- tocdepth
2
Использовать Jupyter Server со Spark
Spark — кластерная вычислительная платформа с открытым исходным кодом для распределенной пакетной и потоковой обработки данных. С помощью сервиса пользователи могут конфигурировать и создавать кластеры Spark, чтобы запускать на них соответствующие задач препроцессинга данных. Работа с кластером Spark описана в разделах ниже. Пример, иллюстрирующий препроцессинг данных на кластере Spark, размещен на GitHub.
SparkSession и SparkContext
SparkSession, начиная с версии Spark 2.0, является основной точкой входа в программирование Spark с использованием Dataframe и Dataset. В ранних версиях Spark в качестве точки входа в программирование наборов данных RDD и соединения с регионом использовался SparkContext.
Существует точка входа SparkContext в Spark версий 2.0 и выше, что обеспечивает обратную совместимость.
Точка входа SparkSession представляет собой комбинированный класс для всех различных контекстов, которые использовались отдельно до версии Spark 2.0, включая SparkContext. Создание экземпляра SparkSession с использованием SparkSession.builder необходимо, чтобы начать работу с RDD, DataFrame и Dataset.
Spark Session включает в себя все API, доступные в контекстах:
Spark Context
SQL Context
Загрузка данных из хранилища S3
Данные, препроцессинг которых нужно осуществить в среде Spark, могут располагаться в хранилище S3.
Доступ к хранилищу S3 осуществляется по протоколу S3A.
Для получения доступа задайте конфигурацию для объекта класса SparkContext в атрибуте _jsc
при помощи метода hadoopConfiguration().set()
.
Адрес endpoint, access key и secret key задаются следующим образом:
sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "https://your_endpoint_name")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "your_access_key")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "your_secret_key")
После указания данных параметров можно обращаться к данным в объектном хранилище S3 по ссылке, включающей в себя название бакета S3. Например:
path = 's3a://your_bucket_name/path/dataset.csv'
После этого вы можете загрузить данные и создать датасет Spark следующим образом:
df = spark.read.format('csv').load(path)
Чтение данных из хранилища S3 Advanced (OBS)
Подробнее про OBS можно почитать в документации.
Предлагаем использовать следующий скрипт для чтения данных из бакета OBS:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
if __name__ == "__main__":
# Create a SparkSession
print("Start")
ss = SparkSession.builder.master('local').getOrCreate()
spark = ss.sparkContext
# sc.setLogLevel("DEBUG");
folder_path = "s3a://bucket/folder/"
pattern_date_adv = "filename.parquet"
config_spark_s3_adv = {
'access_id': '',
'access_key': '',
'impl': 'org.apache.hadoop.fs.s3a.S3AFileSystem',
'endpoint': 'https://obs.ru-moscow-1.hc.sbercloud.ru'
}
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", config_spark_s3_adv["impl"])
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", config_spark_s3_adv["endpoint"])
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", config_spark_s3_adv["access_id"])
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", config_spark_s3_adv["access_key"])
# spark._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true") # Check this option!
dummy_adv_dataframe = spark.read.format('parquet').option("header", "true").load(folder_path + pattern_date_adv)
print("The dataframe has this many rows:", dummy_adv_dataframe.count())
# Stop the session
spark.stop()
Установка дополнительных библиотек при работе с кластером Spark
Подробнее про установку дополнительных библиотек можно узнать в разделе Установка дополнительных библиотек в Jupyter Server.
Запуск препроцессинга данных на кластере Spark
Работа с датафреймами Spark ведется по принципу «ленивых вычислений».
Вычисления производятся тогда, когда пользователь запрашивает их результат.
Собрать можно с помощью функции collect()
, просмотреть состояние датасета — с помощью функции show()
.
Например, для выполнения SQL-запроса к содержимому датафрейма требуется создать временное представление, затем запустить задание Spark на препроцессинг и собрать данные:
df.createOrReplaceTempView('temp_view')
result = spark.sql('''
/* SQL-statement (FROM temp_view) */
''').collect()
Тарификация кластера Spark
- Jupyter со Spark с CPU (окружение)
- Вычислительные ресурсы
- Jupyter со Spark с CPU (окружение) состоит из двух частей
Ресурсы Driver — cpu-ai-small (4 vCPU, 16 GB);
ресурсы Executor (2 единицы) — cpu-ai-middle (8 vCPU, 32 GB).
- Время выполнения операций
11 минут 25 секунд (округляется до целого числа минут в большую сторону)
- Расчет стоимости (в рублях с учетом НДС):
\((0,14+0,29 \times 2) \times 12=8,64\)
Где:
0,14 — стоимость CPU-минуты с учетом НДС (по тарифу).
0,29 — стоимость CPU-минуты с учетом НДС (по тарифу).
2 — количество Executor.
12 — количество минут, которые проработала задача.
Масштабирование ресурсов
В данный момент для кластеров Spark нет динамического масштабирования ресурсов. Это означает, что количество Executor-ов на протяжении всего времени существования кластера останется неизменным и будет соответствовать значению, заданному при создании кластера.
Запуск задачи обучения, используя Spark
Используйте образ cr.ai.cloud.ru/aicloud-base-images/spark-3.2.1:0.0.32.1
для запуска задачи обучения Spark.
Подробнее Job.
Пример запуска задачи с использованием Spark
Пример будет использовать CSV-файл объемом 7.8 ГБ, собранный из данных, расположенных на сайте kaggle. Данные содержат записи с датчиков погоды.
Создайте Jupyter Server с доступом к кластеру Spark или подключитесь к уже существующему.
Загрузите скачанный пример в NFS-хранилище запущенного Jupyter Server с доступом к Spark.
Последовательно выполняйте блоки в запущенном Jupyter Server.
В результате будет осуществлена предварительная обработка данных, размер датасета уменьшится с 97 288 452 до 485 строк.
См.также
для Dev & Test