Использовать Jupyter Server со Spark

Spark — кластерная вычислительная платформа с открытым исходным кодом для распределенной пакетной и потоковой обработки данных. С помощью сервиса пользователи могут конфигурировать и создавать кластеры Spark, чтобы запускать на них соответствующие задач препроцессинга данных. Работа с кластером Spark описана в разделах ниже. Пример, иллюстрирующий препроцессинг данных на кластере Spark, размещен на GitHub.

SparkSession и SparkContext

SparkSession, начиная с версии Spark 2.0, является основной точкой входа в программирование Spark с использованием Dataframe и Dataset. В ранних версиях Spark в качестве точки входа в программирование наборов данных RDD и соединения с регионом использовался SparkContext.

Существует точка входа SparkContext в Spark версий 2.0 и выше, что обеспечивает обратную совместимость.

Точка входа SparkSession представляет собой комбинированный класс для всех различных контекстов, которые использовались отдельно до версии Spark 2.0, включая SparkContext. Создание экземпляра SparkSession с использованием SparkSession.builder необходимо, чтобы начать работу с RDD, DataFrame и Dataset.

Spark Session включает в себя все API, доступные в контекстах:

  • Spark Context

  • SQL Context

Загрузка данных из хранилища S3

Данные, препроцессинг которых нужно осуществить в среде Spark, могут располагаться в хранилище S3. Доступ к хранилищу S3 осуществляется по протоколу S3A. Для получения доступа задайте конфигурацию для объекта класса SparkContext в атрибуте _jsc при помощи метода hadoopConfiguration().set(). Адрес endpoint, access key и secret key задаются следующим образом:

sc . _jsc . hadoopConfiguration () . set ( "fs.s3.impl" , "org.apache.hadoop.fs.s3a.S3AFileSystem" )
sc . _jsc . hadoopConfiguration () . set ( "fs.s3a.endpoint" , "https://your_endpoint_name" )
sc . _jsc . hadoopConfiguration () . set ( "fs.s3a.access.key" , "your_access_key" )
sc . _jsc . hadoopConfiguration () . set ( "fs.s3a.secret.key" , "your_secret_key" )

После указания данных параметров можно обращаться к данным в объектном хранилище S3 по ссылке, включающей в себя название бакета S3. Например:

path = 's3a://your_bucket_name/path/dataset.csv'

После этого вы можете загрузить данные и создать датасет Spark следующим образом:

df = spark . read . format ( 'csv' ) . load ( path )

Чтение данных из хранилища S3 Advanced (OBS)

Подробнее про OBS можно почитать в документации.

Предлагаем использовать следующий скрипт для чтения данных из бакета OBS:

from pyspark.sql import SparkSession
from pyspark.context import SparkContext
if __name__ == "__main__" :
# Create a SparkSession
print ( "Start" )
ss = SparkSession . builder . master ( 'local' ) . getOrCreate ()
spark = ss . sparkContext
# sc.setLogLevel("DEBUG");
folder_path = "s3a://bucket/folder/"
pattern_date_adv = "filename.parquet"
config_spark_s3_adv = {
'access_id' : '' ,
'access_key' : '' ,
'impl' : 'org.apache.hadoop.fs.s3a.S3AFileSystem' ,
'endpoint' : 'https://obs.ru-moscow-1.hc.sbercloud.ru'
}
spark . _jsc . hadoopConfiguration () . set ( "fs.s3a.impl" , config_spark_s3_adv [ "impl" ])
spark . _jsc . hadoopConfiguration () . set ( "fs.s3a.endpoint" , config_spark_s3_adv [ "endpoint" ])
spark . _jsc . hadoopConfiguration () . set ( "fs.s3a.access.key" , config_spark_s3_adv [ "access_id" ])
spark . _jsc . hadoopConfiguration () . set ( "fs.s3a.secret.key" , config_spark_s3_adv [ "access_key" ])
# spark._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
spark . _jsc . hadoopConfiguration () . set ( "fs.s3a.path.style.access" , "true" ) # Check this option!
dummy_adv_dataframe = spark . read . format ( 'parquet' ) . option ( "header" , "true" ) . load ( folder_path + pattern_date_adv )
print ( "The dataframe has this many rows:" , dummy_adv_dataframe . count ())
# Stop the session
spark . stop ()

Установка дополнительных библиотек при работе с кластером Spark

Подробнее про установку дополнительных библиотек можно узнать в разделе Установка дополнительных библиотек в Jupyter Server.

Запуск препроцессинга данных на кластере Spark

Работа с датафреймами Spark ведется по принципу «ленивых вычислений». Вычисления производятся тогда, когда пользователь запрашивает их результат. Собрать можно с помощью функции collect(), просмотреть состояние датасета — с помощью функции show().

Например, для выполнения SQL-запроса к содержимому датафрейма требуется создать временное представление, затем запустить задание Spark на препроцессинг и собрать данные:

df . createOrReplaceTempView ( 'temp_view' )
result = spark . sql ( '''
/* SQL-statement (FROM temp_view) */
''' ) . collect ()

Тарификация кластера Spark

Jupyter со Spark с CPU (окружение)
Вычислительные ресурсы
Jupyter со Spark с CPU (окружение) состоит из двух частей
  • Ресурсы Driver — cpu-ai-small (4 vCPU, 16 GB);

  • ресурсы Executor (2 единицы) — cpu-ai-middle (8 vCPU, 32 GB).

Время выполнения операций

11 минут 25 секунд (округляется до целого числа минут в большую сторону)

Расчет стоимости (в рублях с учетом НДС):

\((0,14+0,29 \times 2) \times 12=8,64\)

Где:

  • 0,14 — стоимость CPU-минуты с учетом НДС (по тарифу).

  • 0,29 — стоимость CPU-минуты с учетом НДС (по тарифу).

  • 2 — количество Executor.

  • 12 — количество минут, которые проработала задача.

Масштабирование ресурсов

В данный момент для кластеров Spark нет динамического масштабирования ресурсов. Это означает, что количество Executor-ов на протяжении всего времени существования кластера останется неизменным и будет соответствовать значению, заданному при создании кластера.

Запуск задачи обучения, используя Spark

Используйте образ cr.ai.cloud.ru/aicloud-base-images/spark-3.2.1:0.0.32.1 для запуска задачи обучения Spark. Подробнее Job.

Пример запуска задачи с использованием Spark

Пример будет использовать CSV-файл объемом 7.8 ГБ, собранный из данных, расположенных на сайте kaggle. Данные содержат записи с датчиков погоды.

  1. Скачайте пример Jupyter Notebook.

  2. Создайте Jupyter Server с доступом к кластеру Spark или подключитесь к уже существующему.

  3. Загрузите скачанный пример в NFS-хранилище запущенного Jupyter Server с доступом к Spark.

  4. Последовательно выполняйте блоки в запущенном Jupyter Server.

В результате будет осуществлена предварительная обработка данных, размер датасета уменьшится с 97 288 452 до 485 строк.

ML Space