- tocdepth
2
Обработка данных из Object Storage
На этой странице вы найдете лабораторную работу по обработке данных из Evolution Object Storage с помощью Managed Spark.
Постановка задачи
Необходимо построить витрину данных, отражающую полную информацию о клиентах и продажах.
Таблица client.csv
содержит информацию о клиентах: номер заказа, дату, город, имя и фамилию клиента, модель автомобиля и др.
Таблица sales.csv
содержит информацию о продажах: номер заказа и его сумму.
В результате должна получиться таблица, в которой данные объединены по номеру заказа.
Перед началом работы
Зарегистрируйтесь в личном кабинете Cloud.ru.
Если вы уже зарегистрированы, войдите под своей учетной записью.
Чтобы обеспечить инстансу доступ в интернет и связь с внешними источниками, создайте публичный SNAT-шлюз.
Создайте секрет в сервисе Secret Manager для доступа к Spark UI.
Создайте бакет Object Storage
Создайте бакет Object Storage по инструкции.
Установите и запустите CyberDuck по инструкции.
Создайте инстанс Spark
Перейдите в раздел Evolution и выберите сервис Managed Spark.
Нажмите Создать инстанс.
В блоке Общие параметры укажите название инстанса, например
spark-lab
.В блоке Конфигурация оставьте значения по умолчанию.
В блоке Настройки:
Место хранения — выберите Object Storage.
Бакет — выберите ранее созданный бакет S3.
В поле Группа логов выберите группу логов.
Нажмите Продолжить.
В блоке Сетевые настройки:
Подсеть — выберите подсеть для инстанса Spark.
Группа безопасности — выберите группу безопасности по умолчанию.
В блоке Настройки доступа:
Подключить публичный хост — активируйте переключатель.
Логин — задайте логин для доступа к Spark.
Пароль — выберите секрет для доступа к Spark.
Нажмите Создать.
Создание инстанса занимает около 15 минут. Пока создается инстанс, выполните шаги по подготовке структуры бакета Object Storage, данных и скрипта.
Подготовьте данные и скрипт
В лабораторной работе вам понадобятся:
таблица в формате CSV
client.csv
, которая содержит информацию о клиентах и деталях заказа;таблица в формате CSV
sales.csv
, которая содержит информацию о продажах: ID заказа и сумму.Python-скрипт, который:
читает таблицы
sales.csv
иclient.csv
;объединяет таблицы по колонке
order_number
;выгружает перечисленные в скрипте колонки.
Выполните следующие шаги, чтобы подготовить структуру бакета Object Storage и загрузить на него нужные файлы.
Подготовьте файлы CSV
Скачайте sales.csv и client.csv. Нажмите Скачать в правом верхнем углу.
Откройте CyberDuck.
В бакете Object Storage создайте папку
input
.В этой папке создайте папку
car-sales
.Переместите CSV-таблицы в папку
car-sales
.
Скопируйте скрипт задачи
Скопируйте скрипт и назовите файл
spark-sales-etl.py
.Python-скрипт
1# s3a://bucket-spark/jobs/spark-sales-etl.py 2 3import time 4from pyspark.sql import SparkSession 5from pyspark.sql.types import StructType, StructField, StringType, IntegerType 6from pyspark.sql.functions import count 7from pyspark.sql.types import IntegerType,BooleanType,DateType 8from pyspark.sql.functions import col 9from pyspark.sql.functions import sum,avg,max 10 11bucket_name = 'your-bucket-name' 12 13spark = (SparkSession.builder 14 .appName("sales") 15 .getOrCreate() 16 ) 17 18df_sales = spark.read \ 19 .format("csv") \ 20 .option("header", "true") \ 21 .option("inferSchema", "true") \ 22 .option("delimiter", ";") \ 23 .load(f"s3a://{bucket_name}/input/car-sales/sales.csv") 24 25df_client = spark.read \ 26 .format("csv") \ 27 .option("header", "true") \ 28 .option("inferSchema", "true") \ 29 .option("delimiter", ";") \ 30 .load(f"s3a://{bucket_name}/input/car-sales/client.csv") 31 32 33df_result = df_sales \ 34 .join(df_client, df_sales.order_number == df_client.order_number,"inner") \ 35 .select( \ 36 df_client.order_number, \ 37 df_client.order_date, \ 38 df_client.phone, \ 39 df_client.address_line1, \ 40 df_client.address_line2, \ 41 df_client.city, \ 42 df_client.state, \ 43 df_client.postal_code, \ 44 df_client.country, \ 45 df_client.territory, \ 46 df_client.contact_last_name, \ 47 df_client.contact_first_name, \ 48 df_client.deal_size, \ 49 df_client.car, \ 50 df_sales.sales \ 51 ) 52 53df_result.write.mode('overwrite').csv(f"s3a://{bucket_name}/output/sales")
В 11-й строке скрипта замените
your-bucket-name
на название бакета Object Storage.В CyberDuck создайте папку
jobs
.Поместите скрипт в нее.
В результате получится следующая структура бакета с файлами:
Создайте задачу Spark
Для продолжения работы убедитесь, что статус инстанса Spark изменился на «Готов».
В списке инстансов Managed Spark откройте карточку инстанса
spark-lab
.Перейдите во вкладку Задачи.
Нажмите Создать задачу.
В блоке Общие параметры введите название задачи, например
spark-sales
.В блоке Скрипт приложения выберите Python.
Укажите путь к запускаемой программе. В данном случае путь
s3a://{bucket_name}/jobs/spark-sales-etl.py
, где{bucket_name}
— название созданного бакета Object Storage.Нажмите Создать.
Задача Spark начнет выполняться и отобразится на странице инстанса во вкладке Задачи.
Мониторинг выполнения задачи
Вы можете посмотреть логи задачи, когда задача находится в статусах «Выполняется» и «Готово», то есть как в процессе выполнения, так и по завершению задачи.
Перейдите к логам
Откройте карточку инстанса.
Во вкладке Задачи скопируйте ID задачи.
Нажмите и выберите Перейти к логам.
В поле Запрос введите
labels.spark_job_id="ID"
, где ID — идентификатор задачи, скопированный ранее.Нажмите Обновить.
В таблице отобразятся логи задачи. Нажмите на строку, чтобы развернуть запись.
Перейдите в Spark UI
Откройте карточку инстанса.
Во вкладке Задачи нажмите Spark UI.
В соседней вкладке откроется интерфейс Spark UI.
Вернитесь на карточку инстанса и откройте вкладку Информация.
Скопируйте данные из блока Настройки доступа.
Введите данные инстанса:
Username — значение поля Пользователь.
Password — значение секрета в поле Пароль.
В интерфейсе Spark UI вы найдете информацию о ходе выполнения задачи.
Проверьте результат
Когда задача перейдет в статус «Выполнено», откройте CyberDuck. В бакете появятся:
новая папка
sales
;таблица с объединенными данными из
sales.csv
иclient.csv
.
для Dev & Test