Unverified Commit ca4ae52d authored by Frank Lee's avatar Frank Lee Committed by GitHub
Browse files

Set examples as submodule (#162)

* remove examples folder

* added examples as submodule

* update .gitmodules
parent 17ce8569
#!/usr/bin/env sh
## phase 1: self-supervised training
python -m torch.distributed.launch --nproc_per_node 1 train_simclr.py
## phase 2: linear evaluation
python -m torch.distributed.launch --nproc_per_node 1 train_linear.py
\ No newline at end of file
from colossalai.nn.metric import Accuracy
import torch
import colossalai
from colossalai.core import global_context as gpc
from colossalai.logging import get_dist_logger
from colossalai.trainer import Trainer, hooks
from colossalai.utils import get_dataloader, MultiTimer
from colossalai.nn.lr_scheduler import CosineAnnealingWarmupLR
from torchvision.datasets import CIFAR10
from myhooks import TotalBatchsizeHook
from models.linear_eval import Linear_eval
from augmentation import LeTransform
def build_dataset_train():
augment = LeTransform()
train_dataset = CIFAR10(root=gpc.config.dataset.root,
transform=augment,
train=True)
return get_dataloader(
dataset=train_dataset,
shuffle=True,
num_workers = 1,
batch_size=gpc.config.BATCH_SIZE,
pin_memory=True,
)
def build_dataset_test():
augment = LeTransform()
val_dataset = CIFAR10(root=gpc.config.dataset.root,
transform=augment,
train=False)
return get_dataloader(
dataset=val_dataset,
add_sampler=False,
num_workers = 1,
batch_size=gpc.config.BATCH_SIZE,
pin_memory=True,
)
def main():
colossalai.launch_from_torch(config='./le_config.py')
# get logger
logger = get_dist_logger()
## build model
model = Linear_eval(model='resnet18', class_num=10)
# build dataloader
train_dataloader = build_dataset_train()
test_dataloader = build_dataset_test()
# build loss
criterion = torch.nn.CrossEntropyLoss()
# build optimizer
optimizer = colossalai.nn.FusedSGD(model.parameters(), lr=gpc.config.LEARNING_RATE, weight_decay=gpc.config.WEIGHT_DECAY, momentum=gpc.config.MOMENTUM)
# lr_scheduelr
lr_scheduler = CosineAnnealingWarmupLR(optimizer, warmup_steps=5, total_steps=gpc.config.NUM_EPOCHS)
engine, train_dataloader, test_dataloader, _ = colossalai.initialize(
model, optimizer, criterion, train_dataloader, test_dataloader
)
logger.info("initialized colossalai components", ranks=[0])
## Load trained self-supervised SimCLR model
engine.model.load_state_dict(torch.load(f'./ckpt/{gpc.config.LOG_NAME}/epoch{gpc.config.EPOCH}-tp0-pp0.pt')['model'], strict=False)
logger.info("pretrained model loaded", ranks=[0])
# build a timer to measure time
timer = MultiTimer()
# build trainer
trainer = Trainer(engine=engine, logger=logger, timer=timer)
# build hooks
hook_list = [
hooks.LossHook(),
hooks.AccuracyHook(accuracy_func=Accuracy()),
hooks.LogMetricByEpochHook(logger),
hooks.LRSchedulerHook(lr_scheduler, by_epoch=True),
TotalBatchsizeHook(),
# comment if you do not need to use the hooks below
hooks.SaveCheckpointHook(interval=5, checkpoint_dir=f'./ckpt/{gpc.config.LOG_NAME}-eval'),
hooks.TensorboardHook(log_dir=f'./tb_logs/{gpc.config.LOG_NAME}-eval', ranks=[0]),
]
# start training
trainer.fit(
train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
epochs=gpc.config.NUM_EPOCHS,
hooks=hook_list,
display_progress=True,
test_interval=1
)
if __name__ == '__main__':
main()
\ No newline at end of file
import colossalai
from colossalai.core import global_context as gpc
from colossalai.logging import get_dist_logger
from colossalai.trainer import Trainer, hooks
from colossalai.utils import get_dataloader, MultiTimer
from colossalai.nn.lr_scheduler import CosineAnnealingWarmupLR
from torchvision.datasets import CIFAR10
from NT_Xentloss import NT_Xentloss
from myhooks import TotalBatchsizeHook
from models.simclr import SimCLR
from augmentation import SimCLRTransform
def build_dataset_train():
augment = SimCLRTransform()
train_dataset = CIFAR10(root=gpc.config.dataset.root,
transform=augment,
train=True,
download=True)
return get_dataloader(
dataset=train_dataset,
shuffle=True,
num_workers = 1,
batch_size=gpc.config.BATCH_SIZE,
pin_memory=True,
)
def build_dataset_test():
augment = SimCLRTransform()
val_dataset = CIFAR10(root=gpc.config.dataset.root,
transform=augment,
train=False)
return get_dataloader(
dataset=val_dataset,
add_sampler=False,
num_workers = 1,
batch_size=gpc.config.BATCH_SIZE,
pin_memory=True,
)
def main():
colossalai.launch_from_torch(config='./config.py')
# get logger
logger = get_dist_logger()
## build model
model = SimCLR(model='resnet18')
# build dataloader
train_dataloader = build_dataset_train()
test_dataloader = build_dataset_test()
# build loss
criterion = NT_Xentloss()
# build optimizer
optimizer = colossalai.nn.FusedSGD(model.parameters(), lr=gpc.config.LEARNING_RATE, weight_decay=gpc.config.WEIGHT_DECAY, momentum=gpc.config.MOMENTUM)
# lr_scheduelr
lr_scheduler = CosineAnnealingWarmupLR(optimizer, warmup_steps=10, total_steps=gpc.config.NUM_EPOCHS)
engine, train_dataloader, test_dataloader, _ = colossalai.initialize(
model, optimizer, criterion, train_dataloader, test_dataloader
)
logger.info("initialized colossalai components", ranks=[0])
# build a timer to measure time
timer = MultiTimer()
# build trainer
trainer = Trainer(engine=engine, logger=logger, timer=timer)
# build hooks
hook_list = [
hooks.LossHook(),
hooks.LogMetricByEpochHook(logger),
hooks.LRSchedulerHook(lr_scheduler, by_epoch=True),
TotalBatchsizeHook(),
# comment if you do not need to use the hooks below
hooks.SaveCheckpointHook(interval=50, checkpoint_dir=f'./ckpt/{gpc.config.LOG_NAME}'),
hooks.TensorboardHook(log_dir=f'./tb_logs/{gpc.config.LOG_NAME}', ranks=[0]),
]
# start training
trainer.fit(
train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
epochs=gpc.config.NUM_EPOCHS,
hooks=hook_list,
display_progress=True,
test_interval=1
)
if __name__ == '__main__':
main()
import torch
import numpy as np
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
from models.simclr import SimCLR
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
from torchvision import transforms
log_name = 'cifar-simclr'
epoch = 800
fea_flag = True
tsne_flag = True
plot_flag = True
if fea_flag:
path = f'ckpt/{log_name}/epoch{epoch}-tp0-pp0.pt'
net = SimCLR('resnet18').cuda()
print(net.load_state_dict(torch.load(path)['model']))
transform_eval = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010])
])
train_dataset = CIFAR10(root='./dataset', train=True, transform=transform_eval)
train_dataloader = DataLoader(train_dataset, batch_size=256, shuffle=False, num_workers=4)
test_dataset = CIFAR10(root='./dataset', train=False, transform=transform_eval)
test_dataloader = DataLoader(test_dataset, batch_size=256, shuffle=False, num_workers=4)
def feature_extractor(model, loader):
model.eval()
all_fea = []
all_targets = []
for img, target in loader:
img = img.cuda()
fea = model.backbone(img)
all_fea.append(fea.detach().cpu())
all_targets.append(target)
all_fea = torch.cat(all_fea)
all_targets = torch.cat(all_targets)
return all_fea.numpy(), all_targets.numpy()
if tsne_flag:
train_fea, train_targets = feature_extractor(net, train_dataloader)
train_embedded = TSNE(n_components=2).fit_transform(train_fea)
test_fea, test_targets = feature_extractor(net, test_dataloader)
test_embedded = TSNE(n_components=2).fit_transform(test_fea)
np.savez('results/embedding.npz', train_embedded=train_embedded, train_targets=train_targets, test_embedded=test_embedded, test_targets=test_targets)
if plot_flag:
npz = np.load('embedding.npz')
train_embedded = npz['train_embedded']
train_targets = npz['train_targets']
test_embedded = npz['test_embedded']
test_targets = npz['test_targets']
plt.figure(figsize=(16,16))
for i in range(len(np.unique(train_targets))):
plt.scatter(train_embedded[train_targets==i,0], train_embedded[train_targets==i,1], label=i)
plt.title('train')
plt.legend()
plt.savefig('results/train_tsne.png')
plt.figure(figsize=(16,16))
for i in range(len(np.unique(test_targets))):
plt.scatter(test_embedded[test_targets==i,0], test_embedded[test_targets==i,1], label=i)
plt.title('test')
plt.legend()
plt.savefig('results/test_tsne.png')
# Overview
A common way to speed up AI model training is to implement large-batch training with the help of data parallelism, but this requires expensive supercomputer clusters. In this example, we used a small server with only 4 GPUs to reproduce the large-scale pre-training of Vision Transformer (ViT) on ImageNet-1K in 14 hours.
# How to run
On a single server, you can directly use torch.distributed to start pre-training on multiple GPUs in parallel. In Colossal-AI, we provided several launch methods to init the distributed backend. You can use `colossalai.launch` and `colossalai.get_default_parser` to pass the parameters via command line. If you happen to use launchers such as SLURM, OpenMPI and PyTorch launch utility, you can use `colossalai.launch_from_<torch/slurm/openmpi>` to read rank and world size from the environment variables directly for convenience. In this example, we use `launch_from_slurm` for demo purpose. You can check out more information about SLURM [here](https://slurm.schedmd.com/documentation.html).
```shell
HOST=<node name> srun bash ./scripts/train_slurm.sh
```
---
If you are using `colossalai.launch`, do this:
In your training script:
```python
# initialize distributed setting
parser = colossalai.get_default_parser()
args = parser.parse_args()
colossalai.launch(config=args.config,
rank=args.rank,
world_size=args.world_size,
host=args.host,
port=args.port,
backend=args.backend
)
```
In your terminal:
```shell
<some_launcher> python train.py --config ./config.py --rank <rank> --world_size <world_size> --host <node name> --port 29500
```
---
If you are using `colossalai.launch_from_torch`, do this:
In your training script:
```python
# initialize distributed setting
parser = colossalai.get_default_parser()
args = parser.parse_args()
colossalai.launch_from_torch(config=args.config)
```
In your terminal
```shell
python -m torch.distributed.launch --nproc_per_node <world_size> train.py --config ./config.py --host <node name> --port 29500
```
# Experiments
To facilitate more people to reproduce the experiments with large-scale data parallel, we pre-trained ViT-Base/32 in only 14.58 hours on a small server with 4 NVIDIA A100 GPUs using ImageNet-1K dataset with batch size 32K for 300 epochs maintaining accuracy. For more complex pre-training of ViT-Base/16 and ViT-Large/32, it also takes only 78.58 hours and 37.83 hours to complete. Since the server used in this example is not a standard NVIDIA DGX A100 supercomputing unit, perhaps a better acceleration can be obtained on more professional hardware.
![Loss Curve](./results/loss.jpeg)
![Accuracy](./results/acc.jpeg)
As can be seen from the above figure, the ViT model eventually converges well after training 300 epochs. It is worth noting that, unlike the common small-batch training convergence process, the model performance has a temporary decline in the middle of the large-batch training process. This is due to the difficulty of convergence in large-batch training. As the number of iterations is reduced, a larger learning rate is needed to ensure the final convergence. Since we did not carefully adjust the parameters, perhaps other parameter settings could get better convergence.
# Details
`config.py`
This is a [configuration file](https://colossalai.org/config.html) that defines hyperparameters and trainign scheme (fp16, gradient accumulation, etc.). The config content can be accessed through `gpc.config` in the program.
In this example, we trained ViT-Base/16 for 300 epochs on the ImageNet-1K dataset. The batch size is expanded to 32K through data parallelism. Since only 4 A100 GPUs on one small server are used, and the GPU memory is limited, the batch size of 32K cannot be used directly. Therefore, the batch size used on each GPU is only 256, and the 256 batch size is equivalently expanded to 8K through gradient accumulation 32 times. Finally, data parallelism is used between 4 GPUs to achieve an equivalent batch size of 32K.
Since the batch size of 32K far exceeds the use range of common optimizers and is difficult to train, we use the large-batch optimizer [LAMB](https://arxiv.org/abs/1904.00962) provided by Colossal-AI to achieve a better convergence. The learning rate and weight decay of [LAMB](https://arxiv.org/abs/1904.00962) are set to 1.8e-2 and 0.1, respectively. The learning rate scheduler uses a linear warmup strategy of 150 epochs. We also used FP16 mixed precision to speed up the training process, and introduced gradient clipping to help convergence. For simplicity and speed, we only use [Mixup](https://arxiv.org/abs/1710.09412) instead of `RandAug` in data augmentation.
By tuning the parallelism, this example can be quickly deployed to a single server with several GPUs or to a large cluster with lots of nodes and GPUs. If there are enough computing resources to allow data parallel to be directly extended to hundreds or even thousands of GPUs, the training process of several days on a single A100 GPU can be shortened to less than half an hour.
`imagenet_dali_dataloader.py`
To accelerate the training process, we use [DALI](https://github.com/NVIDIA/DALI) to read data and require the dataset to be in TFRecord format, which avoids directly reading a large number of raw image files and being limited by the efficiency of the file system.
`train.py`
We call DALI in this file to read data and start the training process using Colossal-AI.
`mixup.py`
Since Mixup is used as data augmentation, we define the loss function of Mixup here.
`myhooks.py`
We define hook functions that record running information to help debugging.
# How to build TFRecords dataset
As we use [DALI](https://github.com/NVIDIA/DALI) to read data, we use the TFRecords dataset instead of raw Imagenet dataset. If you don't have TFRecords dataset, follow [imagenet-tools](https://github.com/ver217/imagenet-tools) to build one.
\ No newline at end of file
from colossalai.amp import AMP_TYPE
# ViT Base
BATCH_SIZE = 256
DROP_RATE = 0.1
NUM_EPOCHS = 300
fp16 = dict(
mode=AMP_TYPE.TORCH,
)
gradient_accumulation = 16
clip_grad_norm = 1.0
dali = dict(
# root='./dataset/ILSVRC2012_1k',
root='/project/scratch/p200012/dataset/ILSVRC2012_1k',
gpu_aug=True,
mixup_alpha=0.2
)
from nvidia.dali.pipeline import Pipeline
from nvidia.dali.plugin.pytorch import DALIClassificationIterator, LastBatchPolicy
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import nvidia.dali.tfrecord as tfrec
import torch
import numpy as np
from .rand_augment import RandAugment
class DaliDataloader(DALIClassificationIterator):
def __init__(self,
tfrec_filenames,
tfrec_idx_filenames,
shard_id=0,
num_shards=1,
batch_size=128,
num_threads=4,
resize=256,
crop=224,
prefetch=2,
training=True,
gpu_aug=False,
cuda=True,
mixup_alpha=0.0,
randaug_magnitude=10,
randaug_num_layers=0):
self.mixup_alpha = mixup_alpha
self.training = training
self.randaug_magnitude = randaug_magnitude
self.randaug_num_layers = randaug_num_layers
pipe = Pipeline(batch_size=batch_size,
num_threads=num_threads,
device_id=torch.cuda.current_device() if cuda else None,
seed=42)
with pipe:
inputs = fn.readers.tfrecord(
path=tfrec_filenames,
index_path=tfrec_idx_filenames,
random_shuffle=training,
shard_id=shard_id,
num_shards=num_shards,
initial_fill=10000,
read_ahead=True,
prefetch_queue_depth=prefetch,
name='Reader',
features={
'image/encoded': tfrec.FixedLenFeature((), tfrec.string, ""),
'image/class/label': tfrec.FixedLenFeature([1], tfrec.int64, -1),
})
images = inputs["image/encoded"]
images = fn.decoders.image(images,
device='mixed' if gpu_aug else 'cpu',
output_type=types.RGB)
if training:
images = fn.random_resized_crop(images,
size=crop,
device='gpu' if gpu_aug else 'cpu')
if randaug_num_layers == 0:
flip_lr = fn.random.coin_flip(probability=0.5)
images = fn.flip(images, horizontal=flip_lr)
else:
images = fn.resize(images,
device='gpu' if gpu_aug else 'cpu',
resize_x=resize,
resize_y=resize,
dtype=types.FLOAT,
interp_type=types.INTERP_TRIANGULAR)
images = fn.crop(images,
dtype=types.FLOAT,
crop=(crop, crop))
label = inputs["image/class/label"] - 1 # 0-999
if cuda: # transfer data to gpu
pipe.set_outputs(images.gpu(), label.gpu())
else:
pipe.set_outputs(images, label)
pipe.build()
last_batch_policy = 'DROP' if training else 'PARTIAL'
super().__init__(pipe, reader_name="Reader",
auto_reset=True,
last_batch_policy=last_batch_policy)
def __iter__(self):
# if not reset (after an epoch), reset; if just initialize, ignore
if self._counter >= self._size or self._size < 0:
self.reset()
return self
def __next__(self):
data = super().__next__()
img, label = data[0]['data'], data[0]['label']
img = img.permute(0, 3, 1, 2)
if self.randaug_num_layers > 0 and self.training:
img = RandAugment(img, num_layers=self.randaug_num_layers, magnitude=self.randaug_magnitude)
img = (img - 127.5) / 127.5
label = label.squeeze()
if self.mixup_alpha > 0.0:
if self.training:
lam = np.random.beta(self.mixup_alpha, self.mixup_alpha)
idx = torch.randperm(img.size(0)).to(img.device)
img = lam * img + (1 - lam) * img[idx, :]
label_a, label_b = label, label[idx]
lam = torch.tensor([lam], device=img.device, dtype=img.dtype)
label = {'targets_a': label_a, 'targets_b': label_b, 'lam': lam}
else:
label = {'targets_a': label, 'targets_b': label, 'lam': torch.ones(
1, device=img.device, dtype=img.dtype)}
return img, label
return img, label
import torch
import numpy as np
import torchvision.transforms.functional as TF
_MAX_LEVEL = 10
_HPARAMS = {
'cutout_const': 40,
'translate_const': 40,
}
_FILL = tuple([128, 128, 128])
# RGB
def blend(image0, image1, factor):
# blend image0 with image1
# we only use this function in the 'color' function
if factor == 0.0:
return image0
if factor == 1.0:
return image1
image0 = image0.type(torch.float32)
image1 = image1.type(torch.float32)
scaled = (image1 - image0) * factor
image = image0 + scaled
if factor > 0.0 and factor < 1.0:
return image.type(torch.uint8)
image = torch.clamp(image, 0, 255).type(torch.uint8)
return image
def autocontrast(image):
image = TF.autocontrast(image)
return image
def equalize(image):
image = TF.equalize(image)
return image
def rotate(image, degree, fill=_FILL):
image = TF.rotate(image, angle=degree, fill=fill)
return image
def posterize(image, bits):
image = TF.posterize(image, bits)
return image
def sharpness(image, factor):
image = TF.adjust_sharpness(image, sharpness_factor=factor)
return image
def contrast(image, factor):
image = TF.adjust_contrast(image, factor)
return image
def brightness(image, factor):
image = TF.adjust_brightness(image, factor)
return image
def invert(image):
return 255-image
def solarize(image, threshold=128):
return torch.where(image < threshold, image, 255-image)
def solarize_add(image, addition=0, threshold=128):
add_image = image.long() + addition
add_image = torch.clamp(add_image, 0, 255).type(torch.uint8)
return torch.where(image < threshold, add_image, image)
def color(image, factor):
new_image = TF.rgb_to_grayscale(image, num_output_channels=3)
return blend(new_image, image, factor=factor)
def shear_x(image, level, fill=_FILL):
image = TF.affine(image, 0, [0, 0], 1.0, [level, 0], fill=fill)
return image
def shear_y(image, level, fill=_FILL):
image = TF.affine(image, 0, [0, 0], 1.0, [0, level], fill=fill)
return image
def translate_x(image, level, fill=_FILL):
image = TF.affine(image, 0, [level, 0], 1.0, [0, 0], fill=fill)
return image
def translate_y(image, level, fill=_FILL):
image = TF.affine(image, 0, [0, level], 1.0, [0, 0], fill=fill)
return image
def cutout(image, pad_size, fill=_FILL):
b, c, h, w = image.shape
mask = torch.ones((b, c, h, w), dtype=torch.uint8).cuda()
y = np.random.randint(pad_size, h-pad_size)
x = np.random.randint(pad_size, w-pad_size)
for i in range(c):
mask[:, i, (y-pad_size): (y+pad_size), (x-pad_size): (x+pad_size)] = fill[i]
image = torch.where(mask == 1, image, mask)
return image
def _randomly_negate_tensor(level):
# With 50% prob turn the tensor negative.
flip = np.random.randint(0, 2)
final_level = -level if flip else level
return final_level
def _rotate_level_to_arg(level):
level = (level/_MAX_LEVEL) * 30.
level = _randomly_negate_tensor(level)
return level
def _shear_level_to_arg(level):
level = (level/_MAX_LEVEL) * 0.3
# Flip level to negative with 50% chance.
level = _randomly_negate_tensor(level)
return level
def _translate_level_to_arg(level, translate_const):
level = (level/_MAX_LEVEL) * float(translate_const)
# Flip level to negative with 50% chance.
level = _randomly_negate_tensor(level)
return level
def level(hparams):
return {
'AutoContrast': lambda level: None,
'Equalize': lambda level: None,
'Invert': lambda level: None,
'Rotate': _rotate_level_to_arg,
'Posterize': lambda level: (int((level/_MAX_LEVEL) * 4)),
'Solarize': lambda level: (int((level/_MAX_LEVEL) * 200)),
'SolarizeAdd': lambda level: (int((level/_MAX_LEVEL) * 110)),
'Color': lambda level: ((level/_MAX_LEVEL) * 1.8 + 0.1),
'Contrast': lambda level: ((level/_MAX_LEVEL) * 1.8 + 0.1),
'Brightness': lambda level: ((level/_MAX_LEVEL) * 1.8 + 0.1),
'Sharpness': lambda level: ((level/_MAX_LEVEL) * 1.8 + 0.1),
'ShearX': _shear_level_to_arg,
'ShearY': _shear_level_to_arg,
'Cutout': lambda level: (int((level/_MAX_LEVEL) * hparams['cutout_const'])),
'TranslateX': lambda level: _translate_level_to_arg(level, hparams['translate_const']),
'TranslateY': lambda level: _translate_level_to_arg(level, hparams['translate_const']),
}
AUGMENTS = {
'AutoContrast': autocontrast,
'Equalize': equalize,
'Invert': invert,
'Rotate': rotate,
'Posterize': posterize,
'Solarize': solarize,
'SolarizeAdd': solarize_add,
'Color': color,
'Contrast': contrast,
'Brightness': brightness,
'Sharpness': sharpness,
'ShearX': shear_x,
'ShearY': shear_y,
'TranslateX': translate_x,
'TranslateY': translate_y,
'Cutout': cutout,
}
def RandAugment(image, num_layers=2, magnitude=_MAX_LEVEL, augments=AUGMENTS):
"""Random Augment for images, followed google randaug and the paper(https://arxiv.org/abs/2106.10270)
:param image: the input image, in tensor format with shape of C, H, W
:type image: uint8 Tensor
:num_layers: how many layers will the randaug do, default=2
:type num_layers: int
:param magnitude: the magnitude of random augment, default=10
:type magnitude: int
"""
if np.random.random() < 0.5:
return image
Choice_Augment = np.random.choice(a=list(augments.keys()),
size=num_layers,
replace=False)
magnitude = float(magnitude)
for i in range(num_layers):
arg = level(_HPARAMS)[Choice_Augment[i]](magnitude)
if arg is None:
image = augments[Choice_Augment[i]](image)
else:
image = augments[Choice_Augment[i]](image, arg)
return image
import torch.nn as nn
from colossalai.registry import LOSSES
import torch
@LOSSES.register_module
class MixupLoss(nn.Module):
def __init__(self, loss_fn_cls):
super().__init__()
self.loss_fn = loss_fn_cls()
def forward(self, inputs, targets_a, targets_b, lam):
return lam * self.loss_fn(inputs, targets_a) + (1 - lam) * self.loss_fn(inputs, targets_b)
class MixupAccuracy(nn.Module):
def forward(self, logits, targets):
targets = targets['targets_a']
preds = torch.argmax(logits, dim=-1)
correct = torch.sum(targets == preds)
return correct
from colossalai.trainer.hooks import BaseHook
from colossalai.core import global_context as gpc
from colossalai.context import ParallelMode
from colossalai.logging import get_dist_logger
class TotalBatchsizeHook(BaseHook):
def __init__(self, priority: int = 2) -> None:
super().__init__(priority)
self.logger = get_dist_logger()
def before_train(self, trainer):
total_batch_size = gpc.config.BATCH_SIZE * \
gpc.config.gradient_accumulation * gpc.get_world_size(ParallelMode.DATA)
self.logger.info(f'Total batch size = {total_batch_size}', ranks=[0])
#!/usr/bin/env bash
python train.py --host $HOST --config ./config.py --port 29500
\ No newline at end of file
import glob
from math import log
import os
import colossalai
from colossalai.nn.metric import Accuracy
import torch
from colossalai.context import ParallelMode
from colossalai.core import global_context as gpc
from colossalai.logging import get_dist_logger
from colossalai.trainer import Trainer, hooks
from colossalai.nn.lr_scheduler import LinearWarmupLR
from dataloader.imagenet_dali_dataloader import DaliDataloader
from mixup import MixupLoss, MixupAccuracy
from timm.models import vit_base_patch16_224
from myhooks import TotalBatchsizeHook
def build_dali_train():
root = gpc.config.dali.root
train_pat = os.path.join(root, 'train/*')
train_idx_pat = os.path.join(root, 'idx_files/train/*')
return DaliDataloader(
sorted(glob.glob(train_pat)),
sorted(glob.glob(train_idx_pat)),
batch_size=gpc.config.BATCH_SIZE,
shard_id=gpc.get_local_rank(ParallelMode.DATA),
num_shards=gpc.get_world_size(ParallelMode.DATA),
gpu_aug=gpc.config.dali.gpu_aug,
cuda=True,
mixup_alpha=gpc.config.dali.mixup_alpha,
randaug_num_layers=2
)
def build_dali_test():
root = gpc.config.dali.root
val_pat = os.path.join(root, 'validation/*')
val_idx_pat = os.path.join(root, 'idx_files/validation/*')
return DaliDataloader(
sorted(glob.glob(val_pat)),
sorted(glob.glob(val_idx_pat)),
batch_size=gpc.config.BATCH_SIZE,
shard_id=gpc.get_local_rank(ParallelMode.DATA),
num_shards=gpc.get_world_size(ParallelMode.DATA),
training=False,
# gpu_aug=gpc.config.dali.gpu_aug,
gpu_aug=False,
cuda=True,
mixup_alpha=gpc.config.dali.mixup_alpha
)
def main():
# initialize distributed setting
parser = colossalai.get_default_parser()
args = parser.parse_args()
# launch from slurm batch job
colossalai.launch_from_slurm(config=args.config,
host=args.host,
port=args.port,
backend=args.backend
)
# launch from torch
# colossalai.launch_from_torch(config=args.config)
# get logger
logger = get_dist_logger()
logger.info("initialized distributed environment", ranks=[0])
# build model
model = vit_base_patch16_224(drop_rate=0.1)
# build dataloader
train_dataloader = build_dali_train()
test_dataloader = build_dali_test()
# build optimizer
optimizer = colossalai.nn.Lamb(model.parameters(), lr=1.8e-2, weight_decay=0.1)
# build loss
criterion = MixupLoss(loss_fn_cls=torch.nn.CrossEntropyLoss)
# lr_scheduelr
lr_scheduler = LinearWarmupLR(optimizer, warmup_steps=50, total_steps=gpc.config.NUM_EPOCHS)
engine, train_dataloader, test_dataloader, _ = colossalai.initialize(
model, optimizer, criterion, train_dataloader, test_dataloader
)
logger.info("initialized colossalai components", ranks=[0])
# build trainer
trainer = Trainer(engine=engine, logger=logger)
# build hooks
hook_list = [
hooks.LossHook(),
hooks.AccuracyHook(accuracy_func=MixupAccuracy()),
hooks.LogMetricByEpochHook(logger),
hooks.LRSchedulerHook(lr_scheduler, by_epoch=True),
TotalBatchsizeHook(),
# comment if you do not need to use the hooks below
hooks.SaveCheckpointHook(interval=1, checkpoint_dir='./ckpt'),
hooks.TensorboardHook(log_dir='./tb_logs', ranks=[0]),
]
# start training
trainer.fit(
train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
epochs=gpc.config.NUM_EPOCHS,
hooks=hook_list,
display_progress=True,
test_interval=1
)
if __name__ == '__main__':
main()
# Overview
MoE is a new technique to enlarge neural networks while keeping the same throughput in our training.
It is designed to improve the performance of our models without any additional time penalty. But now using
our temporary moe parallelism will cause a moderate computation overhead and additoinal communication time.
The communication time depends on the topology of network in running environment. At present, moe parallelism
may not meet what you want. Optimized version of moe parallelism will come soon.
This is a simple example about how to run widenet-tiny on cifar10. More information about widenet can be
found [here](https://arxiv.org/abs/2107.11817).
# How to run
On a single server, you can directly use torchrun to start pre-training on multiple GPUs in parallel.
If you use the script here to train, just use follow instruction in your terminal. `n_proc` is the
number of processes which commonly equals to the number GPUs.
```shell
torchrun --nnodes=1 --nproc_per_node=4 train.py \
--config ./config.py
```
If you want to use multi servers, please check our document about environment initialization.
Make sure to initialize moe running environment by `moe_set_seed` before building the model.
# Result
The result of training widenet-tiny on cifar10 from scratch is 89.93%. Since moe makes the model larger
than other vit-tiny models, mixup and rand augmentation is needed.
\ No newline at end of file
BATCH_SIZE = 512
LEARNING_RATE = 2e-3
WEIGHT_DECAY = 3e-2
NUM_EPOCHS = 200
WARMUP_EPOCHS = 40
WORLD_SIZE = 4
MOE_MODEL_PARALLEL_SIZE = 4
parallel = dict(
moe=dict(size=MOE_MODEL_PARALLEL_SIZE)
)
LOG_PATH = f"./cifar10_moe"
import os
import colossalai
import torch
import torchvision
from torchvision import transforms
from colossalai.core import global_context as gpc
from colossalai.logging import get_dist_logger
from colossalai.nn import Accuracy
from colossalai.nn.lr_scheduler import CosineAnnealingWarmupLR
from colossalai.trainer import Trainer
from colossalai.trainer.hooks import (AccuracyHook, LogMemoryByEpochHook,
LogMetricByEpochHook,
LogMetricByStepHook,
LogTimingByEpochHook, LossHook,
LRSchedulerHook, ThroughputHook)
from colossalai.utils import MultiTimer, get_dataloader
from colossalai.nn.loss import MoeCrossEntropyLoss
from model_zoo.moe.models import Widenet
from colossalai.context.random import moe_set_seed
DATASET_PATH = str(os.environ['DATA']) # The directory of your dataset
def build_cifar(batch_size):
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.AutoAugment(policy=transforms.AutoAugmentPolicy.CIFAR10),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
transform_test = transforms.Compose([
transforms.Resize(32),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
train_dataset = torchvision.datasets.CIFAR10(root=DATASET_PATH,
train=True,
download=True,
transform=transform_train)
test_dataset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=False, transform=transform_test)
train_dataloader = get_dataloader(dataset=train_dataset,
shuffle=True,
batch_size=batch_size,
num_workers=4,
pin_memory=True)
test_dataloader = get_dataloader(dataset=test_dataset, batch_size=batch_size, num_workers=4, pin_memory=True)
return train_dataloader, test_dataloader
def train_cifar():
args = colossalai.get_default_parser().parse_args()
colossalai.launch_from_torch(config=args.config)
logger = get_dist_logger()
if hasattr(gpc.config, 'LOG_PATH'):
if gpc.get_global_rank() == 0:
log_path = gpc.config.LOG_PATH
if not os.path.exists(log_path):
os.mkdir(log_path)
logger.log_to_file(log_path)
moe_set_seed(42)
model = Widenet(
num_experts=4,
capacity_factor=1.2,
img_size=32,
patch_size=4,
num_classes=10,
depth=6,
d_model=512,
num_heads=2,
d_kv=128,
d_ff=2048
)
train_dataloader, test_dataloader = build_cifar(gpc.config.BATCH_SIZE // gpc.data_parallel_size)
criterion = MoeCrossEntropyLoss(aux_weight=0.01, label_smoothing=0.1)
optimizer = torch.optim.AdamW(model.parameters(), lr=gpc.config.LEARNING_RATE,
weight_decay=gpc.config.WEIGHT_DECAY)
lr_scheduler = CosineAnnealingWarmupLR(optimizer=optimizer,
total_steps=gpc.config.NUM_EPOCHS,
warmup_steps=gpc.config.WARMUP_EPOCHS)
engine, train_dataloader, test_dataloader, lr_scheduler = colossalai.initialize(model=model,
optimizer=optimizer,
criterion=criterion,
train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
lr_scheduler=lr_scheduler)
logger.info("Engine is built", ranks=[0])
timer = MultiTimer()
trainer = Trainer(engine=engine, logger=logger, timer=timer)
logger.info("Trainer is built", ranks=[0])
hooks = [
LogMetricByEpochHook(logger=logger),
LogMetricByStepHook(),
AccuracyHook(accuracy_func=Accuracy()),
LossHook(),
ThroughputHook(),
LRSchedulerHook(lr_scheduler=lr_scheduler, by_epoch=True)
]
logger.info("Train start", ranks=[0])
trainer.fit(train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
epochs=gpc.config.NUM_EPOCHS,
hooks=hooks,
display_progress=True,
test_interval=1)
if __name__ == '__main__':
train_cifar()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment