Unverified Commit 0f8c7f98 authored by HELSON's avatar HELSON Committed by GitHub
Browse files

Fixed docstring in colossalai (#171)

parent e2089c5c
......@@ -13,23 +13,25 @@ from colossalai.utils import clip_grad_norm_fp32
class ApexAMPOptimizer(ColossalaiOptimizer):
''' A wrapper class for APEX optimizer and it implements apex-specific backward and clip_grad_norm
""" A wrapper class for APEX optimizer and it implements apex-specific backward and clip_grad_norm
methods
'''
"""
def backward(self, loss: Tensor):
"""
:param loss: loss computed by a loss function
"""Backward pass to get all gradients
:param loss: Loss computed by a loss function
:type loss: torch.Tensor
"""
with apex_amp.scale_loss(loss, self.optim) as scaled_loss:
scaled_loss.backward()
def clip_grad_norm(self, model: nn.Module, max_norm: float):
"""
:param model: your model object
"""Clip gradients' norm
:param model: Your model object
:type model: torch.nn.Module
:param max_norm: the max norm value for gradient clipping
:param max_norm: The max norm value for gradient clipping
:type max_norm: float
"""
if max_norm > 0:
......
......@@ -15,7 +15,10 @@ from ._fp16_optimizer import FP16Optimizer
class NaiveAMPOptimizer(ColossalaiOptimizer):
"""A wrapper class for optimizer to cast all parameters to fp16
:param optim: a normal optimizer like Adam or SGD
:param optim: A normal optimizer like Adam or SGD
:param args: Args used to initialize FP16 optimizer
:param kwargs: Kwargs used to initialize FP16 optimizer
:type optim: torch.optim.Optimizer
"""
......@@ -24,7 +27,8 @@ class NaiveAMPOptimizer(ColossalaiOptimizer):
super().__init__(optim)
def backward(self, loss: Tensor):
"""backward with gradient scaler
"""Backward with gradient scaler
:param loss: loss computed by a loss function
:type loss: torch.Tensor
"""
......
......@@ -16,19 +16,11 @@ from colossalai.utils import clip_grad_norm_fp32
class TorchAMPOptimizer(ColossalaiOptimizer):
"""A wrapper class which integrate pytorch amp with an optimizer
:param optim: a normal optimizer like Adam or SGD
:type optim: torch.optim.Optimizer
:param init_scale: Initial scale factor
:type init_scale: float, optional, default=2.**16
:param growth_factor: Factor by which the scale is multiplied during :meth:`update` if no inf/NaN gradients occur for ``growth_interval`` consecutive iterations.
:type growth_factor: float, optional, default=2.0
:param backoff_factor: Factor by which the scale is multiplied during :meth:`update` if inf/NaN gradients occur in an iteration.
:type backoff_factor: float, optional, default=0.5
:param growth_interval: Number of consecutive iterations without inf/NaN gradients that must occur for the scale to be multiplied by ``growth_factor``.
:type growth_interval: int, optional, default=2000
:param enabled: If ``False``, disables gradient scaling. :meth:`step` simply invokes the underlying ``optimizer.step()``, and other methods become no-ops.
:type enabled: bool, optional, default=True
:param optim: A normal optimizer like Adam or SGD
:param args: Args used to initialize gradient scaler
:param kwargs: Kwargs used to initialize gradient scaler
:type optim: torch.optim.Optimizer
"""
def __init__(self, optim: Optimizer, *args, **kwargs):
......@@ -36,23 +28,25 @@ class TorchAMPOptimizer(ColossalaiOptimizer):
self.scaler = GradScaler(*args, **kwargs)
def backward(self, loss: Tensor):
"""backward with torch amp gradient scaler
:param loss: loss computed by a loss function
"""Backward with torch amp gradient scaler
:param loss: Loss computed by a loss function
:type loss: torch.Tensor
"""
self.scaler.scale(loss).backward()
def step(self):
"""update the parameters of the model
"""Update the parameters of the model
"""
self.scaler.step(self.optim)
self.scaler.update()
def clip_grad_norm(self, model: nn.Module, max_norm: float):
"""apply gradient clipping to the model parameters
:param model: your model object
"""Apply gradient clipping to the model parameters
:param model: Your model object
:type model: torch.nn.Module
:param max_norm: max norm value for gradient clipping
:param max_norm: Max norm value for gradient clipping
:type max_norm: float
"""
if max_norm > 0.0:
......@@ -76,7 +70,8 @@ class TorchAMPModel(nn.Module):
class TorchAMPLoss(nn.Module):
"""A wrapper class for a criterion object which computes the loss in mixed-precision context
:param loss: a loss function object
:param loss: A loss function object
:type loss: torch.nn.modules.loss._Loss
"""
......
......@@ -176,16 +176,16 @@ def build_pipeline_model_from_cfg(config, num_chunks: int = 1, partition_method:
...
)
:param config: configuration of the model
:param config: Configuration of the model
:type config: dict
:param num_chunks: the number of chunks you want to have on the current stage. This value should be 1
:param num_chunks: The number of chunks you want to have on the current stage. This value should be 1
in most cases unless you are using virutal pipeline parallelism.
:type num_chunks: int
:param partition_method: this parameter determines how you want to split your model layers into stages,
:type num_chunks: int, optional
:param partition_method: This parameter determines how you want to split your model layers into stages,
you can set it as 'layer' or 'parameter'
:type partition_method: str
:param verbose: whether to print the logs
:type verbose: bool
:type partition_method: str, optional
:param verbose: Whether to print the logs
:type verbose: bool, optional
"""
ori_model = build_model(config)
layers = ori_model.layers_cfg
......@@ -240,13 +240,13 @@ def build_pipeline_model(layers: nn.Sequential, num_chunks: int = 1, verbose: bo
"""An intializer to split the model into different stages for pipeline parallelism.
Note that `layer` must be `torch.nn.Sequential`.
:param layers: layers of model
:type config: `torch.nn.Sequential`
:param num_chunks: the number of chunks you want to have on the current stage. This value should be 1
:param layers: Layers of model
:type layers: `torch.nn.Sequential`
:param num_chunks: The number of chunks you want to have on the current stage. This value should be 1
in most cases unless you are using virutal pipeline parallelism.
:type num_chunks: int
:param verbose: whether to print the logs
:type verbose: bool
:type num_chunks: int, optional
:param verbose: Whether to print the logs
:type verbose: bool, optional
"""
pipeline_parallel_size = gpc.get_world_size(ParallelMode.PIPELINE)
pipeline_rank = gpc.get_local_rank(ParallelMode.PIPELINE)
......
......@@ -18,9 +18,13 @@ def all_gather(tensor: Tensor, dim: int, parallel_mode: ParallelMode, async_op:
:param tensor: Tensor to be gathered
:param dim: The dimension concatenating in
:param parallel_mode: Parallel group mode used in this communication
:param async_op: Whether operations are asynchronous
:type tensor: :class:`torch.Tensor`
:type dim: int
:type parallel_mode: :class:`colossalai.context.ParallelMode`
:type async_op: bool, optional
:return: The tensor generated by all-gather
:rtype: :class:`torch.Tensor`
"""
......@@ -56,9 +60,15 @@ def reduce_scatter(tensor: Tensor,
:param tensor: Tensor to be reduced and scattered
:param dim: The dimension scattering in
:param parallel_mode: Parallel group mode used in this communication
:param op: The type of reduce operation
:param async_op: Whether operations are asynchronous
:type tensor: :class:`torch.Tensor`
:type dim: int
:type parallel_mode: :class:`colossalai.context.ParallelMode`
:type op: ReduceOp, optional
:type async_op: bool, optional
:return: The tensor generated by reduce-scatter
:rtype: :class:`Tensor`
"""
......
......@@ -65,7 +65,17 @@ def recv_tensor_meta(tensor_shape, prev_rank=None):
def split_tensor_into_1d_equal_chunks(tensor, new_buffer=False):
"""Break a tensor into equal 1D chunks."""
"""Break a tensor into equal 1D chunks.
:param tensor: Tensor to be splitted before communication
:param new_buffer: Whether uses a new buffer to store sliced tensor
:type tensor: torch.Tensor
:type new_buffer: bool, optional
:return splitted_tensor: The splitted tensor
:rtype splitted_tensor: torch.Tensor
"""
partition_size = torch.numel(tensor) // gpc.get_world_size(ParallelMode.PARALLEL_1D)
start_index = partition_size * gpc.get_local_rank(ParallelMode.PARALLEL_1D)
end_index = start_index + partition_size
......@@ -80,7 +90,14 @@ def split_tensor_into_1d_equal_chunks(tensor, new_buffer=False):
def gather_split_1d_tensor(tensor):
"""Opposite of above function, gather values from model parallel ranks."""
"""Opposite of above function, gather values from model parallel ranks.
:param tensor: Tensor to be gathered after communication
:type tensor: torch.Tensor
:return gathered: The gathered tensor
:rtype gathered: torch.Tensor
"""
world_size = gpc.get_world_size(ParallelMode.PARALLEL_1D)
numel = torch.numel(tensor)
numel_gathered = world_size * numel
......
......@@ -307,6 +307,7 @@ class ParallelContext:
port: int
):
"""Initializes the global distributed environment
:param rank: rank for the default process group
:type rank: int
:param world_size: world size of the default process group
......@@ -462,7 +463,7 @@ class ParallelContext:
"""Sets distributed processes to be bound to devices.
:param device_ordinal: the device id to be bound to
:type device_ordinal: int
:type device_ordinal: int, optional
"""
global_rank = self.get_global_rank()
if device_ordinal is None:
......
......@@ -12,19 +12,22 @@ from colossalai.constants import PARALLEL_INPUT_1D
@DIST_GROUP_INITIALIZER.register_module
class Initializer_1D(ProcessGroupInitializer):
'''A ProcessGroupInitializer for 1d tensor parallelism.
'''
"""A ProcessGroupInitializer for 1d tensor parallelism.
:param args: Args used to initialize ProcessGroupInitializer
:param kwargs: Kwargs used to initialize ProcessGroupInitializer
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.num_group = self.world_size // self.tensor_parallel_size
def init_dist_group(self):
'''Initialize 1D tensor parallel groups, and assign local_ranks and groups to each gpu.
"""Initialize 1D tensor parallel groups, and assign local_ranks and groups to each gpu.
:return: (local_rank, group_world_size, process_group, ranks_in_group, mode)
:rtype: tuple
'''
:rtype: Tuple
"""
local_rank = None
ranks_in_group = None
process_group = None
......
......@@ -22,8 +22,16 @@ def _check_summa_env_var(summa_dim):
class Initializer_2D_Row(ProcessGroupInitializer):
'''2d tensor parallel initialization among rows.
'''
"""2d tensor parallel initialization among rows.
:param num_group: The number of all tensor groups
:param summa_dim: The dimension of SUMMA
:param args: Args used to initialize base class
:param kwargs: Kwargs used to initialize base class
:type num_group: int
:type summa_dim: int
"""
def __init__(self, num_group, summa_dim, *args, **kwargs):
super(Initializer_2D_Row, self).__init__(*args, **kwargs)
......@@ -31,11 +39,11 @@ class Initializer_2D_Row(ProcessGroupInitializer):
self.summa_dim = summa_dim
def init_dist_group(self):
'''Initialize 2D tensor row parallel groups, and assign local_ranks and groups to each gpu.
"""Initialize 2D tensor row parallel groups, and assign local_ranks and groups to each gpu.
:return: 2D tensor row parallelism's information
:rtype: tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
:return: 2D tensor row parallelism's information
:rtype: Tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
local_rank = None
ranks_in_group = None
process_group = None
......@@ -58,8 +66,16 @@ class Initializer_2D_Row(ProcessGroupInitializer):
class Initializer_2D_Col(ProcessGroupInitializer):
'''2d tensor parallel initialization among cols.
'''
"""2d tensor parallel initialization among cols.
:param num_group: The number of all tensor groups
:param summa_dim: The dimension of SUMMA
:param args: Args used to initialize base class
:param kwargs: Kwargs used to initialize base class
:type num_group: int
:type summa_dim: int
"""
def __init__(self, num_group, summa_dim, *args, **kwargs):
super(Initializer_2D_Col, self).__init__(*args, **kwargs)
......@@ -67,11 +83,11 @@ class Initializer_2D_Col(ProcessGroupInitializer):
self.summa_dim = summa_dim
def init_dist_group(self):
'''Initialize 2D tensor row parallel groups, and assign local_ranks and groups to each gpu.
"""Initialize 2D tensor row parallel groups, and assign local_ranks and groups to each gpu.
:return: 2D tensor col parallelism's information
:rtype: tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
:return: 2D tensor col parallelism's information
:rtype: Tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
local_rank = None
ranks_in_group = None
process_group = None
......@@ -97,6 +113,9 @@ class Initializer_2D_Col(ProcessGroupInitializer):
class Initializer_2D(ProcessGroupInitializer):
"""
Serve as the single entry point to 2D parallel initialization.
:param args: Args used to initialize ProcessGroupInitializer
:param kwargs: Kwargs used to initialize ProcessGroupInitializer
"""
def __init__(self, *args, **kwargs):
......@@ -112,12 +131,10 @@ class Initializer_2D(ProcessGroupInitializer):
self.row_initializer = Initializer_2D_Row(self.num_group, self.summa_dim, *args, **kwargs)
def init_dist_group(self):
'''Initialize 2D tensor row and col parallel groups, and assign local_ranks and groups to each gpu.
:return: 2D tensor parallelism's information
:rtype: list of tuples (local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
parallel_setting = []
parallel_setting.append(self.row_initializer.init_dist_group())
parallel_setting.append(self.col_initializer.init_dist_group())
"""Initialize 2D tensor row and col parallel groups, and assign local_ranks and groups to each gpu.
:return: 2D tensor parallelism's information
:rtype: list of Tuples (local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
parallel_setting = [self.row_initializer.init_dist_group(), self.col_initializer.init_dist_group()]
return parallel_setting
......@@ -33,8 +33,15 @@ def _check_tesseract_env_var(tesseract_dim: int,
# i row j col k dep
class Initializer_2p5D_ROW(ProcessGroupInitializer):
'''2p5d tensor parallel initialization among rows.
'''
"""2p5d tensor parallel initialization among rows.
:param tesseract_dim: The dimension of tesseract
:param tesseract_dep: The dimension of depth
:param args: Args used to initialize base class
:type tesseract_dim: int
:type tesseract_dep: int
"""
def __init__(self,
tesseract_dim: int,
......@@ -48,11 +55,11 @@ class Initializer_2p5D_ROW(ProcessGroupInitializer):
"Tensor parallel size should be depth * dim ** 2 in 2.5D parallel"
def init_dist_group(self):
'''Initialize 2p5D tensor row parallel groups, and assign local_ranks and groups to each gpu.
"""Initialize 2p5D tensor row parallel groups, and assign local_ranks and groups to each gpu.
:return: 2p5D tensor row parallelism's information
:rtype: tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
:return: 2p5D tensor row parallelism's information
:rtype: Tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
local_rank = None
ranks_in_group = None
process_group = None
......@@ -76,8 +83,15 @@ class Initializer_2p5D_ROW(ProcessGroupInitializer):
class Initializer_2p5D_Col(ProcessGroupInitializer):
'''2p5d tensor parallel initialization among cols.
'''
"""2p5d tensor parallel initialization among cols.
:param tesseract_dim: The dimension of tesseract
:param tesseract_dep: The dimension of depth
:param args: Args used to initialize base class
:type tesseract_dim: int
:type tesseract_dep: int
"""
def __init__(self,
tesseract_dim: int,
......@@ -91,11 +105,11 @@ class Initializer_2p5D_Col(ProcessGroupInitializer):
"Tensor parallel size should be depth * dim ** 2 in 2.5D parallel"
def init_dist_group(self):
'''Initialize 2p5D tensor col parallel groups, and assign local_ranks and groups to each gpu.
"""Initialize 2p5D tensor col parallel groups, and assign local_ranks and groups to each gpu.
:return: 2p5D tensor col parallelism's information
:rtype: tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
:return: 2p5D tensor col parallelism's information
:rtype: Tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
local_rank = None
ranks_in_group = None
process_group = None
......@@ -119,8 +133,15 @@ class Initializer_2p5D_Col(ProcessGroupInitializer):
class Initializer_2p5D_Dep(ProcessGroupInitializer):
'''2p5D tensor parallel initialization among depths.
'''
"""2p5D tensor parallel initialization among depths.
:param tesseract_dim: The dimension of tesseract
:param tesseract_dep: The dimension of depth
:param args: Args used to initialize base class
:type tesseract_dim: int
:type tesseract_dep: int
"""
def __init__(self,
tesseract_dim: int,
......@@ -134,11 +155,11 @@ class Initializer_2p5D_Dep(ProcessGroupInitializer):
"Tensor parallel size should be depth * dim ** 2 in 2.5D parallel"
def init_dist_group(self):
'''Initialize 2p5D tensor depth parallel groups, and assign local_ranks and groups to each gpu.
"""Initialize 2p5D tensor depth parallel groups, and assign local_ranks and groups to each gpu.
:return: 2p5D tensor depth parallelism's information
:rtype: tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
:return: 2p5D tensor depth parallelism's information
:rtype: Tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
local_rank = None
ranks_in_group = None
process_group = None
......@@ -163,8 +184,15 @@ class Initializer_2p5D_Dep(ProcessGroupInitializer):
# i row j col k dep
class Initializer_2p5D_XZ(ProcessGroupInitializer):
'''2p5d tensor parallel initialization among cols times dep.
'''
"""2p5d tensor parallel initialization among cols times dep.
:param tesseract_dim: The dimension of tesseract
:param tesseract_dep: The dimension of depth
:param args: Args used to initialize base class
:type tesseract_dim: int
:type tesseract_dep: int
"""
def __init__(self,
tesseract_dim: int,
......@@ -178,11 +206,11 @@ class Initializer_2p5D_XZ(ProcessGroupInitializer):
"Tensor parallel size should be depth * dim ** 2 in 2.5D parallel"
def init_dist_group(self):
'''Initialize 2p5D tensor colXdepth parallel groups, and assign local_ranks and groups to each gpu.
"""Initialize 2p5D tensor colXdepth parallel groups, and assign local_ranks and groups to each gpu.
:return: 2p5D tensor colXdepth parallelism's information
:rtype: tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
:return: 2p5D tensor colXdepth parallelism's information
:rtype: Tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
local_rank = None
ranks_in_group = None
process_group = None
......@@ -209,6 +237,22 @@ class Initializer_2p5D_XZ(ProcessGroupInitializer):
class Initializer_2p5D(ProcessGroupInitializer):
"""
Serve as the single entry point to Tesseract parallel initialization.
:param rank: The rank of current process
:param world_size: Size of whole communication world
:param config: Running configuration
:param data_parallel_size: Size of data parallel
:param pipeline_parallel_size: Size of pipeline parallel
:param tensor_parallel_size: Size of tensor parallel
:param depth: The depth of 2p5d parallel
:type rank: int
:type world_size: int
:type config: Config
:type data_parallel_size: int
:type pipeline_parallel_size: int
:type tensor_parallel_size: int
:type depth: int
"""
def __init__(self,
......@@ -216,11 +260,11 @@ class Initializer_2p5D(ProcessGroupInitializer):
world_size: int,
config: Config,
data_parallel_size: int,
pipeline_parlalel_size: int,
pipeline_parallel_size: int,
tensor_parallel_size: int,
depth: int
):
args = (rank, world_size, config, data_parallel_size, pipeline_parlalel_size, tensor_parallel_size)
args = (rank, world_size, config, data_parallel_size, pipeline_parallel_size, tensor_parallel_size)
super().__init__(*args)
self.num_group = self.world_size // self.tensor_parallel_size
self.tesseract_dim = int(math.sqrt(self.tensor_parallel_size / depth))
......@@ -236,14 +280,11 @@ class Initializer_2p5D(ProcessGroupInitializer):
self.xz_initializer = Initializer_2p5D_XZ(self.tesseract_dim, self.tesseract_dep, *args)
def init_dist_group(self):
'''Initialize 2p5D tensor row, col, depth, and colXdepth parallel groups, and assign local_ranks and groups to each gpu.
"""Initialize 2p5D tensor row, col, depth, and colXdepth parallel groups, and assign local_ranks and groups to each gpu.
:return: Whole 2p5D tensor parallelism's information
:rtype: list of tuples (local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
parallel_setting = []
parallel_setting.append(self.col_initializer.init_dist_group())
parallel_setting.append(self.row_initializer.init_dist_group())
parallel_setting.append(self.dep_initializer.init_dist_group())
parallel_setting.append(self.xz_initializer.init_dist_group())
:rtype: list of Tuples (local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
parallel_setting = [self.col_initializer.init_dist_group(), self.row_initializer.init_dist_group(),
self.dep_initializer.init_dist_group(), self.xz_initializer.init_dist_group()]
return parallel_setting
......@@ -25,19 +25,26 @@ def _check_depth_env_var(depth):
class Initializer_3D_Input(ProcessGroupInitializer):
'''2D tensor parallel initialization among input.
'''
"""3D tensor parallel initialization among input.
:param num_group: The number of all tensor groups
:param depth: Depth of 3D parallelism
:param args: Args used in base class
:type num_group: int
:type depth: int
"""
def __init__(self, num_group: int, depth: int, *args):
super().__init__(*args)
self.num_group = num_group
self.depth = depth
def init_dist_group(self):
'''Initialize 3D tensor parallel groups among input, and assign local_ranks and groups to each gpu.
"""Initialize 3D tensor parallel groups among input, and assign local_ranks and groups to each gpu.
:return: 3D tensor parallelism's information among input
:rtype: tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
:return: 3D tensor parallelism's information among input
:rtype: Tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
local_rank = None
ranks_in_group = None
process_group = None
......@@ -64,8 +71,15 @@ class Initializer_3D_Input(ProcessGroupInitializer):
class Initializer_3D_Weight(ProcessGroupInitializer):
'''3D tensor parallel initialization among weight.
'''
"""3D tensor parallel initialization among weight.
:param num_group: The number of all tensor groups
:param depth: Depth of 3D parallelism
:param args: Args used in base class
:type num_group: int
:type depth: int
"""
def __init__(self, num_group: int, depth: int, *args):
super().__init__(*args)
......@@ -73,11 +87,11 @@ class Initializer_3D_Weight(ProcessGroupInitializer):
self.depth = depth
def init_dist_group(self):
'''Initialize 3D tensor parallel groups among weight, and assign local_ranks and groups to each gpu.
"""Initialize 3D tensor parallel groups among weight, and assign local_ranks and groups to each gpu.
:return: 3D tensor parallelism's information among weight
:rtype: tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
:return: 3D tensor parallelism's information among weight
:rtype: Tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
local_rank = None
ranks_in_group = None
process_group = None
......@@ -104,8 +118,15 @@ class Initializer_3D_Weight(ProcessGroupInitializer):
class Initializer_3D_Output(ProcessGroupInitializer):
'''2D tensor parallel initialization among weight.
'''
"""3D tensor parallel initialization among weight.
:param num_group: The number of all tensor groups
:param depth: Depth of 3D parallelism
:param args: Args used in base class
:type num_group: int
:type depth: int
"""
def __init__(self, num_group: int, depth: int, *args):
super().__init__(*args)
......@@ -113,11 +134,11 @@ class Initializer_3D_Output(ProcessGroupInitializer):
self.depth = depth
def init_dist_group(self):
'''Initialize 3D tensor parallel groups among output, and assign local_ranks and groups to each gpu.
"""Initialize 3D tensor parallel groups among output, and assign local_ranks and groups to each gpu.
:return: 3D tensor parallelism's information among output
:rtype: tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
:return: 3D tensor parallelism's information among output
:rtype: Tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
local_rank = None
ranks_in_group = None
process_group = None
......@@ -145,8 +166,10 @@ class Initializer_3D_Output(ProcessGroupInitializer):
@DIST_GROUP_INITIALIZER.register_module
class Initializer_3D(ProcessGroupInitializer):
'''Serve as the single entry point to 3D parallel initialization.
'''
"""Serve as the single entry point to 3D parallel initialization.
:param args: Args used to initialize ProcessGroupInitializer
"""
def __init__(self, *args):
super().__init__(*args)
self.num_group = self.world_size // self.tensor_parallel_size
......@@ -163,13 +186,11 @@ class Initializer_3D(ProcessGroupInitializer):
self.num_group, self.depth, *args)
def init_dist_group(self):
'''Initialize 3D tensor parallel groups, and assign local_ranks and groups to each gpu.
:return: 3D tensor parallelism's information
:rtype: list of tuples (local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
parallel_setting = []
parallel_setting.append(self.input_initializer.init_dist_group())
parallel_setting.append(self.weight_initializer.init_dist_group())
parallel_setting.append(self.output_initializer.init_dist_group())
"""Initialize 3D tensor parallel groups, and assign local_ranks and groups to each gpu.
:return: 3D tensor parallelism's information
:rtype: list of Tuples (local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
parallel_setting = [self.input_initializer.init_dist_group(), self.weight_initializer.init_dist_group(),
self.output_initializer.init_dist_group()]
return parallel_setting
......@@ -10,18 +10,21 @@ from ..parallel_mode import ParallelMode
@DIST_GROUP_INITIALIZER.register_module
class Initializer_Data(ProcessGroupInitializer):
'''A ProcessGroupInitializer for data parallelism.
'''
"""A ProcessGroupInitializer for data parallelism.
:param args: Args used to initialize ProcessGroupInitializer
:param kwargs: Kwargs used to initialize ProcessGroupInitializer
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.num_data_parallel_group = self.world_size // self.data_parallel_size
def init_dist_group(self):
'''Initialize data parallel groups, and assign local_ranks and groups to each gpu.
"""Initialize data parallel groups, and assign local_ranks and groups to each gpu.
:return: data parallelism's information
:rtype: tuple (local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
:return: Data parallelism's information
:rtype: Tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
local_rank = None
ranks_in_group = None
process_group = None
......
......@@ -11,8 +11,12 @@ from ..parallel_mode import ParallelMode
@DIST_GROUP_INITIALIZER.register_module
class Initializer_Model(ProcessGroupInitializer):
'''A ProcessGroupInitializer for model parallelism (model parallel group contains pipeline and tensor parallel groups).
'''
"""A ProcessGroupInitializer for model parallelism (model parallel group contains pipeline and tensor parallel
groups).
:param args: Args used to initialize ProcessGroupInitializer
:param kwargs: Kwargs used to initialize ProcessGroupInitializer
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
......@@ -20,11 +24,11 @@ class Initializer_Model(ProcessGroupInitializer):
self.num_group = self.world_size // self.model_parallel_size
def init_dist_group(self):
'''Initialize 1D tensor parallel groups, and assign local_ranks and groups to each gpu.
"""Initialize model parallel groups, and assign local_ranks and groups to each gpu.
:return: (local_rank, group_world_size, process_group, ranks_in_group, mode)
:rtype: tuple
'''
:rtype: Tuple
"""
local_rank = None
ranks_in_group = None
process_group = None
......
......@@ -9,8 +9,15 @@ from ..parallel_mode import ParallelMode
@DIST_GROUP_INITIALIZER.register_module
class Initializer_Moemodel(ProcessGroupInitializer):
"""Model parallel initialization for MoE system.
"""
:param moe_moel: Size of moe model parallel
:param moe_data: Size of moe data parallel
:param args: Args used in base class
:param kwargs: Kwargs used in base class
:type moe_model: int
:type moe_data: int
"""
def __init__(self, moe_model, moe_data, *args, **kwargs):
super().__init__(*args, **kwargs)
self.moe_model = moe_model
......@@ -19,8 +26,10 @@ class Initializer_Moemodel(ProcessGroupInitializer):
def init_dist_group(self):
"""Initialize model parallel groups in moe parallel environment,
and assign local_ranks and groups to each gpu.
"""
:return: MoE model parallelism's information
:rtype: Tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
local_rank = None
ranks_in_group = None
process_group = None
......@@ -43,8 +52,15 @@ class Initializer_Moemodel(ProcessGroupInitializer):
@DIST_GROUP_INITIALIZER.register_module
class Initializer_Moedata(ProcessGroupInitializer):
"""Data parallel initialization for MoE system.
"""
:param moe_moel: Size of moe model parallel
:param moe_data: Size of moe data parallel
:param args: Args used in base class
:param kwargs: Kwargs used in base class
:type moe_model: int
:type moe_data: int
"""
def __init__(self, moe_model, moe_data, *args, **kwargs):
super().__init__(*args, **kwargs)
self.moe_model = moe_model
......@@ -53,8 +69,10 @@ class Initializer_Moedata(ProcessGroupInitializer):
def init_dist_group(self):
"""Initialize data parallel groups in moe parallel environment,
and assign local_ranks and groups to each gpu.
"""
:return: MoE data parallelism's information
:rtype: Tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
local_rank = None
ranks_in_group = None
process_group = None
......@@ -77,8 +95,10 @@ class Initializer_Moedata(ProcessGroupInitializer):
@DIST_GROUP_INITIALIZER.register_module
class Initializer_Moe(ProcessGroupInitializer):
"""Serves as the single entry point to MoE parallel initialization.
"""
:param args: Args used to initialize ProcessGroupInitializer
:param kwargs: Kwargs used to initialize ProcessGroupInitializer
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.moe_model = moe_env.model_parallel_size
......@@ -90,8 +110,10 @@ class Initializer_Moe(ProcessGroupInitializer):
def init_dist_group(self):
"""Initializes MoE parallel communication groups.
"""
:return: MoE parallelism's information
:rtype: list of Tuples (local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
parallel_setting = [self.model_initializer.init_dist_group(),
self.data_initializer.init_dist_group()]
return parallel_setting
......@@ -10,12 +10,22 @@ from ..parallel_mode import ParallelMode
@DIST_GROUP_INITIALIZER.register_module
class Initializer_Pipeline(ProcessGroupInitializer):
"""A ProcessGroupInitializer for pipeline parallelism.
:param args: Args used to initialize ProcessGroupInitializer
:param kwargs: Kwargs used to initialize ProcessGroupInitializer
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.data_group_size = self.world_size // self.data_parallel_size
self.pipeline_stage_size = self.data_group_size // self.pipeline_parallel_size
def init_dist_group(self):
"""Initialize pipeline parallel groups, and assign local_ranks and groups to each gpu.
:return: Pipeline parallelism's information
:rtype: list of Tuples (local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
dist_settings = list()
for i in range(self.data_parallel_size):
for j in range(self.pipeline_stage_size):
......
......@@ -10,12 +10,14 @@ from ..parallel_mode import ParallelMode
@DIST_GROUP_INITIALIZER.register_module
class Initializer_Sequence_DP(ProcessGroupInitializer):
'''A ProcessGroupInitializer for sequence parallelism all-reduce.
"""A ProcessGroupInitializer for sequence parallelism all-reduce.
In Sequence Parallelism, each GPU holds the full copy of model weights,
In Sequence Parallelism, each GPU holds the full copy of model weights,
thus, gradient all-reduce occurs across all processes in the same pipeline stage
'''
:param args: Args used to initialize ProcessGroupInitializer
:param kwargs: Kwargs used to initialize ProcessGroupInitializer
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
......@@ -23,10 +25,11 @@ class Initializer_Sequence_DP(ProcessGroupInitializer):
self.num_group = self.pipeline_parallel_size
def init_dist_group(self):
'''Initialize Sequence Parallel process groups used for gradient all-reduce.
"""Initialize Sequence Parallel process groups used for gradient all-reduce.
:return: (local_rank, group_world_size, process_group, ranks_in_group, mode)
:rtype: tuple
'''
:rtype: Tuple
"""
local_rank = None
ranks_in_group = None
process_group = None
......@@ -47,9 +50,11 @@ class Initializer_Sequence_DP(ProcessGroupInitializer):
@DIST_GROUP_INITIALIZER.register_module
class Initializer_Sequence(ProcessGroupInitializer):
'''A ProcessGroupInitializer for sequence parallelism.
'''
"""A ProcessGroupInitializer for sequence parallelism.
:param args: Args used to initialize ProcessGroupInitializer
:param kwargs: Kwargs used to initialize ProcessGroupInitializer
"""
def __init__(self,
*args, **kwargs):
super().__init__(*args, **kwargs)
......@@ -58,15 +63,15 @@ class Initializer_Sequence(ProcessGroupInitializer):
self._sequence_dp_initializer = Initializer_Sequence_DP(*args, **kwargs)
def init_dist_group(self):
'''Initialize Sequence parallel process groups and assign local_ranks and groups to each gpu.
"""Initialize Sequence parallel process groups and assign local_ranks and groups to each gpu.
Sequence parallelism requires 2 process groups. The first is for model forward where several processes
exchange paritial query, key and value embedding to compute self attention values. The second is for
all-reduce to synchronize the model parameters.
:return: 2D tensor parallelism's information
:rtype: list of tuples (local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
:return: Sequence parallelism's information
:rtype: list of Tuples (local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
parallel_setting = []
......
......@@ -10,18 +10,21 @@ from ..parallel_mode import ParallelMode
@DIST_GROUP_INITIALIZER.register_module
class Initializer_Tensor(ProcessGroupInitializer):
'''A ProcessGroupInitializer for tensor parallelism.
'''
"""A ProcessGroupInitializer for tensor parallelism.
:param args: Args used to initialize ProcessGroupInitializer
:param kwargs: Kwargs used to initialize ProcessGroupInitializer
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.num_tensor_parallel_group = self.world_size // self.tensor_parallel_size
def init_dist_group(self):
'''Initialize tensor parallel groups, and assign local_ranks and groups to each gpu.
"""Initialize tensor parallel groups, and assign local_ranks and groups to each gpu.
:return: tensor parallelism's information
:rtype: tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
'''
:return: Tensor parallelism's information
:rtype: Tuple(local_rank, group_world_size, process_group, ranks_in_group, mode)
"""
local_rank = None
ranks_in_group = None
process_group = None
......
......@@ -7,21 +7,35 @@ from colossalai.context import Config
class ProcessGroupInitializer(ABC):
'''An object, knowing the parallelism configuration, that initializes parallel groups.
'''
"""An object, knowing the parallelism configuration, that initializes parallel groups.
:param rank: The rank of current process
:param world_size: Size of whole communication world
:param config: Running configuration
:param data_parallel_size: Size of data parallel
:param pipeline_parallel_size: Size of pipeline parallel
:param tensor_parallel_size: Size of tensor parallel
:type rank: int
:type world_size: int
:type config: Config
:type data_parallel_size: int
:type pipeline_parallel_size: int
:type tensor_parallel_size: int
"""
def __init__(self,
rank: int,
world_size: int,
config: Config,
data_parallel_size: int,
pipeline_parlalel_size: int,
pipeline_parallel_size: int,
tensor_parallel_size: int
):
self.rank = rank
self.world_size = world_size
self.data_parallel_size = data_parallel_size
self.config = config
self.pipeline_parallel_size = pipeline_parlalel_size
self.pipeline_parallel_size = pipeline_parallel_size
self.tensor_parallel_size = tensor_parallel_size
super().__init__()
......
......@@ -61,6 +61,8 @@ class SeedManager:
:type parallel_mode: :class:`colossalai.context.ParallelMode`
:param seed: The seed to be added
:type seed: int
:param overwrtie: Whether allows to overwrite the seed that has been set already
:type overwrtie: bool, optional
:raises AssertionError: Raises an AssertionError if `parallel_mode` is not an instance of
:class:`colossalai.context.ParallelMode` or the seed for `parallel_mode` has been added
"""
......
......@@ -22,8 +22,10 @@ class Engine:
:type optimizer: ``torch.optim.Optimizer``
:param criterion: Loss function for calculating loss
:type criterion: ``torch.nn.modules.loss._Loss``
:param gradient_clipping: The norm of gradient clipping
:type gradient_clipping: float, optional
:param gradient_handlers: A list of gradient handler used in backward
:type gradient_handlers: list
:param clip_grad_norm: The norm of gradient clipping
:type clip_grad_norm: float, optional
:param verbose: whether to display log info
:type verbose: bool
"""
......@@ -54,26 +56,26 @@ class Engine:
@property
def model(self):
"""model attached to the engine"""
"""Model attached to the engine"""
return self._model
@property
def optimizer(self):
"""optimizer attached to the engine"""
"""Optimizer attached to the engine"""
return self._optimizer
@property
def criterion(self):
"""criterion attached to the engine"""
"""Criterion attached to the engine"""
return self._criterion
def zero_grad(self):
"""set the gradient of parameters to zero
"""Set the gradient of parameters to zero
"""
self.optimizer.zero_grad()
def step(self):
"""execute parameter update
"""Execute parameter update
"""
self._all_reduce_gradients()
self.optimizer.clip_grad_norm(self.model, self._clip_grad_norm)
......@@ -82,7 +84,7 @@ class Engine:
def backward(self, loss: Tensor):
"""Start backward propagation given the loss value computed by a loss function
:param loss: loss value computed by a loss function
:param loss: Loss value computed by a loss function
:type loss: :class:`torch.Tensor`
"""
return self.optimizer.backward(loss)
......@@ -90,23 +92,28 @@ class Engine:
def backward_by_grad(self, tensor, grad):
"""Start backward propagation given the gradient of the output tensor
:param loss: output tensor
:type loss: :class:`torch.Tensor`
:param grad: gradient passed back to the output
:param tensor: Output tensor
:type tensor: :class:`torch.Tensor`
:param grad: Gradient passed back to the output
:type grad: :class:`torch.Tensor`
"""
return self.optimizer.backward_by_grad(tensor, grad)
def calc_loss(self, *args, **kwargs):
"""compute the loss value
:return: the loss value
"""Compute the loss value
:param args: Args used in criterion function
:param kwargs: Kwargs used in criterion function
:return: The loss value
:rtype: :class:`torch.Tensor`
"""
return self.criterion(*args, **kwargs)
def __call__(self, *args, **kwargs):
"""run the forward step for the model
:return: output the model
"""Run the forward step for the model
:return: Output the model
:rtype: Tuple[:class:`torch.Tensor`] or :class:`torch.Tensor`
"""
return self.model(*args, **kwargs)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment