Примеры препроцессинга данных

Препроцессинг, или предварительная обработка данных, готовит их к анализу или обучению, повышает точность работы алгоритмов.

На GPU при помощи библиотеки RAPIDS

Используем образ cr.ai.cloud.ru/data-hub/jupyter-rapids-ai:latest:

client_lib.Job(base_image='cr.ai.cloud.ru/data-hub/jupyter-rapids-ai:latest',
                    script='preprocessing.py',
                    n_workers=1, instance_type='your instance_type')

Для получения значения instance_type воспользуйтесь инструкцией.

Пример 1. Использование одиночного GPU для препроцессинга

Использование GPU значительно ускоряет препроцессинг данных, ниже представлен график, показывающий во сколько раз 1 GPU быстрее 1 CPU по основным категориям обработки данных.

../../../_images/s__gpu-vs-cpu.jpg

Запуск тестовой задачи:

client_lib.Job(base_image='cr.ai.cloud.ru/data-hub/jupyter-rapids-ai:latest',
                    script='preprocessing.py',
                    n_workers=1, instance_type='your instance_type')

Для получения значения instance_type воспользуйтесь инструкцией.

Тестовый скрипт preprocessing.py:

import cudf
import pandas as pd
import time
from mpi4py import MPI
import cupy as cp
import numpy as np
import warnings

warnings.filterwarnings('ignore')

if __name__ == '__main__':
    comm = MPI.COMM_WORLD
    if comm.rank == 0:
        rand1 = np.random.randint(low=0, high=int(1e7), size=int(3e7))
        rand2 = np.random.random(size=int(3e7))
        rand3 = np.random.random(size=int(3e7))
        pdf = pd.DataFrame()
        pdf['a'] = rand1
        pdf['b'] = rand2
        pdf['c'] = rand3

        gpu_rand1 = cp.random.randint(low=0, high=int(1e7), size=int(3e7))
        gpu_rand2 = cp.random.random(size=int(3e7))
        gpu_rand3 = cp.random.random(size=int(3e7))

        gdf = cudf.DataFrame()
        gdf['a'] = gpu_rand1
        gdf['b'] = gpu_rand2
        gdf['c'] = gpu_rand3

        del gpu_rand1, gpu_rand2, gpu_rand3, rand1, rand2, rand3

        start_time = time.time()
        pdf.groupby('a')
        print(f'CPU groupby time = {round(time.time() - start_time, 3)} sec')

        start_time = time.time()
        gdf.groupby('a')
        print(f'GPU groupby time = {round(time.time() - start_time, 3)} sec')

        start_time = time.time()
        pdf = pdf.merge(pdf, on=['a'])
        print(f'CPU merge time = {round(time.time() - start_time, 3)} sec')

        start_time = time.time()
        gdf = gdf.merge(gdf, on=['a'])
        print(f'GPU merge time = {round(time.time() - start_time, 3)} sec')

        def my_udf(x):
            return (x ** 2) - x

        start_time = time.time()
        pdf['d'] = pdf['a'].apply(my_udf)
        print(f'CPU apply time = {round(time.time() - start_time, 3)} sec')

        start_time = time.time()
        gdf['d'] = gdf['a'].applymap(my_udf)
        print(f'GPU apply time = {round(time.time() - start_time, 3)} sec')

        pdf['d'] = pdf['a'].apply(my_udf)
        print(f'CPU apply time = {round(time.time() - start_time, 3)} sec')

        start_time = time.time()
        gdf['d'] = gdf['a'].applymap(my_udf)
        print(f'GPU apply time = {round(time.time() - start_time, 3)} sec')

Пример 2. Распределенный препроцессинг в рамках одного DGX

Для решения такого рода задач необходимо выставить количество процессов на один воркер (processes_per_worker) и количество воркеров в 1. Создание дополнительных процессов и воркеров по количеству GPU обеспечивает утилита LocalCudaCluster из пакета dask_cuda.

client_lib.Job(base_image='cr.ai.cloud.ru/data-hub/jupyter-rapids-ai:latest',
                    script='preprocessing.py',
                    n_workers=1, instance_type='your instance_type',
                    processes_per_worker=1)

Для получения значения instance_type воспользуйтесь инструкцией.

Пример простого скрипта, формирующего синтетическую выборку на GPU и выполняющего расчеты распределенно, приведен ниже.

preprocessing.py:

import cudf
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
import dask
import dask_cudf
import cupy as cp
import warnings

warnings.filterwarnings('ignore')

if __name__ == '__main__':
    cluster = LocalCUDACluster()
    client = Client(cluster)

    def make_cudf_dataframe(nrows=int(1e8)):
        cudf_df = cudf.DataFrame()
        cudf_df['a'] = cp.random.randint(low=0, high=1000, size=nrows)
        cudf_df['b'] = cp.random.randint(low=0, high=1000, size=nrows)
        cudf_df['c'] = cp.random.random(nrows)
        cudf_df['d'] = cp.random.random(nrows)
        return cudf_df

    delayed_cudf_dataframe = [dask.delayed(make_cudf_dataframe)() for i in range(20)]
    ddf = dask_cudf.from_delayed(delayed_cudf_dataframe)
    ddf.groupby(['a', 'b']).agg({'c': ['sum', 'mean']}).compute()
    client.close()
    cluster.close()

Пример 3. Распределенный препроцессинг

Для использования библиотек rapids в интерактивном режиме в рамках Jupyter Server необходимо использовать образ с библиотекой rapids из DataHub. С примерами использования можно ознакомиться как на сайте Rapids, так и на нашем Github.

Запустили Evolution free tier
для Dev & Test
Получить