Работа с таблицами Delta Lake
В этой лабораторной работе вы обработаете таблицу формата Delta Lake с помощью Spark.
Постановка задачи
-
Построить витрину данных, отражающую полную информацию о клиентах и их пути.
-
Сохранить результат в формате Delta Lake.
-
Выгрузить историю изменений таблицы в логи.
В лабораторной работе вам понадобятся:
-
CSV-таблица, в которой хранятся данные о поездке;
-
Python-скрипт.
Перед началом работы
-
Если вы уже зарегистрированы, войдите под своей учетной записью.
-
Создайте публичный SNAT-шлюз, для доступа инстанса к внешним источникам.
-
Создайте бакет Object Storage, в котором будут храниться логи, таблицы и скрипт.
-
Сверьте совместимость версий Spark и Delta Lake.
-
Создайте кластер Data Platform, в котором будет размещен инстанс.
Назовите кластер «dp-labs».
-
Скачайте и установите root-сертификат на устройство.
-
Создайте пароль и добавьте его в Secret Manager. Этот секрет станет паролем для доступа к интерфейсу Spark.
Создайте инстанс Spark
-
Перейдите в раздел Evolution и выберите сервис Managed Spark.
-
Нажмите Создать инстанс.
-
В блоке Общие параметры задайте:
-
Название — spark-delta.
-
Кластер — dp-labs.
-
-
В блоке Конфигурация оставьте значения по умолчанию.
-
В блоке Настройки выберите:
-
Место сохранения — Object Storage.
-
Бакет — ранее созданный бакет S3.
-
-
В поле Лог-группа выберите группу логов по умолчанию.
-
Нажмите Продолжить.
-
В блоке Сетевые настройки выберите:
-
Зона доступности — зону доступности.
-
Подсеть — подсеть со sNAT-шлюзом и с DNS-сервером.
Если нужной подсети нет, создайте новую, нажав Создать новую подсеть.
-
-
В блоке Настройки доступа:
-
Подключить публичный хост — активируйте опцию, чтобы опубликовать инстанс в интернете. Интерфейсы Spark History Server и Spark UI станут доступны из интернета.
-
Логин — задайте логин для доступа к Spark.
-
Пароль — выберите секрет для доступа к Spark.
Вы можете создать новый секрет, нажав Создать новый секрет.
-
-
В блоке Настройки доступа:
-
Подключить публичный хост — активируйте переключатель.
-
Логин — задайте логин для доступа к Spark.
-
Пароль — выберите секрет для доступа к Spark.
-
-
Нажмите Создать.
Создание инстанса занимает около 15 минут. Пока создается инстанс, выполните шаги по подготовке структуры бакета Object Storage, данных и скрипта.
Подготовьте файл CSV
-
Скачайте CSV-таблицу delta-table.csv. Нажмите Скачать в правом верхнем углу.
-
В файловом менеджере Object Storage создайте папку «input».
-
Загрузите CSV-таблицу в нее.
Подготовьте скрипт задачи
-
Скопируйте скрипт и назовите файл «delta-script.py».
Python-скрипт -
В строке ROOT_PATH = "s3a://your-bucket-name/" скрипта замените your-bucket-name на название бакета Object Storage.
-
В файловом менеджере Object Storage создайте папку «jobs».
-
Загрузите скрипт в нее.
В результате должна получиться следующая структура:
-
<bucket>
-
input
-
delta-table.csv
-
-
jobs
-
delta-script.py
-
-
Создайте задачу Spark
Для продолжения работы убедитесь, что статус инстанса Spark в личном кабинете изменился на «Готов».
-
Перейдите в раздел Evolution и выберите сервис Managed Spark.
-
В списке инстансов Managed Spark откройте карточку инстанса «spark-delta».
-
Перейдите во вкладку Задачи.
-
Нажмите Создать задачу.
-
В блоке Общие параметры введите название задачи, например «delta».
-
В блоке Скрипт приложения выберите Python:
-
Путь к запускаемому файлу — укажите путь к скрипту. В данном случае путь s3a://{bucket_name}/jobs/delta-script.py, где {bucket_name} — название созданного бакета Object Storage.
-
-
В блоке Настройки активируйте переключатель Добавить Spark-конфигурацию (–conf) и введите:
Параметр
Значение
spark.jars.packages
io.delta:delta-spark_2.12:3.2.0
spark.sql.extensions
io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog
org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.log.level
ERROR
-
Нажмите Создать.
Задача Spark начнет выполняться и отобразится на странице инстанса во вкладке Задачи.
Мониторинг выполнения задачи
Вы можете посмотреть логи задачи, когда задача находится в статусах «Выполняется» и «Готово», то есть как в процессе выполнения, так и по завершению задачи.
Перейдите к логам
-
Откройте карточку инстанса.
-
Во вкладке Задачи скопируйте ID задачи.
-
Нажмите
и выберите Перейти к логам.
-
В поле Запрос введите labels.spark_job_id="ID", где ID — идентификатор задачи, скопированный ранее.
-
Нажмите Обновить.
В таблице отобразятся логи задачи. Нажмите на строку, чтобы развернуть запись.
Перейдите в Spark UI
-
Откройте карточку инстанса.
-
Во вкладке Задачи нажмите Spark UI.
В соседней вкладке откроется интерфейс Spark UI.
-
Вернитесь на карточку инстанса и откройте вкладку Информация.
-
Скопируйте данные из блока Настройки доступа.
-
Введите данные инстанса:
-
Username — значение поля Пользователь.
-
Password — значение секрета в поле Пароль.
-
В интерфейсе Spark UI вы найдете информацию о ходе выполнения задачи.
Проверьте результат
Когда задача перейдет в статус «Выполнено», откройте Object Storage. В бакете появится новая папка с названием формата delta-lab_<TIME_STAMP>. В этой папке хранятся:
-
версии таблицы «delta-table.csv»;
-
папка _delta_log с логами задачи.
Чтобы посмотреть историю изменений таблицы с помощью метода history():
-
На странице Managed Spark перейдите на вкладку Задачи.
-
Скопируйте ID задачи.
-
Нажмите
и выберите Перейти к логам.
-
В поле Запрос введите labels.spark_job_id="ID", где ID — идентификатор задачи, скопированный ранее.
-
Нажмите Скачать журнал логов.
-
Выберите формат файла.
-
Нажмите Скачать.
-
Откройте скачанный файл.
История изменений отображается в нескольких сообщениях.
- Постановка задачи
- Перед началом работы
- Создайте инстанс Spark
- Подготовьте файл CSV
- Подготовьте скрипт задачи
- Создайте задачу Spark
- Мониторинг выполнения задачи
- Проверьте результат