- tocdepth
2
Работа с пользовательским образом
В этой лабораторной работе вы обработаете данные с помощью пользовательского образа Spark.
Постановка задачи
Построить витрину данных, объединяющую информацию о заказах из двух таблиц.
Найти среднюю стоимость заказа.
Подсчитать расхождение суммы каждого заказа со средней стоимостью заказа.
Чтобы выполнить задачу, необходимо подготовить пользовательский образ и Python-скрипт. Пользовательский образ включает библиотеки для работы с S3 и библиотеку NumPy. Cкрипт выполняет действия с данными.
Описание работы Python-скрипта
читает CSV-таблицы;
подсчитывает среднюю сумму заказа;
добавляет колонку с разницей между суммой заказа и средней суммой заказа;
сохраняет результат в сводную CSV-таблицу.
Перед началом работы
Зарегистрируйтесь в личном кабинете Cloud.ru.
Если вы уже зарегистрированы, войдите под своей учетной записью.
Создайте публичный SNAT-шлюз, чтобы обеспечить инстансу доступ в интернет и связь с внешними источниками.
Создайте секрет в сервисе Secret Manager для доступа к Spark UI.
Создайте бакет Object Storage, в котором будут храниться логи, таблицы и скрипт.
Создайте реестр Artifact Registry, в котором будет храниться пользовательский образ Spark.
Создайте инстанс Spark
Перейдите в раздел Evolution и выберите сервис Managed Spark.
Нажмите Создать инстанс.
В блоке Общие параметры укажите название инстанса, например
spark-image
.В блоке Конфигурация оставьте значения по умолчанию.
В блоке Настройки:
Место хранения — выберите Object Storage.
Бакет — выберите ранее созданный бакет S3.
В поле Группа логов выберите группу логов.
Нажмите Продолжить.
В блоке Сетевые настройки:
Подсеть — выберите подсеть для инстанса Spark.
Группа безопасности — выберите группу безопасности по умолчанию.
В блоке Настройки доступа:
Подключить публичный хост — активируйте переключатель.
Логин — задайте логин для доступа к Spark.
Пароль — выберите секрет для доступа к Spark.
Нажмите Создать.
Создание инстанса занимает около 15 минут. Пока создается инстанс, выполните шаги по подготовке структуры бакета Object Storage, данных и скрипта.
Подготовьте файл CSV
Скачайте CSV-таблицы client-spark-image.csv и sales-spark-image.csv.
В файловом менеджере Object Storage создайте папку input и загрузите CSV-таблицы.
Подготовьте скрипт
Скопируйте скрипт и назовите файл
script-spark-image.py
.Python-скрипт
1# s3a://your-bucket-name/jobs/script-spark-image.py 2import numpy as np 3import time 4 5from pyspark.sql import SparkSession 6from pyspark.sql.types import FloatType 7from pyspark.sql.functions import lit, udf 8 9bucket_name = 'your-bucket-name' 10 11spark = (SparkSession.builder 12 .appName("sales") 13 .getOrCreate() 14 ) 15 16# Read the source data from csv 17df_sales = spark.read \ 18.format("csv") \ 19.option("header", "true") \ 20.option("inferSchema", "true") \ 21.option("delimiter", ";") \ 22.load(f"s3a://{bucket_name}/input/sales-spark-image.csv") 23 24df_client = spark.read \ 25.format("csv") \ 26.option("header", "true") \ 27.option("inferSchema", "true") \ 28.option("delimiter", ";") \ 29.load(f"s3a://{bucket_name}/input/client-spark-image.csv") 30 31# get average cost for all sales 32np_arr = np.array(df_sales.select('sales').collect()) 33avg = np.average(np_arr) 34print(f'Average cost: {avg}') 35# define UDF 36 37@udf(returnType=FloatType()) 38def calc_diff_avg(avg, val): 39 return val - avg 40# Create result with sale price and diff between sale price and average price 41 42df_result = df_sales \ 43.join(df_client, df_sales.order_number == df_client.order_number,"inner") \ 44.select( \ 45 df_client.order_number, \ 46 df_client.order_date, \ 47 df_client.phone, \ 48 df_client.address_line1, \ 49 df_client.address_line2, \ 50 df_client.city, \ 51 df_client.state, \ 52 df_client.postal_code, \ 53 df_client.country, \ 54 df_client.territory, \ 55 df_client.contact_last_name, \ 56 df_client.contact_first_name, \ 57 df_client.deal_size, \ 58 df_client.car, \ 59 df_sales.sales, \ 60 calc_diff_avg(lit(avg), df_sales.sales).alias("diff_with_avg") \ 61) 62 63# Write the result to csv file 64df_result.write.mode('overwrite').option("header","true").csv(f"s3a://{bucket_name}/output/sales")
В девятой строке скрипта замените
your-bucket-name
на название бакета Object Storage.В файловом менеджере Object Storage создайте папку jobs и загрузите скрипт.
В результате должна получиться следующая структура:
<bucket>
input
sales-spark-image.csv
client-spark-image.csv
jobs
script-spark-image.py
Подготовьте образ в Artifact Registry
Создайте образ в формате Docker.
FROM apache/spark:3.5.0-scala2.12-java11-python3-ubuntu # add S3 libs RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar -o /opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar -o /opt/spark/jars/hadoop-aws-3.3.4.jar ARG spark_uid=root USER ${spark_uid} # install compartible numpy version RUN pip install numpy==1.21.6
В сервисе Artifact Registry:
Создайте задачу
Для продолжения работы убедитесь, что статус инстанса Spark изменился на «Готов».
В списке инстансов Managed Spark откройте карточку инстанса «spark-image».
Перейдите на вкладку Задачи.
Нажмите Создать задачу.
В блоке Общие параметры введите название задачи, например
spark-image-sales
.В блоке Образ:
Выберите Пользовательский.
Под полем URI образа нажмите Выбрать из реестра и выберите добавленный ранее образ.
В блоке Скрипт приложения выберите Python.
Укажите путь к запускаемой программе. В данном случае путь
s3a://{bucket_name}/jobs/script-spark-image.py
, где{bucket_name}
— название созданного бакета Object Storage.Нажмите Создать.
Задача Spark начнет выполняться и отобразится на странице инстанса на вкладке Задачи.
Мониторинг выполнения задачи
Вы можете посмотреть логи задачи, когда задача находится в статусах «Выполняется» и «Готово», то есть как в процессе выполнения, так и по завершению задачи.
Перейдите к логам
Откройте карточку инстанса.
Во вкладке Задачи скопируйте ID задачи.
Нажмите и выберите Перейти к логам.
В поле Запрос введите
labels.spark_job_id="ID"
, где ID — идентификатор задачи, скопированный ранее.Нажмите Обновить.
В таблице отобразятся логи задачи. Нажмите на строку, чтобы развернуть запись.
Перейдите в Spark UI
Откройте карточку инстанса.
Во вкладке Задачи нажмите Spark UI.
В соседней вкладке откроется интерфейс Spark UI.
Вернитесь на карточку инстанса и откройте вкладку Информация.
Скопируйте данные из блока Настройки доступа.
Введите данные инстанса:
Username — значение поля Пользователь.
Password — значение секрета в поле Пароль.
В интерфейсе Spark UI вы найдете информацию о ходе выполнения задачи.
Проверьте результат
Когда задача перейдет в статус «Выполнено», откройте файловый менеджер Object Storage.
В бакете появится новая папка output
.
В ней хранится сводная таблица данных.
для Dev & Test