tocdepth

2

Использовать Jupyter Server со Spark

Spark — кластерная вычислительная платформа с открытым исходным кодом для распределенной пакетной и потоковой обработки данных. С помощью сервиса пользователи могут конфигурировать и создавать кластеры Spark, чтобы запускать на них соответствующие задач препроцессинга данных. Работа с кластером Spark описана в разделах ниже. Пример, иллюстрирующий препроцессинг данных на кластере Spark, размещен на GitHub.

SparkSession и SparkContext

SparkSession, начиная с версии Spark 2.0, является основной точкой входа в программирование Spark с использованием Dataframe и Dataset. В ранних версиях Spark в качестве точки входа в программирование наборов данных RDD и соединения с регионом использовался SparkContext.

Существует точка входа SparkContext в Spark версий 2.0 и выше, что обеспечивает обратную совместимость.

Точка входа SparkSession представляет собой комбинированный класс для всех различных контекстов, которые использовались отдельно до версии Spark 2.0, включая SparkContext. Создание экземпляра SparkSession с использованием SparkSession.builder необходимо, чтобы начать работу с RDD, DataFrame и Dataset.

Spark Session включает в себя все API, доступные в контекстах:

  • Spark Context

  • SQL Context

Загрузка данных из хранилища S3

Данные, препроцессинг которых нужно осуществить в среде Spark, могут располагаться в хранилище S3. Доступ к хранилищу S3 осуществляется по протоколу S3A. Для получения доступа задайте конфигурацию для объекта класса SparkContext в атрибуте _jsc при помощи метода hadoopConfiguration().set(). Адрес endpoint, access key и secret key задаются следующим образом:

sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "https://your_endpoint_name")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "your_access_key")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "your_secret_key")

После указания данных параметров можно обращаться к данным в объектном хранилище S3 по ссылке, включающей в себя название бакета S3. Например:

path = 's3a://your_bucket_name/path/dataset.csv'

После этого вы можете загрузить данные и создать датасет Spark следующим образом:

df = spark.read.format('csv').load(path)

Чтение данных из хранилища S3 Advanced (OBS)

Подробнее про OBS можно почитать в документации.

Предлагаем использовать следующий скрипт для чтения данных из бакета OBS:

from pyspark.sql import SparkSession
from pyspark.context import SparkContext


if __name__ == "__main__":
    # Create a SparkSession
    print("Start")

    ss = SparkSession.builder.master('local').getOrCreate()
    spark = ss.sparkContext
    # sc.setLogLevel("DEBUG");


    folder_path = "s3a://bucket/folder/"
    pattern_date_adv = "filename.parquet"


    config_spark_s3_adv = {
        'access_id': '',
        'access_key': '',
        'impl': 'org.apache.hadoop.fs.s3a.S3AFileSystem',
        'endpoint': 'https://obs.ru-moscow-1.hc.sbercloud.ru'
    }

    spark._jsc.hadoopConfiguration().set("fs.s3a.impl", config_spark_s3_adv["impl"])
    spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", config_spark_s3_adv["endpoint"])
    spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", config_spark_s3_adv["access_id"])
    spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", config_spark_s3_adv["access_key"])
    # spark._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
    spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")     # Check this option!

    dummy_adv_dataframe = spark.read.format('parquet').option("header", "true").load(folder_path + pattern_date_adv)
    print("The dataframe has this many rows:", dummy_adv_dataframe.count())

    # Stop the session
    spark.stop()

Установка дополнительных библиотек при работе с кластером Spark

Подробнее про установку дополнительных библиотек можно узнать в разделе Установка дополнительных библиотек в Jupyter Server.

Запуск препроцессинга данных на кластере Spark

Работа с датафреймами Spark ведется по принципу «ленивых вычислений». Вычисления производятся тогда, когда пользователь запрашивает их результат. Собрать можно с помощью функции collect(), просмотреть состояние датасета — с помощью функции show().

Например, для выполнения SQL-запроса к содержимому датафрейма требуется создать временное представление, затем запустить задание Spark на препроцессинг и собрать данные:

df.createOrReplaceTempView('temp_view')

result = spark.sql('''
    /* SQL-statement (FROM temp_view) */
''').collect()

Тарификация кластера Spark

Jupyter со Spark с CPU (окружение)
Вычислительные ресурсы
Jupyter со Spark с CPU (окружение) состоит из двух частей
  • Ресурсы Driver — cpu-ai-small (4 vCPU, 16 GB);

  • ресурсы Executor (2 единицы) — cpu-ai-middle (8 vCPU, 32 GB).

Время выполнения операций

11 минут 25 секунд (округляется до целого числа минут в большую сторону)

Расчет стоимости (в рублях с учетом НДС):

\((0,14+0,29 \times 2) \times 12=8,64\)

Где:

  • 0,14 — стоимость CPU-минуты с учетом НДС (по тарифу).

  • 0,29 — стоимость CPU-минуты с учетом НДС (по тарифу).

  • 2 — количество Executor.

  • 12 — количество минут, которые проработала задача.

Масштабирование ресурсов

В данный момент для кластеров Spark нет динамического масштабирования ресурсов. Это означает, что количество Executor-ов на протяжении всего времени существования кластера останется неизменным и будет соответствовать значению, заданному при создании кластера.

Запуск задачи обучения, используя Spark

Используйте образ cr.ai.cloud.ru/aicloud-base-images/spark-3.2.1:0.0.32.1 для запуска задачи обучения Spark. Подробнее Job.

Пример запуска задачи с использованием Spark

Пример будет использовать CSV-файл объемом 7.8 ГБ, собранный из данных, расположенных на сайте kaggle. Данные содержат записи с датчиков погоды.

  1. Скачайте пример Jupyter Notebook.

  2. Создайте Jupyter Server с доступом к кластеру Spark или подключитесь к уже существующему.

  3. Загрузите скачанный пример в NFS-хранилище запущенного Jupyter Server с доступом к Spark.

  4. Последовательно выполняйте блоки в запущенном Jupyter Server.

В результате будет осуществлена предварительная обработка данных, размер датасета уменьшится с 97 288 452 до 485 строк.

Запустили Evolution free tier
для Dev & Test
Получить