tocdepth

2

Работа с таблицами Delta Lake

В этой лабораторной работе вы обработаете таблицу формата Delta Lake с помощью Spark.

Постановка задачи

  1. Построить витрину данных, отражающую полную информацию о клиентах и их пути.

  2. Сохранить результат в формате Delta Lake.

  3. Выгрузить историю изменений таблицы в логи.

В лабораторной работе вам понадобятся:

  • CSV-таблица, в которой хранятся данные о поездке;

  • Python-скрипт.

Описание работы Python-скрипта

  • читает CSV-таблицу;

    Исходные данные v. 0

    v. 0
    
    +-----------+----------+--------------------+-------------+--------------------+
    | vendor_id | trip_id  | trip_distance      | fare_amount | store_and_fwd_flag |
    +-----------+----------+--------------------+-------------+--------------------+
    |          1| 1000371  |                 1.8|        15.32|                   N|
    |          2| 1000372  |                 1.8|        14.31|                   N|
    |          3| 1000373  |                 1.8|         13.3|                   N|
    |          4| 1000374  |                 1.8|        12.29|                   Y|
    +-----------+----------+--------------------+-------------+--------------------+
    
  • дважды обновляет данные в колонке trip_distance: увеличивает значение на 2 для вендоров 2 и 4;

    Версии таблицы

    v. 1
    
    +-----------+----------+--------------------+-------------+--------------------+
    | vendor_id | trip_id  | trip_distance      | fare_amount | store_and_fwd_flag |
    +-----------+----------+--------------------+-------------+--------------------+
    |          1| 1000371  |                 1.8|        15.32|                   N|
    |          2| 1000372  |                 3.8|        14.31|                   N|
    |          3| 1000373  |                 1.8|         13.3|                   N|
    |          4| 1000374  |                 3.8|        12.29|                   Y|
    +-----------+----------+--------------------+-------------+--------------------+
    
    v. 2
    
    +-----------+---------+-------------------+-------------+--------------------+
    | vendor_id | trip_id | trip_distance     | fare_amount | store_and_fwd_flag |
    +-----------+----------+------------------+-------------+--------------------+
    |          1| 1000371 |                1.8|        15.32|                   N|
    |          2| 1000372 |                5.8|        14.31|                   N|
    |          3| 1000373 |                1.8|         13.3|                   N|
    |          4| 1000374 |                5.8|        12.29|                   Y|
    +-----------+---------+-------------------+--------------+-------------------+
    
  • выводит историю изменений с помощью метода history(), которая отображается в логах;

    История изменений таблицы

    +-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
    |version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
    +-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
    |      2|2024-11-06 08:10:43|  NULL|    NULL|   UPDATE|{predicate -> ["(...|NULL|    NULL|     NULL|          1|  Serializable|        false|{numRemovedFiles ...|        NULL|Apache-Spark/3.5....|
    |      1|2024-11-06 08:10:35|  NULL|    NULL|   UPDATE|{predicate -> ["(...|NULL|    NULL|     NULL|          0|  Serializable|        false|{numRemovedFiles ...|        NULL|Apache-Spark/3.5....|
    |      0|2024-11-06 08:10:18|  NULL|    NULL|    WRITE|{mode -> ErrorIfE...|NULL|    NULL|     NULL|       NULL|  Serializable|         true|{numFiles -> 1, n...|        NULL|Apache-Spark/3.5....|
    +-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
    
  • читает таблицу v. 1.

    Таблица v. 1

    v. 1
    
    +-----------+----------+--------------------+-------------+--------------------+
    | vendor_id | trip_id  | trip_distance      | fare_amount | store_and_fwd_flag |
    +-----------+----------+--------------------+-------------+--------------------+
    |          1| 1000371  |                 1.8|        15.32|                   N|
    |          2| 1000372  |                 3.8|        14.31|                   N|
    |          3| 1000373  |                 1.8|         13.3|                   N|
    |          4| 1000374  |                 3.8|        12.29|                   Y|
    +-----------+----------+--------------------+-------------+--------------------+
    

Перед началом работы

  1. Зарегистрируйтесь в личном кабинете Cloud.ru.

    Если вы уже зарегистрированы, войдите под своей учетной записью.

  2. Создайте публичный SNAT-шлюз, чтобы обеспечить инстансу доступ в интернет и связь с внешними источниками.

  3. Создайте секрет в сервисе Secret Manager для доступа к Spark UI.

  4. Создайте бакет Object Storage, в котором будут храниться логи, таблицы и скрипт.

  5. Сверьте совместимость версий Spark и Delta Lake.

Создайте инстанс Spark

  1. Перейдите в раздел Evolution и выберите сервис Managed Spark.

  2. Нажмите Создать инстанс.

  3. В блоке Общие параметры укажите название инстанса, например «spark-delta».

  4. В блоке Конфигурация оставьте значения по умолчанию.

  5. В блоке Настройки:

    • Место хранения — выберите Object Storage.

    • Бакет — выберите ранее созданный бакет S3.

  6. В поле Группа логов выберите группу логов по умолчанию.

  7. Нажмите Продолжить.

  8. В блоке Сетевые настройки:

    • Подсеть — выберите подсеть для инстанса Spark.

    • Группа безопасности — выберите группу безопасности по умолчанию.

  9. В блоке Настройки доступа:

    • Подключить публичный хост — активируйте переключатель.

    • Логин — задайте логин для доступа к Spark.

    • Пароль — выберите секрет для доступа к Spark.

  10. Нажмите Создать.

Создание инстанса занимает около 15 минут. Пока создается инстанс, выполните шаги по подготовке структуры бакета Object Storage, данных и скрипта.

Подготовьте файл CSV

  1. Скачайте CSV-таблицу delta-table.csv.

  2. В файловом менеджере Object Storage создайте папку «input».

  3. Загрузите CSV-таблицу в нее.

Подготовьте скрипт задачи

  1. Скопируйте скрипт и назовите файл «delta-script.py».

    Python-скрипт

     1import time
     2
     3from pyspark.sql import SparkSession
     4from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
     5from delta import *
     6
     7spark = (SparkSession.builder
     8        .appName('Delta test')
     9        .enableHiveSupport()
    10        .getOrCreate()
    11        )
    12
    13SCHEMA = StructType([
    14    StructField("vendor_id", LongType(), True),
    15    StructField("trip_id", LongType(), True),
    16    StructField("trip_distance", FloatType(), True),
    17    StructField("fare_amount", DoubleType(), True),
    18    StructField("store_and_fwd_flag", StringType(), True)
    19])
    20
    21TABLE_TIME = time.strftime('%Y_%m_%d__%H_%M_%S')
    22TABLE_NAME = "delta_lab" + TABLE_TIME
    23ROOT_PATH = "s3a://your-bucket-name/"
    24CSV_PATH = ROOT_PATH + "input/delta-table.csv"
    25FULL_PATH_DELTA_TABLE = ROOT_PATH + "warehouse_delta/" + TABLE_NAME
    26
    27
    28def read_csv_to_table():
    29    _csv_df = (
    30        spark
    31        .read
    32        .option("delimiter", ";")
    33        .option("header", True)
    34        .csv(CSV_PATH, schema=SCHEMA)
    35    )
    36    _csv_df.show()
    37    return _csv_df
    38
    39
    40def insert_data_to_table(df):
    41    df.write.format("delta").save(FULL_PATH_DELTA_TABLE)
    42
    43
    44def read_data_from_table():
    45    df = spark.read.format("delta").load(FULL_PATH_DELTA_TABLE)
    46    df.show()
    47
    48
    49def update_delta_table():
    50    delta_table = DeltaTable.forPath(spark, FULL_PATH_DELTA_TABLE)
    51
    52    delta_table.update(
    53        condition="vendor_id % 2 = 0",
    54        set={
    55            "trip_distance": "trip_distance + 2"
    56        }
    57    )
    58
    59
    60def show_history_delta():
    61    delta_table = DeltaTable.forPath(spark, FULL_PATH_DELTA_TABLE)
    62    history = delta_table.history()
    63    history.show()
    64
    65
    66def read_specific_version_delta(version: int):
    67    df = spark.read.format("delta").option("versionAsOf", version).load(FULL_PATH_DELTA_TABLE)
    68    df.show()
    69
    70
    71if __name__ == "__main__":
    72    csv_df = read_csv_to_table()
    73    insert_data_to_table(df=csv_df)
    74    read_data_from_table()
    75
    76    update_delta_table()
    77    read_data_from_table()
    78
    79    update_delta_table()
    80    read_data_from_table()
    81
    82    show_history_delta()
    83
    84    read_specific_version_delta(version=1)
    85
    86    spark.stop()
    
  2. В 23-й строке скрипта замените your-bucket-name на название бакета Object Storage.

  3. В файловом менеджере Object Storage создайте папку «jobs».

  4. Загрузите скрипт в нее.

В результате должна получиться следующая структура:

  • <bucket>

    • input

      • delta-table.csv

    • jobs

      • delta-script.py

Создайте задачу Spark

Для продолжения работы убедитесь, что статус инстанса Spark в личном кабинете изменился на «Готов».

  1. В списке инстансов Managed Spark откройте карточку инстанса «spark-delta».

  2. Перейдите во вкладку Задачи.

  3. Нажмите Создать задачу.

  4. В блоке Общие параметры введите название задачи, например «delta».

  5. В блоке Скрипт приложения выберите Python:

    • Путь к запускаемому файлу — укажите путь к скрипту. В данном случае путь s3a://{bucket_name}/jobs/delta-script.py, где {bucket_name} — название созданного бакета Object Storage.

  6. В блоке Настройки активируйте переключатель Добавить Spark-конфигурацию (–conf) и введите:

    Параметр

    Значение

    spark.jars.packages

    io.delta:delta-spark_2.12:3.2.0

    spark.sql.extensions

    io.delta.sql.DeltaSparkSessionExtension

    spark.sql.catalog.spark_catalog

    org.apache.spark.sql.delta.catalog.DeltaCatalog

    spark.log.level

    ERROR

  7. Нажмите Создать.

Задача Spark начнет выполняться и отобразится на странице инстанса во вкладке Задачи.

Мониторинг выполнения задачи

Вы можете посмотреть логи задачи, когда задача находится в статусах «Выполняется» и «Готово», то есть как в процессе выполнения, так и по завершению задачи.

Перейдите к логам

  1. Откройте карточку инстанса.

  2. Во вкладке Задачи скопируйте ID задачи.

  3. Нажмите Горизонтальное меню и выберите Перейти к логам.

  4. В поле Запрос введите labels.spark_job_id="ID", где ID — идентификатор задачи, скопированный ранее.

  5. Нажмите Обновить.

В таблице отобразятся логи задачи. Нажмите на строку, чтобы развернуть запись.

Перейдите в Spark UI

  1. Откройте карточку инстанса.

  2. Во вкладке Задачи нажмите Spark UI.

    В соседней вкладке откроется интерфейс Spark UI.

  3. Вернитесь на карточку инстанса и откройте вкладку Информация.

  4. Скопируйте данные из блока Настройки доступа.

  5. Введите данные инстанса:

    • Username — значение поля Пользователь.

    • Password — значение секрета в поле Пароль.

В интерфейсе Spark UI вы найдете информацию о ходе выполнения задачи.

../_images/spark-ui__delta-timeline.PNG ../_images/spark-ui__delta-completed-jobs.PNG

Проверьте результат

Когда задача перейдет в статус «Выполнено», откройте Object Storage. В бакете появится новая папка с названием формата delta-lab_<TIME_STAMP>. В этой папке хранятся:

  • версии таблицы «delta-table.csv»;

  • папка _delta_log с логами задачи.

Чтобы посмотреть историю изменений таблицы с помощью метода history():

  1. На странице Managed Spark перейдите на вкладку Задачи.

  2. Скопируйте ID задачи.

  3. Нажмите Горизонтальное меню и выберите Перейти к логам.

  4. В поле Запрос введите labels.spark_job_id="ID", где ID — идентификатор задачи, скопированный ранее.

  5. Нажмите Скачать журнал логов.

  6. Выберите формат файла.

  7. Нажмите Скачать.

  8. Откройте скачанный файл.

История изменений отображается в нескольких сообщениях.

Запустили Evolution free tier
для Dev & Test
Получить