Пример использования PyTorch DDP
Разберем предоставленный код подробно, объясняя каждый блок и его роль в процессе распределенного обучения с использованием PyTorch Distributed Data Parallel (DDP).
Полный исходный код доступен для копирования в репозитории на GitHub.
Код можно разделить на несколько основных частей:
настройка среды;
инициализация процесса обучения;
основная функция обучения;
завершение работы.
Основные понятия
Node Rank (ранг узла) — идентификатор узла в кластере.
В контексте DDP каждый узел обработки (например, каждый GPU) имеет свой уникальный ранг в глобальном пространстве всех узлов. Ранг можно получить с помощью dist.get_rank().
World Size (размер мира) — общее количество узлов или процессов в распределенной системе.
Это число указывает на общее количество процессов, участвующих в обучении. Получить размер мира можно с помощью dist.get_world_size().
Local Rank (локальный ранг) — идентификатор узла в пределах одного физического сервера или машины.
Если вы используете несколько GPU на одном сервере, каждый GPU будет иметь свой локальный ранг. В примере кода LOCAL_RANK получается из переменной окружения LOCAL_RANK, что является стандартным способом его определения при использовании скриптов запуска, предоставляемых PyTorch, таких как torch.distributed.launch или torchrun.
Шаг 1. Настройка среды
Рабочая директория определяется как абсолютный путь до каталога, в котором находится исполняемый файл. Это упрощает доступ к файлам и директориям относительно местоположения скрипта.
Логирование настраивается для записи в файл train.log событий процесса обучения: времени, уровня логирования и сообщений, что помогает в отладке и мониторинге процесса обучения.
Установка рабочей директории и логирование
import osimport torchfrom torch.nn.parallel import DistributedDataParallel as DDPimport torch.distributed as distimport pathlibimport loggingfrom datasets import load_datasetfrom peft import (LoraConfig,get_peft_model,prepare_model_for_kbit_training)from transformers import (AutoModelForCausalLM,AutoTokenizer,BitsAndBytesConfig,TrainingArguments,Trainer,DataCollatorForLanguageModeling)BASE_DIR = str(pathlib.Path(__file__).parent.absolute())print(f"Working dir: {BASE_DIR}")logging.basicConfig(filename=BASE_DIR + "/train.log",filemode='a',format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',datefmt='%H:%M:%S',level=logging.INFO)logging.info(f"Working dir: {BASE_DIR}")def cleanup():dist.destroy_process_group()
Шаг 2. Инициализация распределенного обучения
Переменные окружения WORLD_SIZE и WORLD_RANK получаются из переменных окружения, заданных внешне, и определяют общее количество процессов и ранг текущего процесса соответственно.
Инициализация группы процессов init_processes создает группу процессов для распределенного обучения, используя выбранный бэкенд (по умолчанию — nccl для NVIDIA GPUs). Затем вызывает функцию обучения с текущим рангом процесса, общим количеством процессов и локальным рангом.
def init_processes(fn, local_rank, backend='nccl'):dist.init_process_group(backend)fn(dist.get_rank(), dist.get_world_size(), local_rank)
Шаг 3. Запуск основной функции обучения
Включает в себя следующие шаги:
Настройка модели и токенизатора.
Загружаются модель и токенизатор, настраивается квантизация модели для уменьшения занимаемой памяти и увеличения скорости вычислений.
Распределенное обучение с DDP.
Модель оборачивается в DDP, что позволяет синхронизировать градиенты между процессами. device_ids=[local_rank] указывает, что каждый процесс использует свой собственный GPU.
Подготовка данных.
Загрузка и токенизация данных для обучения.
Настройка аргументов обучения и тренера.
Создание объекта Trainer с заданными параметрами обучения для управления процессом обучения.
Сохранение модели.
После обучения состояние модели сохраняется на диск. Это позволяет использовать модель после обучения.
Важно понимать роль node rank, world size и local rank для корректной работы в распределенной системе. Приведенный код демонстрирует основные шаги настройки и использования DDP для обучения модели языка с использованием технологий Hugging Face и PyTorch.
def train(rank, size, local_rank):epochs = 10 # Number of training epochsMODEL_NAME = "Intel/neural-chat-7b-v3-1"# Configuration for quantizing the modelbnb_config = BitsAndBytesConfig(load_in_4bit=True, # Download in 4-bit formatbnb_4bit_use_double_quant=True, # Using double quantizationbnb_4bit_quant_type="nf4", # Quantization typebnb_4bit_compute_dtype=torch.bfloat16 # Calculation data type)# Loading a pre-trained model with quantization settingsmodel = AutoModelForCausalLM.from_pretrained(MODEL_NAME,device_map="cuda", # Distribution of the model by CUDA devicestrust_remote_code=True, # Trusting remote codequantization_config=bnb_config # Applying quantize configuration)# Loading a tokenizer for a modeltokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)tokenizer.pad_token = tokenizer.eos_token # Setting the padding token equal to the end of line tokenmodel.gradient_checkpointing_enable() # Enable gradient control points to save memorymodel = prepare_model_for_kbit_training(model) # Preparing the model for training with kbit optimization# Configuration for optimization LoRAconfig = LoraConfig(r=8, # Projection dimensionlora_alpha=32, # Multiplier for LoRAtarget_modules=["q_proj", "k_proj", "v_proj", "o_proj"], # Target modules for optimizationlora_dropout=0.05, # Exclusion share for LoRAbias="none", # Offset Settingtask_type="CAUSAL_LM" # Task type)model = get_peft_model(model, config) # Applying PEFT optimization to a modelprint(f'local rank = {local_rank}, rank = {rank}') # Displaying information about the rank of a processdevice = torch.device(f'cuda:{local_rank}') # Setting the device for the current processmodel.to(device) # Moving a model to a devicemodel = DDP(model, device_ids=[local_rank], find_unused_parameters=False) # Model wrapper for distributed learning# Loading and preparing data# data = load_dataset("csv", data_files=BASE_DIR + "/midjourney_prompt_dataset.csv") # Loading a datasetdata = load_dataset("bittu9988/mid_journey_prompts")# Function for generating and tokenizing requestsdef generate_and_tokenize_prompt(data_point):full_prompt = f"""<human>: {data_point["User"]}<assistant>: {data_point["Prompt"]}""".strip() # Generating a complete requesttokenized_full_prompt = tokenizer(full_prompt, padding=True, truncation=True) # Request Tokenizationreturn tokenized_full_prompttokenized_data = data["train"].shuffle().map(generate_and_tokenize_prompt) # Tokenization and data shufflingtokenized_data = tokenized_data.remove_columns(data["train"].column_names) # Removing unnecessary columns from data# Settings for trainingtraining_args = TrainingArguments(per_device_train_batch_size=1, # Batch size per devicegradient_accumulation_steps=8, # Gradient accumulation stepsnum_train_epochs=epochs, # Number of training epochslearning_rate=2e-4, # Learning ratefp16=True, # Using 16-bit floating pointssave_total_limit=3, # Limit on the number of model saveslogging_steps=1, # Logging stepsoutput_dir="experiments", # Output directoryoptim="paged_adamw_8bit", # Optimizerlr_scheduler_type="cosine", # Learning Rate Scheduler Typewarmup_ratio=0.05, # Heating proportionremove_unused_columns=False, # Removing unused columnslocal_rank=local_rank # Local rank)# Initializing the trainer to train the modeltrainer = Trainer(model=model,train_dataset=tokenized_data,args=training_args,data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False),)trainer.train() # Start trainingtorch.save(model.state_dict(), BASE_DIR + f"/model.bin") # Saving the trained modelcleanup() # Resource Cleanup