tocdepth

2

Работа с таблицами Iceberg

В этой лабораторной работе вы обработаете таблицу формата Iceberg с помощью Managed Spark и преобразуете ее в таблицу Parquet.

Постановка задачи

  1. Построить витрину данных, отражающую информацию о продажах.

  2. Результат сохранить в формате Iceberg.

В лабораторной работе вам понадобятся:

  • CSV-таблица, в которой хранятся данные о поездке;

  • JAR-файл Iceberg;

  • Python-скрипт, который:

    • читает CSV-таблицу;

    • создает схему Data Frame;

    • выгружает данные в таблицу Parquet.

Перед началом работы

  1. Зарегистрируйтесь в личном кабинете Cloud.ru.

    Если вы уже зарегистрированы, войдите под своей учетной записью.

  2. Создайте публичный SNAT-шлюз, чтобы обеспечить инстансу доступ в интернет и связь с внешними источниками.

  3. Создайте секрет в сервисе Secret Manager для доступа к Spark UI.

  4. Создайте бакет Object Storage, в котором будут храниться логи, таблицы, JAR-файл Iceberg и скрипт.

  5. Установите и запустите CyberDuck.

  6. Сверьте совместимость версий Spark и Iceberg.

Создайте инстанс Spark

  1. Перейдите в раздел Evolution и выберите сервис Managed Spark.

  2. Нажмите Создать инстанс.

  3. В блоке Общие параметры укажите название инстанса, например «spark-iceberg».

  4. В блоке Конфигурация оставьте значения по умолчанию.

  5. В блоке Настройки:

    • Место хранения — выберите Object Storage.

    • Бакет — выберите ранее созданный бакет S3.

  6. В поле Группа логов выберите группу логов по умолчанию.

  7. Нажмите Продолжить.

  8. В блоке Сетевые настройки:

    • Подсеть — выберите подсеть для инстанса Spark.

    • Группа безопасности — выберите группу безопасности по умолчанию.

  9. В блоке Настройки доступа:

    • Подключить публичный хост — активируйте переключатель.

    • Логин — задайте логин для доступа к Spark.

    • Пароль — выберите секрет для доступа к Spark.

  10. Нажмите Создать.

Создание инстанса занимает около 15 минут. Пока создается инстанс, выполните шаги по подготовке структуры бакета Object Storage, данных и скрипта.

Подготовьте файл CSV

  1. Скачайте CSV-таблицу iceberg-table.csv.

  2. Откройте CyberDuck.

  3. В бакете Object Storage создайте папку «input» и загрузите CSV-таблицу в нее.

Подготовьте скрипт задачи

  1. Скопируйте скрипт и назовите файл «iceberg-script.py».

    Python-скрипт

     1import time
     2
     3from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
     4from pyspark.sql import SparkSession
     5
     6spark = (SparkSession.builder
     7        .appName('Iceberg test')
     8        .enableHiveSupport()
     9        .getOrCreate()
    10        )
    11
    12DB_NAME = f"db_{time.strftime('%Y_%m_%d__%H_%M_%S')}"
    13CATALOG_NAME = "local"
    14TABLE_NAME = "my_table"
    15TABLE_PATH = f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}"
    16
    17
    18ROOT_PATH = "s3a://<your-bucket-name>/input/"
    19CSV_PATH = ROOT_PATH + "iceberg-table.csv"
    20
    21SCHEMA = StructType([
    22    StructField("vendor_id", LongType(), True),
    23    StructField("trip_id", LongType(), True),
    24    StructField("trip_distance", FloatType(), True),
    25    StructField("fare_amount", DoubleType(), True),
    26    StructField("store_and_fwd_flag", StringType(), True)
    27])
    28
    29
    30def create_table():
    31    df = spark.createDataFrame([], SCHEMA)
    32    df.writeTo(TABLE_PATH).create()
    33
    34
    35def read_csv_to_table():
    36    _csv_df = (
    37        spark
    38        .read
    39        .option("delimiter", ";")
    40        .option("header", True)
    41        .csv(CSV_PATH, schema=SCHEMA)
    42    )
    43    _csv_df.show()
    44    return _csv_df
    45
    46
    47def insert_data_to_table(df):
    48    df.writeTo(TABLE_PATH).append()
    49
    50
    51def read_data_from_table():
    52    spark.table(TABLE_PATH).show()
    53
    54
    55if __name__ == "__main__":
    56    create_table()
    57    csv_df = read_csv_to_table()
    58    insert_data_to_table(df=csv_df)
    59    read_data_from_table()
    60
    61    spark.stop()
    
  2. В 18-й строке скрипта замените your-bucket-name на название бакета Object Storage.

  3. В CyberDuck создайте папку «jobs» и загрузите скрипт в нее.

Подготовьте файл Iceberg JAR

  1. Скачайте JAR-файл Iceberg для соответствующей версии Spark. Например, если версия Spark 3.5., скачайте «1.6.1 Spark 3.5_with Scala 2.12 runtime Jar».

  2. Откройте CyberDuck.

  3. В бакете Object Storage создайте папку «iceberg» и загрузите JAR-файл в нее. В рамках лабораторной работы файл iceberg-spark-runtime-3.5_2.12-1.6.1.jar.

В результате должна получиться следующая структура:

../_images/cyberduck__iceberg-s3-structure.PNG

Создайте задачу Spark

Для продолжения работы убедитесь, что статус инстанса Spark в личном кабинете изменился на «Готов».

  1. В списке инстансов Managed Spark откройте карточку инстанса «spark-iceberg».

  2. Перейдите во вкладку Задачи.

  3. Нажмите Создать задачу.

  4. В блоке Общие параметры введите название задачи, например «iceberg».

  5. В блоке Скрипт приложения выберите Python.

    • Путь к запускамому файлу — укажите путь к скрипту. В данном случае путь s3a://{bucket_name}/jobs/iceberg-script.py, где {bucket_name} — название созданного бакета Object Storage.

  6. В блоке Настройки:

    • Добавить Spark-конфигурацию (–conf) — активируйте переключатель и введите:

      Параметр

      Значение

      spark.sql.catalog.local

      org.apache.iceberg.spark.SparkCatalog

      spark.sql.catalog.local.type

      hadoop

      spark.sql.catalog.local.warehouse

      s3a://{bucket_name}/

      {bucket_name} — название созданного бакета Object Storage

    • Добавить зависимости — укажите путь к JAR-файлу. В данном случае путь s3a://{bucket_name}/iceberg/iceberg-spark-runtime-3.5_2.12-1.6.1.jar, где {bucket_name} — название созданного бакета Object Storage.

  7. Нажмите Создать.

Задача Spark начнет выполняться и отобразится на странице инстанса во вкладке Задачи.

Мониторинг выполнения задачи

Вы можете посмотреть логи задачи, когда задача находится в статусах «Выполняется» и «Готово», то есть как в процессе выполнения, так и по завершению задачи.

Перейдите к логам

  1. Откройте карточку инстанса.

  2. Во вкладке Задачи скопируйте ID задачи.

  3. Нажмите Горизонтальное меню и выберите Перейти к логам.

  4. В поле Запрос введите labels.spark_job_id="ID", где ID — идентификатор задачи, скопированный ранее.

  5. Нажмите Обновить.

В таблице отобразятся логи задачи. Нажмите на строку, чтобы развернуть запись.

Перейдите в Spark UI

  1. Откройте карточку инстанса.

  2. Во вкладке Задачи нажмите Spark UI.

    В соседней вкладке откроется интерфейс Spark UI.

  3. Вернитесь на карточку инстанса и откройте вкладку Информация.

  4. Скопируйте данные из блока Настройки доступа.

  5. Введите данные инстанса:

    • Username — значение поля Пользователь.

    • Password — значение секрета в поле Пароль.

В интерфейсе Spark UI вы найдете информацию о ходе выполнения задачи.

../_images/spark-ui__iceberg-timeline.PNG ../_images/spark-ui__iceberg-completed-jobs.png

Проверьте результат

Когда задача перейдет в статус «Выполнено», откройте CyberDuck. В бакете появится новая папка с названием формата db_<YYYY_MM_DD_hrs_min_sec>. Внутри этой папки находятся две папки:

  • metadata с описательной частью данных;

  • data с таблицей Parquet с результатом работы скрипта.

../_images/cyberduck__iceberg-s3-result.PNG
Запустили Evolution free tier
для Dev & Test
Получить