С помощью этого руководства вы научитесь обрабатывать данные, применяя пользовательский образ Spark. Вы примените пользовательский образ, включающий библиотеки для работы с Object Storage и библиотеку NumPy. Для обработки данных вы используете скрипт, который объединит информацию о заказах из двух таблиц в единую витрину данных, найдет среднюю стоимость заказа и подсчитает разницу с ней для каждого заказа.
Вы будете использовать следующие сервисы:
Managed Spark — сервис, который позволяет развернуть кластерное вычислительное решение на основе Apache Spark для распределенной обработки данных.
Object Storage — сервис для хранения данных любого типа и объема. Будет использоваться в качестве хранилища для скриптов.
Artifact Registry — сервис для хранения и распространения артефактов.
Шаги:
Перед началом работы
Зарегистрируйтесь в личном кабинете Cloud.ru. Если вы уже зарегистрированы, войдите под своей учетной записью.
Создайте бакет Object Storage, в котором будут храниться необходимые файлы и логи.
Создайте кластер Data Platform, в котором будет размещен инстанс.
Скачайте и установите root-сертификат на устройство.
Создайте пароль и добавьте его в Secret Manager. Этот секрет станет паролем для доступа к интерфейсу Managed Spark.
Создайте реестр Artifact Registry, в котором будет храниться пользовательский образ Managed Spark.
1. Подготовьте файлы с данными
На этом шаге вы загрузите в хранилище Object Storage файлы с данными для обработки.
Скачайте CSV-таблицы client-spark-image.csv и sales-spark-image.csv: нажмите Скачать в правом верхнем углу.
В ранее созданном бакете Object Storage создайте папку input.
Загрузите CSV-таблицы в папку input.
2. Подготовьте скрипт задачи
На этом шаге вы загрузите в хранилище Object Storage файл, содержащий скрипт для обработки данных из CSV-таблицы.
Скопируйте скрипт и назовите файл script-spark-image.py.
import numpy as npimport timefrom pyspark.sql import SparkSessionfrom pyspark.sql.types import FloatTypefrom pyspark.sql.functions import lit, udfbucket_name = 'your-bucket-name'spark = (SparkSession.builder.appName("sales").getOrCreate())# Read the source data from csvdf_sales = spark.read \.format("csv") \.option("header", "true") \.option("inferSchema", "true") \.option("delimiter", ";") \.load(f"s3a://{bucket_name}/input/sales-spark-image.csv")df_client = spark.read \.format("csv") \.option("header", "true") \.option("inferSchema", "true") \.option("delimiter", ";") \.load(f"s3a://{bucket_name}/input/client-spark-image.csv")# get average cost for all salesnp_arr = np.array(df_sales.select('sales').collect())avg = np.average(np_arr)print(f'Average cost: {avg}')# define UDF@udf(returnType=FloatType())def calc_diff_avg(avg, val):return val - avg# Create result with sale price and diff between sale price and average pricedf_result = df_sales \.join(df_client, df_sales.order_number == df_client.order_number,"inner") \.select( \df_client.order_number, \df_client.order_date, \df_client.phone, \df_client.address_line1, \df_client.address_line2, \df_client.city, \df_client.state, \df_client.postal_code, \df_client.country, \df_client.territory, \df_client.contact_last_name, \df_client.contact_first_name, \df_client.deal_size, \df_client.car, \df_sales.sales, \calc_diff_avg(lit(avg), df_sales.sales).alias("diff_with_avg") \)# Write the result to csv filedf_result.write.mode('overwrite').option("header","true").csv(f"s3a://{bucket_name}/output/sales")В строке bucket_name = 'your-bucket-name' замените your-bucket-name на название бакета Object Storage.
В ранее созданном бакете Object Storage создайте папку jobs.
Загрузите скрипт в папку jobs.
В результате получится следующая структура бакета с файлами:
<bucket>
input
sales-spark-image.csv
client-spark-image.csv
jobs
script-spark-image.py
3. Подготовьте образ в Artifact Registry
На этом шаге вы подготовите пользовательский образ Managed Spark и загрузите его в сервис Artifact Registry.
Создайте Dockerfile для сборки образа.
FROM apache/spark:3.5.0-scala2.12-java11-python3-ubuntu# add S3 libsRUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar -o /opt/spark/jars/aws-java-sdk-bundle-1.12.262.jarRUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar -o /opt/spark/jars/hadoop-aws-3.3.4.jarARG spark_uid=rootUSER ${spark_uid}# install compatible numpy versionRUN pip install numpy==1.21.6Чтобы собрать образ, выполните команду:
docker build . --tag <IMAGE-NAME>:<TAG> --platform linux/amd64Где:
<IMAGE-NAME> — имя образа.
<TAG> — тег образа.
Откройте сервис Artifact Registry.
4. Создайте задачу Managed Spark
На этом шаге вы запустите задачу Managed Spark с использованием подготовленного скрипта.
Для продолжения работы убедитесь, что статус инстанса Managed Spark изменился на «Готов».
Перейдите в сервис Managed Spark.
Откройте созданный ранее инстанс.
Перейдите на вкладку Задачи.
Нажмите Создать задачу.
В блоке Общие параметры введите название задачи, например spark-image-sales.
В блоке Образ:
Выберите Пользовательский.
Под полем URI образа нажмите Выбрать из реестра и выберите добавленный ранее образ.
В блоке Скрипт приложения:
В поле Тип запускаемой задачи выберите Python.
В поле Путь к запускаемому файлу укажите путь к файлу script-spark-image.py. В данном случае путь s3a://{bucket_name}/jobs/script-spark-image.py, где {bucket_name} — название созданного бакета Object Storage.
Нажмите Создать.
Задача Managed Spark начнет выполняться и отобразится на странице инстанса во вкладке Задачи.
5. Наблюдайте за ходом выполнения задачи
На этом шаге вы будете наблюдать за ходом выполнения задачи, просматривая информацию, поступающую в логи.
Вы можете посмотреть логи задачи, когда задача находится в статусах «Выполняется» и «Готово», то есть как в процессе выполнения, так и по завершению задачи.
Перейдите к логам
В строке задачи нажмите
и выберите Перейти к логам.
Используйте фильтр, чтобы найти логи, например, за определенное время.
Перейдите в Spark UI
Откройте инстанс Managed Spark.
Во вкладке Задачи нажмите Spark UI. В соседней вкладке откроется интерфейс Spark UI.
Вернитесь в инстанс и откройте вкладку Информация.
Скопируйте данные из блока Настройки доступа.
Введите данные инстанса:
Username — значение поля Пользователь.
Password — значение секрета в поле Пароль.
В интерфейсе Spark UI вы найдете информацию о ходе выполнения задачи.
Результат
Когда задача перейдет в статус «Выполнено», откройте файловый менеджер Object Storage. В бакете появится новая папка output, в которой будет храниться сводная таблица данных.
Вы применили пользовательский образ Managed Spark и скрипт для обработки данных и получили объединенную таблицу со всеми данными.
- Перед началом работы
- 1. Подготовьте файлы с данными
- 2. Подготовьте скрипт задачи
- 3. Подготовьте образ в Artifact Registry
- 4. Создайте задачу Managed Spark
- 5. Наблюдайте за ходом выполнения задачи
- Результат