Пример использования PyTorch DDP

Разберем предоставленный код подробно, объясняя каждый блок и его роль в процессе распределенного обучения с использованием PyTorch Distributed Data Parallel (DDP).

Код можно разделить на несколько основных частей:

  • настройка среды;

  • инициализация процесса обучения;

  • основная функция обучения;

  • завершение работы.

Основные понятия

  • Node Rank (ранг узла) — идентификатор узла в кластере.

    В контексте DDP каждый узел обработки (например, каждый GPU) имеет свой уникальный ранг в глобальном пространстве всех узлов. Ранг можно получить с помощью dist.get_rank().

  • World Size (размер мира) — общее количество узлов или процессов в распределенной системе.

    Это число указывает на общее количество процессов, участвующих в обучении. Получить размер мира можно с помощью dist.get_world_size().

  • Local Rank (локальный ранг) — идентификатор узла в пределах одного физического сервера или машины.

    Если вы используете несколько GPU на одном сервере, каждый GPU будет иметь свой локальный ранг. В примере кода LOCAL_RANK получается из переменной окружения LOCAL_RANK, что является стандартным способом его определения при использовании скриптов запуска, предоставляемых PyTorch, таких как torch.distributed.launch или torchrun.

Шаг 1. Настройка среды

Рабочая директория определяется как абсолютный путь до каталога, в котором находится исполняемый файл. Это упрощает доступ к файлам и директориям относительно местоположения скрипта.

Логирование настраивается для записи в файл train.log событий процесса обучения: времени, уровня логирования и сообщений, что помогает в отладке и мониторинге процесса обучения.

Установка рабочей директории и логирование

import os
import torch
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
import pathlib
import logging
from datasets import load_dataset
from peft import (
    LoraConfig,
    get_peft_model,
    prepare_model_for_kbit_training
)
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
    TrainingArguments,
    Trainer,
    DataCollatorForLanguageModeling
)

BASE_DIR = str(pathlib.Path(__file__).parent.absolute())

print(f"Working dir: {BASE_DIR}")



logging.basicConfig(filename=BASE_DIR + "/train.log",

                    filemode='a',

                    format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',

                    datefmt='%H:%M:%S',

                    level=logging.INFO)

logging.info(f"Working dir: {BASE_DIR}")

def cleanup():
    dist.destroy_process_group()

Шаг 2. Инициализация распределенного обучения

Переменные окружения WORLD_SIZE и WORLD_RANK получаются из переменных окружения, заданных внешне, и определяют общее количество процессов и ранг текущего процесса соответственно.

Инициализация группы процессов init_processes создает группу процессов для распределенного обучения, используя выбранный бэкенд (по умолчанию — nccl для NVIDIA GPUs). Затем вызывает функцию обучения с текущим рангом процесса, общим количеством процессов и локальным рангом.

def init_processes(fn, local_rank, backend='nccl'):

    dist.init_process_group(backend)

    fn(dist.get_rank(), dist.get_world_size(), local_rank)

Шаг 3. Запуск основной функции обучения

Включает в себя следующие шаги:

  • Настройка модели и токенизатора.

    Загружаются модель и токенизатор, настраивается квантизация модели для уменьшения занимаемой памяти и увеличения скорости вычислений.

  • Распределенное обучение с DDP.

    Модель оборачивается в DDP, что позволяет синхронизировать градиенты между процессами. device_ids=[local_rank] указывает, что каждый процесс использует свой собственный GPU.

  • Подготовка данных.

    Загрузка и токенизация данных для обучения.

  • Настройка аргументов обучения и тренера.

    Создание объекта Trainer с заданными параметрами обучения для управления процессом обучения.

  • Сохранение модели.

    После обучения состояние модели сохраняется на диск. Это позволяет использовать модель после обучения.

Важно понимать роль node rank, world size и local rank для корректной работы в распределенной системе. Приведенный код демонстрирует основные шаги настройки и использования DDP для обучения модели языка с использованием технологий Hugging Face и PyTorch.

def train(rank, size, local_rank):
   epochs = 1000
    MODEL_NAME = BASE_DIR + "/neural-chat-7b-v3-1"
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16
    )

    model = AutoModelForCausalLM.from_pretrained(
        MODEL_NAME,
        device_map="cuda",
        trust_remote_code=True,
        quantization_config=bnb_config
    )

    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    tokenizer.pad_token = tokenizer.eos_token

    model.gradient_checkpointing_enable()
    model = prepare_model_for_kbit_training(model)

    config = LoraConfig(
        r=8,
        lora_alpha=32,
        target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],
        lora_dropout=0.05,
        bias="none",
        task_type="CAUSAL_LM"
    )

    model = get_peft_model(model, config)
    print(f'local rank = {local_rank}, rank = {rank}')
    device = torch.device(f'cuda:{local_rank}')
    model.to(device)
    model = DDP(model, device_ids=[local_rank], find_unused_parameters=False)

    data = load_dataset("csv", data_files=BASE_DIR + "/midjourney_prompt_dataset.csv")

    def generate_and_tokenize_prompt(data_point):
        full_prompt = f"""
        <human>: {data_point["User"]}
        <assistant>: {data_point["Prompt"]}
        """.strip()
        tokenized_full_prompt = tokenizer(full_prompt, padding=True, truncation=True)
        return tokenized_full_prompt

    tokenized_data = data["train"].shuffle().map(generate_and_tokenize_prompt)
    tokenized_data = tokenized_data.remove_columns(data["train"].column_names)
    training_args = TrainingArguments(
        per_device_train_batch_size=1,
        gradient_accumulation_steps=8,
        num_train_epochs=epochs,
        learning_rate=2e-4,
        fp16=True,
        save_total_limit=3,
        logging_steps=1,
        output_dir="experiments",
        optim="paged_adamw_8bit",
        lr_scheduler_type="cosine",
        warmup_ratio=0.05,
        remove_unused_columns=False,
        local_rank=local_rank
    )

    trainer = Trainer(
        model=model,
        train_dataset=tokenized_data,
        args=training_args,
        data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False),

    )

    #model.config.use_cache = False
    trainer.train()
    torch.save(model.state_dict(), BASE_DIR + f"/model.bin")
    cleanup()

Полный скрипт обучения

Код из блока ниже можно сохранить в виде файла train_distributed_example.py для дальнейшего использования в ML Space.

# Imported additional libraries and modules
import pathlib
import logging
import torch
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer, DataCollatorForLanguageModeling
from datasets import load_dataset
from bitsandbytes.optim import prepare_model_for_kbit_training, BitsAndBytesConfig
from peft.peft import get_peft_model, LoraConfig

# Determine the base directory where script is located
BASE_DIR = str(pathlib.Path(__file__).parent.absolute())

# Setting up logging to write logs to a file
logging.basicConfig(filename=BASE_DIR + "/train.log",
                    filemode='a',  # Add mode ('a' - append)
                    format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',  # Log message format
                    datefmt='%H:%M:%S',  # Date and time format
                    level=logging.INFO)  # Logging level
logging.info(f"Working dir: {BASE_DIR}")  # Logging information about the working directory

# Function to clean up a group of processes after shutdown
def cleanup():
    dist.destroy_process_group()

# Main function for training
def train(rank, size, local_rank):
    epochs = 1000  # Number of training epochs
    MODEL_NAME = BASE_DIR + "/neural-chat-7b-v3-1"  # Model path
    # Configuration for quantizing the model
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,  # Download in 4-bit format
        bnb_4bit_use_double_quant=True,  # Using double quantization
        bnb_4bit_quant_type="nf4",  # Quantization type
        bnb_4bit_compute_dtype=torch.bfloat16  # Calculation data type
    )

    # Loading a pre-trained model with quantization settings
    model = AutoModelForCausalLM.from_pretrained(
        MODEL_NAME,
        device_map="cuda",  # Distribution of the model by CUDA devices
        trust_remote_code=True,  # Trusting remote code
        quantization_config=bnb_config  # Applying quantize configuration
    )

    # Loading a tokenizer for a model
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    tokenizer.pad_token = tokenizer.eos_token  # Setting the padding token equal to the end of line token

    model.gradient_checkpointing_enable()  # Enable gradient control points to save memory
    model = prepare_model_for_kbit_training(model)  # Preparing the model for training with kbit optimization

    # Configuration for optimization LoRA
    config = LoraConfig(
        r=8,  # Projection dimension
        lora_alpha=32,  # Multiplier for LoRA
        target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],  # Target modules for optimization
        lora_dropout=0.05,  # Exclusion share for LoRA
        bias="none",  # Offset Setting
        task_type="CAUSAL_LM"  # Task type
    )

    model = get_peft_model(model, config)  # Applying PEFT optimization to a model
    print(f'local rank = {local_rank}, rank = {rank}')  # Displaying information about the rank of a process
    device = torch.device(f'cuda:{local_rank}')  # Setting the device for the current process
    model.to(device)  # Moving a model to a device
    model = DDP(model, device_ids=[local_rank], find_unused_parameters=False)  # Model wrapper for distributed learning

    # Loading and preparing data
    data = load_dataset("csv", data_files=BASE_DIR + "/midjourney_prompt_dataset.csv")  # Loading a dataset

    # Function for generating and tokenizing requests
    def generate_and_tokenize_prompt(data_point):
        full_prompt = f"""
        <human>: {data_point["User"]}
        <assistant>: {data_point["Prompt"]}
        """.strip()  # Generating a complete request
        tokenized_full_prompt = tokenizer(full_prompt, padding=True, truncation=True)  # Request Tokenization
        return tokenized_full_prompt

    tokenized_data = data["train"].shuffle().map(generate_and_tokenize_prompt)  # Tokenization and data shuffling
    tokenized_data = tokenized_data.remove_columns(data["train"].column_names)  # Removing unnecessary columns from data
    # Settings for training
    training_args = TrainingArguments(
        per_device_train_batch_size=1,  # Batch size per device
        gradient_accumulation_steps=8,  # Gradient accumulation steps
        num_train_epochs=epochs,  # Number of training epochs
        learning_rate=2e-4,  # Learning rate
        fp16=True,  # Using 16-bit floating points
        save_total_limit=3,  # Limit on the number of model saves
        logging_steps=1,  # Logging steps
        output_dir="experiments",  # Output directory
        optim="paged_adamw_8bit",  # Optimizer
        lr_scheduler_type="cosine",  # Learning Rate Scheduler Type
        warmup_ratio=0.05,  # Heating proportion
        remove_unused_columns=False,  # Removing unused columns
        local_rank=local_rank  # Local rank
    )

    # Initializing the trainer to train the model
    trainer = Trainer(
        model=model,
        train_dataset=tokenized_data,
        args=training_args,
        data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False),
    )

    trainer.train()  # Start training
    torch.save(model.state_dict(), BASE_DIR + f"/model.bin")  # Saving the trained model
    cleanup()  # Resource Cleanup

# Initializing Processes for Distributed Learning
def init_processes(fn, local_rank, backend='nccl'):
    """ Initialize the distributed environment. """
    dist.init_process_group(backend)  # Initializing a process group
    fn(dist.get_rank(), dist.get_world_size(), local_rank)  # Launching the learning function

# Entry point to the program
if __name__ == "__main__":
    LOCAL_RANK = int(os.environ['LOCAL_RANK'])  # Getting local rank from environment variables
    init_processes(train, LOCAL_RANK, backend='nccl')  # Initializing processes with a given backend

Запуск распределенной задачи обучения с помощью client_lib

Запустим распределенную задачу обучения с помощью подготовленных на предыдущих шагах скриптов:

import client_lib

BASE_DIR = pathlib.Path().absolute()
print(f'Working dir: {BASE_DIR}')

REGION = 'SR005' # Region
INSTANCE_TYPE = 'a100plus.1gpu.80vG.12C.96G' # Instance_type

job_run = client_lib.Job(base_image='cr.msk.sbercloud.ru/aicloud-base-images/cuda12.2-torch2-py310:0.0.37',
                              script=f'{BASE_DIR}/train_distributed_example.py',
                              region=f'{REGION}',
                              instance_type=f'{INSTANCE_TYPE}',
                              type="pytorch2",
                              n_workers=1,
                              pytorch_use_env=True,
                             )
Запустили Evolution free tier
для Dev & Test
Получить