Пример использования PyTorch DDP
Разберем предоставленный код подробно, объясняя каждый блок и его роль в процессе распределенного обучения с использованием PyTorch Distributed Data Parallel (DDP).
Код можно разделить на несколько основных частей:
настройка среды;
инициализация процесса обучения;
основная функция обучения;
завершение работы.
Основные понятия
Node Rank (ранг узла) — идентификатор узла в кластере.
В контексте DDP каждый узел обработки (например, каждый GPU) имеет свой уникальный ранг в глобальном пространстве всех узлов. Ранг можно получить с помощью
dist.get_rank()
.World Size (размер мира) — общее количество узлов или процессов в распределенной системе.
Это число указывает на общее количество процессов, участвующих в обучении. Получить размер мира можно с помощью
dist.get_world_size()
.Local Rank (локальный ранг) — идентификатор узла в пределах одного физического сервера или машины.
Если вы используете несколько GPU на одном сервере, каждый GPU будет иметь свой локальный ранг. В примере кода
LOCAL_RANK
получается из переменной окруженияLOCAL_RANK
, что является стандартным способом его определения при использовании скриптов запуска, предоставляемых PyTorch, таких какtorch.distributed.launch
илиtorchrun
.
Шаг 1. Настройка среды
Рабочая директория определяется как абсолютный путь до каталога, в котором находится исполняемый файл. Это упрощает доступ к файлам и директориям относительно местоположения скрипта.
Логирование настраивается для записи в файл train.log
событий процесса обучения: времени, уровня логирования и сообщений, что помогает в отладке и мониторинге процесса обучения.
Установка рабочей директории и логирование
import os
import torch
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
import pathlib
import logging
from datasets import load_dataset
from peft import (
LoraConfig,
get_peft_model,
prepare_model_for_kbit_training
)
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
BitsAndBytesConfig,
TrainingArguments,
Trainer,
DataCollatorForLanguageModeling
)
BASE_DIR = str(pathlib.Path(__file__).parent.absolute())
print(f"Working dir: {BASE_DIR}")
logging.basicConfig(filename=BASE_DIR + "/train.log",
filemode='a',
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=logging.INFO)
logging.info(f"Working dir: {BASE_DIR}")
def cleanup():
dist.destroy_process_group()
Шаг 2. Инициализация распределенного обучения
Переменные окружения WORLD_SIZE
и WORLD_RANK
получаются из переменных окружения, заданных внешне, и определяют общее количество процессов и ранг текущего процесса соответственно.
Инициализация группы процессов init_processes
создает группу процессов для распределенного обучения, используя выбранный бэкенд (по умолчанию — nccl
для NVIDIA GPUs).
Затем вызывает функцию обучения с текущим рангом процесса, общим количеством процессов и локальным рангом.
def init_processes(fn, local_rank, backend='nccl'):
dist.init_process_group(backend)
fn(dist.get_rank(), dist.get_world_size(), local_rank)
Шаг 3. Запуск основной функции обучения
Включает в себя следующие шаги:
Настройка модели и токенизатора.
Загружаются модель и токенизатор, настраивается квантизация модели для уменьшения занимаемой памяти и увеличения скорости вычислений.
Распределенное обучение с DDP.
Модель оборачивается в
DDP
, что позволяет синхронизировать градиенты между процессами.device_ids=[local_rank]
указывает, что каждый процесс использует свой собственный GPU.Подготовка данных.
Загрузка и токенизация данных для обучения.
Настройка аргументов обучения и тренера.
Создание объекта
Trainer
с заданными параметрами обучения для управления процессом обучения.Сохранение модели.
После обучения состояние модели сохраняется на диск. Это позволяет использовать модель после обучения.
Важно понимать роль node rank
, world size
и local rank
для корректной работы в распределенной системе.
Приведенный код демонстрирует основные шаги настройки и использования DDP для обучения модели языка с использованием технологий Hugging Face и PyTorch.
def train(rank, size, local_rank):
epochs = 1000
MODEL_NAME = BASE_DIR + "/neural-chat-7b-v3-1"
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16
)
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
device_map="cuda",
trust_remote_code=True,
quantization_config=bnb_config
)
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token
model.gradient_checkpointing_enable()
model = prepare_model_for_kbit_training(model)
config = LoraConfig(
r=8,
lora_alpha=32,
target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],
lora_dropout=0.05,
bias="none",
task_type="CAUSAL_LM"
)
model = get_peft_model(model, config)
print(f'local rank = {local_rank}, rank = {rank}')
device = torch.device(f'cuda:{local_rank}')
model.to(device)
model = DDP(model, device_ids=[local_rank], find_unused_parameters=False)
data = load_dataset("csv", data_files=BASE_DIR + "/midjourney_prompt_dataset.csv")
def generate_and_tokenize_prompt(data_point):
full_prompt = f"""
<human>: {data_point["User"]}
<assistant>: {data_point["Prompt"]}
""".strip()
tokenized_full_prompt = tokenizer(full_prompt, padding=True, truncation=True)
return tokenized_full_prompt
tokenized_data = data["train"].shuffle().map(generate_and_tokenize_prompt)
tokenized_data = tokenized_data.remove_columns(data["train"].column_names)
training_args = TrainingArguments(
per_device_train_batch_size=1,
gradient_accumulation_steps=8,
num_train_epochs=epochs,
learning_rate=2e-4,
fp16=True,
save_total_limit=3,
logging_steps=1,
output_dir="experiments",
optim="paged_adamw_8bit",
lr_scheduler_type="cosine",
warmup_ratio=0.05,
remove_unused_columns=False,
local_rank=local_rank
)
trainer = Trainer(
model=model,
train_dataset=tokenized_data,
args=training_args,
data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False),
)
#model.config.use_cache = False
trainer.train()
torch.save(model.state_dict(), BASE_DIR + f"/model.bin")
cleanup()
Полный скрипт обучения
Код из блока ниже можно сохранить в виде файла train_distributed_example.py
для дальнейшего использования в ML Space.
# Imported additional libraries and modules
import pathlib
import logging
import torch
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer, DataCollatorForLanguageModeling
from datasets import load_dataset
from bitsandbytes.optim import prepare_model_for_kbit_training, BitsAndBytesConfig
from peft.peft import get_peft_model, LoraConfig
# Determine the base directory where script is located
BASE_DIR = str(pathlib.Path(__file__).parent.absolute())
# Setting up logging to write logs to a file
logging.basicConfig(filename=BASE_DIR + "/train.log",
filemode='a', # Add mode ('a' - append)
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', # Log message format
datefmt='%H:%M:%S', # Date and time format
level=logging.INFO) # Logging level
logging.info(f"Working dir: {BASE_DIR}") # Logging information about the working directory
# Function to clean up a group of processes after shutdown
def cleanup():
dist.destroy_process_group()
# Main function for training
def train(rank, size, local_rank):
epochs = 1000 # Number of training epochs
MODEL_NAME = BASE_DIR + "/neural-chat-7b-v3-1" # Model path
# Configuration for quantizing the model
bnb_config = BitsAndBytesConfig(
load_in_4bit=True, # Download in 4-bit format
bnb_4bit_use_double_quant=True, # Using double quantization
bnb_4bit_quant_type="nf4", # Quantization type
bnb_4bit_compute_dtype=torch.bfloat16 # Calculation data type
)
# Loading a pre-trained model with quantization settings
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
device_map="cuda", # Distribution of the model by CUDA devices
trust_remote_code=True, # Trusting remote code
quantization_config=bnb_config # Applying quantize configuration
)
# Loading a tokenizer for a model
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token # Setting the padding token equal to the end of line token
model.gradient_checkpointing_enable() # Enable gradient control points to save memory
model = prepare_model_for_kbit_training(model) # Preparing the model for training with kbit optimization
# Configuration for optimization LoRA
config = LoraConfig(
r=8, # Projection dimension
lora_alpha=32, # Multiplier for LoRA
target_modules=["q_proj", "k_proj", "v_proj", "o_proj"], # Target modules for optimization
lora_dropout=0.05, # Exclusion share for LoRA
bias="none", # Offset Setting
task_type="CAUSAL_LM" # Task type
)
model = get_peft_model(model, config) # Applying PEFT optimization to a model
print(f'local rank = {local_rank}, rank = {rank}') # Displaying information about the rank of a process
device = torch.device(f'cuda:{local_rank}') # Setting the device for the current process
model.to(device) # Moving a model to a device
model = DDP(model, device_ids=[local_rank], find_unused_parameters=False) # Model wrapper for distributed learning
# Loading and preparing data
data = load_dataset("csv", data_files=BASE_DIR + "/midjourney_prompt_dataset.csv") # Loading a dataset
# Function for generating and tokenizing requests
def generate_and_tokenize_prompt(data_point):
full_prompt = f"""
<human>: {data_point["User"]}
<assistant>: {data_point["Prompt"]}
""".strip() # Generating a complete request
tokenized_full_prompt = tokenizer(full_prompt, padding=True, truncation=True) # Request Tokenization
return tokenized_full_prompt
tokenized_data = data["train"].shuffle().map(generate_and_tokenize_prompt) # Tokenization and data shuffling
tokenized_data = tokenized_data.remove_columns(data["train"].column_names) # Removing unnecessary columns from data
# Settings for training
training_args = TrainingArguments(
per_device_train_batch_size=1, # Batch size per device
gradient_accumulation_steps=8, # Gradient accumulation steps
num_train_epochs=epochs, # Number of training epochs
learning_rate=2e-4, # Learning rate
fp16=True, # Using 16-bit floating points
save_total_limit=3, # Limit on the number of model saves
logging_steps=1, # Logging steps
output_dir="experiments", # Output directory
optim="paged_adamw_8bit", # Optimizer
lr_scheduler_type="cosine", # Learning Rate Scheduler Type
warmup_ratio=0.05, # Heating proportion
remove_unused_columns=False, # Removing unused columns
local_rank=local_rank # Local rank
)
# Initializing the trainer to train the model
trainer = Trainer(
model=model,
train_dataset=tokenized_data,
args=training_args,
data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False),
)
trainer.train() # Start training
torch.save(model.state_dict(), BASE_DIR + f"/model.bin") # Saving the trained model
cleanup() # Resource Cleanup
# Initializing Processes for Distributed Learning
def init_processes(fn, local_rank, backend='nccl'):
""" Initialize the distributed environment. """
dist.init_process_group(backend) # Initializing a process group
fn(dist.get_rank(), dist.get_world_size(), local_rank) # Launching the learning function
# Entry point to the program
if __name__ == "__main__":
LOCAL_RANK = int(os.environ['LOCAL_RANK']) # Getting local rank from environment variables
init_processes(train, LOCAL_RANK, backend='nccl') # Initializing processes with a given backend
Запуск распределенной задачи обучения с помощью client_lib
Запустим распределенную задачу обучения с помощью подготовленных на предыдущих шагах скриптов:
import client_lib
BASE_DIR = pathlib.Path().absolute()
print(f'Working dir: {BASE_DIR}')
REGION = 'SR005' # Region
INSTANCE_TYPE = 'a100plus.1gpu.80vG.12C.96G' # Instance_type
job_run = client_lib.Job(base_image='cr.msk.sbercloud.ru/aicloud-base-images/cuda12.2-torch2-py310:0.0.37',
script=f'{BASE_DIR}/train_distributed_example.py',
region=f'{REGION}',
instance_type=f'{INSTANCE_TYPE}',
type="pytorch2",
n_workers=1,
pytorch_use_env=True,
)
для Dev & Test