- tocdepth
2
Пример использования PyTorch DDP
Разберем предоставленный код подробно, объясняя каждый блок и его роль в процессе распределенного обучения с использованием PyTorch Distributed Data Parallel (DDP).
Примечание
Полный исходный код доступен для копирования в репозитории на GitHub.
Код можно разделить на несколько основных частей:
настройка среды;
инициализация процесса обучения;
основная функция обучения;
завершение работы.
Основные понятия
Node Rank (ранг узла) — идентификатор узла в кластере.
В контексте DDP каждый узел обработки (например, каждый GPU) имеет свой уникальный ранг в глобальном пространстве всех узлов. Ранг можно получить с помощью
dist.get_rank()
.World Size (размер мира) — общее количество узлов или процессов в распределенной системе.
Это число указывает на общее количество процессов, участвующих в обучении. Получить размер мира можно с помощью
dist.get_world_size()
.Local Rank (локальный ранг) — идентификатор узла в пределах одного физического сервера или машины.
Если вы используете несколько GPU на одном сервере, каждый GPU будет иметь свой локальный ранг. В примере кода
LOCAL_RANK
получается из переменной окруженияLOCAL_RANK
, что является стандартным способом его определения при использовании скриптов запуска, предоставляемых PyTorch, таких какtorch.distributed.launch
илиtorchrun
.
Шаг 1. Настройка среды
Рабочая директория определяется как абсолютный путь до каталога, в котором находится исполняемый файл. Это упрощает доступ к файлам и директориям относительно местоположения скрипта.
Логирование настраивается для записи в файл train.log
событий процесса обучения: времени, уровня логирования и сообщений, что помогает в отладке и мониторинге процесса обучения.
Установка рабочей директории и логирование
import os
import torch
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
import pathlib
import logging
from datasets import load_dataset
from peft import (
LoraConfig,
get_peft_model,
prepare_model_for_kbit_training
)
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
BitsAndBytesConfig,
TrainingArguments,
Trainer,
DataCollatorForLanguageModeling
)
BASE_DIR = str(pathlib.Path(__file__).parent.absolute())
print(f"Working dir: {BASE_DIR}")
logging.basicConfig(filename=BASE_DIR + "/train.log",
filemode='a',
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=logging.INFO)
logging.info(f"Working dir: {BASE_DIR}")
def cleanup():
dist.destroy_process_group()
Шаг 2. Инициализация распределенного обучения
Переменные окружения WORLD_SIZE
и WORLD_RANK
получаются из переменных окружения, заданных внешне, и определяют общее количество процессов и ранг текущего процесса соответственно.
Инициализация группы процессов init_processes
создает группу процессов для распределенного обучения, используя выбранный бэкенд (по умолчанию — nccl
для NVIDIA GPUs).
Затем вызывает функцию обучения с текущим рангом процесса, общим количеством процессов и локальным рангом.
def init_processes(fn, local_rank, backend='nccl'):
dist.init_process_group(backend)
fn(dist.get_rank(), dist.get_world_size(), local_rank)
Шаг 3. Запуск основной функции обучения
Включает в себя следующие шаги:
Настройка модели и токенизатора.
Загружаются модель и токенизатор, настраивается квантизация модели для уменьшения занимаемой памяти и увеличения скорости вычислений.
Распределенное обучение с DDP.
Модель оборачивается в
DDP
, что позволяет синхронизировать градиенты между процессами.device_ids=[local_rank]
указывает, что каждый процесс использует свой собственный GPU.Подготовка данных.
Загрузка и токенизация данных для обучения.
Настройка аргументов обучения и тренера.
Создание объекта
Trainer
с заданными параметрами обучения для управления процессом обучения.Сохранение модели.
После обучения состояние модели сохраняется на диск. Это позволяет использовать модель после обучения.
Важно понимать роль node rank
, world size
и local rank
для корректной работы в распределенной системе.
Приведенный код демонстрирует основные шаги настройки и использования DDP для обучения модели языка с использованием технологий Hugging Face и PyTorch.
def train(rank, size, local_rank):
epochs = 10 # Number of training epochs
MODEL_NAME = "Intel/neural-chat-7b-v3-1"
# Configuration for quantizing the model
bnb_config = BitsAndBytesConfig(
load_in_4bit=True, # Download in 4-bit format
bnb_4bit_use_double_quant=True, # Using double quantization
bnb_4bit_quant_type="nf4", # Quantization type
bnb_4bit_compute_dtype=torch.bfloat16 # Calculation data type
)
# Loading a pre-trained model with quantization settings
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
device_map="cuda", # Distribution of the model by CUDA devices
trust_remote_code=True, # Trusting remote code
quantization_config=bnb_config # Applying quantize configuration
)
# Loading a tokenizer for a model
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token # Setting the padding token equal to the end of line token
model.gradient_checkpointing_enable() # Enable gradient control points to save memory
model = prepare_model_for_kbit_training(model) # Preparing the model for training with kbit optimization
# Configuration for optimization LoRA
config = LoraConfig(
r=8, # Projection dimension
lora_alpha=32, # Multiplier for LoRA
target_modules=["q_proj", "k_proj", "v_proj", "o_proj"], # Target modules for optimization
lora_dropout=0.05, # Exclusion share for LoRA
bias="none", # Offset Setting
task_type="CAUSAL_LM" # Task type
)
model = get_peft_model(model, config) # Applying PEFT optimization to a model
print(f'local rank = {local_rank}, rank = {rank}') # Displaying information about the rank of a process
device = torch.device(f'cuda:{local_rank}') # Setting the device for the current process
model.to(device) # Moving a model to a device
model = DDP(model, device_ids=[local_rank], find_unused_parameters=False) # Model wrapper for distributed learning
# Loading and preparing data
# data = load_dataset("csv", data_files=BASE_DIR + "/midjourney_prompt_dataset.csv") # Loading a dataset
data = load_dataset("bittu9988/mid_journey_prompts")
# Function for generating and tokenizing requests
def generate_and_tokenize_prompt(data_point):
full_prompt = f"""
<human>: {data_point["User"]}
<assistant>: {data_point["Prompt"]}
""".strip() # Generating a complete request
tokenized_full_prompt = tokenizer(full_prompt, padding=True, truncation=True) # Request Tokenization
return tokenized_full_prompt
tokenized_data = data["train"].shuffle().map(generate_and_tokenize_prompt) # Tokenization and data shuffling
tokenized_data = tokenized_data.remove_columns(data["train"].column_names) # Removing unnecessary columns from data
# Settings for training
training_args = TrainingArguments(
per_device_train_batch_size=1, # Batch size per device
gradient_accumulation_steps=8, # Gradient accumulation steps
num_train_epochs=epochs, # Number of training epochs
learning_rate=2e-4, # Learning rate
fp16=True, # Using 16-bit floating points
save_total_limit=3, # Limit on the number of model saves
logging_steps=1, # Logging steps
output_dir="experiments", # Output directory
optim="paged_adamw_8bit", # Optimizer
lr_scheduler_type="cosine", # Learning Rate Scheduler Type
warmup_ratio=0.05, # Heating proportion
remove_unused_columns=False, # Removing unused columns
local_rank=local_rank # Local rank
)
# Initializing the trainer to train the model
trainer = Trainer(
model=model,
train_dataset=tokenized_data,
args=training_args,
data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False),
)
trainer.train() # Start training
torch.save(model.state_dict(), BASE_DIR + f"/model.bin") # Saving the trained model
cleanup() # Resource Cleanup
для Dev & Test