- Начало работы с ML Space
- Инструкции
- Работа в Jupyter Server
- Создать Jupyter Server и подключиться к нему через интерфейс ML Space
- Создать и активировать окружение в запущенном Jupyter Server
- Подключиться к Jupyter Server по SSH из локальной IDE или терминала
- Установить и обновить библиотеки в созданном Jupyter Server
- Собрать и использовать кастомный Docker-образ для Jupyter Server
- Использовать Jupyter Server со Spark
- Остановить или удалить Jupyter Server
- Использовать GitLab CI при работе с Environments
- Тарификация
- Термины и сокращения
- Обратиться в поддержку
Использовать Jupyter Server со Spark
Spark — кластерная вычислительная платформа с открытым исходным кодом для распределенной пакетной и потоковой обработки данных. С помощью сервиса пользователи могут конфигурировать и создавать кластеры Spark, чтобы запускать на них соответствующие задач препроцессинга данных. Работа с кластером Spark описана в разделах ниже. Пример, иллюстрирующий препроцессинг данных на кластере Spark, размещен на GitHub.
SparkSession и SparkContext
SparkSession, начиная с версии Spark 2.0, является основной точкой входа в программирование Spark с использованием Dataframe и Dataset. В ранних версиях Spark в качестве точки входа в программирование наборов данных RDD и соединения с регионом использовался SparkContext.
Существует точка входа SparkContext в Spark версий 2.0 и выше, что обеспечивает обратную совместимость.
Точка входа SparkSession представляет собой комбинированный класс для всех различных контекстов, которые использовались отдельно до версии Spark 2.0, включая SparkContext. Создание экземпляра SparkSession с использованием SparkSession.builder необходимо, чтобы начать работу с RDD, DataFrame и Dataset.
Spark Session включает в себя все API, доступные в контекстах:
Spark Context
SQL Context
Загрузка данных из хранилища S3
Данные, препроцессинг которых нужно осуществить в среде Spark, могут располагаться в хранилище S3. Доступ к хранилищу S3 осуществляется по протоколу S3A. Для получения доступа задайте конфигурацию для объекта класса SparkContext в атрибуте _jsc при помощи метода hadoopConfiguration().set(). Адрес endpoint, access key и secret key задаются следующим образом:
sc . _jsc . hadoopConfiguration () . set ( "fs.s3.impl" , "org.apache.hadoop.fs.s3a.S3AFileSystem" )sc . _jsc . hadoopConfiguration () . set ( "fs.s3a.endpoint" , "https://your_endpoint_name" )sc . _jsc . hadoopConfiguration () . set ( "fs.s3a.access.key" , "your_access_key" )sc . _jsc . hadoopConfiguration () . set ( "fs.s3a.secret.key" , "your_secret_key" )
После указания данных параметров можно обращаться к данным в объектном хранилище S3 по ссылке, включающей в себя название бакета S3. Например:
path = 's3a://your_bucket_name/path/dataset.csv'
После этого вы можете загрузить данные и создать датасет Spark следующим образом:
df = spark . read . format ( 'csv' ) . load ( path )
Чтение данных из хранилища S3 Advanced (OBS)
Подробнее про OBS можно почитать в документации.
Предлагаем использовать следующий скрипт для чтения данных из бакета OBS:
from pyspark.sql import SparkSessionfrom pyspark.context import SparkContextif __name__ == "__main__" :# Create a SparkSessionprint ( "Start" )ss = SparkSession . builder . master ( 'local' ) . getOrCreate ()spark = ss . sparkContext# sc.setLogLevel("DEBUG");folder_path = "s3a://bucket/folder/"pattern_date_adv = "filename.parquet"config_spark_s3_adv = {'access_id' : '' ,'access_key' : '' ,'impl' : 'org.apache.hadoop.fs.s3a.S3AFileSystem' ,'endpoint' : 'https://obs.ru-moscow-1.hc.sbercloud.ru'}spark . _jsc . hadoopConfiguration () . set ( "fs.s3a.impl" , config_spark_s3_adv [ "impl" ])spark . _jsc . hadoopConfiguration () . set ( "fs.s3a.endpoint" , config_spark_s3_adv [ "endpoint" ])spark . _jsc . hadoopConfiguration () . set ( "fs.s3a.access.key" , config_spark_s3_adv [ "access_id" ])spark . _jsc . hadoopConfiguration () . set ( "fs.s3a.secret.key" , config_spark_s3_adv [ "access_key" ])# spark._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")spark . _jsc . hadoopConfiguration () . set ( "fs.s3a.path.style.access" , "true" ) # Check this option!dummy_adv_dataframe = spark . read . format ( 'parquet' ) . option ( "header" , "true" ) . load ( folder_path + pattern_date_adv )print ( "The dataframe has this many rows:" , dummy_adv_dataframe . count ())# Stop the sessionspark . stop ()
Установка дополнительных библиотек при работе с кластером Spark
Подробнее про установку дополнительных библиотек можно узнать в разделе Установка дополнительных библиотек в Jupyter Server.
Запуск препроцессинга данных на кластере Spark
Работа с датафреймами Spark ведется по принципу «ленивых вычислений». Вычисления производятся тогда, когда пользователь запрашивает их результат. Собрать можно с помощью функции collect(), просмотреть состояние датасета — с помощью функции show().
Например, для выполнения SQL-запроса к содержимому датафрейма требуется создать временное представление, затем запустить задание Spark на препроцессинг и собрать данные:
df . createOrReplaceTempView ( 'temp_view' )result = spark . sql ( '''/* SQL-statement (FROM temp_view) */''' ) . collect ()
Тарификация кластера Spark
- Jupyter со Spark с CPU (окружение)
- Вычислительные ресурсы
- Jupyter со Spark с CPU (окружение) состоит из двух частей
Ресурсы Driver — cpu-ai-small (4 vCPU, 16 GB);
ресурсы Executor (2 единицы) — cpu-ai-middle (8 vCPU, 32 GB).
- Время выполнения операций
- Расчет стоимости (в рублях с учетом НДС):
0,14 — стоимость CPU-минуты с учетом НДС (по тарифу).
0,29 — стоимость CPU-минуты с учетом НДС (по тарифу).
2 — количество Executor.
12 — количество минут, которые проработала задача.
11 минут 25 секунд (округляется до целого числа минут в большую сторону)
Где:
Масштабирование ресурсов
В данный момент для кластеров Spark нет динамического масштабирования ресурсов. Это означает, что количество Executor-ов на протяжении всего времени существования кластера останется неизменным и будет соответствовать значению, заданному при создании кластера.
Запуск задачи обучения, используя Spark
Используйте образ cr.ai.cloud.ru/aicloud-base-images/spark-3.2.1:0.0.32.1 для запуска задачи обучения Spark. Подробнее Job.
Пример запуска задачи с использованием Spark
Пример будет использовать CSV-файл объемом 7.8 ГБ, собранный из данных, расположенных на сайте kaggle. Данные содержат записи с датчиков погоды.
Создайте Jupyter Server с доступом к кластеру Spark или подключитесь к уже существующему.
Загрузите скачанный пример в NFS-хранилище запущенного Jupyter Server с доступом к Spark.
Последовательно выполняйте блоки в запущенном Jupyter Server.
В результате будет осуществлена предварительная обработка данных, размер датасета уменьшится с 97 288 452 до 485 строк.
- SparkSession и SparkContext
- Загрузка данных из хранилища S3
- Чтение данных из хранилища S3 Advanced (OBS)
- Установка дополнительных библиотек при работе с кластером Spark
- Запуск препроцессинга данных на кластере Spark
- Тарификация кластера Spark
- Масштабирование ресурсов
- Запуск задачи обучения, используя Spark
- Пример запуска задачи с использованием Spark