- tocdepth
2
Работа с таблицами Iceberg
В этой лабораторной работе вы обработаете таблицу формата Iceberg с помощью Managed Spark и преобразуете ее в таблицу Parquet.
Постановка задачи
Построить витрину данных, отражающую информацию о продажах.
Результат сохранить в формате Iceberg.
В лабораторной работе вам понадобятся:
CSV-таблица, в которой хранятся данные о поездке;
JAR-файл Iceberg;
Python-скрипт, который:
читает CSV-таблицу;
создает схему Data Frame;
выгружает данные в таблицу Parquet.
Перед началом работы
Зарегистрируйтесь в личном кабинете Cloud.ru.
Если вы уже зарегистрированы, войдите под своей учетной записью.
Создайте публичный SNAT-шлюз, чтобы обеспечить инстансу доступ в интернет и связь с внешними источниками.
Создайте секрет в сервисе Secret Manager для доступа к Spark UI.
Создайте бакет Object Storage, в котором будут храниться логи, таблицы, JAR-файл Iceberg и скрипт.
Сверьте совместимость версий Spark и Iceberg.
Создайте инстанс Spark
Перейдите в раздел Evolution и выберите сервис Managed Spark.
Нажмите Создать инстанс.
В блоке Общие параметры укажите название инстанса, например «spark-iceberg».
В блоке Конфигурация оставьте значения по умолчанию.
В блоке Настройки:
Место хранения — выберите Object Storage.
Бакет — выберите ранее созданный бакет S3.
В поле Группа логов выберите группу логов по умолчанию.
Нажмите Продолжить.
В блоке Сетевые настройки:
Подсеть — выберите подсеть для инстанса Spark.
Группа безопасности — выберите группу безопасности по умолчанию.
В блоке Настройки доступа:
Подключить публичный хост — активируйте переключатель.
Логин — задайте логин для доступа к Spark.
Пароль — выберите секрет для доступа к Spark.
Нажмите Создать.
Создание инстанса занимает около 15 минут. Пока создается инстанс, выполните шаги по подготовке структуры бакета Object Storage, данных и скрипта.
Подготовьте файл CSV
Скачайте CSV-таблицу iceberg-table.csv.
Откройте CyberDuck.
В бакете Object Storage создайте папку «input» и загрузите CSV-таблицу в нее.
Подготовьте скрипт задачи
Скопируйте скрипт и назовите файл «iceberg-script.py».
Python-скрипт
1import time 2 3from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType 4from pyspark.sql import SparkSession 5 6spark = (SparkSession.builder 7 .appName('Iceberg test') 8 .enableHiveSupport() 9 .getOrCreate() 10 ) 11 12DB_NAME = f"db_{time.strftime('%Y_%m_%d__%H_%M_%S')}" 13CATALOG_NAME = "local" 14TABLE_NAME = "my_table" 15TABLE_PATH = f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}" 16 17 18ROOT_PATH = "s3a://<your-bucket-name>/input/" 19CSV_PATH = ROOT_PATH + "iceberg-table.csv" 20 21SCHEMA = StructType([ 22 StructField("vendor_id", LongType(), True), 23 StructField("trip_id", LongType(), True), 24 StructField("trip_distance", FloatType(), True), 25 StructField("fare_amount", DoubleType(), True), 26 StructField("store_and_fwd_flag", StringType(), True) 27]) 28 29 30def create_table(): 31 df = spark.createDataFrame([], SCHEMA) 32 df.writeTo(TABLE_PATH).create() 33 34 35def read_csv_to_table(): 36 _csv_df = ( 37 spark 38 .read 39 .option("delimiter", ";") 40 .option("header", True) 41 .csv(CSV_PATH, schema=SCHEMA) 42 ) 43 _csv_df.show() 44 return _csv_df 45 46 47def insert_data_to_table(df): 48 df.writeTo(TABLE_PATH).append() 49 50 51def read_data_from_table(): 52 spark.table(TABLE_PATH).show() 53 54 55if __name__ == "__main__": 56 create_table() 57 csv_df = read_csv_to_table() 58 insert_data_to_table(df=csv_df) 59 read_data_from_table() 60 61 spark.stop()
В 18-й строке скрипта замените
your-bucket-name
на название бакета Object Storage.В CyberDuck создайте папку «jobs» и загрузите скрипт в нее.
Подготовьте файл Iceberg JAR
Скачайте JAR-файл Iceberg для соответствующей версии Spark. Например, если версия Spark 3.5., скачайте «1.6.1 Spark 3.5_with Scala 2.12 runtime Jar».
Откройте CyberDuck.
В бакете Object Storage создайте папку «iceberg» и загрузите JAR-файл в нее. В рамках лабораторной работы файл
iceberg-spark-runtime-3.5_2.12-1.6.1.jar
.
В результате должна получиться следующая структура:
Создайте задачу Spark
Для продолжения работы убедитесь, что статус инстанса Spark в личном кабинете изменился на «Готов».
В списке инстансов Managed Spark откройте карточку инстанса «spark-iceberg».
Перейдите во вкладку Задачи.
Нажмите Создать задачу.
В блоке Общие параметры введите название задачи, например «iceberg».
В блоке Скрипт приложения выберите Python.
Путь к запускамому файлу — укажите путь к скрипту. В данном случае путь
s3a://{bucket_name}/jobs/iceberg-script.py
, где{bucket_name}
— название созданного бакета Object Storage.
В блоке Настройки:
Добавить Spark-конфигурацию (–conf) — активируйте переключатель и введите:
Параметр
Значение
spark.sql.catalog.local
org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.local.type
hadoop
spark.sql.catalog.local.warehouse
s3a://{bucket_name}/
{bucket_name} — название созданного бакета Object Storage
Добавить зависимости — укажите путь к JAR-файлу. В данном случае путь
s3a://{bucket_name}/iceberg/iceberg-spark-runtime-3.5_2.12-1.6.1.jar
, где{bucket_name}
— название созданного бакета Object Storage.
Нажмите Создать.
Задача Spark начнет выполняться и отобразится на странице инстанса во вкладке Задачи.
Мониторинг выполнения задачи
Вы можете посмотреть логи задачи, когда задача находится в статусах «Выполняется» и «Готово», то есть как в процессе выполнения, так и по завершению задачи.
Перейдите к логам
Откройте карточку инстанса.
Во вкладке Задачи скопируйте ID задачи.
Нажмите и выберите Перейти к логам.
В поле Запрос введите
labels.spark_job_id="ID"
, где ID — идентификатор задачи, скопированный ранее.Нажмите Обновить.
В таблице отобразятся логи задачи. Нажмите на строку, чтобы развернуть запись.
Перейдите в Spark UI
Откройте карточку инстанса.
Во вкладке Задачи нажмите Spark UI.
В соседней вкладке откроется интерфейс Spark UI.
Вернитесь на карточку инстанса и откройте вкладку Информация.
Скопируйте данные из блока Настройки доступа.
Введите данные инстанса:
Username — значение поля Пользователь.
Password — значение секрета в поле Пароль.
В интерфейсе Spark UI вы найдете информацию о ходе выполнения задачи.
Проверьте результат
Когда задача перейдет в статус «Выполнено», откройте CyberDuck.
В бакете появится новая папка с названием формата db_<YYYY_MM_DD_hrs_min_sec>
.
Внутри этой папки находятся две папки:
metadata
с описательной частью данных;data
с таблицей Parquet с результатом работы скрипта.
для Dev & Test