Unverified Commit 35813ed3 authored by Frank Lee's avatar Frank Lee Committed by GitHub
Browse files

update examples and sphnix docs for the new api (#63)

parent 7d371105
...@@ -14,10 +14,12 @@ Blog: [Colossal-AI: A Unified Deep Learning System For Large-Scale Parallel Trai ...@@ -14,10 +14,12 @@ Blog: [Colossal-AI: A Unified Deep Learning System For Large-Scale Parallel Trai
pip install colossalai pip install colossalai
``` ```
### Install From Source ### Install From Source (Recommended)
> We **recommend** you to install from source as the Colossal-AI is updating frequently in the early versions. The documentation will be in line with the main branch of the repository. Feel free to raise an issue if you encounter any problem. :)
```shell ```shell
git clone git@github.com:hpcaitech/ColossalAI.git git clone https://github.com/hpcaitech/ColossalAI.git
cd ColossalAI cd ColossalAI
# install dependency # install dependency
pip install -r requirements/requirements.txt pip install -r requirements/requirements.txt
...@@ -64,8 +66,8 @@ model = ... ...@@ -64,8 +66,8 @@ model = ...
# sampler by default # sampler by default
train_dataset = ... train_dataset = ...
train_dataloader = get_dataloader(dataset=dataset, train_dataloader = get_dataloader(dataset=dataset,
shuffle=True, shuffle=True,
) )
# build your # build your
......
...@@ -16,6 +16,22 @@ def convert_to_amp(model: nn.Module, ...@@ -16,6 +16,22 @@ def convert_to_amp(model: nn.Module,
criterion: _Loss, criterion: _Loss,
mode: AMP_TYPE, mode: AMP_TYPE,
amp_config: Config = None): amp_config: Config = None):
"""A helper function to wrap training components with Torch AMP modules
:param model: your model object
:type model: :class:`torch.nn.Module`
:param optimizer: your optimizer object
:type optimizer: :class:`torch.optim.Optimzer`
:param criterion: your loss function object
:type criterion: :class:`torch.nn.modules.loss._Loss`
:param mode: amp mode
:type mode: :class:`colossalai.amp.AMP_TYPE`
:param amp_config: configuration for different amp modes
:type amp_config: :class:`colossalai.context.Config` or dict
:return: (model, optimizer, criterion)
:rtype: Tuple
"""
assert isinstance(mode, AMP_TYPE), \ assert isinstance(mode, AMP_TYPE), \
f'expected the argument mode be AMP_TYPE, but got {type(mode)}' f'expected the argument mode be AMP_TYPE, but got {type(mode)}'
......
...@@ -7,6 +7,18 @@ import apex.amp as apex_amp ...@@ -7,6 +7,18 @@ import apex.amp as apex_amp
def convert_to_apex_amp(model: nn.Module, def convert_to_apex_amp(model: nn.Module,
optimizer: Optimizer, optimizer: Optimizer,
amp_config): amp_config):
"""A helper function to wrap training components with Torch AMP modules
:param model: your model object
:type model: :class:`torch.nn.Module`
:param optimizer: your optimizer object
:type optimizer: :class:`torch.optim.Optimzer`
:param amp_config: configuration for nvidia apex
:type amp_config: :class:`colossalai.context.Config` or dict
:return: (model, optimizer)
:rtype: Tuple
"""
model, optimizer = apex_amp.initialize(model, optimizer, **amp_config) model, optimizer = apex_amp.initialize(model, optimizer, **amp_config)
optimizer = ApexAMPOptimizer(optimizer) optimizer = ApexAMPOptimizer(optimizer)
return model, optimizer return model, optimizer
......
...@@ -13,11 +13,24 @@ from colossalai.utils import clip_grad_norm_fp32 ...@@ -13,11 +13,24 @@ from colossalai.utils import clip_grad_norm_fp32
class ApexAMPOptimizer(ColossalaiOptimizer): class ApexAMPOptimizer(ColossalaiOptimizer):
''' A wrapper class for APEX optimizer and it implements apex-specific backward and clip_grad_norm
methods
'''
def backward(self, loss: Tensor): def backward(self, loss: Tensor):
"""
:param loss: loss computed by a loss function
:type loss: torch.Tensor
"""
with apex_amp.scale_loss(loss, self.optim) as scaled_loss: with apex_amp.scale_loss(loss, self.optim) as scaled_loss:
scaled_loss.backward() scaled_loss.backward()
def clip_grad_norm(self, model: nn.Module, max_norm: float): def clip_grad_norm(self, model: nn.Module, max_norm: float):
"""
:param model: your model object
:type model: torch.nn.Module
:param max_norm: the max norm value for gradient clipping
:type max_norm: float
"""
if max_norm > 0: if max_norm > 0:
clip_grad_norm_fp32(apex_amp.master_params(self.optim), max_norm) clip_grad_norm_fp32(apex_amp.master_params(self.optim), max_norm)
...@@ -8,6 +8,18 @@ from .naive_amp import NaiveAMPOptimizer, NaiveAMPModel ...@@ -8,6 +8,18 @@ from .naive_amp import NaiveAMPOptimizer, NaiveAMPModel
def convert_to_naive_amp(model: nn.Module, def convert_to_naive_amp(model: nn.Module,
optimizer: Optimizer, optimizer: Optimizer,
amp_config): amp_config):
"""A helper function to wrap training components with Torch AMP modules
:param model: your model object
:type model: :class:`torch.nn.Module`
:param optimizer: your optimizer object
:type optimizer: :class:`torch.optim.Optimzer`
:param amp_config: configuration for naive mode amp
:type amp_config: :class:`colossalai.context.Config` or dict
:return: (model, optimizer)
:rtype: Tuple
"""
if is_no_pp_or_last_stage(): if is_no_pp_or_last_stage():
model = NaiveAMPModel(model, output_to_fp32=True) model = NaiveAMPModel(model, output_to_fp32=True)
else: else:
......
...@@ -146,26 +146,22 @@ class DynamicGradScaler: ...@@ -146,26 +146,22 @@ class DynamicGradScaler:
class FP16Optimizer(Optimizer): class FP16Optimizer(Optimizer):
"""Float16 optimizer for fp16 and bf16 data types. """Float16 optimizer for fp16 and bf16 data types.
Arguments: :param optimizer: base optimizer such as Adam or SGD
optimizer: base optimizer such as Adam or SGD :type optimizer: torch.optim.Optimizer
clip_grad: clip gradeints with this global L2 norm. Note :param clip_grad: clip gradeints with this global L2 norm. Note that clipping is ignored if clip_grad == 0
that clipping is ignored if clip_grad == 0 :type param clip_grad: float
log_num_zeros_in_grad: return number of zeros in the gradients. :param log_num_zeros_in_grad: return number of zeros in the gradients.
params_have_main_grad: flag indicating if parameters have :type log_num_zeros_in_grad: bool
a `main_grad` field. If this is set, we are assuming :param initial_scale: initial scale of gradient scaler
that the model parameters are store in the `main_grad` :type initial_scale: int
field instead of the typical `grad` field. This happens :param growth_factor: the growth rate of loss scale
for the DDP cases where there is a contihuous buffer :type growth_factor: int
holding the gradients. For example for bfloat16, we want :param backoff_factor: the decrease rate of loss scale
to do gradient accumulation and all-reduces in float32 :type backoff_factor: float
and as a result we store those gradients in the main_grad. :param hysterisis: delay shift in dynamic loss scaling
Note that main grad is not necessarily in float32. :type hysterisis: int
bf16: if true, the model is running in bfloat16. :param max_scale: maximum loss scale allowed
grad_scaler: used for scaling gradients. Note that this can be :type max_scale: int
None. This case happens when `bf16 = True` and we don't
use any loss scale. Note that for `bf16 = True`, we can have
a constnat gradient scaler. Also for `bf16 = False`, we
always require a grad scaler.
""" """
def __init__(self, def __init__(self,
......
...@@ -13,12 +13,21 @@ from ._fp16_optimizer import FP16Optimizer ...@@ -13,12 +13,21 @@ from ._fp16_optimizer import FP16Optimizer
class NaiveAMPOptimizer(ColossalaiOptimizer): class NaiveAMPOptimizer(ColossalaiOptimizer):
"""A wrapper class for optimizer to cast all parameters to fp16
:param optim: a normal optimizer like Adam or SGD
:type optim: torch.optim.Optimizer
"""
def __init__(self, optim: Optimizer, *args, **kwargs): def __init__(self, optim: Optimizer, *args, **kwargs):
optim = FP16Optimizer(optimizer=optim, *args, **kwargs) optim = FP16Optimizer(optimizer=optim, *args, **kwargs)
super().__init__(optim) super().__init__(optim)
def backward(self, loss: Tensor): def backward(self, loss: Tensor):
"""backward with gradient scaler
:param loss: loss computed by a loss function
:type loss: torch.Tensor
"""
loss = self.optim.scale_loss(loss) loss = self.optim.scale_loss(loss)
loss.backward() loss.backward()
...@@ -30,6 +39,9 @@ class NaiveAMPOptimizer(ColossalaiOptimizer): ...@@ -30,6 +39,9 @@ class NaiveAMPOptimizer(ColossalaiOptimizer):
class NaiveAMPModel(nn.Module): class NaiveAMPModel(nn.Module):
"""A wrapper class for model to cast the model into fp16 and
automatically cast the input and output
"""
def __init__(self, def __init__(self,
model: nn.Module, model: nn.Module,
......
...@@ -9,6 +9,20 @@ def convert_to_torch_amp(model: nn.Module, ...@@ -9,6 +9,20 @@ def convert_to_torch_amp(model: nn.Module,
optimizer: Optimizer, optimizer: Optimizer,
criterion: _Loss, criterion: _Loss,
amp_config: Config): amp_config: Config):
"""A helper function to wrap training components with Torch AMP modules
:param model: your model object
:type model: :class:`torch.nn.Module`
:param optimizer: your optimizer object
:type optimizer: :class:`torch.optim.Optimzer`
:param criterion: your loss function object
:type criterion: :class:`torch.nn.modules.loss._Loss`
:param amp_config: configuration for different amp modes
:type amp_config: :class:`colossalai.context.Config` or dict
:return: (model, optimizer, criterion)
:rtype: Tuple
"""
model = TorchAMPModel(model) model = TorchAMPModel(model)
optimizer = TorchAMPOptimizer(optimizer, **amp_config) optimizer = TorchAMPOptimizer(optimizer, **amp_config)
criterion = TorchAMPLoss(criterion) criterion = TorchAMPLoss(criterion)
......
...@@ -14,25 +14,45 @@ from colossalai.utils import clip_grad_norm_fp32 ...@@ -14,25 +14,45 @@ from colossalai.utils import clip_grad_norm_fp32
class TorchAMPOptimizer(ColossalaiOptimizer): class TorchAMPOptimizer(ColossalaiOptimizer):
"""A wrapper class which integrate pytorch amp with an optimizer
:param optim: a normal optimizer like Adam or SGD
:type optim: torch.optim.Optimizer
"""
def __init__(self, optim: Optimizer, *args, **kwargs): def __init__(self, optim: Optimizer, *args, **kwargs):
super().__init__(optim) super().__init__(optim)
self.scaler = GradScaler(*args, **kwargs) self.scaler = GradScaler(*args, **kwargs)
def backward(self, loss: Tensor): def backward(self, loss: Tensor):
"""backward with torch amp gradient scaler
:param loss: loss computed by a loss function
:type loss: torch.Tensor
"""
self.scaler.scale(loss).backward() self.scaler.scale(loss).backward()
def step(self): def step(self):
"""update the parameters of the model
"""
self.scaler.step(self.optim) self.scaler.step(self.optim)
self.scaler.update() self.scaler.update()
def clip_grad_norm(self, model: nn.Module, max_norm: float): def clip_grad_norm(self, model: nn.Module, max_norm: float):
"""apply gradient clipping to the model parameters
:param model: your model object
:type model: torch.nn.Module
:param max_norm: max norm value for gradient clipping
:type max_norm: float
"""
if max_norm > 0.0: if max_norm > 0.0:
self.scaler.unscale_(self.optim) self.scaler.unscale_(self.optim)
clip_grad_norm_fp32(model.parameters(), max_norm) clip_grad_norm_fp32(model.parameters(), max_norm)
class TorchAMPModel(nn.Module): class TorchAMPModel(nn.Module):
"""A wrapper class for a model object which executes forward with values automatically
cast to fp16
"""
def __init__(self, model: nn.Module) -> None: def __init__(self, model: nn.Module) -> None:
super().__init__() super().__init__()
...@@ -44,7 +64,10 @@ class TorchAMPModel(nn.Module): ...@@ -44,7 +64,10 @@ class TorchAMPModel(nn.Module):
class TorchAMPLoss(nn.Module): class TorchAMPLoss(nn.Module):
"""A wrapper class for a criterion object which computes the loss in mixed-precision context
:param loss: a loss function object
:type loss: torch.nn.modules.loss._Loss
"""
def __init__(self, loss: _Loss): def __init__(self, loss: _Loss):
super().__init__() super().__init__()
self.loss = loss self.loss = loss
......
...@@ -16,8 +16,8 @@ def build_from_config(module, config: dict): ...@@ -16,8 +16,8 @@ def build_from_config(module, config: dict):
of the return object of the return object
:type config: dict :type config: dict
:raises AssertionError: Raises an AssertionError if `module` is not a class :raises AssertionError: Raises an AssertionError if `module` is not a class
:return: An object of :class:`module` :return: An object of interest
:rtype: :class:`module` :rtype: Object
""" """
assert inspect.isclass(module), 'module must be a class' assert inspect.isclass(module), 'module must be a class'
return module(**config) return module(**config)
...@@ -62,8 +62,8 @@ def build_layer(config): ...@@ -62,8 +62,8 @@ def build_layer(config):
:param config: A python dict or a :class:`colossalai.context.Config` object :param config: A python dict or a :class:`colossalai.context.Config` object
containing information used in the construction of the return object containing information used in the construction of the return object
:type config: dict or :class:`colossalai.context.Config` :type config: dict or :class:`colossalai.context.Config`
:return: An object of :class:`nn.Module` :return: An object of :class:`torch.nn.Module`
:rtype: :class:`nn.Module` :rtype: :class:`torch.nn.Module`
""" """
return build_from_registry(config, LAYERS) return build_from_registry(config, LAYERS)
...@@ -75,8 +75,8 @@ def build_loss(config): ...@@ -75,8 +75,8 @@ def build_loss(config):
:param config: A python dict or a :class:`colossalai.context.Config` object :param config: A python dict or a :class:`colossalai.context.Config` object
containing information used in the construction of the return object containing information used in the construction of the return object
:type config: dict or :class:`colossalai.context.Config` :type config: dict or :class:`colossalai.context.Config`
:return: An object of :class:`torch.autograd.Function` :return: An object of :class:`torch.nn.modules.loss._Loss`
:rtype: :class:`torch.autograd.Function` :rtype: :class:`torch.nn.modules.loss._Loss`
""" """
return build_from_registry(config, LOSSES) return build_from_registry(config, LOSSES)
...@@ -87,8 +87,8 @@ def build_model(config): ...@@ -87,8 +87,8 @@ def build_model(config):
:param config: A python dict or a :class:`colossalai.context.Config` object :param config: A python dict or a :class:`colossalai.context.Config` object
containing information used in the construction of the return object containing information used in the construction of the return object
:type config: dict or :class:`colossalai.context.Config` :type config: dict or :class:`colossalai.context.Config`
:return: An object of :class:`nn.Module` :return: An object of :class:`torch.nn.Module`
:rtype: :class:`nn.Module` :rtype: :class:`torch.nn.Module`
""" """
return build_from_registry(config, MODELS) return build_from_registry(config, MODELS)
...@@ -134,8 +134,8 @@ def build_gradient_handler(config, model, optimizer): ...@@ -134,8 +134,8 @@ def build_gradient_handler(config, model, optimizer):
:type model: :class:`nn.Module` :type model: :class:`nn.Module`
:param optimizer: An optimizer object containing parameters for the gradient handler :param optimizer: An optimizer object containing parameters for the gradient handler
:type optimizer: :class:`torch.optim.Optimizer` :type optimizer: :class:`torch.optim.Optimizer`
:return: An object of :class:`BaseGradientHandler` :return: An object of :class:`colossalai.engine.BaseGradientHandler`
:rtype: :class:`BaseGradientHandler` :rtype: :class:`colossalai.engine.BaseGradientHandler`
""" """
config_ = config.copy() config_ = config.copy()
config_['model'] = model config_['model'] = model
...@@ -151,8 +151,8 @@ def build_hooks(config, trainer): ...@@ -151,8 +151,8 @@ def build_hooks(config, trainer):
:type config: dict or :class:`colossalai.context.Config` :type config: dict or :class:`colossalai.context.Config`
:param trainer: A :class:`Trainer` object containing parameters for the hook :param trainer: A :class:`Trainer` object containing parameters for the hook
:type trainer: :class:`Trainer` :type trainer: :class:`Trainer`
:return: An object of :class:`BaseHook` :return: An object of :class:`colossalai.trainer.hooks.BaseHook`
:rtype: :class:`BaseHook` :rtype: :class:`colossalai.trainer.hooks.BaseHook`
""" """
config_ = config.copy() config_ = config.copy()
config_['trainer'] = trainer config_['trainer'] = trainer
...@@ -182,8 +182,8 @@ def build_data_sampler(config, dataset): ...@@ -182,8 +182,8 @@ def build_data_sampler(config, dataset):
:param dataset: An object of :class:`torch.utils.data.Dataset` containing information :param dataset: An object of :class:`torch.utils.data.Dataset` containing information
used in the construction of the return object used in the construction of the return object
:type dataset: :class:`torch.utils.data.Dataset` :type dataset: :class:`torch.utils.data.Dataset`
:return: An object of :class:`colossalai.nn.data.sampler.BaseSampler` :return: An object of :class:`colossalai.utils.data_sampler.BaseSampler`
:rtype: :class:`colossalai.nn.data.sampler.BaseSampler` :rtype: :class:`colossalai.utils.data_sampler.BaseSampler`
""" """
config_ = config.copy() config_ = config.copy()
config_['dataset'] = dataset config_['dataset'] = dataset
...@@ -200,10 +200,6 @@ def build_lr_scheduler(config, optimizer): ...@@ -200,10 +200,6 @@ def build_lr_scheduler(config, optimizer):
:param optimizer: An optimizer object containing parameters for the learning rate :param optimizer: An optimizer object containing parameters for the learning rate
scheduler scheduler
:type optimizer: :class:`torch.optim.Optimizer` :type optimizer: :class:`torch.optim.Optimizer`
:param total_steps: Number of total steps of the learning rate scheduler
:type total_steps: int
:param num_steps_per_epoch: number of steps per epoch of the learning rate scheduler
:type num_steps_per_epoch: int
:return: An object of :class:`torch.optim.lr_scheduler` :return: An object of :class:`torch.optim.lr_scheduler`
:rtype: :class:`torch.optim.lr_scheduler` :rtype: :class:`torch.optim.lr_scheduler`
""" """
......
...@@ -151,6 +151,28 @@ def _partition_balanced(weights, pipeline_parallel_size, num_chunks): ...@@ -151,6 +151,28 @@ def _partition_balanced(weights, pipeline_parallel_size, num_chunks):
class PipelineModelInitializer(): class PipelineModelInitializer():
"""An intializer to split the model into different stages for pipeline parallelism.
An example for the model config is shown below. The class VisionTransformerFromConfig should
inherit colossalai.nn.model.ModelFromConfig to allow this initializer to build model from a sequence
of layer configurations.
model_config = dict(
type='VisionTransformerFromConfig',
embedding_cfg=dict(...),
...
)
:param config: configuration of the model
:type config: dict
:param num_chunks: the number of chunks you want to have on the current stage. This value should be 1
in most cases unless you are using virutal pipeline parallelism.
:type num_chunks: int
:param verbose: whether to print the logs
:type verbose: bool
"""
def __init__(self, config, num_chunks, verbose=False): def __init__(self, config, num_chunks, verbose=False):
self.num_chunks = num_chunks self.num_chunks = num_chunks
self.ori_model = build_model(config) self.ori_model = build_model(config)
...@@ -161,6 +183,13 @@ class PipelineModelInitializer(): ...@@ -161,6 +183,13 @@ class PipelineModelInitializer():
self._logger.info(f"The total length of layers is {layer_length}", ranks=[0]) self._logger.info(f"The total length of layers is {layer_length}", ranks=[0])
def initialize(self, partition_method='parameter'): def initialize(self, partition_method='parameter'):
"""Initialize the model object from the config passed
:param partition_method: this parameter determines how you want to split your model layers into stages,
you can set it as 'layer' or 'parameter'
:type partition_method: str
"""
# Some space for initializing comunication groups # Some space for initializing comunication groups
self._interval = None self._interval = None
self._partition_layers(method=partition_method) self._partition_layers(method=partition_method)
...@@ -183,7 +212,7 @@ class PipelineModelInitializer(): ...@@ -183,7 +212,7 @@ class PipelineModelInitializer():
# print_rank_0(param_counts) # print_rank_0(param_counts)
self.parts = _partition_balanced(param_counts, pipeline_parallel_size, self.num_chunks) self.parts = _partition_balanced(param_counts, pipeline_parallel_size, self.num_chunks)
else: else:
assert method == 'layer', "Method should be a pre-set string" raise ValueError("Method should be a pre-set string in [layer, parameter]")
# Display the partition # Display the partition
if gpc.get_global_rank() == 0 and self.verbose: if gpc.get_global_rank() == 0 and self.verbose:
......
...@@ -18,11 +18,11 @@ def all_gather(tensor: Tensor, dim: int, ...@@ -18,11 +18,11 @@ def all_gather(tensor: Tensor, dim: int,
:param tensor: Tensor to be gathered :param tensor: Tensor to be gathered
:param dim: The dimension concatenating in :param dim: The dimension concatenating in
:param parallel_mode: Parallel group mode used in this communication :param parallel_mode: Parallel group mode used in this communication
:type tensor: Tensor :type tensor: :class:`torch.Tensor`
:type dim: int :type dim: int
:type parallel_mode: ParallelMode :type parallel_mode: :class:`colossalai.context.ParallelMode`
:return: The tensor generated by all-gather :return: The tensor generated by all-gather
:rtype: Tensor :rtype: :class:`torch.Tensor`
""" """
depth = gpc.get_world_size(parallel_mode) depth = gpc.get_world_size(parallel_mode)
temp = tensor.clone() temp = tensor.clone()
...@@ -54,11 +54,11 @@ def reduce_scatter(tensor: Tensor, dim: int, ...@@ -54,11 +54,11 @@ def reduce_scatter(tensor: Tensor, dim: int,
:param tensor: Tensor to be reduced and scattered :param tensor: Tensor to be reduced and scattered
:param dim: The dimension scattering in :param dim: The dimension scattering in
:param parallel_mode: Parallel group mode used in this communication :param parallel_mode: Parallel group mode used in this communication
:type tensor: Tensor :type tensor: :class:`torch.Tensor`
:type dim: int :type dim: int
:type parallel_mode: ParallelMode :type parallel_mode: :class:`colossalai.context.ParallelMode`
:return: The tensor generated by reduce-scatter :return: The tensor generated by reduce-scatter
:rtype: Tensor :rtype: :class:`Tensor`
""" """
depth = gpc.get_world_size(parallel_mode) depth = gpc.get_world_size(parallel_mode)
# temp = list(torch.chunk(tensor, depth, dim=dim)) # temp = list(torch.chunk(tensor, depth, dim=dim))
......
...@@ -96,7 +96,7 @@ def recv_forward(input_tensor_shape, prev_rank=None): ...@@ -96,7 +96,7 @@ def recv_forward(input_tensor_shape, prev_rank=None):
:type input_tensor_shape: torch.Size :type input_tensor_shape: torch.Size
:type prev_rank: int, optional :type prev_rank: int, optional
:return: The input tensor in forward step :return: The input tensor in forward step
:rtype: Tensor :rtype: :class:`torch.Tensor`
""" """
if gpc.is_first_rank(ParallelMode.PIPELINE): if gpc.is_first_rank(ParallelMode.PIPELINE):
input_tensor = None input_tensor = None
...@@ -115,7 +115,7 @@ def recv_backward(output_grad_shape, next_rank=None): ...@@ -115,7 +115,7 @@ def recv_backward(output_grad_shape, next_rank=None):
:type output_grad_shape: torch.Size :type output_grad_shape: torch.Size
:type next_rank: int, optional :type next_rank: int, optional
:return: The grad of output tensor in forward step :return: The grad of output tensor in forward step
:rtype: Tensor :rtype: :class:`torch.Tensor`
""" """
if gpc.is_last_rank(ParallelMode.PIPELINE): if gpc.is_last_rank(ParallelMode.PIPELINE):
output_tensor_grad = None output_tensor_grad = None
...@@ -131,7 +131,7 @@ def send_forward(output_tensor, next_rank=None): ...@@ -131,7 +131,7 @@ def send_forward(output_tensor, next_rank=None):
:param output_tensor: Tensor to be sent :param output_tensor: Tensor to be sent
:param next_rank: The rank of the recipient of the tensor :param next_rank: The rank of the recipient of the tensor
:type output_tensor: Tensor :type output_tensor: :class:`torch.Tensor`
:type next_rank: int, optional :type next_rank: int, optional
""" """
if not gpc.is_last_rank(ParallelMode.PIPELINE): if not gpc.is_last_rank(ParallelMode.PIPELINE):
...@@ -144,7 +144,7 @@ def send_backward(input_tensor_grad, prev_rank=None): ...@@ -144,7 +144,7 @@ def send_backward(input_tensor_grad, prev_rank=None):
:param input_tensor_grad: Tensor to be sent :param input_tensor_grad: Tensor to be sent
:param prev_rank: The rank of the recipient of the tensor :param prev_rank: The rank of the recipient of the tensor
:type input_tensor_grad: Tensor :type input_tensor_grad: :class:`torch.Tensor`
:type prev_rank: int, optional :type prev_rank: int, optional
""" """
if not gpc.is_first_rank(ParallelMode.PIPELINE): if not gpc.is_first_rank(ParallelMode.PIPELINE):
...@@ -162,10 +162,10 @@ def send_forward_recv_backward(output_tensor, ...@@ -162,10 +162,10 @@ def send_forward_recv_backward(output_tensor,
:param output_tensor: Tensor to be sent :param output_tensor: Tensor to be sent
:param output_grad_shape: The shape of the tensor to be recieved :param output_grad_shape: The shape of the tensor to be recieved
:type output_tensor: Tensor :type output_tensor: :class:`torch.Tensor`
:type output_grad_shape: torch.Size :type output_grad_shape: :class:`torch.Size`
:return: The grad of output tensor in forward step :return: The grad of output tensor in forward step
:rtype: Tensor :rtype: :class:`torch.Tensor`
""" """
if gpc.is_last_rank(ParallelMode.PIPELINE): if gpc.is_last_rank(ParallelMode.PIPELINE):
output_tensor_grad = None output_tensor_grad = None
...@@ -187,10 +187,10 @@ def send_backward_recv_forward(input_tensor_grad, ...@@ -187,10 +187,10 @@ def send_backward_recv_forward(input_tensor_grad,
:param input_tensor_grad: Tensor to be sent :param input_tensor_grad: Tensor to be sent
:param input_tensor_shape: The shape of the tensor to be recieved :param input_tensor_shape: The shape of the tensor to be recieved
:type input_tensor_grad: Tensor :type input_tensor_grad: :class:`torch.Tensor`
:type input_tensor_shape: torch.Size :type input_tensor_shape: :class:`torch.Size`
:return: The input tensor in forward step :return: The input tensor in forward step
:rtype: Tensor :rtype: :class:`torch.Tensor`
""" """
if gpc.is_first_rank(ParallelMode.PIPELINE): if gpc.is_first_rank(ParallelMode.PIPELINE):
input_tensor = None input_tensor = None
...@@ -213,10 +213,10 @@ def send_forward_recv_forward(output_tensor, ...@@ -213,10 +213,10 @@ def send_forward_recv_forward(output_tensor,
:param output_tensor: Tensor to be sent :param output_tensor: Tensor to be sent
:param input_tensor_shape: The shape of the tensor to be recieved :param input_tensor_shape: The shape of the tensor to be recieved
:type output_tensor: Tensor :type output_tensor: :class:`torch.Tensor`
:type input_tensor_shape: torch.Size :type input_tensor_shape: :class:`torch.Size`
:return: The input tensor in forward step :return: The input tensor in forward step
:rtype: Tensor :rtype: :class:`torch.Tensor`
""" """
input_tensor, _ = _communicate(tensor_send_next=output_tensor, input_tensor, _ = _communicate(tensor_send_next=output_tensor,
recv_prev=recv_prev, recv_prev=recv_prev,
...@@ -237,10 +237,10 @@ def send_backward_recv_backward(input_tensor_grad, ...@@ -237,10 +237,10 @@ def send_backward_recv_backward(input_tensor_grad,
:param input_tensor_grad: Tensor to be sent :param input_tensor_grad: Tensor to be sent
:param output_grad_shape: The shape of the tensor to be recieved :param output_grad_shape: The shape of the tensor to be recieved
:type input_tensor_grad: Tensor :type input_tensor_grad: :class:`torch.Tensor`
:type output_grad_shape: torch.Size :type output_grad_shape: :class:`torch.Size`
:return: The grad of output tensor in forward step :return: The grad of output tensor in forward step
:rtype: Tensor :rtype: :class:`torch.Tensor`
""" """
_, output_tensor_grad = _communicate(tensor_send_prev=input_tensor_grad, _, output_tensor_grad = _communicate(tensor_send_prev=input_tensor_grad,
recv_next=recv_next, recv_next=recv_next,
...@@ -266,10 +266,10 @@ def send_forward_backward_recv_forward_backward(output_tensor, ...@@ -266,10 +266,10 @@ def send_forward_backward_recv_forward_backward(output_tensor,
:param input_tensor_grad: Tensor sent to the previous :param input_tensor_grad: Tensor sent to the previous
:param input_tensor_shape: The shape of the tensor recieved from the previous :param input_tensor_shape: The shape of the tensor recieved from the previous
:param output_grad_shape: The shape of the tensor recieved from the next :param output_grad_shape: The shape of the tensor recieved from the next
:type output_tensor: Tensor :type output_tensor: :class:`torch.Tensor`
:type input_tensor_grad: Tensor :type input_tensor_grad: :class:`torch.Tensor`
:type input_tensor_shape: torch.Size :type input_tensor_shape: :class:`torch.Size`
:type output_grad_shape: torch.Size :type output_grad_shape: :class:`torch.Size`
:return: (the input tensor in forward step, the grad of output tensor in forward step) :return: (the input tensor in forward step, the grad of output tensor in forward step)
:rtype: (Tensor, Tensor) :rtype: (Tensor, Tensor)
""" """
......
...@@ -14,10 +14,10 @@ def ring_forward(tensor_send_next: torch.Tensor, parallel_mode: ParallelMode): ...@@ -14,10 +14,10 @@ def ring_forward(tensor_send_next: torch.Tensor, parallel_mode: ParallelMode):
:param tensor_send_next: Tensor sent to next member :param tensor_send_next: Tensor sent to next member
:param parallel_mode: Parallel group mode used in this communication :param parallel_mode: Parallel group mode used in this communication
:type tensor_send_next: Tensor :type tensor_send_next: :class:`torch.Tensor`
:type parallel_mode: ParallelMode :type parallel_mode: :class:`colossalai.context.ParallelMode`
:return: The tensor recieved from the previous :return: The tensor recieved from the previous
:rtype: Tensor :rtype: :class:`torch.Tensor`
""" """
buffer_shape = tensor_send_next.size() buffer_shape = tensor_send_next.size()
......
...@@ -433,6 +433,9 @@ class ParallelContext: ...@@ -433,6 +433,9 @@ class ParallelContext:
def set_device(self, device_ordinal: int = None): def set_device(self, device_ordinal: int = None):
"""Sets distributed processes to be bound to devices. """Sets distributed processes to be bound to devices.
:param device_ordinal: the device id to be bound to
:type device_ordinal: int
""" """
global_rank = self.get_global_rank() global_rank = self.get_global_rank()
if device_ordinal is None: if device_ordinal is None:
...@@ -445,6 +448,9 @@ class ParallelContext: ...@@ -445,6 +448,9 @@ class ParallelContext:
def set_seed(self, seed: int): def set_seed(self, seed: int):
"""Sets seeds for all random libraries. """Sets seeds for all random libraries.
:param seed: seed for random states
:type seed: int
""" """
random.seed(seed) random.seed(seed)
np.random.seed(seed) np.random.seed(seed)
......
...@@ -57,38 +57,61 @@ class Engine: ...@@ -57,38 +57,61 @@ class Engine:
@property @property
def model(self): def model(self):
"""model attached to the engine"""
return self._model return self._model
@property @property
def optimizer(self): def optimizer(self):
"""optimizer attached to the engine"""
return self._optimizer return self._optimizer
@property @property
def criterion(self): def criterion(self):
"""criterion attached to the engine"""
return self._criterion return self._criterion
@property
def schedule(self):
return self._schedule
def zero_grad(self): def zero_grad(self):
"""set the gradient of parameters to zero
"""
self.optimizer.zero_grad() self.optimizer.zero_grad()
def step(self): def step(self):
"""execute parameter update
"""
self._all_reduce_gradients() self._all_reduce_gradients()
self.optimizer.clip_grad_norm(self.model, self._clip_grad_norm) self.optimizer.clip_grad_norm(self.model, self._clip_grad_norm)
self.optimizer.step() self.optimizer.step()
def backward(self, loss: Tensor): def backward(self, loss: Tensor):
"""Start backward propagation given the loss value computed by a loss function
:param loss: loss value computed by a loss function
:type loss: :class:`torch.Tensor`
"""
return self.optimizer.backward(loss) return self.optimizer.backward(loss)
def backward_by_grad(self, tensor, grad): def backward_by_grad(self, tensor, grad):
"""Start backward propagation given the gradient of the output tensor
:param loss: output tensor
:type loss: :class:`torch.Tensor`
:param grad: gradient passed back to the output
:type grad: :class:`torch.Tensor`
"""
return self.optimizer.backward_by_grad(tensor, grad) return self.optimizer.backward_by_grad(tensor, grad)
def calc_loss(self, *args, **kwargs): def calc_loss(self, *args, **kwargs):
"""compute the loss value
:return: the loss value
:rtype: :class:`torch.Tensor`
"""
return self.criterion(*args, **kwargs) return self.criterion(*args, **kwargs)
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
"""run the forward step for the model
:return: output the model
:rtype: Tuple[:class:`torch.Tensor`] or :class:`torch.Tensor`
"""
return self.model(*args, **kwargs) return self.model(*args, **kwargs)
def _all_reduce_gradients(self): def _all_reduce_gradients(self):
......
...@@ -48,7 +48,7 @@ class BaseSchedule(ABC): ...@@ -48,7 +48,7 @@ class BaseSchedule(ABC):
already in the same GPU as where the model's. already in the same GPU as where the model's.
:return: (data, label) :return: (data, label)
:rtype: (Tensor, Tensor) :rtype: (:class:`Tensor`, :class:`torch.Tensor`)
""" """
if data_iter is None: if data_iter is None:
raise RuntimeError('Dataloader is not defined.') raise RuntimeError('Dataloader is not defined.')
......
...@@ -38,7 +38,9 @@ class NonPipelineSchedule(BaseSchedule): ...@@ -38,7 +38,9 @@ class NonPipelineSchedule(BaseSchedule):
:type data_iter: Iterator :type data_iter: Iterator
:type forward_only: bool, optional :type forward_only: bool, optional
:type return_loss: bool, optional :type return_loss: bool, optional
:return: (output, label, loss) :return: (output, label, loss)
:rtype: Tuple[:class:`torch.Tensor`]
""" """
assert forward_only or return_loss, \ assert forward_only or return_loss, \
"The argument 'return_loss' has to be True when 'forward_only' is False, but got False." "The argument 'return_loss' has to be True when 'forward_only' is False, but got False."
......
...@@ -133,6 +133,16 @@ class PipelineSchedule(BaseSchedule): ...@@ -133,6 +133,16 @@ class PipelineSchedule(BaseSchedule):
"""Forward step for passed-in model. If it is the first stage, the input tensor """Forward step for passed-in model. If it is the first stage, the input tensor
is obtained from data_iterator, otherwise the passed-in input_tensor is used. is obtained from data_iterator, otherwise the passed-in input_tensor is used.
Returns output tensor. This is a helper function and can be ignored by users. Returns output tensor. This is a helper function and can be ignored by users.
:param engine: your engine object
:type engine: colossalai.engine.Engine
:param input_tensor: input tensor for this pipeline stage
:type input_tensor: :class:`torch.Tensor`
:param return_tensors: a list of tensors to return
:type return_tensors: List[:class:`torch.Tensor`]
:return: output or the loss value of the current pipeline stage
:rtype: :class:`torch.Tensor`
""" """
if input_tensor is None: if input_tensor is None:
...@@ -162,6 +172,18 @@ class PipelineSchedule(BaseSchedule): ...@@ -162,6 +172,18 @@ class PipelineSchedule(BaseSchedule):
output_tensor_grad is None, otherwise it is the gradients with respect to stage's output tensor. output_tensor_grad is None, otherwise it is the gradients with respect to stage's output tensor.
Returns the gradients with respect to the input tensor (None if first stage). Returns the gradients with respect to the input tensor (None if first stage).
This is a helper function and can be ignored by users. This is a helper function and can be ignored by users.
:param engine: your engine object
:type engine: colossalai.engine.Engine
:param input_tensor: input tensor for this pipeline stage
:type input_tensor: :class:`torch.Tensor`
:param output_tensor: output tensor for this pipeline stage
:type output_tensor: :class:`torch.Tensor`
:param output_tensor_grad: gradient of output tensor for this pipeline stage
:type output_tensor_grad: :class:`torch.Tensor`
:return: gradient of input tensor
:rtype: :class:`torch.Tensor`
""" """
# Retain the grad on the input_tensor. # Retain the grad on the input_tensor.
...@@ -189,7 +211,17 @@ class PipelineSchedule(BaseSchedule): ...@@ -189,7 +211,17 @@ class PipelineSchedule(BaseSchedule):
"""Runs non-interleaved 1F1B schedule, with communication between pipeline stages. """Runs non-interleaved 1F1B schedule, with communication between pipeline stages.
Returns a tuple with losses if the last stage, an empty tuple otherwise. Returns a tuple with losses if the last stage, an empty tuple otherwise.
:param engine: your engine object
:type engine: colossalai.engine.Engine
:param data_iter: dataloader as the form of an iterator, obtained by calling iter(dataloader)
:type data_iter: Iterable
:param forward_only: whether run forward step only. Default is false. If true, no backward will be run.
:type forward_only: bool
:param return_loss: whether returns the loss value. Default is true.
:type return_loss: bool
:return: (output, label, loss) :return: (output, label, loss)
:rtype: Tuple[:class:`torch.Tensor`]
""" """
assert forward_only or return_loss, \ assert forward_only or return_loss, \
......
...@@ -82,6 +82,8 @@ def launch(config: Union[str, Path, Config, Dict], ...@@ -82,6 +82,8 @@ def launch(config: Union[str, Path, Config, Dict],
:param local_rank: rank for the process on the node and is used to set the default CUDA device, :param local_rank: rank for the process on the node and is used to set the default CUDA device,
defaults to None. If local_rank = None, the default device ordinal will be calculated automatically defaults to None. If local_rank = None, the default device ordinal will be calculated automatically
:type local_rank: int, optional :type local_rank: int, optional
:param verbose: whether to print logs
:type verbose: bool
:raises Exception: raise exception when config type is wrong :raises Exception: raise exception when config type is wrong
''' '''
gpc.verbose = verbose gpc.verbose = verbose
...@@ -121,6 +123,20 @@ def launch_from_slurm(config: Union[str, Path, Config, Dict], ...@@ -121,6 +123,20 @@ def launch_from_slurm(config: Union[str, Path, Config, Dict],
backend: str = 'nccl', backend: str = 'nccl',
seed: int = 1024, seed: int = 1024,
verbose: bool = True): verbose: bool = True):
'''A wrapper for colossalai.launch for SLURM launcher by reading rank and world size from the environment variables
set by SLURM
:param config: config file or config file path are both acceptable
:type config: Union[str, dict, Config]
:param host: the master address for distributed training
:type host: str
:param port: the master port for distributed training
:type port: str
:param backend: backend for torch.distributed
:type backend: str
:param verbose: whether to print logs
:type verbose: bool
'''
rank = int(os.environ['SLURM_PROCID']) rank = int(os.environ['SLURM_PROCID'])
world_size = int(os.environ['SLURM_NPROCS']) world_size = int(os.environ['SLURM_NPROCS'])
launch(config=config, launch(config=config,
...@@ -139,6 +155,20 @@ def launch_from_openmpi(config: Union[str, Path, Config, Dict], ...@@ -139,6 +155,20 @@ def launch_from_openmpi(config: Union[str, Path, Config, Dict],
backend: str = 'nccl', backend: str = 'nccl',
seed: int = 1024, seed: int = 1024,
verbose: bool = True): verbose: bool = True):
'''A wrapper for colossalai.launch for OpenMPI launcher by reading rank and world size from the environment variables
set by OpenMPI
:param config: config file or config file path are both acceptable
:type config: Union[str, dict, Config]
:param host: the master address for distributed training
:type host: str
:param port: the master port for distributed training
:type port: str
:param backend: backend for torch.distributed
:type backend: str
:param verbose: whether to print logs
:type verbose: bool
'''
rank = int(os.environ['OMPI_COMM_WORLD_RANK']) rank = int(os.environ['OMPI_COMM_WORLD_RANK'])
local_rank = int(os.environ['OMPI_COMM_WORLD_LOCAL_RANK']) local_rank = int(os.environ['OMPI_COMM_WORLD_LOCAL_RANK'])
world_size = int(os.environ['OMPI_COMM_WORLD_SIZE']) world_size = int(os.environ['OMPI_COMM_WORLD_SIZE'])
...@@ -159,6 +189,20 @@ def launch_from_torch(config: Union[str, Path, Config, Dict], ...@@ -159,6 +189,20 @@ def launch_from_torch(config: Union[str, Path, Config, Dict],
backend: str = 'nccl', backend: str = 'nccl',
seed: int = 1024, seed: int = 1024,
verbose: bool = True): verbose: bool = True):
'''A wrapper for colossalai.launch for torchrun or torch.distributed.launch by reading rank and world size
from the environment variables set by PyTorch
:param config: config file or config file path are both acceptable
:type config: Union[str, dict, Config]
:param host: the master address for distributed training
:type host: str
:param port: the master port for distributed training
:type port: str
:param backend: backend for torch.distributed
:type backend: str
:param verbose: whether to print logs
:type verbose: bool
'''
rank = int(os.environ['RANK']) rank = int(os.environ['RANK'])
local_rank = int(os.environ['LOCAL_RANK']) local_rank = int(os.environ['LOCAL_RANK'])
world_size = int(os.environ['WORLD_SIZE']) world_size = int(os.environ['WORLD_SIZE'])
...@@ -184,16 +228,20 @@ def initialize(model: Union[nn.Module, List[nn.Module]], ...@@ -184,16 +228,20 @@ def initialize(model: Union[nn.Module, List[nn.Module]],
''' Core function to wrap the essential training components with our functionality based on the config which is loaded into gpc.config. ''' Core function to wrap the essential training components with our functionality based on the config which is loaded into gpc.config.
:param model: your model instance :param model: your model instance
:type model: a single or a list of ``torch.nn.Module`` objects :type model: :class:`torch.nn.Module`
:param optimizer: your optimizer instance :param optimizer: your optimizer instance
:type optimizer: a single or a list of ``torch.optim.optimizer.Optimizer`` objects :type optimizer: :class:`torch.optim.optimizer.Optimizer`
:param criterion: your criterion instance :param criterion: your criterion instance
:type criterion: a single or a list of ``torch.nn.modules.loss._Loss`` objects :type criterion: :class:`torch.nn.modules.loss._Loss`
:param train_dataloader: dataloaders for training data :param train_dataloader: dataloader for training data
:type train_dataloader: a single or a list of ``torch.utils.data.DataLoader`` objects, defaults to None :type train_dataloader: :class:`torch.utils.data.DataLoader`
:param train_dataloader: dataloaders for testing data :param train_dataloader: dataloader for testing data
:type train_dataloader: a single or a list of ``torch.utils.data.DataLoader`` objects, defaults to None :type train_dataloader: :class:`torch.utils.data.DataLoader`
:return: (engine, criterion, train_dataloader, test_dataloader) :param lr_scheduler: your lr scheduler instance
:type lr_scheduler: :class:`torch.nn.lr_scheduler._LRScheduler`
:param verbose: whether to print logs
:type verbose: bool
:return: (engine, train_dataloader, test_dataloader, lr_scheduler)
:rtype: tuple :rtype: tuple
''' '''
# get logger # get logger
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment