- tocdepth
2
Примеры препроцессинга данных
Препроцессинг, или предварительная обработка данных, готовит их к анализу или обучению, повышает точность работы алгоритмов.
На GPU при помощи библиотеки RAPIDS
Используем образ cr.ai.cloud.ru/data-hub/jupyter-rapids-ai:latest
:
client_lib.Job(base_image='cr.ai.cloud.ru/data-hub/jupyter-rapids-ai:latest',
script='preprocessing.py',
n_workers=1, instance_type='your instance_type')
Для получения значения instance_type
воспользуйтесь инструкцией.
Пример 1. Использование одиночного GPU для препроцессинга
Использование GPU значительно ускоряет препроцессинг данных, ниже представлен график, показывающий во сколько раз 1 GPU быстрее 1 CPU по основным категориям обработки данных.
Запуск тестовой задачи:
client_lib.Job(base_image='cr.ai.cloud.ru/data-hub/jupyter-rapids-ai:latest',
script='preprocessing.py',
n_workers=1, instance_type='your instance_type')
Для получения значения instance_type
воспользуйтесь инструкцией.
Тестовый скрипт preprocessing.py:
import cudf
import pandas as pd
import time
from mpi4py import MPI
import cupy as cp
import numpy as np
import warnings
warnings.filterwarnings('ignore')
if __name__ == '__main__':
comm = MPI.COMM_WORLD
if comm.rank == 0:
rand1 = np.random.randint(low=0, high=int(1e7), size=int(3e7))
rand2 = np.random.random(size=int(3e7))
rand3 = np.random.random(size=int(3e7))
pdf = pd.DataFrame()
pdf['a'] = rand1
pdf['b'] = rand2
pdf['c'] = rand3
gpu_rand1 = cp.random.randint(low=0, high=int(1e7), size=int(3e7))
gpu_rand2 = cp.random.random(size=int(3e7))
gpu_rand3 = cp.random.random(size=int(3e7))
gdf = cudf.DataFrame()
gdf['a'] = gpu_rand1
gdf['b'] = gpu_rand2
gdf['c'] = gpu_rand3
del gpu_rand1, gpu_rand2, gpu_rand3, rand1, rand2, rand3
start_time = time.time()
pdf.groupby('a')
print(f'CPU groupby time = {round(time.time() - start_time, 3)} sec')
start_time = time.time()
gdf.groupby('a')
print(f'GPU groupby time = {round(time.time() - start_time, 3)} sec')
start_time = time.time()
pdf = pdf.merge(pdf, on=['a'])
print(f'CPU merge time = {round(time.time() - start_time, 3)} sec')
start_time = time.time()
gdf = gdf.merge(gdf, on=['a'])
print(f'GPU merge time = {round(time.time() - start_time, 3)} sec')
def my_udf(x):
return (x ** 2) - x
start_time = time.time()
pdf['d'] = pdf['a'].apply(my_udf)
print(f'CPU apply time = {round(time.time() - start_time, 3)} sec')
start_time = time.time()
gdf['d'] = gdf['a'].applymap(my_udf)
print(f'GPU apply time = {round(time.time() - start_time, 3)} sec')
pdf['d'] = pdf['a'].apply(my_udf)
print(f'CPU apply time = {round(time.time() - start_time, 3)} sec')
start_time = time.time()
gdf['d'] = gdf['a'].applymap(my_udf)
print(f'GPU apply time = {round(time.time() - start_time, 3)} sec')
Пример 2. Распределенный препроцессинг в рамках одного DGX
Для решения такого рода задач необходимо выставить количество процессов на один воркер (processes_per_worker
) и количество воркеров в 1.
Создание дополнительных процессов и воркеров по количеству GPU обеспечивает утилита LocalCudaCluster
из пакета dask_cuda
.
client_lib.Job(base_image='cr.ai.cloud.ru/data-hub/jupyter-rapids-ai:latest',
script='preprocessing.py',
n_workers=1, instance_type='your instance_type',
processes_per_worker=1)
Для получения значения instance_type
воспользуйтесь инструкцией.
Пример простого скрипта, формирующего синтетическую выборку на GPU и выполняющего расчеты распределенно, приведен ниже.
preprocessing.py
:
import cudf
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
import dask
import dask_cudf
import cupy as cp
import warnings
warnings.filterwarnings('ignore')
if __name__ == '__main__':
cluster = LocalCUDACluster()
client = Client(cluster)
def make_cudf_dataframe(nrows=int(1e8)):
cudf_df = cudf.DataFrame()
cudf_df['a'] = cp.random.randint(low=0, high=1000, size=nrows)
cudf_df['b'] = cp.random.randint(low=0, high=1000, size=nrows)
cudf_df['c'] = cp.random.random(nrows)
cudf_df['d'] = cp.random.random(nrows)
return cudf_df
delayed_cudf_dataframe = [dask.delayed(make_cudf_dataframe)() for i in range(20)]
ddf = dask_cudf.from_delayed(delayed_cudf_dataframe)
ddf.groupby(['a', 'b']).agg({'c': ['sum', 'mean']}).compute()
client.close()
cluster.close()
Пример 3. Распределенный препроцессинг
Для использования библиотек rapids в интерактивном режиме в рамках Jupyter Server необходимо использовать образ с библиотекой rapids из DataHub. С примерами использования можно ознакомиться как на сайте Rapids, так и на нашем Github.
для Dev & Test