Unverified Commit da01c234 authored by Frank Lee's avatar Frank Lee Committed by GitHub
Browse files

Develop/experiments (#59)



* Add gradient accumulation, fix lr scheduler

* fix FP16 optimizer and adapted torch amp with tensor parallel (#18)

* fixed bugs in compatibility between torch amp and tensor parallel and performed some minor fixes

* fixed trainer

* Revert "fixed trainer"

This reverts commit 2e0b0b76990e8d4e337add483d878c0f61cf5097.

* improved consistency between trainer, engine and schedule (#23)
Co-authored-by: default avatar1SAA <c2h214748@gmail.com>

* Split conv2d, class token, positional embedding in 2d, Fix random number in ddp
Fix convergence in cifar10, Imagenet1000

* Integrate 1d tensor parallel in Colossal-AI (#39)

* fixed 1D and 2D convergence (#38)

* optimized 2D operations

* fixed 1D ViT convergence problem

* Feature/ddp (#49)

* remove redundancy func in setup (#19) (#20)

* use env to control the language of doc (#24) (#25)

* Support TP-compatible Torch AMP and Update trainer API (#27)

* Add gradient accumulation, fix lr scheduler

* fix FP16 optimizer and adapted torch amp with tensor parallel (#18)

* fixed bugs in compatibility between torch amp and tensor parallel and performed some minor fixes

* fixed trainer

* Revert "fixed trainer"

This reverts commit 2e0b0b76990e8d4e337add483d878c0f61cf5097.

* improved consistency between trainer, engine and schedule (#23)
Co-authored-by: default avatar1SAA <c2h214748@gmail.com>
Co-authored-by: default avatar1SAA <c2h214748@gmail.com>
Co-authored-by: default avatarver217 <lhx0217@gmail.com>

* add an example of ViT-B/16 and remove w_norm clipping in LAMB (#29)

* add explanation for ViT example (#35) (#36)

* support torch ddp

* fix loss accumulation

* add log for ddp

* change seed

* modify timing hook
Co-authored-by: default avatarFrank Lee <somerlee.9@gmail.com>
Co-authored-by: default avatar1SAA <c2h214748@gmail.com>
Co-authored-by: default avatarbinmakeswell <binmakeswell@gmail.com>

* Feature/pipeline (#40)

* remove redundancy func in setup (#19) (#20)

* use env to control the language of doc (#24) (#25)

* Support TP-compatible Torch AMP and Update trainer API (#27)

* Add gradient accumulation, fix lr scheduler

* fix FP16 optimizer and adapted torch amp with tensor parallel (#18)

* fixed bugs in compatibility between torch amp and tensor parallel and performed some minor fixes

* fixed trainer

* Revert "fixed trainer"

This reverts commit 2e0b0b76990e8d4e337add483d878c0f61cf5097.

* improved consistency between trainer, engine and schedule (#23)
Co-authored-by: default avatar1SAA <c2h214748@gmail.com>
Co-authored-by: default avatar1SAA <c2h214748@gmail.com>
Co-authored-by: default avatarver217 <lhx0217@gmail.com>

* add an example of ViT-B/16 and remove w_norm clipping in LAMB (#29)

* add explanation for ViT example (#35) (#36)

* optimize communication of pipeline parallel

* fix grad clip for pipeline
Co-authored-by: default avatarFrank Lee <somerlee.9@gmail.com>
Co-authored-by: default avatar1SAA <c2h214748@gmail.com>
Co-authored-by: default avatarbinmakeswell <binmakeswell@gmail.com>

* optimized 3d layer to fix slow computation ; tested imagenet performance with 3d; reworked lr_scheduler config definition; fixed launch args; fixed some printing issues; simplified apis of 3d layers (#51)

* Update 2.5d layer code to get a similar accuracy on imagenet-1k dataset

* update api for better usability (#58)

update api for better usability
Co-authored-by: default avatar1SAA <c2h214748@gmail.com>
Co-authored-by: default avatarver217 <lhx0217@gmail.com>
Co-authored-by: default avatarpuck_WCR <46049915+WANG-CR@users.noreply.github.com>
Co-authored-by: default avatarbinmakeswell <binmakeswell@gmail.com>
Co-authored-by: default avatarアマデウス <kurisusnowdeng@users.noreply.github.com>
Co-authored-by: default avatarBoxiangW <45734921+BoxiangW@users.noreply.github.com>
parent eb2f8b1f
import math
def set_parallel_size(obj, config: dict, key: str, attr_name: str):
if key in config:
ele = config[key]
if isinstance(ele, int):
setattr(obj, attr_name, ele)
elif isinstance(ele, dict):
setattr(obj, attr_name, ele['size'])
else:
raise NotImplementedError(
f"Parallel configuration does not support this kind of argument, please use int or dict"
)
def add_tensor_pg(pg_init, mode, size, depth=None):
if mode == '1d':
pg_init.append(dict(
type='Initializer1D',
parallel_size=size
))
elif mode == '2d':
dim = math.floor(math.sqrt(size))
pg_init.append(dict(
type='Initializer2D_Col',
summa_dim=dim
))
pg_init.append(dict(
type='Initializer2D_Row',
summa_dim=dim
))
elif mode == '2.5d':
dim = math.floor(math.sqrt(size // depth))
pg_init.append(dict(
type='Initializer_Tesseract_ROW',
tesseract_dim=dim,
tesseract_dep=depth
))
pg_init.append(dict(
type='Initializer_Tesseract_COL',
tesseract_dim=dim,
tesseract_dep=depth
))
pg_init.append(dict(
type='Initializer_Tesseract_DEP',
tesseract_dim=dim,
tesseract_dep=depth
))
pg_init.append(dict(
type='Initializer_Tesseract_XZ',
tesseract_dim=dim,
tesseract_dep=depth
))
elif mode == '3d':
dim = math.floor(math.pow(size, 1.0 / 3.0) + 0.5)
pg_init.append(dict(
type='ParallelInitializer3D_Input',
depth=dim
))
pg_init.append(dict(
type='ParallelInitializer3D_Weight',
depth=dim
))
pg_init.append(dict(
type='ParallelInitializer3D_Output',
depth=dim
))
else:
raise NotImplementedError("This kind of tensor splitting has not been implemented yet")
......@@ -97,3 +97,7 @@ class Config(dict):
sys.path.pop(0)
return config
class ConfigException(Exception):
pass
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import os
import random
from typing import Union
......@@ -11,8 +10,8 @@ import torch.distributed as dist
from colossalai.constants import ALLOWED_MODES, INITIALIZER_MAPPING
from colossalai.context.config import Config
from colossalai.logging import get_dist_logger
from colossalai.registry import DIST_GROUP_INITIALIZER
from ._utils import set_parallel_size
from .parallel_mode import ParallelMode
from .random import add_seed, get_seeds, set_mode
......@@ -21,11 +20,24 @@ class ParallelContext:
"""This class provides interface functions for users to get the parallel context,
such as the global rank, the local rank, the world size, etc. of each device.
:param args: The distributed arguments in the system
:type args: dict
"""
def __init__(self, args=None):
__instance = None
@staticmethod
def get_instance():
if ParallelContext.__instance is None:
ParallelContext()
return ParallelContext.__instance
def __init__(self):
# create a singleton instance
if ParallelContext.__instance is not None:
raise Exception(
'ParallelContext is a singleton class, you should get the instance by colossalai.core.global_context')
else:
ParallelContext.__instance = self
# distributed settings
self._global_ranks = dict()
self._local_ranks = dict()
......@@ -34,7 +46,6 @@ class ParallelContext:
self._ranks_in_group = dict()
# load config from file
self._dist_args = args
self._config = None
# default 3D parallel args, will be overwritten during process group intialization
......@@ -43,10 +54,22 @@ class ParallelContext:
self.pipeline_parallel_size = 1
self.tensor_parallel_size = 1
# logging
self._verbose = False
self._logger = get_dist_logger()
@property
def config(self):
return self._config
@property
def verbose(self):
return self._verbose
@verbose.setter
def verbose(self, verbose_: bool):
self._verbose = verbose_
def load_config(self, config: Union[dict, str]):
"""Loads the configuration from either a dict or a file.
......@@ -62,14 +85,6 @@ class ParallelContext:
else:
raise TypeError("Invalid type for config, only dictionary or string is supported")
def set_dist_args(self, args):
"""Sets the distributed arguments.
:param args: The distributed arguments in the system
:type args: dict
"""
self._dist_args = args
@staticmethod
def _check_parallel_mode(parallel_mode: ParallelMode):
assert isinstance(parallel_mode, ParallelMode)
......@@ -268,32 +283,36 @@ class ParallelContext:
self._check_parallel_mode(parallel_mode)
self._ranks_in_group[parallel_mode] = ranks
def init_global_dist(self, addr=None, port=None):
"""Initializes the global distributed environment.
:param addr: The IP address of the current device
:type addr: str, optional
:param port: The port to be used in the system of the current device
:type port: int, optional
def init_global_dist(self,
rank: int,
world_size: int,
backend: str,
host: str,
port: int
):
"""Initializes the global distributed environment
:param rank: rank for the default process group
:type rank: int
:param world_size: world size of the default process group
:type world_size: int
:param host: the master address for distributed training
:type host: str
:param port: the master port for distributed training
:type port: str
:param backend: backend for torch.distributed
:type backend: str
"""
# get config
rank = self._dist_args.local_rank
world_size = self._dist_args.world_size
# default env config, overwrite by exporting
# them in your bash script
addr = os.getenv('MASTER_ADDR', 'localhost') if addr is None else addr
port = os.getenv('MASTER_PORT', '8008') if port is None else port
init_method = f'tcp://{addr}:{port}'
dist.init_process_group(backend=self._dist_args.backend,
rank=rank,
# initialize the default process group
init_method = f'tcp://{host}:{port}'
dist.init_process_group(rank=rank,
world_size=world_size,
backend=backend,
init_method=init_method)
# None will give the default global process group for pytorch dist operations
self._register_dist(rank, world_size, None,
list(range(world_size)), ParallelMode.GLOBAL)
self._global_ranks[ParallelMode.GLOBAL] = rank
self.add_global_rank(ParallelMode.GLOBAL, rank)
def _register_dist(self, local_rank, world_size,
process_group, ranks_in_group, mode):
......@@ -312,7 +331,20 @@ class ParallelContext:
pps = self.pipeline_parallel_size
tps = self.tensor_parallel_size
ws = self.world_size
assert ws == dps * pps * tps, f"Expected the world size {ws} to be equal to data parallel size ({dps}) * pipeline parallel size ({pps}) * tensor parallel size ({tps})"
assert ws == dps * pps * \
tps, f"Expected the world size {ws} to be equal to data parallel size ({dps}) * pipeline parallel size ({pps}) * tensor parallel size ({tps})"
def _set_parallel_size_from_config(self, config: dict, key: str, attr_name: str):
if key in config:
ele = config[key]
if isinstance(ele, int):
setattr(self, attr_name, ele)
elif isinstance(ele, dict):
setattr(self, attr_name, ele['size'])
else:
raise NotImplementedError(
f"Parallel configuration does not support this kind of argument, please use int or dict"
)
def init_parallel_groups(self):
"""Initializes the parallel groups.
......@@ -325,21 +357,20 @@ class ParallelContext:
world_size = self.get_world_size(ParallelMode.GLOBAL)
self.world_size = world_size
assert hasattr(self.config, 'parallel'), 'Expected the field parallel to be present in the config file'
# set parallel size as attributes for global context
parallel_config = self.config.parallel
set_parallel_size(self, parallel_config, 'pipeline',
'pipeline_parallel_size')
set_parallel_size(self, parallel_config, 'tensor',
'tensor_parallel_size')
parallel_config = self.config.get('parallel', None)
if parallel_config is not None:
self._set_parallel_size_from_config(parallel_config, 'pipeline', 'pipeline_parallel_size')
self._set_parallel_size_from_config(parallel_config, 'tensor', 'tensor_parallel_size')
# the user should not set the data parallel size manually
# instead, it should be calculated based on other parallel config
self.data_parallel_size = self.world_size // (self.pipeline_parallel_size * self.tensor_parallel_size)
# get the tensor parallel mode and check
tensor_parallel_mode = parallel_config['tensor'].get('mode', None)
tensor_parallel_mode = None
if parallel_config is not None and 'tensor' in parallel_config and 'mode' in parallel_config['tensor']:
tensor_parallel_mode = parallel_config['tensor']['mode']
assert tensor_parallel_mode in ALLOWED_MODES, f"mode in the parallel config must be set to one of {ALLOWED_MODES}"
self.check_sanity()
......@@ -400,23 +431,21 @@ class ParallelContext:
# destroy global process group
dist.destroy_process_group()
def set_device(self):
def set_device(self, device_ordinal: int = None):
"""Sets distributed processes to be bound to devices.
"""
devices_per_node = torch.cuda.device_count()
global_rank = self.get_global_rank()
device = global_rank % devices_per_node
torch.cuda.set_device(device)
print(f'process rank {global_rank} is bound to device {device}')
if device_ordinal is None:
devices_per_node = torch.cuda.device_count()
device_ordinal = global_rank % devices_per_node
torch.cuda.set_device(device_ordinal)
if self._verbose:
self._logger.info(f'process rank {global_rank} is bound to device {device_ordinal}')
def set_seed(self):
def set_seed(self, seed: int):
"""Sets seeds for all random libraries.
"""
if hasattr(self.config, 'seed'):
seed = getattr(self.config, 'seed')
else:
seed = 2 # default seed
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
......@@ -444,11 +473,18 @@ class ParallelContext:
seeds = get_seeds()
seed_str = ', '.join([f'{k}: {v}' for k, v in seeds.items()])
print(f"initialized seed on rank {global_rank}, "
f"numpy: {seed}, python random: {seed}, {seed_str},"
f"the default parallel seed is {ParallelMode.DATA}.", flush=True)
if self._verbose:
self._logger.info(
f"initialized seed on rank {global_rank}, "
f"numpy: {seed}, python random: {seed}, {seed_str},"
f"the default parallel seed is {ParallelMode.DATA}.",
ranks=[0])
else:
print(f"initialized seed on rank {global_rank}, "
f"numpy: {seed}, python random: {seed}, pytorch: {seed}", flush=True)
print('WARNING: CUDA is not available, thus CUDA RNG cannot be used to track CUDA random number states',
flush=True)
if self._verbose:
self._logger.info(
f"initialized seed on rank {global_rank}, "
f"numpy: {seed}, python random: {seed}, pytorch: {seed}",
ranks=[0])
self._logger.info(
'WARNING: CUDA is not available, thus CUDA RNG cannot be used to track CUDA random number states',
ranks=[0])
......@@ -4,7 +4,6 @@
import torch.distributed as dist
from colossalai.context import Config
from colossalai.core import global_context as gpc
from colossalai.registry import DIST_GROUP_INITIALIZER
from .process_group_initializer import ProcessGroupInitializer
from ..parallel_mode import ParallelMode
......
......@@ -8,7 +8,6 @@ import torch.distributed as dist
from colossalai.constants import TESSERACT_DIM, TESSERACT_DEP
from colossalai.context import Config
from colossalai.core import global_context as gpc
from colossalai.registry import DIST_GROUP_INITIALIZER
from .process_group_initializer import ProcessGroupInitializer
from ..parallel_mode import ParallelMode
......@@ -42,8 +41,6 @@ class Initializer_2p5D_ROW(ProcessGroupInitializer):
tesseract_dep: int,
*args):
super(Initializer_2p5D_ROW, self).__init__(*args)
self.tensor_parallel_size = gpc.tensor_parallel_size
self.num_group = self.world_size // self.tensor_parallel_size
self.tesseract_dep = tesseract_dep
self.tesseract_dim = tesseract_dim
......@@ -66,7 +63,7 @@ class Initializer_2p5D_ROW(ProcessGroupInitializer):
for j in range(self.tesseract_dim):
for k in range(self.tesseract_dep):
ranks = [h * self.tensor_parallel_size + i + self.tesseract_dim * (
j + self.tesseract_dim * k) for i in range(self.tesseract_dim)]
j + self.tesseract_dim * k) for i in range(self.tesseract_dim)]
group = dist.new_group(ranks)
if self.rank in ranks:
......@@ -81,13 +78,12 @@ class Initializer_2p5D_ROW(ProcessGroupInitializer):
class Initializer_2p5D_Col(ProcessGroupInitializer):
'''2p5d tensor parallel initialization among cols.
'''
def __init__(self,
tesseract_dim: int,
tesseract_dep: int,
*args):
super(Initializer_2p5D_Col, self).__init__(*args)
self.tensor_parallel_size = gpc.tensor_parallel_size
self.num_group = self.world_size // self.tensor_parallel_size
self.tesseract_dep = tesseract_dep
self.tesseract_dim = tesseract_dim
......@@ -110,7 +106,7 @@ class Initializer_2p5D_Col(ProcessGroupInitializer):
for i in range(self.tesseract_dim):
for k in range(self.tesseract_dep):
ranks = [h * self.tensor_parallel_size + i + self.tesseract_dim * (
j + self.tesseract_dim * k) for j in range(self.tesseract_dim)]
j + self.tesseract_dim * k) for j in range(self.tesseract_dim)]
group = dist.new_group(ranks)
if self.rank in ranks:
......@@ -125,13 +121,12 @@ class Initializer_2p5D_Col(ProcessGroupInitializer):
class Initializer_2p5D_Dep(ProcessGroupInitializer):
'''2p5D tensor parallel initialization among depths.
'''
def __init__(self,
tesseract_dim: int,
tesseract_dep: int,
*args):
super(Initializer_2p5D_Dep, self).__init__(*args)
self.tensor_parallel_size = gpc.tensor_parallel_size
self.num_group = self.world_size // self.tensor_parallel_size
self.tesseract_dep = tesseract_dep
self.tesseract_dim = tesseract_dim
......@@ -154,7 +149,7 @@ class Initializer_2p5D_Dep(ProcessGroupInitializer):
for i in range(self.tesseract_dim):
for j in range(self.tesseract_dim):
ranks = [h * self.tensor_parallel_size + i + self.tesseract_dim * (
j + self.tesseract_dim * k) for k in range(self.tesseract_dep)]
j + self.tesseract_dim * k) for k in range(self.tesseract_dep)]
group = dist.new_group(ranks)
if self.rank in ranks:
......@@ -170,13 +165,12 @@ class Initializer_2p5D_Dep(ProcessGroupInitializer):
class Initializer_2p5D_XZ(ProcessGroupInitializer):
'''2p5d tensor parallel initialization among cols times dep.
'''
def __init__(self,
tesseract_dim: int,
tesseract_dep: int,
*args):
super(Initializer_2p5D_XZ, self).__init__(*args)
self.tensor_parallel_size = gpc.tensor_parallel_size
self.num_group = self.world_size // self.tensor_parallel_size
self.tesseract_dep = tesseract_dep
self.tesseract_dim = tesseract_dim
......@@ -198,8 +192,8 @@ class Initializer_2p5D_XZ(ProcessGroupInitializer):
for h in range(self.num_group):
for i in range(self.tesseract_dim):
ranks = [h * self.tensor_parallel_size + i + self.tesseract_dim * (
j + self.tesseract_dim * k) for k in range(self.tesseract_dep) for j in
range(self.tesseract_dim)]
j + self.tesseract_dim * k) for k in range(self.tesseract_dep) for j in
range(self.tesseract_dim)]
group = dist.new_group(ranks)
if self.rank in ranks:
......
......@@ -5,7 +5,7 @@ import math
import os
import torch.distributed as dist
from colossalai.constants import DEPTH_3D
from colossalai.constants import DEPTH_3D, INPUT_GROUP_3D, WEIGHT_GROUP_3D, OUTPUT_GROUP_3D
from colossalai.registry import DIST_GROUP_INITIALIZER
from ..parallel_mode import ParallelMode
......@@ -18,7 +18,7 @@ def _check_depth_env_var(depth):
if env_depth:
assert int(env_depth) == depth, \
'SUMMA_DIM has been set in the current environment and ' \
'DEPTH_3D has been set in the current environment and ' \
'does not match with the value passed to this initialized'
else:
os.environ[DEPTH_3D] = str(depth)
......@@ -43,6 +43,7 @@ class Initializer_3D_Input(ProcessGroupInitializer):
process_group = None
group_world_size = None
mode = ParallelMode.PARALLEL_3D_INPUT
os.environ[INPUT_GROUP_3D] = INPUT_GROUP_3D
for h in range(self.num_group):
for i in range(self.depth):
......@@ -82,6 +83,7 @@ class Initializer_3D_Weight(ProcessGroupInitializer):
process_group = None
group_world_size = None
mode = ParallelMode.PARALLEL_3D_WEIGHT
os.environ[WEIGHT_GROUP_3D] = WEIGHT_GROUP_3D
for h in range(self.num_group):
for k in range(self.depth):
......@@ -121,6 +123,7 @@ class Initializer_3D_Output(ProcessGroupInitializer):
process_group = None
group_world_size = None
mode = ParallelMode.PARALLEL_3D_OUTPUT
os.environ[OUTPUT_GROUP_3D] = OUTPUT_GROUP_3D
for h in range(self.num_group):
for i in range(self.depth):
......
......@@ -3,14 +3,4 @@
from colossalai.context import ParallelContext
global_context = ParallelContext()
def set_global_context(context: ParallelContext):
'''Reset global context to be identical to a given :class:ParallelContext.
:param context: Parallel context to generate our global parallel context.
:type context: ParallelContext
'''
global global_context
global_context = context
global_context = ParallelContext.get_instance()
from ._base_engine import Engine
from .gradient_handler import *
from .schedule import *
from .amp import *
__all__ = ['Engine']
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import torch
from typing import List
from torch.nn import Module
from torch.nn.modules.loss import _Loss
from torch.optim import Optimizer
from colossalai.builder import build_gradient_handler
from colossalai.context import ParallelMode
from colossalai.core import global_context as gpc
from colossalai.logging import get_global_dist_logger
from colossalai.nn import (ZeroRedundancyOptimizer_Level_2,
ZeroRedundancyOptimizer_Level_3)
from .schedule import BaseSchedule
from colossalai.logging import get_dist_logger
from colossalai.utils import is_using_ddp, is_using_pp
from torch import Tensor
class Engine:
......@@ -20,74 +20,40 @@ class Engine:
It controls a iteration in training.
:param model: The neural network model
:type model: ``torch.nn.Module``
:param optimizer: Optimizer for updating the parameters
:param step_schedule: Running schedule in :meth:`step`
:param gradient_accumulation: Steps of gradient accumulation
:type optimizer: ``torch.optim.Optimizer``
:param criterion: Loss function for calculating loss
:type criterion: ``torch.nn.modules.loss._Loss``
:param gradient_clipping: The norm of gradient clipping
:type model: Module
:type optimizer: Optimizer
:type step_schedule: BaseSchedule, optional
:type gradient_accumulation: int, optional
:type gradient_clipping: float, optional
:param verbose: whether to display log info
:type verbose: bool
"""
def __init__(self,
model: Module,
optimizer: Optimizer,
criterion: _Loss,
step_schedule: BaseSchedule,
gradient_handlers: list = None,
gradient_accumulation: int = 1,
gradient_clipping: float = 0.0,
gradient_handlers: List = None,
clip_grad_norm: float = 0.0,
verbose: bool = True
):
self._model = model
self._optimizer = optimizer
self._criterion = criterion
self._schedule = step_schedule
# schedule initialize
self._schedule.initialize(model, optimizer)
self._clip_grad_norm = clip_grad_norm
self._verbose = verbose
self._logger = get_dist_logger()
# state
self.training = True # default
# gradient accumulation
assert gradient_accumulation > 0, 'gradient accumulation size must be larger than 0'
self._grad_accum_size = gradient_accumulation
self._grad_clip = gradient_clipping
self._logger = get_global_dist_logger()
# build gradient handler
self._gradient_handlers = []
if gradient_handlers is not None:
assert isinstance(gradient_handlers, list), \
f'argument gradient_handler_cfg expected type list, ' \
f'but got type {type(gradient_handlers)}'
elif isinstance(optimizer, (ZeroRedundancyOptimizer_Level_2,
ZeroRedundancyOptimizer_Level_3)):
gradient_handlers = [dict(type='ZeROGradientHandler')]
self._logger.info(
"Training with zero is detected, ZeROGradientHandler is automatically "
"added even though not specified in the configuration",
ranks=[0])
elif gpc.is_initialized(ParallelMode.DATA) and gpc.get_world_size(
ParallelMode.DATA) > 1:
gradient_handlers = [dict(type='DataParallelGradientHandler')]
self._logger.info(
"Data parallel training is detected, DataParallelGradientHandler is automatically "
"added even though not specified in the configuration",
ranks=[0])
if gradient_handlers is None:
self._logger.warning(
"No gradient handler is set up, please make sure you do not need "
"to all-reduce the gradients after a training step.",
ranks=[0])
if gradient_handlers:
self._gradient_handlers = gradient_handlers
else:
for cfg in gradient_handlers:
handler = build_gradient_handler(cfg, model, optimizer)
self._gradient_handlers.append(handler)
self._gradient_handlers = []
@property
def model(self):
......@@ -105,11 +71,27 @@ class Engine:
def schedule(self):
return self._schedule
@property
def gradient_accumulation(self):
return self._grad_accum_size
def zero_grad(self):
self.optimizer.zero_grad()
def step(self):
self._all_reduce_gradients()
self.optimizer.clip_grad_norm(self.model, self._clip_grad_norm)
self.optimizer.step()
def backward(self, loss: Tensor):
return self.optimizer.backward(loss)
def handle_gradient(self):
def backward_by_grad(self, tensor, grad):
return self.optimizer.backward_by_grad(tensor, grad)
def calc_loss(self, *args, **kwargs):
return self.criterion(*args, **kwargs)
def __call__(self, *args, **kwargs):
return self.model(*args, **kwargs)
def _all_reduce_gradients(self):
"""Handles all-reduce operations of gradients across different parallel groups.
"""
for handler in self._gradient_handlers:
......@@ -126,51 +108,3 @@ class Engine:
"""
self.training = False
self._model.eval()
def step(self,
data_iter,
is_last_iteration: bool = False,
return_loss=True):
"""A running step based on the schedule. Usually, it runs a training or
evaluation over a batch of dataset.
:param data_iter: Data iterator of the dataset
:param is_last_iteration: If True, this iteration is the last iteration in the epoch
:param return_loss: loss will be returned if True
:type data_iter: Iterator
:type is_last_iteration: bool, optional
:type return_loss: bool, optional
:return: (output, lablel, loss)
"""
if self.training:
self._optimizer.zero_grad()
# differentiate training and eval with grad accum
if self.training:
for i in range(self._grad_accum_size):
output, label, loss = self._schedule.forward_backward_step(
data_iter, self._model, self._criterion, self._optimizer,
forward_only=False,
grad_accum_size=self._grad_accum_size,
return_loss=return_loss)
if i == self._grad_accum_size - 1:
# all reduce gradients
self.handle_gradient()
self._schedule.optimizer_step(self._model, self._optimizer, self._grad_clip)
else:
output, label, loss = self._schedule.forward_backward_step(
data_iter, self._model, self._criterion, self._optimizer,
forward_only=True,
grad_accum_size=1,
return_loss=return_loss)
# consume the remaining dataset left out due to gradient accumulation
if is_last_iteration:
while True:
try:
_ = next(data_iter)
except StopIteration:
break
return output, label, loss
from .grad_scaler import GradScaler
from .amp_type import AMP_TYPE
from ._base_schedule import BaseSchedule
from ._no_pipeline import NoPipelineSchedule
from ._pipeline import PipelineSchedule
from ._pipeline_schedule import PipelineSchedule
from ._non_pipeline_schedule import NonPipelineSchedule
__all__ = ['BaseSchedule', 'NoPipelineSchedule', 'PipelineSchedule']
__all__ = ['BaseSchedule', 'PipelineSchedule', 'NonPipelineSchedule']
......@@ -5,8 +5,10 @@ from abc import ABC, abstractmethod
import torch
from colossalai.core import global_context as gpc
from colossalai.logging import get_global_dist_logger
from torch import Tensor
from typing import Iterable, Union, List, Callable
from .._base_engine import Engine
from colossalai.logging import get_dist_logger
from colossalai.utils import get_current_device
......@@ -18,8 +20,9 @@ class BaseSchedule(ABC):
control of FP16 in class schedule.
"""
def __init__(self):
self.logger = get_global_dist_logger()
def __init__(self, batch_data_process_func: Callable = None):
self.logger = get_dist_logger()
self.batch_data_process_func = batch_data_process_func
@staticmethod
def _move_tensor(element):
......@@ -35,6 +38,11 @@ class BaseSchedule(ABC):
data = data.to(get_current_device()).detach()
return data
def _to_list(self, data):
if torch.is_tensor(data):
return [data]
return data
def load_batch(self, data_iter):
"""Loads a batch from data iterator. It returns the data and labels which are
already in the same GPU as where the model's.
......@@ -44,46 +52,34 @@ class BaseSchedule(ABC):
"""
if data_iter is None:
raise RuntimeError('Dataloader is not defined.')
data, label = next(data_iter)
return self._move_to_device(data), self._move_to_device(label)
batch_data = next(data_iter)
def initialize(self, model, optimizer):
"""Initializes the model and the optimizer before training.
This is often used in FP16 training.
if self.batch_data_process_func:
data, label = self.batch_data_process_func(batch_data)
else:
data, label = batch_data
:param model: The neural network model
:param optimizer: Optimizer for updating the parameters
data, label = self._to_list(data), self._to_list(label)
return self._move_to_device(data), self._move_to_device(label)
def pre_processing(self, engine: Engine):
"""To perform actions before running the schedule.
"""
return model, optimizer
pass
@abstractmethod
def forward_backward_step(self,
data_iter,
model,
criterion,
optimizer=None,
forward_only=False,
grad_accum_size: int = 1,
return_loss=True):
engine: Engine,
data_iter: Iterable,
forward_only: bool,
return_loss: bool = True
):
"""The process function over a batch of dataset for training or evaluation.
:param data_iter: Data iterator of the dataset
:param model: Model used in training or evaluation
:param optimizer: Optimizer used in training or evaluation
:param criterion: Loss function
:param engine: Colossalai training engine
:param inputs: input data
:param labels: ground truth
:param forward_only: If True, the process won't include backward
:param grad_accum_size: Steps of gradient accumulation
:param return_loss: If False, the loss won't be returned
"""
pass
@abstractmethod
def optimizer_step(self, model, optimizer, grad_clipping: float = 0.0):
"""Updates the parameters with the optimizer.
:param model: The neural network model
:param optimizer: Optimizer for updating the parameters
:param grad_clipping: The norm of gradient clipping
:type grad_clipping: float, optional
"""
pass
pass
\ No newline at end of file
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
try:
import apex.amp as apex_amp
except:
pass
try:
import torch.cuda.amp as torch_amp
except:
pass
from typing import Iterable
import torch.nn as nn
from torch.optim import Optimizer
from colossalai.nn import (ZeroRedundancyOptimizer_Level_2,
ZeroRedundancyOptimizer_Level_3)
from colossalai.nn.optimizer._utils import clip_grad_norm_fp32
from ._base_schedule import BaseSchedule
from ._utils import convert_to_fp16, convert_to_fp32
from ..amp import AMP_TYPE, GradScaler
class NoPipelineSchedule(BaseSchedule):
"""A helper schedule class for no pipeline parallelism running environment.
During one process, it loads a batch of dataset and feeds it to the model.
After getting the output and calculating the loss, it will use :meth:`step`
to update the parameters if it is in training mode.
:param amp_type: The type of automatic mixed precision
:param amp_config: The configuration of automatic mixed procision
:type amp_type: AMP_TYPE
:type amp_config: dict
"""
def __init__(
self,
amp_type: AMP_TYPE = None,
amp_config: dict = None,
):
super().__init__()
# mixed precision training
assert amp_type is None or isinstance(amp_type, AMP_TYPE), \
'unrecognised value for argument fp16, it can only be None, torch or apex'
self.use_zero_level_2_3 = False
if amp_type is not None:
self.fp16 = True
self.amp_type = amp_type
if amp_config is not None:
assert isinstance(amp_config, dict), \
f'expected argument fp16_config to be type dictionary, but got {type(amp_config)}'
if self.amp_type == AMP_TYPE.TORCH:
# torch apex
if amp_config is None:
amp_config = dict()
self.amp_cfg = amp_config
elif self.amp_type == AMP_TYPE.APEX:
# apex amp
if amp_config is None:
amp_config = dict(opt_level='O2')
self.logger.warning(
'apex is deprecated, please consider using torch.cuda.amp instead.'
)
self.amp_cfg = amp_config
elif self.amp_type == AMP_TYPE.PARALLEL:
# use fp16 optimizer for tensor parallel training
if amp_config is None:
amp_config = dict()
self.amp_cfg = amp_config
else:
self.fp16 = False
self.amp_type = None
def initialize(self, model: nn.Module, optimizer: Optimizer):
if isinstance(optimizer, (ZeroRedundancyOptimizer_Level_2,
ZeroRedundancyOptimizer_Level_3)):
self.use_zero_level_2_3 = True
assert self.amp_type != AMP_TYPE.PARALLEL, \
'ZeRO Level 2 and 3 are mutually exclusive with AMP_TYPE.PARALLEL'
if self.fp16:
if self.amp_type == AMP_TYPE.TORCH:
self._torch_amp_scaler = GradScaler(**self.amp_cfg)
elif self.amp_type == AMP_TYPE.APEX:
model, optimizer = apex_amp.initialize(model, optimizer, **self.amp_cfg)
return model, optimizer
def forward_backward_step(self,
data_iter: Iterable,
model: nn.Module,
criterion: nn.modules.loss._Loss,
optimizer: Optimizer = None,
forward_only: bool = False,
grad_accum_size: int = 1,
return_loss: bool = True):
"""The process function that loads loads a batch of dataset and feeds it to the model.
The returned labels and loss will None if :attr:`return_loss` is False.
:param data_iter: Data iterator of the dataloader, e.g. iter(dataloader)
:param model: Model for training and inference
:param criterion: Loss function for training
:param optimizer: Optimizer used for training
:param forward_only: If True, the model is run for the forward pass, else back propagation will be executed
:param grad_accum_size: The number of iterations for gradient accumulation
:param return_loss: Loss will be returned if True
:type data_iter: Iterator
:type model: torch.nn.Module
:type criterion: torch.nn.modules.loss._Loss
:type optimizer: torch.optim.Optimizer
:type forward_only: bool, optional
:type grad_accum_size: int
:type return_loss: bool, optional
:return: (output, label, loss)
"""
assert forward_only or return_loss, \
'The argument \'return_loss\' has to be True when \'forward_only\' is False, but got False.'
data, label = self.load_batch(data_iter)
loss = None
# forward
if self.fp16 and self.amp_type == AMP_TYPE.TORCH:
with torch_amp.autocast():
output = model(*data)
if not isinstance(output, (tuple, list)):
output = (output,)
if return_loss:
loss = criterion(*output, *label)
else:
if self.use_zero_level_2_3 or self.amp_type == AMP_TYPE.PARALLEL:
data = convert_to_fp16(data)
output = model(*data)
if self.use_zero_level_2_3 or self.amp_type == AMP_TYPE.PARALLEL:
output = convert_to_fp32(output)
if not isinstance(output, (tuple, list)):
output = (output,)
if return_loss:
loss = criterion(*output, *label)
loss /= grad_accum_size
if not forward_only:
# backward
if self.use_zero_level_2_3:
optimizer.backward(loss)
elif self.fp16:
if self.amp_type == AMP_TYPE.APEX:
with apex_amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
elif self.amp_type == AMP_TYPE.TORCH:
self._torch_amp_scaler.scale(loss).backward()
elif self.amp_type == AMP_TYPE.PARALLEL:
loss = optimizer.scale_loss(loss)
loss.backward()
# scale back to display the original value in logs
loss.div_(optimizer.grad_scaler.scale)
else:
loss.backward()
if return_loss:
return output, label, loss * grad_accum_size
else:
return output, None, None
def optimizer_step(self, model: nn.Module, optimizer: Optimizer, grad_clipping: float = 0.0):
# step optimizer
if self.fp16 and self.amp_type == AMP_TYPE.TORCH:
if grad_clipping > 0.0:
self._torch_amp_scaler.unscale_(optimizer)
clip_grad_norm_fp32(model.parameters(), grad_clipping)
self._torch_amp_scaler.step(optimizer)
self._torch_amp_scaler.update()
else:
if not self.fp16 and not self.use_zero_level_2_3 and grad_clipping > 0.0:
clip_grad_norm_fp32(model.parameters(), grad_clipping)
optimizer.step()
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
from typing import Iterable
import torch
import torch.nn as nn
from colossalai.engine import Engine
from torch.optim import Optimizer
from ._base_schedule import BaseSchedule
from colossalai.utils import conditional_context
class NonPipelineSchedule(BaseSchedule):
"""A helper schedule class for no pipeline parallelism running environment.
During one process, it loads a batch of dataset and feeds it to the model.
After getting the output and calculating the loss, it will use :meth:`step`
to update the parameters if it is in training mode.
:param amp_type: The type of automatic mixed precision
:param amp_config: The configuration of automatic mixed procision
:type amp_type: AMP_TYPE
:type amp_config: dict
"""
def forward_backward_step(self,
engine: Engine,
data_iter: Iterable,
forward_only: bool = False,
return_loss: bool = True):
"""The process function that loads loads a batch of dataset and feeds it to the model.
The returned labels and loss will None if :attr:`return_loss` is False.
:param engine: Model for training and inference
:param data_iter: Data iterator of the dataloader, e.g. iter(dataloader)
:param forward_only: If True, the model is run for the forward pass, else back propagation will be executed
:param return_loss: Loss will be returned if True
:type engine: Iterator
:type data_iter: Iterator
:type forward_only: bool, optional
:type return_loss: bool, optional
:return: (output, label, loss)
"""
assert forward_only or return_loss, \
"The argument 'return_loss' has to be True when 'forward_only' is False, but got False."
data, label = self.load_batch(data_iter)
# forward
with conditional_context(torch.no_grad(), enable=forward_only):
output = engine(*data)
if not isinstance(output, (tuple, list)):
output = (output,)
if return_loss:
loss = engine.criterion(*output, *label)
if not forward_only:
engine.backward(loss)
if return_loss:
return output, label, loss
else:
return output, None, None
......@@ -10,12 +10,12 @@ from torch import Tensor
from colossalai.communication import *
from colossalai.context.parallel_mode import ParallelMode
from colossalai.core import global_context as gpc
from colossalai.nn import (ZeroRedundancyOptimizer_Level_2,
ZeroRedundancyOptimizer_Level_3)
from colossalai.amp.naive_amp import NaiveAMPModel
from colossalai.zero import (ZeroRedundancyOptimizer_Level_2,
ZeroRedundancyOptimizer_Level_3)
from colossalai.utils import get_current_device
from ._base_schedule import BaseSchedule
from ._utils import convert_to_fp16
from ..amp import AMP_TYPE
from colossalai.amp import AMP_TYPE
def squeeze(x: Union[Tensor, tuple, list]):
......@@ -28,32 +28,25 @@ def squeeze(x: Union[Tensor, tuple, list]):
class PipelineSchedule(BaseSchedule):
"""A helper schedule class for pipeline parallelism running environment.
It uses non-interleaved 1F1B strategy. Other properties are similar as
:class:`NoPipelineSchedule`.
:class:`NonPipelineSchedule`.
:param num_microbatches: The number of microbatches
:param amp_type: The type of automatic mixed precision
:param amp_config: The configuration of automatic mixed procision
:param sync_data: If set to `True`, will sync data every batch over pipeline stages
:type num_microbatches: int
:type amp_type: AMP_TYPE
:type amp_config: dict
:type sync_data: bool
"""
def __init__(self,
num_microbatches,
amp_type: AMP_TYPE = None,
amp_config: dict = None):
sync_data: bool = True):
super().__init__()
self.num_microbatches = num_microbatches
self.data_sync = True # close after making sure data is identical
# amp
# LSGL: amp_config is not used, but leave here for future extension
self.amp_type = amp_type
self.amp_config = amp_config
if self.amp_type is not None:
assert self.amp_type == AMP_TYPE.PARALLEL, 'We only support AMP_TYPE.PARALLEL for pipeline training for now'
self.sync_data = sync_data
def _move_to_device(self, data):
if isinstance(data, (
......@@ -67,30 +60,37 @@ class PipelineSchedule(BaseSchedule):
return data
def _sync_data(self):
reqs = []
if gpc.is_first_rank(ParallelMode.PIPELINE):
src_rank = gpc.get_global_rank()
dist.broadcast(
reqs.append(dist.broadcast(
tensor=self.batch_data,
src=src_rank,
group=gpc.get_group(ParallelMode.PIPELINE_PREV)
)
dist.broadcast(
group=gpc.get_group(ParallelMode.PIPELINE_PREV),
async_op=True
))
reqs.append(dist.broadcast(
tensor=self.batch_label,
src=src_rank,
group=gpc.get_group(ParallelMode.PIPELINE_PREV)
)
group=gpc.get_group(ParallelMode.PIPELINE_PREV),
async_op=True
))
if gpc.is_last_rank(ParallelMode.PIPELINE):
src_rank = gpc.get_next_global_rank(ParallelMode.PIPELINE)
dist.broadcast(
reqs.append(dist.broadcast(
tensor=self.batch_data,
src=src_rank,
group=gpc.get_group(ParallelMode.PIPELINE_NEXT)
)
dist.broadcast(
group=gpc.get_group(ParallelMode.PIPELINE_NEXT),
async_op=True
))
reqs.append(dist.broadcast(
tensor=self.batch_label,
src=src_rank,
group=gpc.get_group(ParallelMode.PIPELINE_NEXT)
)
group=gpc.get_group(ParallelMode.PIPELINE_NEXT),
async_op=True
))
for req in reqs:
req.wait()
# Pipeline schedule just puts data in memory
def load_batch(self, data_iter):
......@@ -104,7 +104,7 @@ class PipelineSchedule(BaseSchedule):
assert batch_size % self.num_microbatches == 0, \
"Batch size should divided by the number of microbatches"
self.microbatch_size = batch_size // self.num_microbatches
if self.data_sync:
if self.sync_data:
self._sync_data()
def _get_data_slice(self, tensor):
......@@ -116,21 +116,20 @@ class PipelineSchedule(BaseSchedule):
self.batch_pos += self.microbatch_size
return (data,), (label,)
def initialize(self, model, optimizer):
if isinstance(optimizer, (ZeroRedundancyOptimizer_Level_2, ZeroRedundancyOptimizer_Level_3)):
def pre_processing(self, engine):
if isinstance(engine.optimizer, (ZeroRedundancyOptimizer_Level_2, ZeroRedundancyOptimizer_Level_3)):
raise TypeError(
"Pipeline schedule is currently not compatible with ZeRO Level 2 and Level 3"
)
# LSG: set default dtype to fp16 for communication
if self.amp_type == AMP_TYPE.PARALLEL:
if isinstance(engine.model, NaiveAMPModel):
torch.set_default_dtype(torch.half)
self.logger.info(
self.logger.warning(
'default tensor dtype is set to torch.half for fp16 training',
ranks=[0])
def forward_step(self, model, criterion, input_tensor, return_tensors,
grad_accum_size, return_loss=True):
def forward_step(self, engine, input_tensor, return_tensors, return_loss=True):
"""Forward step for passed-in model. If it is the first stage, the input tensor
is obtained from data_iterator, otherwise the passed-in input_tensor is used.
Returns output tensor. This is a helper function and can be ignored by users.
......@@ -138,17 +137,16 @@ class PipelineSchedule(BaseSchedule):
if input_tensor is None:
input_tensor, label = self.load_micro_batch()
if self.amp_type == AMP_TYPE.PARALLEL:
input_tensor = convert_to_fp16(input_tensor)
input_tensor = squeeze(input_tensor)
output_tensor = model(input_tensor)
output_tensor = engine(input_tensor)
output_tensor = squeeze(output_tensor)
if gpc.is_last_rank(ParallelMode.PIPELINE):
if return_loss:
input_tensor, label = self.load_micro_batch()
loss_reduced = criterion(output_tensor, *label) \
/ (self.num_microbatches * grad_accum_size)
loss_reduced = engine.criterion(output_tensor, *label) \
/ self.num_microbatches
return_tensors.append(
tuple((output_tensor, label[0], loss_reduced)))
return loss_reduced
......@@ -159,7 +157,7 @@ class PipelineSchedule(BaseSchedule):
else:
return output_tensor
def backward_step(self, optimizer, input_tensor, output_tensor, output_tensor_grad):
def backward_step(self, engine, input_tensor, output_tensor, output_tensor_grad):
"""Backward step through the passed-in output tensor. If it is the last stage, the
output_tensor_grad is None, otherwise it is the gradients with respect to stage's output tensor.
Returns the gradients with respect to the input tensor (None if first stage).
......@@ -171,9 +169,10 @@ class PipelineSchedule(BaseSchedule):
input_tensor.retain_grad()
# Backward pass.
if output_tensor_grad is None and self.amp_type == AMP_TYPE.PARALLEL:
output_tensor = optimizer.scale_loss(output_tensor)
torch.autograd.backward(output_tensor, grad_tensors=output_tensor_grad)
if output_tensor_grad is None:
engine.backward(output_tensor)
else:
engine.backward_by_grad(output_tensor, output_tensor_grad)
# Collect the grad of the input_tensor.
input_tensor_grad = None
......@@ -183,12 +182,9 @@ class PipelineSchedule(BaseSchedule):
return input_tensor_grad
def forward_backward_step(self,
engine,
data_iter,
model,
criterion,
optimizer=None,
forward_only=False,
grad_accum_size: int = 1,
return_loss=True):
"""Runs non-interleaved 1F1B schedule, with communication between pipeline stages.
Returns a tuple with losses if the last stage, an empty tuple otherwise.
......@@ -226,9 +222,8 @@ class PipelineSchedule(BaseSchedule):
ft_shape = recv_tensor_meta(ft_shape)
input_tensor = recv_forward(ft_shape)
output_tensor = self.forward_step(
model, criterion,
input_tensor, return_tensors,
grad_accum_size, return_loss=return_loss
engine, input_tensor, return_tensors,
return_loss=return_loss
)
if not gpc.is_last_rank(ParallelMode.PIPELINE):
bt_shape = output_tensor.shape
......@@ -252,9 +247,8 @@ class PipelineSchedule(BaseSchedule):
last_iteration = (i == (num_microbatches_remaining - 1))
output_tensor = self.forward_step(
model, criterion,
input_tensor, return_tensors,
grad_accum_size, return_loss=return_loss
engine, input_tensor, return_tensors,
return_loss=return_loss
)
if forward_only:
send_forward(output_tensor)
......@@ -276,7 +270,7 @@ class PipelineSchedule(BaseSchedule):
output_tensor = output_tensors.pop(0)
input_tensor_grad = self.backward_step(
optimizer,
engine,
input_tensor, output_tensor,
output_tensor_grad
)
......@@ -297,7 +291,7 @@ class PipelineSchedule(BaseSchedule):
output_tensor_grad = recv_backward(bt_shape)
input_tensor_grad = self.backward_step(
optimizer,
engine,
input_tensor, output_tensor,
output_tensor_grad
)
......@@ -309,11 +303,8 @@ class PipelineSchedule(BaseSchedule):
output, label, loss = tuple(map(list, zip(*return_tensors)))
return (torch.cat(output, dim=0),
torch.cat(label, dim=0),
sum(loss) * grad_accum_size)
sum(loss))
else:
return tuple((torch.cat(return_tensors, dim=0), None, None))
else:
return tuple((None, None, None))
def optimizer_step(self, model, optimizer, grad_clipping: float = 0.0):
optimizer.step()
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
from typing import Union, List
from torch import Tensor
def convert_to_fp16(data: Union[Tensor, List[Tensor]]):
if isinstance(data, Tensor):
ret = data.half()
elif isinstance(data, (list, tuple)):
ret = [val.half() for val in data]
else:
raise TypeError(f"Expected argument 'data' to be a Tensor or a list/tuple of Tensor, but got {type(data)}")
return ret
def convert_to_fp32(data: Union[Tensor, List[Tensor]]):
if isinstance(data, Tensor):
ret = data.float()
elif isinstance(data, (list, tuple)):
ret = [val.float() for val in data]
else:
raise TypeError(f"Expected argument 'data' to be a Tensor or a list/tuple of Tensor, but got {type(data)}")
return ret
......@@ -3,377 +3,326 @@
import argparse
import pprint
import random
from pathlib import Path
from typing import Callable, Iterable, Optional, Union
from typing import Tuple
import os
from colossalai.nn.optimizer.colossalai_optimizer import ColossalaiOptimizer
import numpy as np
import torch
from torch.utils.data import DataLoader
import torch.nn as nn
from pathlib import Path
from typing import Iterable, Union, Optional, Tuple, List, Dict
from colossalai.engine import AMP_TYPE, NoPipelineSchedule, PipelineSchedule
from colossalai.amp import convert_to_amp, AMP_TYPE
from colossalai.context import Config, ParallelMode, ConfigException
from colossalai.core import global_context as gpc
from colossalai.engine import Engine
from colossalai.logging import get_global_dist_logger, init_global_dist_logger
from colossalai.nn import DataParallelSampler
from colossalai.nn.model.base_model import BaseModel
from .builder import (ModelInitializer, build_dataset, build_loss,
build_model, build_optimizer,
build_optimizer_wrapper, build_schedule)
from .context import Config, ParallelMode
from .core import global_context as gpc
from .utils import get_current_device, sync_model_param_in_dp
def parse_args():
from colossalai.logging import get_dist_logger
from colossalai.utils import (accumulate_gradient, get_current_device,
sync_model_param_in_dp, is_using_ddp, is_using_pp)
from colossalai.zero import convert_to_zero, ZeroRedundancyOptimizer_Level_2, ZeroRedundancyOptimizer_Level_3
from colossalai.builder.builder import build_gradient_handler
from torch.optim.optimizer import Optimizer
from torch.optim.lr_scheduler import _LRScheduler
from torch.utils.data import DataLoader
from torch.nn.modules.loss import _Loss
from torch.nn.parallel import DistributedDataParallel as DDP
def get_default_parser():
'''Reads user command line and uses an argument parser to parse the input arguments.
Input arguments include configuration, host, port, world size, local rank, backend for torch.distributed.
:return: call the parse arguments function
:return: returns the parser with the default arguments, the user may add customized arguments into this parser
:rtype: Namespace
'''
parser = argparse.ArgumentParser()
parser.add_argument('--config', type=str, help='path to the config file')
parser.add_argument('--host',
type=str,
default=None,
help='the master address for distributed training')
parser.add_argument('--port',
type=str,
default=None,
type=int,
help='the master port for distributed training')
parser.add_argument('--world_size', type=int, help='world size for ')
parser.add_argument('--world_size', type=int, help='world size for distributed training')
parser.add_argument('--rank', type=int, help='rank for the default process group')
parser.add_argument('--local_rank',
type=int,
help='rank for the default process group')
help='local rank on the node')
parser.add_argument('--backend',
type=str,
default='nccl',
help='backend for torch.distributed')
return parser.parse_args()
def init_dist(config: Union[str, dict] = None,
local_rank: int = None,
world_size: int = None,
host: str = None,
port: str = None,
backend: str = None):
help='backend for distributed communication')
return parser
def launch(config: Union[str, Path, Config, Dict],
rank: int,
world_size: int,
host: str,
port: int,
backend: str = 'nccl',
local_rank: int = None,
seed: int = 1024,
verbose: bool = True):
'''This function first parses the configuration arguments, using :func:parse_args() in case one of the input arguments are not given.
Then initialize and set distributed environment by calling global_context's functions.
Then initialize and set distributed environment by calling global_context's functions.
:param config: config file or config file path are both acceptable
:type config: Union[str, dict], optional
:param local_rank: rank for the default process group, defaults to None
:type config: Union[str, dict, Config]
:param rank: rank for the default process group
:type rank: int
:param world_size: world size of the default process group
:type world_size: int
:param host: the master address for distributed training
:type host: str
:param port: the master port for distributed training
:type port: str
:param backend: backend for torch.distributed
:type backend: str
:param local_rank: rank for the process on the node and is used to set the default CUDA device,
defaults to None. If local_rank = None, the default device ordinal will be calculated automatically
:type local_rank: int, optional
:param world_size: world size of GPUs, defaults to None
:type world_size: int, optional
:param host: the master address for distributed training, defaults to None
:type host: str, optional
:param port: the master port for distributed training, defaults to None
:type port: str, optional
:param backend: backend for torch.distributed, defaults to None
:type backend: str, optional
:raises Exception: raise exception when config type is wrong
'''
args = [config, local_rank, world_size, host, port, backend]
arg_given = [arg is not None for arg in args]
if not all(arg_given):
args = parse_args()
if config is None:
config = args.config
if local_rank is None:
local_rank = args.local_rank
if world_size is None:
world_size = args.world_size
if host is None:
host = args.host
if port is None:
port = args.port
if backend is None:
backend = args.backend
args = Config(
dict(config=config,
host=host,
port=port,
world_size=world_size,
local_rank=local_rank,
backend=backend))
# set distributed settings
dist_args = Config(
dict(local_rank=args.local_rank,
world_size=args.world_size,
backend=args.backend))
gpc.set_dist_args(dist_args)
gpc.verbose = verbose
# set config
if isinstance(args.config, dict):
cfg = args.config
elif isinstance(args.config, (str, Path)):
cfg = Config.from_file(args.config)
else:
raise Exception('Config type error: {}'.format(type(args.config)))
gpc.load_config(cfg)
# init dist groups
gpc.init_global_dist(args.host, args.port)
assert isinstance(config, (Config, str, Path, dict)), \
f'expected argument config to be Config, str or Path, but got {type(config)}'
if not isinstance(config, Config) and isinstance(config, dict):
config = Config(config)
if isinstance(config, (str, Path)):
config = Config.from_file(config)
gpc.load_config(config)
# init default process group
gpc.init_global_dist(rank, world_size, backend, host, port)
# init process groups for different parallel modes from config
gpc.init_parallel_groups()
# init dist logger
init_global_dist_logger()
# set cuda device
if torch.cuda.is_available():
gpc.set_device()
def get_dataloader(dataset, seed=1024, add_sampler_if_possible=False, **kwargs):
'''Set up a deterministic dataloader (also configure seed workers, samplers and whether shuffle or not)
.. note: when pipeline parallel is enabled, shuffle cannot be True
as it will result in mismatch between input data on the 1st
stage and label on the last stage
:param dataset: a :class:utils.data.dataset dataset
:param seed: random worker seed, defaults to 1024
:type seed: int, optional
:param add_sampler_if_possible: [description], defaults to False
:type add_sampler_if_possible: bool, optional
:return: a :class:utils.data.dataset dataloader
:rtype: torch.utils.data.dataset
'''
_kwargs = kwargs.copy()
if 'shuffle' in _kwargs:
shuffle = _kwargs.pop('shuffle')
else:
shuffle = False
if add_sampler_if_possible and gpc.is_initialized(ParallelMode.DATA) and gpc.get_world_size(ParallelMode.DATA) > 1:
sampler = DataParallelSampler(dataset, shuffle=shuffle)
else:
sampler = None
# Deterministic dataloader
def seed_worker(worker_id):
worker_seed = seed
np.random.seed(worker_seed)
torch.manual_seed(worker_seed)
random.seed(worker_seed)
if sampler is None:
return DataLoader(dataset,
worker_init_fn=seed_worker,
shuffle=shuffle,
**_kwargs)
else:
return DataLoader(dataset,
sampler=sampler,
worker_init_fn=seed_worker,
**_kwargs)
def initialize(config: Union[str, dict] = None,
local_rank: int = None,
world_size: int = None,
host: str = None,
port: str = None,
backend: str = None,
train_dataloader: Optional[Union[Iterable, Callable]] = None,
test_dataloader: Optional[Union[Iterable, Callable]] = None,
# if local rank is not given, calculate automatically
gpc.set_device(local_rank)
gpc.set_seed(seed)
if verbose:
logger = get_dist_logger()
logger.info(f'Distributed environment is initialized, '
f'data parallel size: {gpc.data_parallel_size}, pipeline parallel size: {gpc.pipeline_parallel_size}, '
f'tensor parallel size: {gpc.tensor_parallel_size}', ranks=[0])
def launch_from_slurm(config: Union[str, Path, Config, Dict],
host: str,
port: int,
backend: str = 'nccl',
seed: int = 1024,
verbose: bool = True):
rank = int(os.environ['SLURM_PROCID'])
world_size = int(os.environ['SLURM_NPROCS'])
launch(config=config,
rank=rank,
world_size=world_size,
host=host,
port=port,
backend=backend,
seed=seed,
verbose=verbose)
def launch_from_openmpi(config: Union[str, Path, Config, Dict],
host: str,
port: int,
backend: str = 'nccl',
seed: int = 1024,
verbose: bool = True):
rank = int(os.environ['OMPI_COMM_WORLD_RANK'])
local_rank = int(os.environ['OMPI_COMM_WORLD_LOCAL_RANK'])
world_size = int(os.environ['OMPI_COMM_WORLD_SIZE'])
launch(config=config,
local_rank=local_rank,
rank=rank,
world_size=world_size,
host=host,
port=port,
backend=backend,
seed=seed,
verbose=verbose)
def launch_from_torch(config: Union[str, Path, Config, Dict],
host: str,
port: int,
backend: str = 'nccl',
seed: int = 1024,
verbose: bool = True):
rank = int(os.environ['RANK'])
local_rank = int(os.environ['LOCAL_RANK'])
world_size = int(os.environ['WORLD_SIZE'])
launch(config=config,
local_rank=local_rank,
rank=rank,
world_size=world_size,
host=host,
port=port,
backend=backend,
seed=seed,
verbose=verbose)
def initialize(model: Union[nn.Module, List[nn.Module]],
optimizer: Union[Optimizer, List[Optimizer]],
criterion: Union[_Loss, List[_Loss]],
train_dataloader: Optional[Union[Iterable, List[Iterable]]] = None,
test_dataloader: Optional[Union[Iterable, List[Iterable]]] = None,
lr_scheduler: _LRScheduler = None,
verbose: bool = True
) -> Tuple[Engine, DataLoader, DataLoader]:
'''Core function that initializes distributed environment, logger, cudnn, data, model, loss function, optimizer, and lr_scheduler(their configs are in gpc.config).
:param config: config file or config file path are both acceptable
:type config: Union[str, dict], optional
:param local_rank: rank for the default process group, defaults to None
:type local_rank: int, optional
:param world_size: world size of GPUs, defaults to None
:type world_size: int, optional
:param host: the master address for distributed training, defaults to None
:type host: str, optional
:param port: the master port for distributed training, defaults to None
:type port: str, optional
:param backend: backend for torch.distributed, defaults to None
:type backend: str, optional
:param train_dataloader: If None, the config is used to build a dataloder; Else, it should be a dataloader object or a function with no arguments which can build a dataloader, defaults to None
:type train_dataloader: Optional[Union[Iterable, Callable]], optional
:param test_dataloader: If None, the config is used to build a dataloder; Else, it should be a dataloader object or a function with no arguments which can build a dataloader, defaults to None
:type test_dataloader: Optional[Union[Iterable, Callable]], optional
:return: (engine, train_dataloader, test_dataloader, criterion)
''' Core function to wrap the essential training components with our functionality based on the config which is loaded into gpc.config.
:param model: your model instance
:type model: a single or a list of ``torch.nn.Module`` objects
:param optimizer: your optimizer instance
:type optimizer: a single or a list of ``torch.optim.optimizer.Optimizer`` objects
:param criterion: your criterion instance
:type criterion: a single or a list of ``torch.nn.modules.loss._Loss`` objects
:param train_dataloader: dataloaders for training data
:type train_dataloader: a single or a list of ``torch.utils.data.DataLoader`` objects, defaults to None
:param train_dataloader: dataloaders for testing data
:type train_dataloader: a single or a list of ``torch.utils.data.DataLoader`` objects, defaults to None
:return: (engine, criterion, train_dataloader, test_dataloader)
:rtype: tuple
'''
# initialize distributed environment
init_dist(config=config,
local_rank=local_rank,
world_size=world_size,
host=host,
port=port,
backend=backend)
# init logger
logger = get_global_dist_logger()
logger.info(f'Distributed environment is initialized, '
f'data parallel size: {gpc.data_parallel_size}, pipeline parallel size: {gpc.pipeline_parallel_size}, '
f'tensor parallel size: {gpc.tensor_parallel_size}', ranks=[0])
# get logger
logger = get_dist_logger()
gpc.verbose = verbose
# get config from gpc
config = gpc.config
# print config
logger.info(f"\n========== Your Config ========\n"
f"{pprint.pformat(gpc.config)}\n"
f"================================", ranks=[0])
if verbose:
logger.info(f"\n========== Your Config ========\n"
f"{pprint.pformat(gpc.config)}\n"
f"================================\n", ranks=[0])
# cudnn
cudnn_benchmark = gpc.config.get('cudnn_benchmark', True)
cudnn_deterministic = gpc.config.get('cudnn_deterministic', False)
cudnn_benchmark = config.get('cudnn_benchmark', True)
cudnn_deterministic = config.get('cudnn_deterministic', False)
torch.backends.cudnn.benchmark = cudnn_benchmark
torch.backends.cudnn.deterministic = cudnn_deterministic
logger.info(
f"cuDNN benchmark = {cudnn_benchmark}, deterministic = {cudnn_deterministic}", ranks=[0])
# set seed, cuda seed is only set when cuda is avail
gpc.set_seed()
# return_items = list()
# check fp16 and zero
should_convert_model_to_half = False
should_wrap_fp16_optimizer = False
should_wrap_zero_optimizer_level_2_3 = False
if hasattr(gpc.config, 'fp16'):
fp16_mode = gpc.config.fp16.mode
if fp16_mode == AMP_TYPE.PARALLEL:
should_convert_model_to_half = True
should_wrap_fp16_optimizer = True
if hasattr(gpc.config, 'zero'):
should_wrap_zero_optimizer_level_2_3 = True
zero_type = gpc.config.zero.type
if zero_type in ['ZeroRedundancyOptimizer_Level_2', 'ZeroRedundancyOptimizer_Level_3']:
should_convert_model_to_half = True
assert not should_wrap_fp16_optimizer, \
'AMP_TYPE.PARALLEL is mutually exclusive with zero level 2 and 3'
# build model
logger.info('Building model ...', ranks=[0])
assert hasattr(
gpc.config, 'model'), "Build error: configuration 'model' is missing"
if gpc.pipeline_parallel_size > 1:
model = ModelInitializer(gpc.config.model, 1, verbose=True)
model = model.model_initialize()
else:
model = build_model(gpc.config.model)
if isinstance(model, BaseModel):
model.build_from_cfg()
model = model.to(get_current_device())
sync_model_param_in_dp(model)
logger.info('Model is created', ranks=[0])
if should_convert_model_to_half:
model = model.half()
logger.info("Model is cast to fp16", ranks=[0])
# training data
if callable(train_dataloader):
if verbose:
logger.info(
f'Build train data loader from {train_dataloader}', ranks=[0])
train_dataloader = train_dataloader()
if train_dataloader is None and hasattr(gpc.config, 'train_data'):
logger.info('Preparing data ...', ranks=[0])
# assert hasattr(gpc.config, 'train_data'), "Build error: configuration 'train_data' is missing."
train_dataset = build_dataset(gpc.config.train_data.dataset)
logger.info('Train dataset is ready.', ranks=[0])
train_dataloader = get_dataloader(train_dataset,
gpc.config.get('seed', 1024),
True,
**gpc.config.train_data.dataloader,
)
logger.info(
f'Loaded {len(train_dataset)} samples in {len(train_dataloader)} batches for training', ranks=[0])
f"cuDNN benchmark = {cudnn_benchmark}, deterministic = {cudnn_deterministic}", ranks=[0])
if callable(test_dataloader):
logger.info(
f'Build test data loader from {test_dataloader}', ranks=[0])
test_dataloader = test_dataloader()
# testing data, allowed to be None
if test_dataloader is None and hasattr(gpc.config, 'test_data'):
test_dataset = build_dataset(gpc.config.test_data.dataset)
test_dataloader = get_dataloader(
test_dataset, add_sampler_if_possible=True, **gpc.config.test_data.dataloader)
logger.info(
f'Loaded {len(test_dataset)} samples in {len(test_dataloader)} batches for testing', ranks=[0])
# build loss function
assert hasattr(gpc.config, 'loss'), \
'Build error: configuration \'loss\' is missing.'
criterion = build_loss(gpc.config.loss)
logger.info('Loss function is created', ranks=[0])
# build optimizer
assert hasattr(gpc.config, 'optimizer'), \
"Build error: configuration 'optimizer' is missing."
optim_type = gpc.config.optimizer.type
is_pytorch_native_zero_level_1 = optim_type == 'ZeroRedundancyOptimizer'
if is_pytorch_native_zero_level_1:
original_cfg_copy = gpc.config.optimizer.copy()
original_cfg_copy.pop('type')
cfg = dict(type=optim_type, process_group=gpc.get_group(
ParallelMode.DATA), **original_cfg_copy)
optimizer = build_optimizer(cfg, model)
else:
optimizer = build_optimizer(gpc.config.optimizer, model)
if should_wrap_zero_optimizer_level_2_3:
optimizer = build_optimizer_wrapper(gpc.config.zero, optimizer, model)
if should_wrap_fp16_optimizer:
# replace the field mode with type
fp16_cfg = gpc.config.fp16.copy()
amp_type = fp16_cfg.pop('mode')
assert amp_type == AMP_TYPE.PARALLEL, 'FP Optimizer should only be used for AMP_TYPE.PARALLEL'
fp16_cfg['type'] = 'FP16Optimizer'
optimizer = build_optimizer_wrapper(fp16_cfg, optimizer)
logger.info('Optimizer is created', ranks=[0])
# build schedule and engine
if hasattr(gpc.config, 'fp16'):
amp_type = gpc.config.fp16.mode
amp_cfg = gpc.config.fp16.copy()
amp_cfg.pop('mode')
# first sync model across dp ranks
model.to(get_current_device())
sync_model_param_in_dp(model)
# check amp and zero
fp16_cfg = gpc.config.get('fp16', None)
zero_cfg = gpc.config.get('zero', None)
if fp16_cfg is not None and fp16_cfg.mode is not None and zero_cfg is not None:
raise ConfigException(
"It is not allowed to set fp16 and zero configuration in your config file at the same time")
# initialize amp
amp_mode = None
if fp16_cfg is not None and fp16_cfg.mode is not None:
cfg_ = fp16_cfg.copy()
amp_mode = cfg_.pop('mode')
model, optimizer, criterion = convert_to_amp(model=model,
optimizer=optimizer,
criterion=criterion,
mode=amp_mode,
amp_config=cfg_)
if zero_cfg is not None:
cfg_ = zero_cfg.copy()
level = cfg_.pop('level')
model, optimizer = convert_to_zero(model=model,
optimizer=optimizer,
level=level,
zero_config=cfg_
)
# gradient handler
gradient_handler_cfg = gpc.config.get('gradient_handler', None)
if gradient_handler_cfg is None:
# if gradient handler is not specified in the configuration file,
# check in the following order
# 1. if optimizer is ZERO, then use zero grad handler
# 2. if dp size is larger than 1 and pipeline is not used, use pytorch ddp
# 3. if using pipeline and dp size larger than 1, use data parallel grad handler
if isinstance(optimizer, (ZeroRedundancyOptimizer_Level_2,
ZeroRedundancyOptimizer_Level_3)):
gradient_handler_cfg = [dict(type='ZeROGradientHandler')]
if verbose:
logger.info(
"Training with zero is detected, ZeROGradientHandler is automatically "
"added even though not specified in the configuration",
ranks=[0])
elif is_using_ddp() and not is_using_pp() and amp_mode != AMP_TYPE.NAIVE:
model = DDP(model, process_group=gpc.get_group(ParallelMode.DATA))
if verbose:
logger.info(
'Model is using torch.nn.parallel.DistributedDataParallel', ranks=[0])
elif is_using_ddp():
gradient_handler_cfg = [dict(type='DataParallelGradientHandler')]
if verbose:
logger.info(
"Data parallel training is detected when using pipeline parallel, DataParallelGradientHandler is automatically "
"added even though not specified in the configuration",
ranks=[0])
else:
amp_type = None
amp_cfg = None
engine_cfg = gpc.config.get('engine', dict())
schedule_cfg = engine_cfg.pop('schedule', None)
schedule_type = None
if schedule_cfg is not None:
schedule_type = schedule_cfg.get('type', None)
if schedule_type is not None:
# run customized schedule
schedule_cfg['amp_type'] = amp_type
schedule_cfg['amp_config'] = amp_cfg
schedule = build_schedule(schedule_cfg)
elif gpc.is_initialized(ParallelMode.PIPELINE) and gpc.get_world_size(ParallelMode.PIPELINE) > 1:
assert schedule_cfg is not None, \
"Config 'engine.schedule' not found in your configuration file for pipeline parallel training"
schedule = PipelineSchedule(
amp_type=amp_type, amp_config=amp_cfg, **schedule_cfg.copy())
if not isinstance(gradient_handler_cfg, list):
raise ConfigException(
f"expected gradient_handler in the configuration file to be a list but got {type(gradient_handler_cfg)}")
if gradient_handler_cfg is None:
gradient_handlers = None
if verbose and not isinstance(model, DDP):
logger.warning(
"No PyTorch DDP or gradient handler is set up, please make sure you do not need "
"to all-reduce the gradients after a training step.",
ranks=[0])
else:
schedule = NoPipelineSchedule(amp_type=amp_type, amp_config=amp_cfg)
gradient_handlers = [build_gradient_handler(cfg, model, optimizer) for cfg in gradient_handler_cfg]
# check if optimizer is ColossalaiOptimizer
if not isinstance(optimizer, (ColossalaiOptimizer, ZeroRedundancyOptimizer_Level_2, ZeroRedundancyOptimizer_Level_3)):
optimizer = ColossalaiOptimizer(optim=optimizer)
# gradient accumulation
grad_accum_size = gpc.config.get('gradient_accumulation', None)
if grad_accum_size is not None:
optimizer, train_dataloader, gradient_handlers, lr_scheduler = accumulate_gradient(model=model,
optimizer=optimizer,
dataloader=train_dataloader,
accumulate_size=grad_accum_size,
gradient_handlers=gradient_handlers,
lr_scheduler=lr_scheduler)
# clip grad norm
clip_grad_norm = gpc.config.get('clip_grad_norm', 0.0)
if clip_grad_norm > 0:
if zero_cfg is not None:
raise ConfigException(
"clip_grad_norm should be specified with zero, you should specify clip_grad in zero configuration")
elif fp16_cfg is not None and fp16_cfg.mode == AMP_TYPE.NAIVE:
raise ConfigException(
"clip_grad_norm should be specified with AMP_TYPE.NAIVE, you should specify clip_grad in fp16 configuration")
engine = Engine(
model=model,
optimizer=optimizer,
criterion=criterion,
step_schedule=schedule,
**gpc.config.get('engine', dict())
gradient_handlers=gradient_handlers,
clip_grad_norm=clip_grad_norm
)
return engine, train_dataloader, test_dataloader
return engine, train_dataloader, test_dataloader, lr_scheduler
from colossalai.core import global_context as gpc
from .logging import DistributedLogger
__all__ = ['get_global_dist_logger', 'get_dist_logger', 'DistributedLogger', 'init_global_dist_logger']
__all__ = ['get_dist_logger', 'DistributedLogger']
_GLOBAL_LOGGER: DistributedLogger = None
def get_dist_logger(name, level='INFO', root_path: str = None, mode='a'):
return DistributedLogger(name=name, level=level, root_path=root_path, mode=mode)
def get_global_dist_logger():
assert _GLOBAL_LOGGER is not None, 'Global distributed logger is not initialized'
return _GLOBAL_LOGGER
def init_global_dist_logger():
rank = gpc.get_global_rank()
if hasattr(gpc.config, 'logging'):
logger = get_dist_logger(name=f'rank_{rank}', **gpc.config.logging)
else:
logger = get_dist_logger(name=f'rank_{rank}', level='INFO')
global _GLOBAL_LOGGER
assert _GLOBAL_LOGGER is None, 'Global distributed logger has already been initialized'
_GLOBAL_LOGGER = logger
def get_dist_logger(name='root'):
"""Get logger instance based on name. The DistributedLogger will create singleton instances,
which means that only one logger instance is created per name.
"""
return DistributedLogger.get_instance(name=name)
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import colossalai
import logging
from pathlib import Path
from typing import Union
from colossalai.context.parallel_mode import ParallelMode
from colossalai.core import global_context as gpc
_FORMAT = 'colossalai - %(name)s - %(asctime)s %(levelname)s: %(message)s'
logging.basicConfig(level=logging.INFO, format=_FORMAT)
......@@ -16,40 +18,92 @@ class DistributedLogger:
:param name: The name of the logger
:type name: str
:param level: The threshold for the logger. Logging messages which are less severe than `level`
will be ignored
:type level: str
:param root_path: The root path where logs are stored
:type root_path: str, optional
:param mode: The mode that the file is opened in. Defaults to 'a'
:type mode: str, optional
"""
def __init__(self, name, level='INFO', root_path: str = None, mode='a'):
self._logger = logging.getLogger(name)
__instances = dict()
@staticmethod
def get_instance(name: str):
"""Get the unique single logger instance based on name.
:param name: The name of the logger
:type name: str
:return: a DistributedLogger object
:rtype: DistributedLogger
"""
if name in DistributedLogger.__instances:
return DistributedLogger.__instances[name]
else:
logger = DistributedLogger(name=name)
return logger
def __init__(self, name):
if name in DistributedLogger.__instances:
raise Exception('Logger with the same name has been created, you should use colossalai.logging.get_dist_logger')
else:
self._name = name
self._logger = logging.getLogger(name)
DistributedLogger.__instances[name] = self
@staticmethod
def _check_valid_logging_level(level: str):
assert level in ['INFO', 'DEBUG', 'WARNING', 'ERROR'], 'found invalid logging level'
def set_level(self, level: str):
"""Set the logging level
:param level: can only be INFO, DEBUG, WARNING and ERROR
:type level: str
"""
self._check_valid_logging_level(level)
self._logger.setLevel(getattr(logging, level))
if root_path is not None:
log_root_path = Path(root_path)
# create path if not exists
log_root_path.mkdir(parents=True, exist_ok=True)
log_path = log_root_path.joinpath(f'{name}.log')
file_handler = logging.FileHandler(log_path, mode)
file_handler.setLevel(getattr(logging, level))
formatter = logging.Formatter(_FORMAT)
file_handler.setFormatter(formatter)
self._logger.addHandler(file_handler)
def log_to_file(self,
path: Union[str, Path],
mode: str = 'a',
level: str = 'INFO',
suffix: str = None):
"""Save the logs to file
:param path: the file to save the log
:type path: a string or pathlib.Path object
:param mode: the mode to write log into the file
:type mode: str
:param level: can only be INFO, DEBUG, WARNING and ERROR
:type level: str
"""
assert isinstance(path, (str, Path)), \
f'expected argument path to be type str or Path, but got {type(path)}'
self._check_valid_logging_level(level)
if isinstance(path, str):
path = Path(path)
# set the default file name if path is a directory
if not colossalai.core.global_context.is_initialized(ParallelMode.GLOBAL):
rank = 0
else:
rank = colossalai.core.global_context.get_global_rank()
if suffix is not None:
log_file_name = f'rank_{rank}_{suffix}.log'
else:
log_file_name = f'rank_{rank}.log'
path = path.joinpath(log_file_name)
# add file handler
file_handler = logging.FileHandler(path, mode)
file_handler.setLevel(getattr(logging, level))
formatter = logging.Formatter(_FORMAT)
file_handler.setFormatter(formatter)
self._logger.addHandler(file_handler)
def _log(self, level, message: str, parallel_mode: ParallelMode = ParallelMode.GLOBAL, ranks: list = None):
if ranks is None:
getattr(self._logger, level)(message)
else:
local_rank = gpc.get_local_rank(parallel_mode)
local_rank = colossalai.core.global_context.get_local_rank(parallel_mode)
if local_rank in ranks:
getattr(self._logger, level)(message)
def info(self, message: str, parallel_mode: ParallelMode = ParallelMode.GLOBAL, ranks: list = None):
"""Stores an info log message.
"""Log an info message.
:param message:
:type message:
......@@ -61,7 +115,7 @@ class DistributedLogger:
self._log('info', message, parallel_mode, ranks)
def warning(self, message: str, parallel_mode: ParallelMode = ParallelMode.GLOBAL, ranks: list = None):
"""Stores a warning log message.
"""Log a warning message.
:param message: The message to be logged
:type message: str
......@@ -73,7 +127,7 @@ class DistributedLogger:
self._log('warning', message, parallel_mode, ranks)
def debug(self, message: str, parallel_mode: ParallelMode = ParallelMode.GLOBAL, ranks: list = None):
"""Stores a debug log message.
"""Log a debug message.
:param message: The message to be logged
:type message: str
......@@ -85,7 +139,7 @@ class DistributedLogger:
self._log('debug', message, parallel_mode, ranks)
def error(self, message: str, parallel_mode: ParallelMode = ParallelMode.GLOBAL, ranks: list = None):
"""Stores an error log message.
"""Log an error message.
:param message: The message to be logged
:type message: str
......
from .data import *
from .layer import *
from .loss import *
from .lr_scheduler import *
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment