tocdepth

2

Работа с пользовательским образом

В этой лабораторной работе вы обработаете данные с помощью пользовательского образа Spark.

Постановка задачи

  1. Построить витрину данных, объединяющую информацию о заказах из двух таблиц.

  2. Найти среднюю стоимость заказа.

  3. Подсчитать расхождение суммы каждого заказа со средней стоимостью заказа.

Чтобы выполнить задачу, необходимо подготовить пользовательский образ и Python-скрипт. Пользовательский образ включает библиотеки для работы с S3 и библиотеку NumPy. Cкрипт выполняет действия с данными.

Описание работы Python-скрипта

  • читает CSV-таблицы;

  • подсчитывает среднюю сумму заказа;

  • добавляет колонку с разницей между суммой заказа и средней суммой заказа;

  • сохраняет результат в сводную CSV-таблицу.

Перед началом работы

  1. Зарегистрируйтесь в личном кабинете Cloud.ru.

    Если вы уже зарегистрированы, войдите под своей учетной записью.

  2. Создайте публичный SNAT-шлюз, чтобы обеспечить инстансу доступ в интернет и связь с внешними источниками.

  3. Создайте секрет в сервисе Secret Manager для доступа к Spark UI.

  4. Создайте бакет Object Storage, в котором будут храниться логи, таблицы и скрипт.

  5. Создайте реестр Artifact Registry, в котором будет храниться пользовательский образ Spark.

Создайте инстанс Spark

  1. Перейдите в раздел Evolution и выберите сервис Managed Spark.

  2. Нажмите Создать инстанс.

  3. В блоке Общие параметры укажите название инстанса, например spark-image.

  4. В блоке Конфигурация оставьте значения по умолчанию.

  5. В блоке Настройки:

    • Место хранения — выберите Object Storage.

    • Бакет — выберите ранее созданный бакет S3.

  6. В поле Группа логов выберите группу логов.

  7. Нажмите Продолжить.

  8. В блоке Сетевые настройки:

    • Подсеть — выберите подсеть для инстанса Spark.

    • Группа безопасности — выберите группу безопасности по умолчанию.

  9. В блоке Настройки доступа:

    • Подключить публичный хост — активируйте переключатель.

    • Логин — задайте логин для доступа к Spark.

    • Пароль — выберите секрет для доступа к Spark.

  10. Нажмите Создать.

Создание инстанса занимает около 15 минут. Пока создается инстанс, выполните шаги по подготовке структуры бакета Object Storage, данных и скрипта.

Подготовьте файл CSV

  1. Скачайте CSV-таблицы client-spark-image.csv и sales-spark-image.csv.

  2. В файловом менеджере Object Storage создайте папку input и загрузите CSV-таблицы.

Подготовьте скрипт

  1. Скопируйте скрипт и назовите файл script-spark-image.py.

    Python-скрипт

     1# s3a://your-bucket-name/jobs/script-spark-image.py
     2import numpy as np
     3import time
     4
     5from pyspark.sql import SparkSession
     6from pyspark.sql.types import FloatType
     7from pyspark.sql.functions import lit, udf
     8
     9bucket_name = 'your-bucket-name'
    10
    11spark = (SparkSession.builder
    12        .appName("sales")
    13        .getOrCreate()
    14        )
    15
    16# Read the source data from csv
    17df_sales = spark.read \
    18.format("csv") \
    19.option("header", "true") \
    20.option("inferSchema", "true") \
    21.option("delimiter", ";") \
    22.load(f"s3a://{bucket_name}/input/sales-spark-image.csv")
    23
    24df_client = spark.read \
    25.format("csv") \
    26.option("header", "true") \
    27.option("inferSchema", "true") \
    28.option("delimiter", ";") \
    29.load(f"s3a://{bucket_name}/input/client-spark-image.csv")
    30
    31# get average cost for all sales
    32np_arr = np.array(df_sales.select('sales').collect())
    33avg = np.average(np_arr)
    34print(f'Average cost: {avg}')
    35# define UDF
    36
    37@udf(returnType=FloatType())
    38def calc_diff_avg(avg, val):
    39    return val - avg
    40# Create result with sale price and diff between sale price and average price
    41
    42df_result = df_sales \
    43.join(df_client, df_sales.order_number ==  df_client.order_number,"inner") \
    44.select( \
    45    df_client.order_number, \
    46    df_client.order_date, \
    47    df_client.phone, \
    48    df_client.address_line1, \
    49    df_client.address_line2, \
    50    df_client.city, \
    51    df_client.state, \
    52    df_client.postal_code, \
    53    df_client.country, \
    54    df_client.territory, \
    55    df_client.contact_last_name, \
    56    df_client.contact_first_name, \
    57    df_client.deal_size, \
    58    df_client.car, \
    59    df_sales.sales, \
    60    calc_diff_avg(lit(avg), df_sales.sales).alias("diff_with_avg") \
    61)
    62
    63# Write the result to csv file
    64df_result.write.mode('overwrite').option("header","true").csv(f"s3a://{bucket_name}/output/sales")
    
  2. В девятой строке скрипта замените your-bucket-name на название бакета Object Storage.

  3. В файловом менеджере Object Storage создайте папку jobs и загрузите скрипт.

В результате должна получиться следующая структура:

  • <bucket>

    • input

      • sales-spark-image.csv

      • client-spark-image.csv

    • jobs

      • script-spark-image.py

Подготовьте образ в Artifact Registry

  1. Создайте образ в формате Docker.

    FROM apache/spark:3.5.0-scala2.12-java11-python3-ubuntu
    
    # add S3 libs
    RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar -o /opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar
    RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar -o /opt/spark/jars/hadoop-aws-3.3.4.jar
    
    ARG spark_uid=root
    USER ${spark_uid}
    
    # install compartible numpy version
    RUN pip install numpy==1.21.6
    
  2. В сервисе Artifact Registry:

    1. Создайте репозиторий.

    2. Загрузите образ.

Создайте задачу

Для продолжения работы убедитесь, что статус инстанса Spark изменился на «Готов».

  1. В списке инстансов Managed Spark откройте карточку инстанса «spark-image».

  2. Перейдите на вкладку Задачи.

  3. Нажмите Создать задачу.

  4. В блоке Общие параметры введите название задачи, например spark-image-sales.

  5. В блоке Образ:

    1. Выберите Пользовательский.

    2. Под полем URI образа нажмите Выбрать из реестра и выберите добавленный ранее образ.

  6. В блоке Скрипт приложения выберите Python.

  7. Укажите путь к запускаемой программе. В данном случае путь s3a://{bucket_name}/jobs/script-spark-image.py, где {bucket_name} — название созданного бакета Object Storage.

  8. Нажмите Создать.

Задача Spark начнет выполняться и отобразится на странице инстанса на вкладке Задачи.

Мониторинг выполнения задачи

Вы можете посмотреть логи задачи, когда задача находится в статусах «Выполняется» и «Готово», то есть как в процессе выполнения, так и по завершению задачи.

Перейдите к логам

  1. Откройте карточку инстанса.

  2. Во вкладке Задачи скопируйте ID задачи.

  3. Нажмите Горизонтальное меню и выберите Перейти к логам.

  4. В поле Запрос введите labels.spark_job_id="ID", где ID — идентификатор задачи, скопированный ранее.

  5. Нажмите Обновить.

В таблице отобразятся логи задачи. Нажмите на строку, чтобы развернуть запись.

Перейдите в Spark UI

  1. Откройте карточку инстанса.

  2. Во вкладке Задачи нажмите Spark UI.

    В соседней вкладке откроется интерфейс Spark UI.

  3. Вернитесь на карточку инстанса и откройте вкладку Информация.

  4. Скопируйте данные из блока Настройки доступа.

  5. Введите данные инстанса:

    • Username — значение поля Пользователь.

    • Password — значение секрета в поле Пароль.

В интерфейсе Spark UI вы найдете информацию о ходе выполнения задачи.

Проверьте результат

Когда задача перейдет в статус «Выполнено», откройте файловый менеджер Object Storage.

В бакете появится новая папка output. В ней хранится сводная таблица данных.

Запустили Evolution free tier
для Dev & Test
Получить