- tocdepth
2
Работа с таблицами Delta Lake
В этой лабораторной работе вы обработаете таблицу формата Delta Lake с помощью Spark.
Постановка задачи
Построить витрину данных, отражающую полную информацию о клиентах и их пути.
Сохранить результат в формате Delta Lake.
Выгрузить историю изменений таблицы в логи.
В лабораторной работе вам понадобятся:
CSV-таблица, в которой хранятся данные о поездке;
Python-скрипт.
Описание работы Python-скрипта
читает CSV-таблицу;
Исходные данные v. 0
v. 0 +-----------+----------+--------------------+-------------+--------------------+ | vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag | +-----------+----------+--------------------+-------------+--------------------+ | 1| 1000371 | 1.8| 15.32| N| | 2| 1000372 | 1.8| 14.31| N| | 3| 1000373 | 1.8| 13.3| N| | 4| 1000374 | 1.8| 12.29| Y| +-----------+----------+--------------------+-------------+--------------------+дважды обновляет данные в колонке
trip_distance
: увеличивает значение на 2 для вендоров2
и4
;Версии таблицы
v. 1 +-----------+----------+--------------------+-------------+--------------------+ | vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag | +-----------+----------+--------------------+-------------+--------------------+ | 1| 1000371 | 1.8| 15.32| N| | 2| 1000372 | 3.8| 14.31| N| | 3| 1000373 | 1.8| 13.3| N| | 4| 1000374 | 3.8| 12.29| Y| +-----------+----------+--------------------+-------------+--------------------+ v. 2 +-----------+---------+-------------------+-------------+--------------------+ | vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag | +-----------+----------+------------------+-------------+--------------------+ | 1| 1000371 | 1.8| 15.32| N| | 2| 1000372 | 5.8| 14.31| N| | 3| 1000373 | 1.8| 13.3| N| | 4| 1000374 | 5.8| 12.29| Y| +-----------+---------+-------------------+--------------+-------------------+выводит историю изменений с помощью метода
history()
, которая отображается в логах;История изменений таблицы
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+ |version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| operationMetrics|userMetadata| engineInfo| +-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+ | 2|2024-11-06 08:10:43| NULL| NULL| UPDATE|{predicate -> ["(...|NULL| NULL| NULL| 1| Serializable| false|{numRemovedFiles ...| NULL|Apache-Spark/3.5....| | 1|2024-11-06 08:10:35| NULL| NULL| UPDATE|{predicate -> ["(...|NULL| NULL| NULL| 0| Serializable| false|{numRemovedFiles ...| NULL|Apache-Spark/3.5....| | 0|2024-11-06 08:10:18| NULL| NULL| WRITE|{mode -> ErrorIfE...|NULL| NULL| NULL| NULL| Serializable| true|{numFiles -> 1, n...| NULL|Apache-Spark/3.5....| +-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+читает таблицу v. 1.
Таблица v. 1
v. 1 +-----------+----------+--------------------+-------------+--------------------+ | vendor_id | trip_id | trip_distance | fare_amount | store_and_fwd_flag | +-----------+----------+--------------------+-------------+--------------------+ | 1| 1000371 | 1.8| 15.32| N| | 2| 1000372 | 3.8| 14.31| N| | 3| 1000373 | 1.8| 13.3| N| | 4| 1000374 | 3.8| 12.29| Y| +-----------+----------+--------------------+-------------+--------------------+
Перед началом работы
Зарегистрируйтесь в личном кабинете Cloud.ru.
Если вы уже зарегистрированы, войдите под своей учетной записью.
Создайте публичный SNAT-шлюз, чтобы обеспечить инстансу доступ в интернет и связь с внешними источниками.
Создайте секрет в сервисе Secret Manager для доступа к Spark UI.
Создайте бакет Object Storage, в котором будут храниться логи, таблицы и скрипт.
Сверьте совместимость версий Spark и Delta Lake.
Создайте инстанс Spark
Перейдите в раздел Evolution и выберите сервис Managed Spark.
Нажмите Создать инстанс.
В блоке Общие параметры укажите название инстанса, например «spark-delta».
В блоке Конфигурация оставьте значения по умолчанию.
В блоке Настройки:
Место хранения — выберите Object Storage.
Бакет — выберите ранее созданный бакет S3.
В поле Группа логов выберите группу логов по умолчанию.
Нажмите Продолжить.
В блоке Сетевые настройки:
Подсеть — выберите подсеть для инстанса Spark.
Группа безопасности — выберите группу безопасности по умолчанию.
В блоке Настройки доступа:
Подключить публичный хост — активируйте переключатель.
Логин — задайте логин для доступа к Spark.
Пароль — выберите секрет для доступа к Spark.
Нажмите Создать.
Создание инстанса занимает около 15 минут. Пока создается инстанс, выполните шаги по подготовке структуры бакета Object Storage, данных и скрипта.
Подготовьте файл CSV
Скачайте CSV-таблицу delta-table.csv.
В файловом менеджере Object Storage создайте папку «input».
Загрузите CSV-таблицу в нее.
Подготовьте скрипт задачи
Скопируйте скрипт и назовите файл «delta-script.py».
Python-скрипт
1import time 2 3from pyspark.sql import SparkSession 4from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType 5from delta import * 6 7spark = (SparkSession.builder 8 .appName('Delta test') 9 .enableHiveSupport() 10 .getOrCreate() 11 ) 12 13SCHEMA = StructType([ 14 StructField("vendor_id", LongType(), True), 15 StructField("trip_id", LongType(), True), 16 StructField("trip_distance", FloatType(), True), 17 StructField("fare_amount", DoubleType(), True), 18 StructField("store_and_fwd_flag", StringType(), True) 19]) 20 21TABLE_TIME = time.strftime('%Y_%m_%d__%H_%M_%S') 22TABLE_NAME = "delta_lab" + TABLE_TIME 23ROOT_PATH = "s3a://your-bucket-name/" 24CSV_PATH = ROOT_PATH + "input/delta-table.csv" 25FULL_PATH_DELTA_TABLE = ROOT_PATH + "warehouse_delta/" + TABLE_NAME 26 27 28def read_csv_to_table(): 29 _csv_df = ( 30 spark 31 .read 32 .option("delimiter", ";") 33 .option("header", True) 34 .csv(CSV_PATH, schema=SCHEMA) 35 ) 36 _csv_df.show() 37 return _csv_df 38 39 40def insert_data_to_table(df): 41 df.write.format("delta").save(FULL_PATH_DELTA_TABLE) 42 43 44def read_data_from_table(): 45 df = spark.read.format("delta").load(FULL_PATH_DELTA_TABLE) 46 df.show() 47 48 49def update_delta_table(): 50 delta_table = DeltaTable.forPath(spark, FULL_PATH_DELTA_TABLE) 51 52 delta_table.update( 53 condition="vendor_id % 2 = 0", 54 set={ 55 "trip_distance": "trip_distance + 2" 56 } 57 ) 58 59 60def show_history_delta(): 61 delta_table = DeltaTable.forPath(spark, FULL_PATH_DELTA_TABLE) 62 history = delta_table.history() 63 history.show() 64 65 66def read_specific_version_delta(version: int): 67 df = spark.read.format("delta").option("versionAsOf", version).load(FULL_PATH_DELTA_TABLE) 68 df.show() 69 70 71if __name__ == "__main__": 72 csv_df = read_csv_to_table() 73 insert_data_to_table(df=csv_df) 74 read_data_from_table() 75 76 update_delta_table() 77 read_data_from_table() 78 79 update_delta_table() 80 read_data_from_table() 81 82 show_history_delta() 83 84 read_specific_version_delta(version=1) 85 86 spark.stop()
В 23-й строке скрипта замените
your-bucket-name
на название бакета Object Storage.В файловом менеджере Object Storage создайте папку «jobs».
Загрузите скрипт в нее.
В результате должна получиться следующая структура:
<bucket>
input
delta-table.csv
jobs
delta-script.py
Создайте задачу Spark
Для продолжения работы убедитесь, что статус инстанса Spark в личном кабинете изменился на «Готов».
В списке инстансов Managed Spark откройте карточку инстанса «spark-delta».
Перейдите во вкладку Задачи.
Нажмите Создать задачу.
В блоке Общие параметры введите название задачи, например «delta».
В блоке Скрипт приложения выберите Python:
Путь к запускаемому файлу — укажите путь к скрипту. В данном случае путь
s3a://{bucket_name}/jobs/delta-script.py
, где{bucket_name}
— название созданного бакета Object Storage.
В блоке Настройки активируйте переключатель Добавить Spark-конфигурацию (–conf) и введите:
Параметр
Значение
spark.jars.packages
io.delta:delta-spark_2.12:3.2.0
spark.sql.extensions
io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog
org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.log.level
ERROR
Нажмите Создать.
Задача Spark начнет выполняться и отобразится на странице инстанса во вкладке Задачи.
Мониторинг выполнения задачи
Вы можете посмотреть логи задачи, когда задача находится в статусах «Выполняется» и «Готово», то есть как в процессе выполнения, так и по завершению задачи.
Перейдите к логам
Откройте карточку инстанса.
Во вкладке Задачи скопируйте ID задачи.
Нажмите и выберите Перейти к логам.
В поле Запрос введите
labels.spark_job_id="ID"
, где ID — идентификатор задачи, скопированный ранее.Нажмите Обновить.
В таблице отобразятся логи задачи. Нажмите на строку, чтобы развернуть запись.
Перейдите в Spark UI
Откройте карточку инстанса.
Во вкладке Задачи нажмите Spark UI.
В соседней вкладке откроется интерфейс Spark UI.
Вернитесь на карточку инстанса и откройте вкладку Информация.
Скопируйте данные из блока Настройки доступа.
Введите данные инстанса:
Username — значение поля Пользователь.
Password — значение секрета в поле Пароль.
В интерфейсе Spark UI вы найдете информацию о ходе выполнения задачи.
Проверьте результат
Когда задача перейдет в статус «Выполнено», откройте Object Storage.
В бакете появится новая папка с названием формата delta-lab_<TIME_STAMP>
.
В этой папке хранятся:
версии таблицы «delta-table.csv»;
папка
_delta_log
с логами задачи.
Чтобы посмотреть историю изменений таблицы с помощью метода history()
:
На странице Managed Spark перейдите на вкладку Задачи.
Скопируйте ID задачи.
Нажмите и выберите Перейти к логам.
В поле Запрос введите
labels.spark_job_id="ID"
, где ID — идентификатор задачи, скопированный ранее.Нажмите Скачать журнал логов.
Выберите формат файла.
Нажмите Скачать.
Откройте скачанный файл.
История изменений отображается в нескольких сообщениях.
для Dev & Test