tocdepth

2

Обработка данных из Object Storage

На этой странице вы найдете лабораторную работу по обработке данных из Evolution Object Storage с помощью Managed Spark.

Постановка задачи

Необходимо построить витрину данных, отражающую полную информацию о клиентах и продажах.

Таблица client.csv содержит информацию о клиентах: номер заказа, дату, город, имя и фамилию клиента, модель автомобиля и др.

Таблица sales.csv содержит информацию о продажах: номер заказа и его сумму.

В результате должна получиться таблица, в которой данные объединены по номеру заказа.

Перед началом работы

  1. Зарегистрируйтесь в личном кабинете Cloud.ru.

    Если вы уже зарегистрированы, войдите под своей учетной записью.

  2. Чтобы обеспечить инстансу доступ в интернет и связь с внешними источниками, создайте публичный SNAT-шлюз.

  3. Создайте секрет в сервисе Secret Manager для доступа к Spark UI.

Создайте бакет Object Storage

  1. Создайте бакет Object Storage по инструкции.

  2. Установите и запустите CyberDuck по инструкции.

Создайте инстанс Spark

  1. Перейдите в раздел Evolution и выберите сервис Managed Spark.

  2. Нажмите Создать инстанс.

  3. В блоке Общие параметры укажите название инстанса, например spark-lab.

  4. В блоке Конфигурация оставьте значения по умолчанию.

  5. В блоке Настройки:

    • Место хранения — выберите Object Storage.

    • Бакет — выберите ранее созданный бакет S3.

  6. В поле Группа логов выберите группу логов.

  7. Нажмите Продолжить.

  8. В блоке Сетевые настройки:

    • Подсеть — выберите подсеть для инстанса Spark.

    • Группа безопасности — выберите группу безопасности по умолчанию.

  9. В блоке Настройки доступа:

    • Подключить публичный хост — активируйте переключатель.

    • Логин — задайте логин для доступа к Spark.

    • Пароль — выберите секрет для доступа к Spark.

  10. Нажмите Создать.

Создание инстанса занимает около 15 минут. Пока создается инстанс, выполните шаги по подготовке структуры бакета Object Storage, данных и скрипта.

Подготовьте данные и скрипт

В лабораторной работе вам понадобятся:

  • таблица в формате CSV client.csv, которая содержит информацию о клиентах и деталях заказа;

  • таблица в формате CSV sales.csv, которая содержит информацию о продажах: ID заказа и сумму.

  • Python-скрипт, который:

    • читает таблицы sales.csv и client.csv;

    • объединяет таблицы по колонке order_number;

    • выгружает перечисленные в скрипте колонки.

Выполните следующие шаги, чтобы подготовить структуру бакета Object Storage и загрузить на него нужные файлы.

Подготовьте файлы CSV

  1. Скачайте sales.csv и client.csv. Нажмите Скачать в правом верхнем углу.

  2. Откройте CyberDuck.

  3. В бакете Object Storage создайте папку input.

  4. В этой папке создайте папку car-sales.

  5. Переместите CSV-таблицы в папку car-sales.

Скопируйте скрипт задачи

  1. Скопируйте скрипт и назовите файл spark-sales-etl.py.

    Python-скрипт

     1# s3a://bucket-spark/jobs/spark-sales-etl.py
     2
     3import time
     4from pyspark.sql import SparkSession
     5from pyspark.sql.types import StructType, StructField, StringType, IntegerType
     6from pyspark.sql.functions import count
     7from pyspark.sql.types import IntegerType,BooleanType,DateType
     8from pyspark.sql.functions import col
     9from pyspark.sql.functions import sum,avg,max
    10
    11bucket_name = 'your-bucket-name'
    12
    13spark = (SparkSession.builder
    14         .appName("sales")
    15         .getOrCreate()
    16         )
    17
    18df_sales = spark.read \
    19   .format("csv") \
    20   .option("header", "true") \
    21   .option("inferSchema", "true") \
    22   .option("delimiter", ";") \
    23   .load(f"s3a://{bucket_name}/input/car-sales/sales.csv")
    24
    25df_client = spark.read \
    26   .format("csv") \
    27   .option("header", "true") \
    28   .option("inferSchema", "true") \
    29   .option("delimiter", ";") \
    30   .load(f"s3a://{bucket_name}/input/car-sales/client.csv")
    31
    32
    33df_result = df_sales \
    34   .join(df_client, df_sales.order_number ==  df_client.order_number,"inner") \
    35   .select( \
    36      df_client.order_number, \
    37      df_client.order_date, \
    38      df_client.phone, \
    39      df_client.address_line1, \
    40      df_client.address_line2, \
    41      df_client.city, \
    42      df_client.state, \
    43      df_client.postal_code, \
    44      df_client.country, \
    45      df_client.territory, \
    46      df_client.contact_last_name, \
    47      df_client.contact_first_name, \
    48      df_client.deal_size, \
    49      df_client.car, \
    50      df_sales.sales \
    51   )
    52
    53df_result.write.mode('overwrite').csv(f"s3a://{bucket_name}/output/sales")
    
  2. В 11-й строке скрипта замените your-bucket-name на название бакета Object Storage.

  3. В CyberDuck создайте папку jobs.

  4. Поместите скрипт в нее.

В результате получится следующая структура бакета с файлами:

../_images/cyberduck__s3-structure.PNG

Создайте задачу Spark

Для продолжения работы убедитесь, что статус инстанса Spark изменился на «Готов».

  1. В списке инстансов Managed Spark откройте карточку инстанса spark-lab.

  2. Перейдите во вкладку Задачи.

  3. Нажмите Создать задачу.

  4. В блоке Общие параметры введите название задачи, например spark-sales.

  5. В блоке Скрипт приложения выберите Python.

  6. Укажите путь к запускаемой программе. В данном случае путь s3a://{bucket_name}/jobs/spark-sales-etl.py, где {bucket_name} — название созданного бакета Object Storage.

  7. Нажмите Создать.

Задача Spark начнет выполняться и отобразится на странице инстанса во вкладке Задачи.

Мониторинг выполнения задачи

Вы можете посмотреть логи задачи, когда задача находится в статусах «Выполняется» и «Готово», то есть как в процессе выполнения, так и по завершению задачи.

Перейдите к логам

  1. Откройте карточку инстанса.

  2. Во вкладке Задачи скопируйте ID задачи.

  3. Нажмите Горизонтальное меню и выберите Перейти к логам.

  4. В поле Запрос введите labels.spark_job_id="ID", где ID — идентификатор задачи, скопированный ранее.

  5. Нажмите Обновить.

В таблице отобразятся логи задачи. Нажмите на строку, чтобы развернуть запись.

Перейдите в Spark UI

  1. Откройте карточку инстанса.

  2. Во вкладке Задачи нажмите Spark UI.

    В соседней вкладке откроется интерфейс Spark UI.

  3. Вернитесь на карточку инстанса и откройте вкладку Информация.

  4. Скопируйте данные из блока Настройки доступа.

  5. Введите данные инстанса:

    • Username — значение поля Пользователь.

    • Password — значение секрета в поле Пароль.

В интерфейсе Spark UI вы найдете информацию о ходе выполнения задачи.

../_images/spark-ui__timeline.png ../_images/spark-ui__completed-jobs.png

Проверьте результат

Когда задача перейдет в статус «Выполнено», откройте CyberDuck. В бакете появятся:

  • новая папка sales;

  • таблица с объединенными данными из sales.csv и client.csv.

../_images/cyberduck__result.PNG
Запустили Evolution free tier
для Dev & Test
Получить