Unverified Commit 6a21f96a authored by flybird11111's avatar flybird11111 Committed by GitHub
Browse files

[doc] update advanced tutorials, training gpt with hybrid parallelism (#4866)

* [doc]update advanced tutorials, training gpt with hybrid parallelism

* [doc]update advanced tutorials, training gpt with hybrid parallelism

* update vit tutorials

* update vit tutorials

* update vit tutorials

* update vit tutorials

* update en/train_vit_with_hybrid_parallel.py

* fix

* resolve comments

* fix
parent 8aed02b9
...@@ -64,7 +64,6 @@ ...@@ -64,7 +64,6 @@
"label": "Advanced Tutorials", "label": "Advanced Tutorials",
"collapsed": true, "collapsed": true,
"items": [ "items": [
"advanced_tutorials/train_vit_using_pipeline_parallelism",
"advanced_tutorials/train_vit_with_hybrid_parallelism", "advanced_tutorials/train_vit_with_hybrid_parallelism",
"advanced_tutorials/train_gpt_using_hybrid_parallelism", "advanced_tutorials/train_gpt_using_hybrid_parallelism",
"advanced_tutorials/meet_gemini", "advanced_tutorials/meet_gemini",
......
# Train ViT Using Pipeline Parallelism
Author: Hongxin Liu, Yongbin Li
**Example Code**
- [ColossalAI-Examples Pipeline Parallel ViT](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/image/vision_transformer/pipeline_parallel)
**Related Paper**
- [Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM](https://arxiv.org/abs/2104.04473)
## Introduction
In this tutorial, you will learn how to train Vision Transformer for image classification from scratch, using pipeline.
Pipeline parallelism is a kind of model parallelism, which is useful when your GPU memory cannot fit your model.
By using it, we split the original model into multi stages, and each stage maintains a part of the original model.
We assume that your GPU memory cannot fit ViT/L-16, and your memory can fit this model.
## Table of contents
In this tutorial we will cover:
1. The definition of ViT model, based on [TIMM](https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/vision_transformer.py)
2. Processing the dataset
3. Training ViT using pipeline
## Import libraries
```python
import os
from collections import OrderedDict
from functools import partial
import colossalai
import colossalai.nn as col_nn
import torch
import torch.nn as nn
from colossalai.legacy.builder import build_pipeline_model
from colossalai.legacy.engine.schedule import (InterleavedPipelineSchedule,
PipelineSchedule)
from colossalai.logging import disable_existing_loggers, get_dist_logger
from colossalai.legacy.trainer import Trainer, hooks
from colossalai.utils import MultiTimer, get_dataloader
from timm.models import vision_transformer as vit
from torchvision import transforms
from torchvision.datasets import CIFAR10
```
## Define Vision Transformer model
Generally, we provide 3 ways to build a pipelined model:
1. `colossalai.legacy.builder.build_pipeline_model_from_cfg`
2. `colossalai.legacy.builder.build_pipeline_model`
3. Split the model by stages by yourself
When your memory can fit the model, you can use the first two methods to build your model, otherwise you must split the model by yourself. The first two methods first build the whole model on CPU, then split the model, and finally you can just move the corresponding part of model to GPU.
`colossalai.legacy.builder.build_pipeline_model_from_cfg()` receives a config file of model, and it can split the model uniformly (by layer) or balanced (by parameter size).
If you are familiar with `PyTorch`, you can use `colossalai.legacy.builder.build_pipeline_model()` which receives a `torch.nn.Sequential` model and split it by layer uniformly.
In this tutorial, we will modify [TIMM/ViT](https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/vision_transformer.py) to `torch.nn.Sequential` and then use `colossalai.legacy.builder.build_pipeline_model()` to build the pipelined model.
When the data is **one** `Tensor`, you can use the positional argument in `forward()` of your model to get the data tensor. For the first stage of pipeline, the first positional argument of `forward()` is the data tensor loaded from data loader. For other stages, the first positional argument of `forward()` is the output tensor from the previous stage. Note that if the stage is not the last stage, the return of `forward()` must be a `Tensor`.
When the data is a `dict` of `Tensor`, you can use named keyword arguments in `forward()` of your model to get the data `dict`.
```python
class ViTEmbedding(nn.Module):
def __init__(self, img_size=224, patch_size=16, in_chans=3, embed_dim=768, embed_layer=vit.PatchEmbed, drop_rate=0., distilled=False):
super().__init__()
self.embed_dim = embed_dim # num_features for consistency with other models
self.num_tokens = 2 if distilled else 1
self.patch_embed = embed_layer(
img_size=img_size, patch_size=patch_size, in_chans=in_chans, embed_dim=embed_dim)
num_patches = self.patch_embed.num_patches
self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
self.dist_token = nn.Parameter(torch.zeros(1, 1, embed_dim)) if distilled else None
self.pos_embed = nn.Parameter(torch.zeros(1, num_patches + self.num_tokens, embed_dim))
self.pos_drop = nn.Dropout(p=drop_rate)
self.init_weights()
def forward(self, x):
x = self.patch_embed(x)
cls_token = self.cls_token.expand(x.shape[0], -1, -1) # stole cls_tokens impl from Phil Wang, thanks
if self.dist_token is None:
x = torch.cat((cls_token, x), dim=1)
else:
x = torch.cat((cls_token, self.dist_token.expand(x.shape[0], -1, -1), x), dim=1)
x = self.pos_drop(x + self.pos_embed)
return x
def init_weights(self):
vit.trunc_normal_(self.pos_embed, std=.02)
if self.dist_token is not None:
vit.trunc_normal_(self.dist_token, std=.02)
vit.trunc_normal_(self.cls_token, std=.02)
self.apply(vit._init_vit_weights)
class ViTHead(nn.Module):
def __init__(self, embed_dim=768, num_classes=1000, norm_layer=None, distilled=False, representation_size=None):
super().__init__()
norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6)
self.norm = norm_layer(embed_dim)
self.num_classes = num_classes
self.distilled = distilled
self.num_features = embed_dim
# Representation layer
if representation_size and not distilled:
self.num_features = representation_size
self.pre_logits = nn.Sequential(OrderedDict([
('fc', nn.Linear(embed_dim, representation_size)),
('act', nn.Tanh())
]))
else:
self.pre_logits = nn.Identity()
# Classifier head(s)
self.head = nn.Linear(self.num_features, num_classes) if num_classes > 0 else nn.Identity()
self.head_dist = None
if distilled:
self.head_dist = nn.Linear(embed_dim, num_classes) if num_classes > 0 else nn.Identity()
self.init_weights()
def forward(self, x):
x = self.norm(x)
if self.distilled:
x, x_dist = self.head(x[:, 0]), self.head_dist(x[:, 1])
if self.training and not torch.jit.is_scripting():
# during inference, return the average of both classifier predictions
return x, x_dist
else:
return (x + x_dist) / 2
else:
x = self.pre_logits(x[:, 0])
x = self.head(x)
return x
def init_weights(self):
self.apply(vit._init_vit_weights)
def sequential_vit(img_size=224, patch_size=16, in_chans=3, num_classes=1000, embed_dim=768, depth=12,
num_heads=12, mlp_ratio=4., qkv_bias=True, representation_size=None, distilled=False,
drop_rate=0., attn_drop_rate=0., drop_path_rate=0., embed_layer=vit.PatchEmbed, norm_layer=None,
act_layer=None):
norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6)
act_layer = act_layer or nn.GELU
embedding = ViTEmbedding(img_size=img_size, patch_size=patch_size, in_chans=in_chans,
embed_dim=embed_dim, embed_layer=embed_layer, drop_rate=drop_rate, distilled=distilled)
dpr = [x.item() for x in torch.linspace(0, drop_path_rate, depth)] # stochastic depth decay rule
blocks = [vit.Block(
dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio, qkv_bias=qkv_bias, drop=drop_rate,
attn_drop=attn_drop_rate, drop_path=dpr[i], norm_layer=norm_layer, act_layer=act_layer)
for i in range(depth)]
for block in blocks:
block.apply(vit._init_vit_weights)
head = ViTHead(embed_dim=embed_dim, num_classes=num_classes, norm_layer=norm_layer,
distilled=distilled, representation_size=representation_size)
return nn.Sequential(embedding, *blocks, head)
def vit_large_patch16_224(**kwargs):
model_kwargs = dict(embed_dim=1024, depth=24, num_heads=16, **kwargs)
return sequential_vit(**model_kwargs)
```
## Process the dataset
Generally, we train ViT on large dataset like Imagenet. For simplicity, we just use CIFAR-10 here, since this tutorial is just for pipeline training.
```python
def build_cifar(batch_size):
transform_train = transforms.Compose([
transforms.RandomCrop(224, pad_if_needed=True),
transforms.AutoAugment(policy=transforms.AutoAugmentPolicy.CIFAR10),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
transform_test = transforms.Compose([
transforms.Resize(224),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
train_dataset = CIFAR10(root=os.environ['DATA'], train=True, download=True, transform=transform_train)
test_dataset = CIFAR10(root=os.environ['DATA'], train=False, transform=transform_test)
train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=batch_size, pin_memory=True)
test_dataloader = get_dataloader(dataset=test_dataset, batch_size=batch_size, pin_memory=True)
return train_dataloader, test_dataloader
```
## Training ViT using pipeline
You can set the size of pipeline parallel and number of microbatches in config. `NUM_CHUNKS` is useful when using interleaved-pipeline (for more details see [Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM](https://arxiv.org/abs/2104.04473) ). The original batch will be split into `num_microbatches`, and each stage will load a micro batch each time. Then we will generate an appropriate schedule for you to execute the pipeline training. If you don't need the output and label of model, you can set `return_output_label` to `False` when calling `trainer.fit()` which can further reduce GPU memory usage.
You should `export DATA=/path/to/cifar`.
```python
BATCH_SIZE = 16
NUM_EPOCHS = 60
NUM_CHUNKS = 1
CONFIG = dict(NUM_MICRO_BATCHES=4, parallel=dict(pipeline=2))
def train():
disable_existing_loggers()
parser = colossalai.get_default_parser()
args = parser.parse_args()
colossalai.launch_from_torch(backend=args.backend, config=CONFIG)
logger = get_dist_logger()
# build model
model = vit_large_patch16_224()
model = build_pipeline_model(model, num_chunks=NUM_CHUNKS, verbose=True)
# build criterion
criterion = nn.CrossEntropyLoss()
# optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=0)
# build dataloader
train_dataloader, test_dataloader = build_cifar(BATCH_SIZE)
engine, train_dataloader, test_dataloader, _ = colossalai.initialize(model, optimizer, criterion,
train_dataloader, test_dataloader)
timer = MultiTimer()
trainer = Trainer(engine=engine, timer=timer, logger=logger)
hook_list = [
hooks.LossHook(),
hooks.AccuracyHook(col_nn.metric.Accuracy()),
hooks.LogMetricByEpochHook(logger),
]
trainer.fit(train_dataloader=train_dataloader,
epochs=NUM_EPOCHS,
test_dataloader=test_dataloader,
test_interval=1,
hooks=hook_list,
display_progress=True)
```
<!-- doc-test-command: echo -->
# 使用混合并行训练 GPT # 使用混合并行训练 GPT-2
作者: Hongxin Liu, Yongbin Li 作者: Hongxin Liu, Yongbin Li, Mingyan Jiang
**前置教程**
- [并行插件](../basics/booster_plugins.md)
- [booster API](../basics/booster_api.md)
**示例代码** **示例代码**
- [ColossalAI-Examples GPT2](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/language/gpt_2) - [ColossalAI-Examples GPT2](https://github.com/hpcaitech/ColossalAI/blob/main/examples/language/gpt/hybridparallelism/finetune.py)
- [ColossalAI-Examples GPT3](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/language/gpt_3)
**相关论文** **相关论文**
- [Colossal-AI: A Unified Deep Learning System For Large-Scale Parallel Training](https://arxiv.org/abs/2110.14883) - [Colossal-AI: A Unified Deep Learning System For Large-Scale Parallel Training](https://arxiv.org/abs/2110.14883)
...@@ -12,265 +15,190 @@ ...@@ -12,265 +15,190 @@
## 引言 ## 引言
在上一篇教程中,我们介绍了如何用流水并行训练 ViT。在本教程中,你将学习一个更复杂的场景--用混合并行方式训练GPT。在这种情况下,由于GPT-3过大,即使CPU内存也无法容纳它。因此,你必须自己分割模型 在上一篇教程中,我们介绍了如何用流水并行训练 ViT。在本教程中,你将学习一个更复杂的场景--用混合并行方式训练GPT-2。在这种情况下,由于GPT-2过大,即使CPU内存也无法容纳它。因此,该模型必须被分割
## 目录 ## 目录
在本教程中,我们将介绍: 在本教程中,我们将介绍:
1. 初始化混合并行插件
1. 基于 colossalai/model_zoo 定义 GPT 模型 2. 定义 GPT-2 模型的训练组件
2. 处理数据集 3. 使用 [HybridParallelPlugin](../basics/booster_plugins.md) 增强GPT-2模型
3. 使用混合并行训练 GPT 4. 使用混合并行训练 GPT-2
## 导入依赖库 ## 导入依赖库
```python ```python
import json from typing import Callable, List, Union
import os
from typing import Callable
import colossalai
import colossalai.utils as utils
import model_zoo.gpt.gpt as col_gpt
import torch import torch
import torch.distributed as dist
import torch.nn as nn import torch.nn as nn
from colossalai import nn as col_nn from torch.optim import Optimizer
from colossalai.amp import AMP_TYPE from torch.optim.lr_scheduler import _LRScheduler as LRScheduler
from colossalai.legacy.builder.pipeline import partition_uniform from tqdm import tqdm
from colossalai.legacy.context.parallel_mode import ParallelMode from transformers import AutoConfig, GPT2ForSequenceClassification, get_linear_schedule_with_warmup
from colossalai.core import global_context as gpc from transformers import AutoTokenizer
from colossalai.legacy.engine.schedule import (InterleavedPipelineSchedule,
PipelineSchedule)
from colossalai.logging import disable_existing_loggers, get_dist_logger
from colossalai.legacy.nn.layer.wrapper import PipelineSharedModuleWrapper
from colossalai.legacy.trainer import Trainer, hooks
from colossalai.utils.timer import MultiTimer
from model_zoo.gpt import GPTLMLoss
from torch.nn import functional as F
from torch.utils.data import Dataset
from transformers import GPT2Tokenizer
```
## 定义 GPT 模型
在前面的教程中,我们介绍了3种建立流水并行模型的方法,但对于像 GPT-3 这样的巨大模型,你甚至不能在 CPU 中建立模型。在这种情况下,你必须自己分割模型。
GPT 数据加载器返回 `input_ids``attention_mask`, 因此我们在 `forward()` 中使用两个关键字参数来获得它们。请注意,对于除第一阶段以外的其他阶段, `forward()` 的第一个位置参数是上一阶段的输出张量。所以 `hidden_states` 来自前一阶段,并且对于第一阶段来说,它是 `None`
对于 GPT, *word embedding layer**output head* 共享权重。我们提供 `PipelineSharedModuleWrapper` 在流水阶段间共享参数。它需要一个 `int` 型的 `list` 作为参数, 这意味着 rank 们共享这些参数。你可以使用 `register_module()`
`register_parameter()` 来注册一个模块或一个参数作为共享模块或参数。如果你有多组共享模块/参数,你应该有多个 `PipelineSharedModuleWrapper` 实例。 如果参数在**一个**阶段内共享, 你不应该使用
`PipelineSharedModuleWrapper`, 而只是使用同一个模块/参数实例。在这个例子中,*word embedding layer* 在第一阶段, 而 *output head* 在最后一个阶段。因此,他们在 rank `[0, pipeline_size - 1]` 之间共享参数。
对于第一阶段,它维护 embedding layer 和一些 transformer blocks。对于最后一个阶段,它维护一些 transformer blocks 和 output head layer。对于其他阶段,他们只维护一些 transformer blocks。
`partition_uniform(num_layers, pipeline_size, num_chunks)` 返回所有 rank 的 parts, part 是一个 `(start, end)` (不包括end) 的 `tuple``start == 0` 表示这是第一阶段, 而 `end == num_layers` 表示这是最后一个阶段。
import colossalai
from colossalai.booster import Booster
from colossalai.booster.plugin import GeminiPlugin, HybridParallelPlugin, LowLevelZeroPlugin, TorchDDPPlugin
from colossalai.cluster import DistCoordinator
from colossalai.nn.optimizer import HybridAdam
from colossalai.utils import get_current_device
```
### 定义plugin
定义一个[`HybridParallelPlugin`](../basics/booster_plugins.md)对象,指定所需要使用的并行策略,在该例子中,同时使用了流水线并行和zero1.
```python ```python
class PipelineGPTHybrid(nn.Module): plugin = HybridParallelPlugin(
def __init__(self, tp_size=1,
num_layers: int = 12, pp_size=2,
hidden_size: int = 768, num_microbatches=None,
num_attention_heads: int = 12, microbatch_size=1,
vocab_size: int = 50304, enable_all_optimization=True,
embed_drop_rate: float = 0., zero_stage=1,
act_func: Callable = F.gelu, precision="fp16",
mlp_ratio: int = 4, initial_scale=1,
attn_drop_rate: float = 0., )
drop_rate: float = 0.,
dtype: torch.dtype = torch.float,
checkpoint: bool = False,
max_position_embeddings: int = 1024,
layer_norm_epsilon: float = 1e-5,
first: bool = False,
last: bool = False):
super().__init__()
self.embedding = None
self.norm = None
self.head = None
if first:
self.embedding = col_gpt.GPTEmbedding(
hidden_size, vocab_size, max_position_embeddings, dropout=embed_drop_rate, dtype=dtype)
self.blocks = nn.ModuleList([
col_gpt.GPTBlock(hidden_size, num_attention_heads, mlp_ratio=mlp_ratio, attention_dropout=attn_drop_rate,
dropout=drop_rate, dtype=dtype, checkpoint=checkpoint, activation=act_func)
for _ in range(num_layers)
])
if last:
self.norm = col_nn.LayerNorm(hidden_size, eps=layer_norm_epsilon)
self.head = col_gpt.GPTLMHead(vocab_size=vocab_size,
dim=hidden_size,
dtype=dtype,
bias=False)
def forward(self, hidden_states=None, input_ids=None, attention_mask=None):
if self.embedding is not None:
hidden_states = self.embedding(input_ids=input_ids)
batch_size = hidden_states.shape[0]
attention_mask = attention_mask.view(batch_size, -1)
attention_mask = attention_mask[:, None, None, :]
attention_mask = attention_mask.to(dtype=hidden_states.dtype) # fp16 compatibility
attention_mask = (1.0 - attention_mask) * -10000.0
for block in self.blocks:
hidden_states, attention_mask = block(hidden_states, attention_mask)
if self.norm is not None:
hidden_states = self.head(self.norm(hidden_states))
return hidden_states
def build_gpt_pipeline(num_layers, num_chunks, device=torch.device('cuda'), **kwargs):
logger = get_dist_logger()
pipeline_size = gpc.get_world_size(ParallelMode.PIPELINE)
pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE)
rank = gpc.get_global_rank()
wrapper = PipelineSharedModuleWrapper([0, pipeline_size - 1])
parts = partition_uniform(num_layers, pipeline_size, num_chunks)[pipeline_rank]
models = []
for start, end in parts:
kwargs['num_layers'] = end - start
kwargs['first'] = start == 0
kwargs['last'] = end == num_layers
logger.info(f'Rank{rank} build layer {start}-{end}, {end-start}/{num_layers} layers')
chunk = PipelineGPTHybrid(**kwargs).to(device)
if start == 0:
wrapper.register_module(chunk.embedding.word_embeddings)
elif end == num_layers:
wrapper.register_module(chunk.head)
models.append(chunk)
if len(models) == 1:
model = models[0]
else:
model = nn.ModuleList(models)
return model
def GPT2_exlarge_pipeline_hybrid(num_chunks=1, checkpoint=False, dtype=torch.float):
cfg = dict(hidden_size=1600, num_attention_heads=32, checkpoint=checkpoint, dtype=dtype)
return build_gpt_pipeline(48, num_chunks, **cfg)
def GPT3_pipeline_hybrid(num_chunks=1, checkpoint=False, dtype=torch.float):
cfg = dict(hidden_size=12288, num_attention_heads=96,
checkpoint=checkpoint, max_position_embeddings=2048, dtype=dtype)
return build_gpt_pipeline(96, num_chunks, **cfg)
``` ```
## 处理数据集 ## 创建分布式环境.
我们在这里提供了一个小型 GPT web-text 数据集。 原始格式是 loose JSON, 我们将保存处理后的数据集。
```python ```python
class WebtextDataset(Dataset): # Launch ColossalAI
def __init__(self, path, seq_len=1024) -> None: colossalai.launch_from_torch(config={}, seed=42)
super().__init__() coordinator = DistCoordinator()
root = os.path.dirname(path)
encoded_data_cache_path = os.path.join(root, f'gpt_webtext_{seq_len}.pt')
if os.path.isfile(encoded_data_cache_path):
seq_len_, data, attention_mask = torch.load(
encoded_data_cache_path)
if seq_len_ == seq_len:
self.data = data
self.attention_mask = attention_mask
return
raw_data = []
with open(path) as f:
for line in f.readlines():
raw_data.append(json.loads(line)['text'])
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
tokenizer.pad_token = tokenizer.unk_token
encoded_data = tokenizer(
raw_data, padding=True, truncation=True, max_length=seq_len, return_tensors='pt')
self.data = encoded_data['input_ids']
self.attention_mask = encoded_data['attention_mask']
torch.save((seq_len, self.data, self.attention_mask),
encoded_data_cache_path)
def __len__(self):
return len(self.data)
def __getitem__(self, index):
return {
'input_ids': self.data[index],
'attention_mask': self.attention_mask[index]
}, self.data[index]
``` ```
## 定义GPT-2模型的训练组件
## 使用混合并行训练 GPT 在使用混合并行之前,您需要定义训练所使用的组件。
定义超参数。
在上一个教程中,我们解释了一些流水并行的参数含义。在本例中,我们可以确定在流水阶段之间交换的每个输出张量的形状。对于 GPT,该形状为
`(MICRO BATCH SIZE, SEQUENCE LEN, HIDDEN SIZE)`。通过设置该参数,我们可以避免交换每个阶段的张量形状。当你不确定张量的形状时,你可以把它保留为
`None`, 形状会被自动推测。请确保你的模型的 `dtype` 是正确的:当你使用 `fp16`,模型的 `dtype` 必须是 `torch.half`;否则,`dtype` 必须是 `torch.float`。对于流水并行,仅支持 `AMP_TYPE.NAIVE`
你可以通过在 `CONFIG` 里使用 `parallel` 来轻松使用张量并行。数据并行的大小是根据 GPU 的数量自动设置的。
```python ```python
NUM_EPOCHS = 60 NUM_EPOCHS = 3
SEQ_LEN = 1024 BATCH_SIZE = 32
BATCH_SIZE = 192 LEARNING_RATE = 2.4e-5
NUM_CHUNKS = None WEIGHT_DECAY = 0.01
TENSOR_SHAPE = (1, 1024, 1600) WARMUP_FRACTION = 0.1
# only pipeline parallel ```
# CONFIG = dict(NUM_MICRO_BATCHES = 192, parallel=dict(pipeline=2), fp16=dict(mode=AMP_TYPE.NAIVE)) 获取数据集。您可以使用`plugin.prepare_dataloader`生成dataloader,也可以自定义您的dataloader。
# pipeline + 1D model parallel ```python
CONFIG = dict(NUM_MICRO_BATCHES = 192, parallel=dict(pipeline=2, tensor=dict(mode='1d', size=2)), fp16=dict(mode=AMP_TYPE.NAIVE)) def tokenize_batch(batch, tokenizer: Optional[AutoTokenizer] = None, max_length: int = 2048):
texts = [sample["sentence1"] + sample["sentence2"] for sample in batch]
data = tokenizer(texts, return_tensors="pt", padding="max_length", truncation=True, max_length=max_length)
def train(): data = {k: v.cuda() for k, v in data.items()}
disable_existing_loggers() data["labels"] = data["input_ids"].clone()
parser = colossalai.get_default_parser() return data
args = parser.parse_args()
colossalai.launch_from_torch(config=CONFIG, backend=args.backend) tokenizer = AutoTokenizer.from_pretrained("gpt2")
logger = get_dist_logger() dataset = datasets.load_dataset("glue", "mrpc")
train_dataloader = plugin.prepare_dataloader(
train_ds = WebtextDataset(os.environ['DATA'], seq_len=SEQ_LEN) dataset["train"],
train_dataloader = utils.get_dataloader(train_ds, batch_size=BATCH_SIZE,
seed=42, shuffle=True,
batch_size=BATCH_SIZE, drop_last=True,
pin_memory=True, collate_fn=partial(tokenize_batch, tokenizer=tokenizer, max_length=512),
shuffle=True, )
drop_last=True) ```
定义GPT-2模型。
use_interleaved = NUM_CHUNKS is not None ```python
num_chunks = 1 if not use_interleaved else NUM_CHUNKS cfg = AutoConfig.from_pretrained("gpt2", num_labels=2)
model = GPT2_exlarge_pipeline_hybrid(num_chunks=num_chunks, checkpoint=True, dtype=torch.half) model = GPT2ForSequenceClassification.from_pretrained("gpt2", config=cfg).cuda()
# model = GPT3_pipeline_hybrid(num_chunks=num_chunks, checkpoint=True, dtype=torch.half) ```
if use_interleaved and not isinstance(model, nn.ModuleList): 准备优化器
model = nn.ModuleList([model]) ```python
lr = LEARNING_RATE * coordinator.world_size
criterion = GPTLMLoss() no_decay = ["bias", "LayerNorm.weight"]
optimizer_grouped_parameters = [
optimizer = torch.optim.Adam(model.parameters(), lr=0.00015, weight_decay=1e-2,) {
"params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
engine, train_dataloader, _, _ = colossalai.initialize(model, "weight_decay": WEIGHT_DECAY,
optimizer, },
criterion, {
train_dataloader=train_dataloader) "params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],
global_batch_size = BATCH_SIZE * \ "weight_decay": 0.0,
gpc.get_world_size(ParallelMode.DATA) * getattr(gpc.config, "gradient_accumulation", 1) },
logger.info(f'Init done, global batch size = {global_batch_size}', ranks=[0]) ]
optimizer = HybridAdam(optimizer_grouped_parameters, lr=lr, eps=1e-8)
```
准备 `lr_scheduler``criterion`,需要注意的是,当混合并行使用了管道并行时,还需定义`criterion`函数。这个函数应该以模型前后向的输入和输出作为参数,并返回loss。
```python
# lr scheduler
total_steps = len(train_dataloader) * NUM_EPOCHS
num_warmup_steps = int(WARMUP_FRACTION * total_steps)
lr_scheduler = get_linear_schedule_with_warmup(
optimizer,
num_warmup_steps=num_warmup_steps,
num_training_steps=total_steps,
)
def _criterion(outputs, inputs):
return outputs.loss
```
## 增强GPT-2模型
使用 HybridParallelPlugin 定义一个 booster(增强器)。根据设置的插件参数,booster会将一种或者多种并行策略注入到模型中。该例子中使用了管道并行,zero1,及半精度训练等优化。
```python
booster = Booster(plugin=plugin)
```
使用定义的 booster 来增强这些组件。
```python
model, optimizer, _criterion, _, lr_scheduler = booster.boost(
model, optimizer, criterion=_criterion, lr_scheduler=lr_scheduler
)
```
timer = MultiTimer()
trainer = Trainer( ## 使用混合并行训练 GPT-2
engine=engine,
logger=logger,
timer=timer
)
hook_list = [ 在前面的教程中,我们已经解释了如何使用 Booster 和 HybridParallelPlugin 将各种并行特性注入到模型及其训练组件中。现在我们可以开始模型训练。
hooks.LossHook(), 定义一个训练函数。当使用了管道并行时,需要调用`booster.execute_pipeline`进行模型训练的阶段调度。
hooks.LogMetricByEpochHook(logger), ```python
hooks.ThroughputHook(), def train_epoch(
hooks.LogMetricByStepHook(), epoch: int,
] model: nn.Module,
optimizer: Optimizer,
_criterion: Callable,
lr_scheduler: LRScheduler,
train_dataloader: DataLoader,
booster: Booster,
coordinator: DistCoordinator,
):
use_pipeline = isinstance(booster.plugin, HybridParallelPlugin) and booster.plugin.pp_size > 1
is_pp_last_stage = use_pipeline and booster.plugin.stage_manager.is_last_stage()
print_flag = (not use_pipeline and coordinator.is_master()) or (use_pipeline and is_pp_last_stage)
total_step = len(train_dataloader)
model.train()
optimizer.zero_grad()
train_dataloader_iter = iter(train_dataloader)
with tqdm(
range(total_step),
desc=f"Epoch [{epoch + 1}/{NUM_EPOCHS}]",
disable=not print_flag,
) as pbar:
# Forward pass
for _ in pbar:
if use_pipeline:
outputs = booster.execute_pipeline(
train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True
)
# Backward and optimize
if is_pp_last_stage:
loss = outputs["loss"]
pbar.set_postfix({"loss": loss.item()})
else:
data = next(train_dataloader_iter)
data = move_to_cuda(data)
outputs = model(**data)
loss = _criterion(outputs, None)
# Backward
booster.backward(loss, optimizer)
pbar.set_postfix({"loss": loss.item()})
optimizer.step()
optimizer.zero_grad()
lr_scheduler.step()
trainer.fit(
train_dataloader=train_dataloader,
epochs=NUM_EPOCHS,
test_interval=1,
hooks=hook_list,
display_progress=True,
return_output_label=False,
)
``` ```
<!-- doc-test-command: echo --> 训练 GPT-2 模型。
```python
for epoch in range(NUM_EPOCHS):
train_epoch(epoch, model, optimizer, _criterion, lr_scheduler, train_dataloader, booster, coordinator)
```
<!-- doc-test-command: torchrun --standalone --nproc_per_node=1 train_gpt_using_hybrid_parallelism.py -->
\ No newline at end of file
# 使用流水并行训练 ViT
作者: Hongxin Liu, Yongbin Li
**示例代码**
- [ColossalAI-Examples Pipeline Parallel ViT](https://github.com/hpcaitech/ColossalAI-Examples/tree/main/image/vision_transformer/pipeline_parallel)
**相关论文**
- [Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM](https://arxiv.org/abs/2104.04473)
## 引言
在本教程中,你将学习如何使用流水并行从头开始训练用于图像分类的 Vision Transformer (ViT)。流水并行是一种模型并行,主要针对 GPU 内存不能满足模型容量的情况。
通过使用流水并行,我们将原始模型分割成多个阶段,每个阶段保留原始模型的一部分。我们假设你的 GPU 内存不能容纳 ViT/L-16,而你的内存可以容纳这个模型。
## 目录
在本教程中,我们将介绍:
1. 基于 [TIMM](https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/vision_transformer.py) 定义 ViT 模型
2. 处理数据集
3. 使用流水并行训练 ViT
## 导入依赖库
```python
import os
from collections import OrderedDict
from functools import partial
import colossalai
import colossalai.nn as col_nn
import torch
import torch.nn as nn
from colossalai.legacy.builder import build_pipeline_model
from colossalai.legacy.engine.schedule import (InterleavedPipelineSchedule,
PipelineSchedule)
from colossalai.logging import disable_existing_loggers, get_dist_logger
from colossalai.legacy.trainer import Trainer, hooks
from colossalai.utils import MultiTimer, get_dataloader
from timm.models import vision_transformer as vit
from torchvision import transforms
from torchvision.datasets import CIFAR10
```
## 定义 Vision Transformer 模型
总的来说, 我们提供3种方法来建立一个流水并行的模型:
1. `colossalai.legacy.builder.build_pipeline_model_from_cfg`
2. `colossalai.legacy.builder.build_pipeline_model`
3. 自己按阶段拆分模型
当你的内存能够容纳模型时,你可以使用前两种方法来建立你的模型,否则你必须自己分割模型。前两种方法首先在 CPU 上建立整个模型,然后分割模型,最后你可以直接把模型的相应部分移到 GPU 上。
`colossalai.legacy.builder.build_pipeline_model_from_cfg()` 接收一个模型的配置文件,它可以均匀地(按层)或平衡地(按参数大小)分割模型。
如果你熟悉 `PyTorch`, 你可以使用 `colossalai.legacy.builder.build_pipeline_model()` 它接收一个 `torch.nn.Sequential` 模型并按层均匀分割。
在本教程中,我们将修改 [TIMM/ViT](https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/vision_transformer.py) to `torch.nn.Sequential`,然后使用 `colossalai.legacy.builder.build_pipeline_model()` 来建立流水线模型。
当数据是 **一个** `Tensor`, 你可以使用你的模型 `forward()` 中的位置参数来获得数据张量。对于流水线的第一阶段,`forward()` 的第一个位置参数是从数据加载器加载的数据张量。对于其他阶段,`forward()` 的第一个位置参数是上一阶段的输出张量。注意,如果该阶段不是最后一个阶段,则 `forward()` 的返回必须是一个 `Tensor`
当数据是一个 `Tensor``dict`, 你可以使用你模型 `forward()` 的命名关键字参数来获得数据的 `dict`
```python
class ViTEmbedding(nn.Module):
def __init__(self, img_size=224, patch_size=16, in_chans=3, embed_dim=768, embed_layer=vit.PatchEmbed, drop_rate=0., distilled=False):
super().__init__()
self.embed_dim = embed_dim # num_features for consistency with other models
self.num_tokens = 2 if distilled else 1
self.patch_embed = embed_layer(
img_size=img_size, patch_size=patch_size, in_chans=in_chans, embed_dim=embed_dim)
num_patches = self.patch_embed.num_patches
self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
self.dist_token = nn.Parameter(torch.zeros(1, 1, embed_dim)) if distilled else None
self.pos_embed = nn.Parameter(torch.zeros(1, num_patches + self.num_tokens, embed_dim))
self.pos_drop = nn.Dropout(p=drop_rate)
self.init_weights()
def forward(self, x):
x = self.patch_embed(x)
cls_token = self.cls_token.expand(x.shape[0], -1, -1) # stole cls_tokens impl from Phil Wang, thanks
if self.dist_token is None:
x = torch.cat((cls_token, x), dim=1)
else:
x = torch.cat((cls_token, self.dist_token.expand(x.shape[0], -1, -1), x), dim=1)
x = self.pos_drop(x + self.pos_embed)
return x
def init_weights(self):
vit.trunc_normal_(self.pos_embed, std=.02)
if self.dist_token is not None:
vit.trunc_normal_(self.dist_token, std=.02)
vit.trunc_normal_(self.cls_token, std=.02)
self.apply(vit._init_vit_weights)
class ViTHead(nn.Module):
def __init__(self, embed_dim=768, num_classes=1000, norm_layer=None, distilled=False, representation_size=None):
super().__init__()
norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6)
self.norm = norm_layer(embed_dim)
self.num_classes = num_classes
self.distilled = distilled
self.num_features = embed_dim
# Representation layer
if representation_size and not distilled:
self.num_features = representation_size
self.pre_logits = nn.Sequential(OrderedDict([
('fc', nn.Linear(embed_dim, representation_size)),
('act', nn.Tanh())
]))
else:
self.pre_logits = nn.Identity()
# Classifier head(s)
self.head = nn.Linear(self.num_features, num_classes) if num_classes > 0 else nn.Identity()
self.head_dist = None
if distilled:
self.head_dist = nn.Linear(embed_dim, num_classes) if num_classes > 0 else nn.Identity()
self.init_weights()
def forward(self, x):
x = self.norm(x)
if self.distilled:
x, x_dist = self.head(x[:, 0]), self.head_dist(x[:, 1])
if self.training and not torch.jit.is_scripting():
# during inference, return the average of both classifier predictions
return x, x_dist
else:
return (x + x_dist) / 2
else:
x = self.pre_logits(x[:, 0])
x = self.head(x)
return x
def init_weights(self):
self.apply(vit._init_vit_weights)
def sequential_vit(img_size=224, patch_size=16, in_chans=3, num_classes=1000, embed_dim=768, depth=12,
num_heads=12, mlp_ratio=4., qkv_bias=True, representation_size=None, distilled=False,
drop_rate=0., attn_drop_rate=0., drop_path_rate=0., embed_layer=vit.PatchEmbed, norm_layer=None,
act_layer=None):
norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6)
act_layer = act_layer or nn.GELU
embedding = ViTEmbedding(img_size=img_size, patch_size=patch_size, in_chans=in_chans,
embed_dim=embed_dim, embed_layer=embed_layer, drop_rate=drop_rate, distilled=distilled)
dpr = [x.item() for x in torch.linspace(0, drop_path_rate, depth)] # stochastic depth decay rule
blocks = [vit.Block(
dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio, qkv_bias=qkv_bias, drop=drop_rate,
attn_drop=attn_drop_rate, drop_path=dpr[i], norm_layer=norm_layer, act_layer=act_layer)
for i in range(depth)]
for block in blocks:
block.apply(vit._init_vit_weights)
head = ViTHead(embed_dim=embed_dim, num_classes=num_classes, norm_layer=norm_layer,
distilled=distilled, representation_size=representation_size)
return nn.Sequential(embedding, *blocks, head)
def vit_large_patch16_224(**kwargs):
model_kwargs = dict(embed_dim=1024, depth=24, num_heads=16, **kwargs)
return sequential_vit(**model_kwargs)
```
## 处理数据集
一般来说, 我们在大型数据集如 ImageNet 上训练 ViT。为了简单期间,我们在这里只使用 CIFAR-10, 因为本教程只是用于流水并行训练。
```python
def build_cifar(batch_size):
transform_train = transforms.Compose([
transforms.RandomCrop(224, pad_if_needed=True),
transforms.AutoAugment(policy=transforms.AutoAugmentPolicy.CIFAR10),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
transform_test = transforms.Compose([
transforms.Resize(224),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
train_dataset = CIFAR10(root=os.environ['DATA'], train=True, download=True, transform=transform_train)
test_dataset = CIFAR10(root=os.environ['DATA'], train=False, transform=transform_test)
train_dataloader = get_dataloader(dataset=train_dataset, shuffle=True, batch_size=batch_size, pin_memory=True)
test_dataloader = get_dataloader(dataset=test_dataset, batch_size=batch_size, pin_memory=True)
return train_dataloader, test_dataloader
```
## 使用流水并行训练 ViT
你可以在配置文件中设置流水并行的大小。`NUM_CHUNKS` 在使用交错流水线时很有用 (更多细节见 [Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM](https://arxiv.org/abs/2104.04473) )。
原始 batch 将会被分割为 `num_microbatches`, 每个阶段每次将加载一个 micro batch。如果你确定性地知道每个阶段输出张量的形状,你可以在配置文件中设置 `tensor_shape` 来减少通信。
我们的仓库会自动为用户生成合适的schedule来支持流水并行训练。如果你不需要模型的输出和标签,你可以在调用 `trainer.fit()` 时,将 `return_output_label` 设置为 `False`,这样能进一步减少 GPU 显存使用。
你应当使用 `export DATA=/path/to/cifar`
```python
BATCH_SIZE = 16
NUM_EPOCHS = 60
NUM_CHUNKS = 1
CONFIG = dict(NUM_MICRO_BATCHES=4, parallel=dict(pipeline=2))
def train():
disable_existing_loggers()
parser = colossalai.get_default_parser()
args = parser.parse_args()
colossalai.launch_from_torch(backend=args.backend, config=CONFIG)
logger = get_dist_logger()
# build model
model = vit_large_patch16_224()
model = build_pipeline_model(model, num_chunks=NUM_CHUNKS, verbose=True)
# build criterion
criterion = nn.CrossEntropyLoss()
# optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=0)
# build dataloader
train_dataloader, test_dataloader = build_cifar(BATCH_SIZE)
engine, train_dataloader, test_dataloader, _ = colossalai.initialize(model, optimizer, criterion,
train_dataloader, test_dataloader)
timer = MultiTimer()
trainer = Trainer(engine=engine, timer=timer, logger=logger)
hook_list = [
hooks.LossHook(),
hooks.AccuracyHook(col_nn.metric.Accuracy()),
hooks.LogMetricByEpochHook(logger),
]
trainer.fit(train_dataloader=train_dataloader,
epochs=NUM_EPOCHS,
test_dataloader=test_dataloader,
test_interval=1,
hooks=hook_list,
display_progress=True)
```
<!-- doc-test-command: echo -->
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment