Commit da3f0934 authored by zhuwenwen's avatar zhuwenwen
Browse files

delete unused files

parent c4dd1fd4
import torch
from . import BaseOpHook
from concurrent.futures import ThreadPoolExecutor
from colossalai.registry import OPHOOKS
from colossalai.logging import get_dist_logger
from time import sleep, time
import psutil
import pickle
def get_cuda_memory_used(device):
"""
Get the free memory info of device.
Notice that for CPU, this function will return 1/N of the total free memory,
where N is the world size.
"""
ret = torch.cuda.memory_allocated()
# get the peak memory to report correct data, so reset the counter for the next call
if hasattr(torch.cuda, "reset_peak_memory_stats"): # pytorch 1.4+
torch.cuda.reset_peak_memory_stats()
return ret
class AsyncMemoryMonitor:
def __init__(self, power=10):
"""
An Async Mem Monitor runing during computing.
Sampling GPU memory usage of the current GPU dev
at interval of 1/(10**power) sec.
"""
self.keep_measuring = False
self.executor = ThreadPoolExecutor(max_workers=1)
self.monitor_thread = None
self.interval = 1 / (10**power)
self.time_stamps = []
self.mem_stats = []
def set_interval(self, power: int):
self.interval = 1 / (10**power)
def is_measuring(self):
return self.keep_measuring
def start(self):
self.keep_measuring = True
self.monitor_thread = self.executor.submit(self._measure_usage)
def finish(self):
if self.keep_measuring is False:
return 0
self.keep_measuring = False
max_usage = self.monitor_thread.result()
self.monitor_thread = None
self.time_stamps.append(time())
self.mem_stats.append(max_usage)
return max_usage
def _measure_usage(self):
max_usage = 0
dev = torch.device(f"cuda:{torch.cuda.current_device()}")
while self.keep_measuring:
max_usage = max(
max_usage,
get_cuda_memory_used(dev),
)
sleep(self.interval)
return max_usage
def state_dict(self):
return {
"time_stamps": self.time_stamps,
"mem_stats": self.mem_stats,
}
def save(self, filename):
with open(filename, "wb") as f:
pickle.dump(self.state_dict(), f)
@OPHOOKS.register_module
class MemTracerOpHook(BaseOpHook):
def __init__(self, niter=5):
super().__init__()
self.async_mem_monitor = AsyncMemoryMonitor()
self._niter = niter
self._curiter = 0
self._logger = get_dist_logger()
def _isvalid(self, module):
return module.training and self._curiter < self._niter
def niter(self):
return self._niter
def pre_fwd_exec(self, module: torch.nn.Module, *args):
if self._isvalid(module):
self.async_mem_monitor.finish()
self.async_mem_monitor.start()
self._logger.debug(f'FWD PRE {module.__class__.__name__}')
def post_fwd_exec(self, module: torch.nn.Module, *args):
if self._isvalid(module):
self.async_mem_monitor.finish()
self._logger.debug(f'FWD POST {module.__class__.__name__}')
def pre_bwd_exec(self, module: torch.nn.Module, input, output):
assert isinstance(module, torch.nn.Module)
if self._isvalid(module):
self.async_mem_monitor.finish()
self.async_mem_monitor.start()
self._logger.debug(f'BWD PRE {module.__class__.__name__}')
def post_bwd_exec(self, module: torch.nn.Module, input):
assert isinstance(module, torch.nn.Module)
if self._isvalid(module):
self.async_mem_monitor.finish()
self._logger.debug(f'BWD POST {module.__class__.__name__}')
def pre_iter(self):
pass
def post_iter(self):
self.async_mem_monitor.finish()
if self._curiter == self._niter:
self._logger.info(
f'dump a memory statistics as pickle to ./memstats.pkl')
self.save_results("memstats.pkl")
self._curiter += 1
def save_results(self, filename):
self.async_mem_monitor.save(filename)
from ._base_schedule import BaseSchedule
from ._pipeline_schedule import PipelineSchedule, InterleavedPipelineSchedule
from ._non_pipeline_schedule import NonPipelineSchedule
__all__ = ['BaseSchedule', 'NonPipelineSchedule', 'PipelineSchedule', 'InterleavedPipelineSchedule']
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
from abc import ABC, abstractmethod
import torch
from typing import Iterable, Callable
from .._base_engine import Engine
from colossalai.logging import get_dist_logger
from colossalai.utils import get_current_device
class BaseSchedule(ABC):
"""A basic helper class to control the process of training or evaluation.
It mainly composes of forward_backward_step for gradient backward and
optimizer_step for parameters update.
For the convenience to enable FP16, we aggreate all codes that contain the
control of FP16 in class schedule.
"""
def __init__(self, batch_data_process_func: Callable = None):
self.logger = get_dist_logger()
self.batch_data_process_func = batch_data_process_func
@staticmethod
def _move_tensor(element):
if torch.is_tensor(element):
if not element.is_cuda:
return element.to(get_current_device()).detach()
return element
def _move_to_device(self, data):
if isinstance(data, dict):
data = {k: self._move_tensor(v) for k, v in data.items()}
else:
data = self._move_tensor(data)
return data
@staticmethod
def _check_sanity(data, tag: str):
assert isinstance(data, (torch.Tensor, dict)), \
f'{tag} must be torch.Tensor or dict'
def load_batch(self, data_iter, to_gpu=True):
"""Loads a batch from data iterator. It returns the data and labels which are
already in the same GPU as where the model's.
:param data_iter: Data iterator from which get a batch of data
:type data_iter: DataIter
:param to_gpu: Whether the data should be moved to GPU
:type to_gpu: bool, optional
:return: (data, label)
:rtype: (:class:`Tensor`, :class:`torch.Tensor`)
"""
if data_iter is None:
raise RuntimeError('Dataloader is not defined.')
batch_data = next(data_iter)
if self.batch_data_process_func:
data, label = self.batch_data_process_func(batch_data)
else:
data, label = batch_data
self._check_sanity(data, 'data')
self._check_sanity(label, 'label')
if isinstance(data, torch.Tensor):
self.batch_size = data.size(0)
else:
self.batch_size = next(iter(data.values())).size(0)
if to_gpu:
return self._move_to_device(data), self._move_to_device(label)
return data, label
def pre_processing(self, engine: Engine):
"""To perform actions before running the schedule.
"""
pass
@abstractmethod
def forward_backward_step(self,
engine: Engine,
data_iter: Iterable,
forward_only: bool,
return_loss: bool = True,
return_output_label: bool = True
):
"""The process function over a batch of dataset for training or evaluation.
:param engine: Colossalai training engine
:type engine: colossalai.engine.Engine
:param data_iter: Data iterator from which get a batch of data
:type data_iter: DataIter
:param forward_only: If True, the process won't include backward
:type forward_only: bool
:param return_loss: If False, the loss won't be returned
:type return_loss: bool, optional
:param return_output_label: If False, the output and label won't be returned
:type return_output_label: bool, optional
"""
pass
@staticmethod
def _call_engine(engine, inputs):
if isinstance(inputs, torch.Tensor):
return engine(inputs)
else:
return engine(**inputs)
@staticmethod
def _call_engine_criterion(engine, outputs, labels):
assert isinstance(outputs, (torch.Tensor, list, tuple)
), f'Expect output of model is (torch.Tensor, list, tuple), got {type(outputs)}'
if isinstance(outputs, torch.Tensor):
outputs = (outputs,)
if isinstance(labels, torch.Tensor):
return engine.criterion(*outputs, labels)
else:
return engine.criterion(*outputs, **labels)
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
from typing import Iterable
import torch
from colossalai.engine import Engine
from ._base_schedule import BaseSchedule
from colossalai.utils import conditional_context
class NonPipelineSchedule(BaseSchedule):
"""A helper schedule class for no pipeline parallelism running environment.
During one process, it loads a batch of dataset and feeds it to the model.
After getting the output and calculating the loss, it will use :meth:`step`
to update the parameters if it is in training mode.
"""
def forward_backward_step(self,
engine: Engine,
data_iter: Iterable,
forward_only: bool = False,
return_loss: bool = True,
return_output_label: bool = True):
"""The process function that loads loads a batch of dataset and feeds it to the model.
The returned labels and loss will None if :attr:`return_loss` is False.
:param engine: Model for training and inference
:param data_iter: Data iterator of the dataloader, e.g. iter(dataloader)
:param forward_only: If True, the model is run for the forward pass, else back propagation will be executed
:param return_loss: Loss will be returned if True
:param return_output_label: Output and label will be returned if True
:type engine: Iterator
:type data_iter: Iterator
:type forward_only: bool, optional
:type return_loss: bool, optional
:type return_output_label: bool, optional
:return: (output, label, loss)
:rtype: Tuple[:class:`torch.Tensor`]
"""
assert forward_only or return_loss, \
"The argument 'return_loss' has to be True when 'forward_only' is False, but got False."
data, label = self.load_batch(data_iter)
# forward
with conditional_context(torch.no_grad(), enable=forward_only):
output = self._call_engine(engine, data)
if return_loss:
loss = self._call_engine_criterion(engine, output, label)
if not forward_only:
engine.backward(loss)
if return_output_label:
if return_loss:
return output, label, loss
else:
return output, label, None
else:
if return_loss:
return None, None, loss
else:
return None, None, None
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import inspect
from typing import Callable, List, Tuple, Union
import colossalai.communication as comm
import torch.cuda
from colossalai.amp.naive_amp import NaiveAMPModel
from colossalai.context.parallel_mode import ParallelMode
from colossalai.core import global_context as gpc
from colossalai.logging import get_dist_logger
from colossalai.utils import switch_virtual_pipeline_parallel_rank
from colossalai.utils.cuda import get_current_device
from colossalai.zero import (ZeroRedundancyOptimizer_Level_2,
ZeroRedundancyOptimizer_Level_3)
from ._base_schedule import BaseSchedule
def pack_return_tensors(return_tensors):
output, label = tuple(zip(*return_tensors))
if isinstance(output[0], torch.Tensor):
output = torch.cat(output, dim=0)
elif isinstance(output[0], (list, tuple)):
output = tuple(torch.cat(tensors, dim=0) for tensors in zip(*output))
else:
raise TypeError(f'Output of model must be tensor or list/tuple of tensors')
if isinstance(label[0], torch.Tensor):
label = torch.cat(label, dim=0)
else:
merged_label = {k: [] for k in label[0].keys()}
for d in label:
for k, v in d.items():
merged_label[k].append(v)
label = {k: torch.cat(v, dim=0) for k, v in merged_label.items()}
return output, label
class PipelineSchedule(BaseSchedule):
"""A helper schedule class for pipeline parallelism running environment.
It uses non-interleaved 1F1B strategy. Other properties are similar as
:class:`NonPipelineSchedule`.
:param num_microbatches: The number of microbatches
:type num_microbatches: int
:param batch_data_process_func: The preprocessing function which receives a batch of data, and it will be executed in `load_batch`
:type batch_data_process_func: Callable, optional
:param tensor_shape: Specified shape in pipeline communication
:type tensor_shape: torch.Size, optional
:param scatter_gather_tensors: If set to `True`, communication will be reduced over pipeline when using 1D tensor parallelization
:type scatter_gather_tensors: bool, optional
"""
def __init__(self,
num_microbatches,
batch_data_process_func: Callable = None,
tensor_shape: Union[torch.Size, List[int], Tuple[int]] = None,
scatter_gather_tensors: bool = False):
super().__init__(batch_data_process_func=batch_data_process_func)
self.num_microbatches = num_microbatches
self.dtype = torch.float
self.tensor_shape = tensor_shape
self.scatter_gather_tensors = False
if gpc.is_initialized(ParallelMode.PARALLEL_1D) and gpc.get_world_size(ParallelMode.PARALLEL_1D) > 1:
self.scatter_gather_tensors = scatter_gather_tensors
self._logger = get_dist_logger()
def load_batch(self, data_iter):
# Pipeline schedule just puts data in memory
self.batch_data, self.batch_label = super().load_batch(data_iter, to_gpu=False)
self.microbatch_offset = 0
if isinstance(self.batch_data, torch.Tensor):
batch_size = self.batch_data.size(0)
else:
batch_size = next(iter(self.batch_data.values())).size(0)
assert batch_size % self.num_microbatches == 0, \
"Batch size should divided by the number of microbatches"
self.microbatch_size = batch_size // self.num_microbatches
def _get_data_slice(self, data, offset):
if isinstance(data, torch.Tensor):
return data[offset: offset + self.microbatch_size]
else:
return {k: v[offset:offset + self.microbatch_size] for k, v in data.items()}
def load_micro_batch(self):
data = self._get_data_slice(self.batch_data, self.microbatch_offset)
label = self._get_data_slice(self.batch_label, self.microbatch_offset)
self.microbatch_offset += self.microbatch_size
return self._move_to_device(data), self._move_to_device(label)
def pre_processing(self, engine):
if isinstance(engine.optimizer, (ZeroRedundancyOptimizer_Level_2, ZeroRedundancyOptimizer_Level_3)):
raise TypeError(
"Pipeline schedule is currently not compatible with ZeRO Level 2 and Level 3"
)
model = engine.model
if isinstance(model, NaiveAMPModel):
self.dtype = torch.half
model = model.model
sig = inspect.signature(model.forward)
for p in sig.parameters.values():
assert p.kind != inspect.Parameter.VAR_POSITIONAL, '*args is not supported'
@staticmethod
def _call_engine(model, input_tensor, batch_data):
if isinstance(model, NaiveAMPModel):
sig = inspect.signature(model.model.forward)
else:
sig = inspect.signature(model.forward)
if isinstance(batch_data, torch.Tensor):
if input_tensor is None:
return model(batch_data)
elif len(sig.parameters) > 1:
return model(input_tensor, batch_data)
else:
return model(input_tensor)
else:
filter_batch = True
for p in sig.parameters.values():
if p.kind == inspect.Parameter.VAR_KEYWORD:
filter_batch = False
if filter_batch:
batch_data = {k: v for k, v in batch_data.items() if k in sig.parameters}
if input_tensor is None:
return model(**batch_data)
else:
return model(input_tensor, **batch_data)
def forward_step(self, engine, input_tensor, return_tensors, return_output_label=True, accum_loss=None):
"""Forward step for passed-in model. If it is the first stage, the input tensor
is obtained from data_iterator, otherwise the passed-in input_tensor is used.
Returns output tensor. This is a helper function and can be ignored by users.
:param engine: Your engine object
:type engine: colossalai.engine.Engine
:param input_tensor: Input tensor for this pipeline stage
:type input_tensor: :class:`torch.Tensor`
:param return_tensors: A list of tensors to return
:type return_tensors: List[:class:`torch.Tensor`]
:param return_output_label: Whether returns output labels
:type return_output_label: bool, optional
:param accum_loss: Where accumulated loss stores
:type accum_loss: optional
:return: output or the loss value of the current pipeline stage
:rtype: :class:`torch.Tensor`
"""
data, label = self.load_micro_batch()
output_tensor = self._call_engine(engine.model, input_tensor, data)
if gpc.is_last_rank(ParallelMode.PIPELINE):
if return_output_label:
return_tensors.append((output_tensor, label))
if accum_loss is not None:
loss_reduced = self._call_engine_criterion(engine, output_tensor, label) / self.num_microbatches
accum_loss.add_(loss_reduced.detach())
return loss_reduced
else:
# forward only, it's useless since backward is not needed
return output_tensor
else:
assert isinstance(
output_tensor, torch.Tensor), 'Output of model using pipeline parallelism must be a tensor (except the last stage).'
self._logger.debug(
f'Global rank {gpc.get_global_rank()}, pipeline rank {gpc.get_local_rank(ParallelMode.PIPELINE)} forward output tensor {output_tensor.shape}, dtype {output_tensor.dtype}')
return output_tensor
def backward_step(self, engine, input_tensor, output_tensor, output_tensor_grad):
"""Backward step through the passed-in output tensor. If it is the last stage, the
output_tensor_grad is None, otherwise it is the gradients with respect to stage's output tensor.
Returns the gradients with respect to the input tensor (None if first stage).
This is a helper function and can be ignored by users.
:param engine: your engine object
:type engine: colossalai.engine.Engine
:param input_tensor: input tensor for this pipeline stage
:type input_tensor: :class:`torch.Tensor`
:param output_tensor: output tensor for this pipeline stage
:type output_tensor: :class:`torch.Tensor`
:param output_tensor_grad: gradient of output tensor for this pipeline stage
:type output_tensor_grad: :class:`torch.Tensor`
:return: gradient of input tensor
:rtype: :class:`torch.Tensor`
"""
# Retain the grad on the input_tensor.
if input_tensor is not None:
input_tensor.retain_grad()
# Backward pass.
if output_tensor_grad is None:
engine.backward(output_tensor)
else:
engine.backward_by_grad(output_tensor, output_tensor_grad)
# Collect the grad of the input_tensor.
input_tensor_grad = None
if input_tensor is not None:
input_tensor_grad = input_tensor.grad
return input_tensor_grad
def forward_backward_step(self,
engine,
data_iter,
forward_only=False,
return_loss=True,
return_output_label=True):
"""Runs non-interleaved 1F1B schedule, with communication between pipeline stages.
Returns a tuple with losses if the last stage, an empty tuple otherwise.
:param engine: Your engine object
:type engine: colossalai.engine.Engine
:param data_iter: Dataloader as the form of an iterator, obtained by calling iter(dataloader)
:type data_iter: Iterable
:param forward_only: Whether run forward step only. Default is false. If true, no backward will be run.
:type forward_only: bool
:param return_loss: Whether returns the loss value. Default is true.
:type return_loss: bool
:param return_output_label: If False, the output and label won't be returned
:type return_output_label: bool
:return: (output, label, loss)
:rtype: Tuple[:class:`torch.Tensor`]
"""
assert forward_only or return_loss, \
'The argument \'return_loss\' has to be True when \'forward_only\' is False, but got False.'
self.load_batch(data_iter)
num_warmup_microbatches = \
(gpc.get_world_size(ParallelMode.PIPELINE) -
gpc.get_local_rank(ParallelMode.PIPELINE) - 1)
num_warmup_microbatches = min(num_warmup_microbatches,
self.num_microbatches)
num_microbatches_remaining = self.num_microbatches - num_warmup_microbatches
# Input, output tensors only need to be saved when doing backward passes
input_tensors = None
output_tensors = None
if not forward_only:
input_tensors = []
output_tensors = []
return_tensors = []
if return_loss and gpc.is_pipeline_last_stage(ignore_virtual=True):
accum_loss = torch.zeros(1, device=get_current_device())
else:
accum_loss = None
# Used for tensor meta information communication
ft_shape = self.tensor_shape
bt_shape = None
fs_checker = self.tensor_shape is None
# Run warmup forward passes.
for i in range(num_warmup_microbatches):
if not gpc.is_first_rank(ParallelMode.PIPELINE):
ft_shape = comm.recv_tensor_meta(ft_shape)
input_tensor = comm.recv_forward(ft_shape, dtype=self.dtype,
scatter_gather_tensors=self.scatter_gather_tensors)
output_tensor = self.forward_step(
engine, input_tensor, return_tensors,
return_output_label=return_output_label,
accum_loss=accum_loss
)
if not gpc.is_last_rank(ParallelMode.PIPELINE):
bt_shape = output_tensor.shape
fs_checker = comm.send_tensor_meta(output_tensor, fs_checker)
comm.send_forward(output_tensor, scatter_gather_tensors=self.scatter_gather_tensors)
if not forward_only:
input_tensors.append(input_tensor)
output_tensors.append(output_tensor)
# Before running 1F1B, need to receive first forward tensor.
# If all microbatches are run in warmup / cooldown phase, then no need to
# receive this tensor here.
if num_microbatches_remaining > 0:
if not gpc.is_first_rank(ParallelMode.PIPELINE):
ft_shape = comm.recv_tensor_meta(ft_shape)
input_tensor = comm.recv_forward(ft_shape, dtype=self.dtype,
scatter_gather_tensors=self.scatter_gather_tensors)
# Run 1F1B in steady state.
for i in range(num_microbatches_remaining):
last_iteration = (i == (num_microbatches_remaining - 1))
output_tensor = self.forward_step(
engine, input_tensor, return_tensors,
return_output_label=return_output_label,
accum_loss=accum_loss
)
if forward_only:
comm.send_forward(output_tensor, scatter_gather_tensors=self.scatter_gather_tensors)
if not last_iteration:
input_tensor = comm.recv_forward(ft_shape, dtype=self.dtype,
scatter_gather_tensors=self.scatter_gather_tensors)
else:
output_tensor_grad = comm.send_forward_recv_backward(
output_tensor, bt_shape, dtype=self.dtype, scatter_gather_tensors=self.scatter_gather_tensors)
# Add input_tensor and output_tensor to end of list.
input_tensors.append(input_tensor)
output_tensors.append(output_tensor)
# Pop input_tensor and output_tensor from the start of the list for
# the backward pass.
input_tensor = input_tensors.pop(0)
output_tensor = output_tensors.pop(0)
input_tensor_grad = self.backward_step(
engine,
input_tensor, output_tensor,
output_tensor_grad
)
if last_iteration:
input_tensor = None
comm.send_backward(input_tensor_grad, scatter_gather_tensors=self.scatter_gather_tensors)
else:
input_tensor = comm.send_backward_recv_forward(
input_tensor_grad, ft_shape, dtype=self.dtype, scatter_gather_tensors=self.scatter_gather_tensors)
# Run cooldown backward passes.
if not forward_only:
for i in range(num_warmup_microbatches):
input_tensor = input_tensors.pop(0)
output_tensor = output_tensors.pop(0)
output_tensor_grad = comm.recv_backward(bt_shape, dtype=self.dtype,
scatter_gather_tensors=self.scatter_gather_tensors)
input_tensor_grad = self.backward_step(
engine,
input_tensor, output_tensor,
output_tensor_grad
)
comm.send_backward(input_tensor_grad, scatter_gather_tensors=self.scatter_gather_tensors)
if len(return_tensors) > 0:
output, label = pack_return_tensors(return_tensors)
return output, label, accum_loss
else:
return None, None, accum_loss
class InterleavedPipelineSchedule(PipelineSchedule):
def __init__(self,
num_microbatches,
num_model_chunks,
batch_data_process_func: Callable = None,
tensor_shape: Union[torch.Size, List[int], Tuple[int]] = None,
scatter_gather_tensors: bool = False):
"""A helper schedule class for pipeline parallelism running environment.
It uses interleaved 1F1B strategy. Other properties are similar as
:class:`NonPipelineSchedule`.
:param num_microbatches: The number of microbatches
:type num_microbatches: int
:param num_model_chunks: The number of model chunks
:type num_model_chunks: int
:param batch_data_process_func: The preprocessing function which receives a batch of data, and it will be executed in `load_batch`
:type batch_data_process_func: Callable, optional
:param tensor_shape: Specified shape in pipeline communication
:type tensor_shape: torch.Size, optional
:param scatter_gather_tensors: If set to `True`, communication will be reduced over pipeline when using 1D tensor parallelization
:type scatter_gather_tensors: bool, optional
"""
assert num_microbatches % gpc.get_world_size(ParallelMode.PIPELINE) == 0, \
'num_microbatches must be an integer multiple of pipeline parallel world size'
super().__init__(num_microbatches, batch_data_process_func=batch_data_process_func,
tensor_shape=tensor_shape, scatter_gather_tensors=scatter_gather_tensors)
gpc.set_virtual_pipeline_parallel_size(num_model_chunks)
gpc.set_virtual_pipeline_parallel_rank(0)
self.num_model_chunks = num_model_chunks
def pre_processing(self, engine):
if isinstance(engine.optimizer, (ZeroRedundancyOptimizer_Level_2, ZeroRedundancyOptimizer_Level_3)):
raise TypeError(
"Pipeline schedule is currently not compatible with ZeRO Level 2 and Level 3"
)
if isinstance(engine.model[0], NaiveAMPModel):
self.dtype = torch.half
for model in engine.model:
if isinstance(model, NaiveAMPModel):
model = model.model
sig = inspect.signature(model.forward)
for p in sig.parameters.values():
assert p.kind != inspect.Parameter.VAR_POSITIONAL, '*args is not supported'
def load_batch(self, data_iter):
super().load_batch(data_iter)
# overwrite microbatch_offset, since model chunks load the same microbatch, and should tract the offset
self.microbatch_offset = [0 for _ in range(self.num_model_chunks)]
def load_micro_batch(self, model_chunk_id):
data = self._get_data_slice(self.batch_data, self.microbatch_offset[model_chunk_id])
label = self._get_data_slice(self.batch_label, self.microbatch_offset[model_chunk_id])
self.microbatch_offset[model_chunk_id] += self.microbatch_size
return self._move_to_device(data), self._move_to_device(label)
def forward_step(self, engine, model_chunk_id, input_tensor, return_tensors, return_output_label=True, accum_loss=None):
"""Forward step for passed-in model. If it is the first stage, the input tensor
is obtained from data_iterator, otherwise the passed-in input_tensor is used.
Returns output tensor. This is a helper function and can be ignored by users.
"""
data, label = self.load_micro_batch(model_chunk_id)
output_tensor = self._call_engine(engine.model[model_chunk_id], input_tensor, data)
if gpc.is_pipeline_last_stage():
if return_output_label:
return_tensors.append((output_tensor, label))
if accum_loss is not None:
loss_reduced = self._call_engine_criterion(engine, output_tensor, label) / self.num_microbatches
accum_loss.add_(loss_reduced.detach())
return loss_reduced
else:
# forward only, it's useless since backward is not needed
return output_tensor
else:
assert isinstance(
output_tensor, torch.Tensor), 'Output of model using pipeline parallelism must be a tensor (except the last stage).'
self._logger.debug(
f'Global rank {gpc.get_global_rank()}, pipeline rank {gpc.get_local_rank(ParallelMode.PIPELINE)} forward output tensor {output_tensor.shape}, dtype {output_tensor.dtype}')
return output_tensor
def forward_backward_step(self, engine, data_iter, forward_only=False, return_loss=True, return_output_label=True):
"""Run interleaved 1F1B schedule (model split into model chunks), with
communication between pipeline stages as needed.
Returns dictionary with losses if the last stage, empty dict otherwise.
:param engine: Your engine object
:type engine: colossalai.engine.Engine
:param data_iter: Dataloader as the form of an iterator, obtained by calling iter(dataloader)
:type data_iter: Iterable
:param forward_only: Whether run forward step only. Default is false. If true, no backward will be run.
:type forward_only: bool
:param return_loss: Whether returns the loss value. Default is true.
:type return_loss: bool
:param return_output_label: If False, the output and label won't be returned
:type return_output_label: bool
"""
assert forward_only or return_loss, \
'The argument \'return_loss\' has to be True when \'forward_only\' is False, but got False.'
self.load_batch(data_iter)
model = engine.model
input_tensors = [[] for _ in range(len(model))]
output_tensors = [[] for _ in range(len(model))]
return_tensors = []
if not forward_only:
output_tensor_grads = [[] for _ in range(len(model))]
if return_loss and gpc.is_pipeline_last_stage(ignore_virtual=True):
accum_loss = torch.zeros(1, device=get_current_device())
else:
accum_loss = None
# Used for tensor meta information communication
input_tensor_shapes = [self.tensor_shape for _ in range(len(model))]
output_tensor_shapes = [None for _ in range(len(model))]
send_tensor_shape_flags = [self.tensor_shape is None for _ in range(len(model))]
pipeline_parallel_size = gpc.get_world_size(ParallelMode.PIPELINE)
pipeline_parallel_rank = gpc.get_local_rank(ParallelMode.PIPELINE)
# Compute number of warmup and remaining microbatches.
num_model_chunks = len(model)
num_microbatches = self.num_microbatches * num_model_chunks
all_warmup_microbatches = False
if forward_only:
num_warmup_microbatches = num_microbatches
else:
# Run all forward passes and then all backward passes if number of
# microbatches is just the number of pipeline stages.
# Otherwise, perform (num_model_chunks-1)*pipeline_parallel_size on
# all workers, followed by more microbatches after depending on
# stage ID (more forward passes for earlier stages, later stages can
# immediately start with 1F1B).
if self.num_microbatches == pipeline_parallel_size:
num_warmup_microbatches = num_microbatches
all_warmup_microbatches = True
else:
num_warmup_microbatches = \
(pipeline_parallel_size - pipeline_parallel_rank - 1) * 2
num_warmup_microbatches += (
num_model_chunks - 1) * pipeline_parallel_size
num_warmup_microbatches = min(num_warmup_microbatches,
num_microbatches)
num_microbatches_remaining = \
num_microbatches - num_warmup_microbatches
def get_model_chunk_id(microbatch_id, forward):
"""Helper method to get the model chunk ID given the iteration number."""
microbatch_id_in_group = microbatch_id % (pipeline_parallel_size * num_model_chunks)
model_chunk_id = microbatch_id_in_group // pipeline_parallel_size
if not forward:
model_chunk_id = (num_model_chunks - model_chunk_id - 1)
return model_chunk_id
def forward_step_helper(microbatch_id):
"""Helper method to run forward step with model split into chunks
(run set_virtual_pipeline_model_parallel_rank() before calling
forward_step())."""
model_chunk_id = get_model_chunk_id(microbatch_id, forward=True)
gpc.set_virtual_pipeline_parallel_rank(model_chunk_id)
# forward step
if gpc.is_pipeline_first_stage():
if len(input_tensors[model_chunk_id]) == \
len(output_tensors[model_chunk_id]):
input_tensors[model_chunk_id].append(None)
input_tensor = input_tensors[model_chunk_id][-1]
output_tensor = self.forward_step(engine, model_chunk_id, input_tensor,
return_tensors, return_output_label=return_output_label, accum_loss=accum_loss)
output_tensors[model_chunk_id].append(output_tensor)
# if forward-only, no need to save tensors for a backward pass
if forward_only:
input_tensors[model_chunk_id].pop()
output_tensors[model_chunk_id].pop()
return output_tensor
def backward_step_helper(microbatch_id):
"""Helper method to run backward step with model split into chunks
(run set_virtual_pipeline_model_parallel_rank() before calling
backward_step())."""
model_chunk_id = get_model_chunk_id(microbatch_id, forward=False)
gpc.set_virtual_pipeline_parallel_rank(model_chunk_id)
if gpc.is_pipeline_last_stage():
if len(output_tensor_grads[model_chunk_id]) == 0:
output_tensor_grads[model_chunk_id].append(None)
input_tensor = input_tensors[model_chunk_id].pop(0)
output_tensor = output_tensors[model_chunk_id].pop(0)
output_tensor_grad = output_tensor_grads[model_chunk_id].pop(0)
input_tensor_grad = self.backward_step(engine, input_tensor, output_tensor, output_tensor_grad)
return input_tensor_grad
# Run warmup forward passes.
gpc.set_virtual_pipeline_parallel_rank(0)
if not gpc.is_pipeline_first_stage():
input_tensor_shapes[0] = comm.recv_tensor_meta(input_tensor_shapes[0])
input_tensors[0].append(comm.recv_forward(input_tensor_shapes[0], dtype=self.dtype,
scatter_gather_tensors=self.scatter_gather_tensors))
for k in range(num_warmup_microbatches):
model_chunk_id = get_model_chunk_id(k, forward=True)
output_tensor = forward_step_helper(k)
if not gpc.is_pipeline_last_stage():
output_tensor_shapes[model_chunk_id] = output_tensor.shape
send_tensor_shape_flags[model_chunk_id] = comm.send_tensor_meta(
output_tensor, send_tensor_shape_flags[model_chunk_id])
# Determine if tensor should be received from previous stage.
next_forward_model_chunk_id = get_model_chunk_id(k+1, forward=True)
recv_prev = True
if gpc.is_pipeline_first_stage(ignore_virtual=True):
if next_forward_model_chunk_id == 0:
recv_prev = False
if k == (num_microbatches - 1):
recv_prev = False
# Don't send tensor downstream if on last stage.
if gpc.is_pipeline_last_stage():
output_tensor = None
with switch_virtual_pipeline_parallel_rank(next_forward_model_chunk_id):
if not gpc.is_pipeline_first_stage():
input_tensor_shapes[next_forward_model_chunk_id] = comm.recv_tensor_meta(
input_tensor_shapes[next_forward_model_chunk_id])
# Send and receive tensors as appropriate (send tensors computed
# in this iteration; receive tensors for next iteration).
input_shape = input_tensor_shapes[next_forward_model_chunk_id] if recv_prev else None
if k == (num_warmup_microbatches - 1) and not forward_only and \
not all_warmup_microbatches:
input_tensor_grad = None
recv_next = True
if gpc.is_pipeline_last_stage(ignore_virtual=True):
recv_next = False
output_shape = output_tensor_shapes[num_model_chunks-1] if recv_next else None
input_tensor, output_tensor_grad = \
comm.send_forward_backward_recv_forward_backward(
output_tensor, input_tensor_grad,
input_shape,
output_shape,
recv_prev=recv_prev, recv_next=recv_next,
dtype=self.dtype,
scatter_gather_tensors=self.scatter_gather_tensors)
output_tensor_grads[num_model_chunks-1].append(output_tensor_grad)
else:
input_tensor = \
comm.send_forward_recv_forward(
output_tensor,
input_shape,
recv_prev=recv_prev,
dtype=self.dtype,
scatter_gather_tensors=self.scatter_gather_tensors)
input_tensors[next_forward_model_chunk_id].append(input_tensor)
# Run 1F1B in steady state.
for k in range(num_microbatches_remaining):
# Forward pass.
forward_k = k + num_warmup_microbatches
output_tensor = forward_step_helper(forward_k)
# Backward pass.
backward_k = k
input_tensor_grad = backward_step_helper(backward_k)
# Send output_tensor and input_tensor_grad, receive input_tensor
# and output_tensor_grad.
# Determine if current stage has anything to send in either direction,
# otherwise set tensor to None.
forward_model_chunk_id = get_model_chunk_id(forward_k, forward=True)
gpc.set_virtual_pipeline_parallel_rank(forward_model_chunk_id)
if gpc.is_pipeline_last_stage():
output_tensor = None
backward_model_chunk_id = get_model_chunk_id(backward_k, forward=False)
gpc.set_virtual_pipeline_parallel_rank(backward_model_chunk_id)
if gpc.is_pipeline_first_stage():
input_tensor_grad = None
# Determine if peers are sending, and where in data structure to put
# received tensors.
recv_prev = True
if gpc.is_pipeline_first_stage(ignore_virtual=True):
# First stage is ahead of last stage by (pipeline_parallel_size - 1).
next_forward_model_chunk_id = get_model_chunk_id(
forward_k - (pipeline_parallel_size - 1), forward=True)
if next_forward_model_chunk_id == (num_model_chunks - 1):
recv_prev = False
next_forward_model_chunk_id += 1
else:
next_forward_model_chunk_id = get_model_chunk_id(forward_k + 1,
forward=True)
recv_next = True
if gpc.is_pipeline_last_stage(ignore_virtual=True):
# Last stage is ahead of first stage by (pipeline_parallel_size - 1).
next_backward_model_chunk_id = get_model_chunk_id(
backward_k - (pipeline_parallel_size - 1), forward=False)
if next_backward_model_chunk_id == 0:
recv_next = False
next_backward_model_chunk_id -= 1
else:
next_backward_model_chunk_id = get_model_chunk_id(backward_k + 1,
forward=False)
# If last iteration, don't receive; we already received one extra
# before the start of the for loop.
if k == (num_microbatches_remaining - 1):
recv_prev = False
input_shape = input_tensor_shapes[next_forward_model_chunk_id] if recv_prev else None
output_shape = output_tensor_shapes[next_backward_model_chunk_id] if recv_next else None
# Communicate tensors.
input_tensor, output_tensor_grad = \
comm.send_forward_backward_recv_forward_backward(
output_tensor, input_tensor_grad,
input_shape,
output_shape,
recv_prev=recv_prev, recv_next=recv_next,
dtype=self.dtype,
scatter_gather_tensors=self.scatter_gather_tensors)
# Put input_tensor and output_tensor_grad in data structures in the
# right location.
if recv_prev:
input_tensors[next_forward_model_chunk_id].append(input_tensor)
if recv_next:
output_tensor_grads[next_backward_model_chunk_id].append(
output_tensor_grad)
# Run cooldown backward passes (flush out pipeline).
if not forward_only:
if all_warmup_microbatches:
output_tensor_grads[num_model_chunks-1].append(
comm.recv_backward(output_tensor_shapes[num_model_chunks-1], scatter_gather_tensors=self.scatter_gather_tensors))
for k in range(num_microbatches_remaining, num_microbatches):
input_tensor_grad = backward_step_helper(k)
next_backward_model_chunk_id = get_model_chunk_id(k+1, forward=False)
recv_next = True
if gpc.is_pipeline_last_stage(ignore_virtual=True):
if next_backward_model_chunk_id == (num_model_chunks - 1):
recv_next = False
if k == (num_microbatches - 1):
recv_next = False
output_shape = output_tensor_shapes[next_backward_model_chunk_id] if recv_next else None
output_tensor_grads[next_backward_model_chunk_id].append(
comm.send_backward_recv_backward(
input_tensor_grad,
output_shape,
recv_next=recv_next,
dtype=self.dtype,
scatter_gather_tensors=self.scatter_gather_tensors))
if len(return_tensors) > 0:
output, label = pack_return_tensors(return_tensors)
return output, label, accum_loss
else:
return None, None, accum_loss
from typing import Optional
class TensorParallelEnv(object):
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = object.__new__(cls, *args, **kwargs)
return cls._instance
def __init__(self, *args, **kwargs):
self.load(*args, **kwargs)
def load(self,
mode: Optional[str] = None,
vocab_parallel: bool = False,
parallel_input_1d: bool = False,
summa_dim: int = None,
tesseract_dim: int = None,
tesseract_dep: int = None,
depth_3d: int = None,
input_group_3d=None,
weight_group_3d=None,
output_group_3d=None):
self.mode = mode
self.vocab_parallel = vocab_parallel
self.parallel_input_1d = parallel_input_1d
self.summa_dim = summa_dim
self.tesseract_dim = tesseract_dim
self.tesseract_dep = tesseract_dep
self.depth_3d = depth_3d
self.input_group_3d = input_group_3d
self.weight_group_3d = weight_group_3d
self.output_group_3d = output_group_3d
def save(self):
return dict(mode=self.mode,
vocab_parallel=self.vocab_parallel,
parallel_input_1d=self.parallel_input_1d,
summa_dim=self.summa_dim,
tesseract_dim=self.tesseract_dim,
tesseract_dep=self.tesseract_dep,
depth_3d=self.depth_3d,
input_group_3d=self.input_group_3d,
weight_group_3d=self.weight_group_3d,
output_group_3d=self.output_group_3d)
class MoeEnv:
"""Moe enviroment variables.
"""
def __init__(self):
self.data_parallel_size = None
self.model_parallel_size = None
self.aux_loss = None
def setup(self, moe_model_size):
from .core import global_context as gpc
if gpc.tensor_parallel_size > 1 or gpc.pipeline_parallel_size > 1:
raise NotImplementedError("Moe is not compatible with tensor or pipeline parallel")
assert gpc.data_parallel_size % moe_model_size == 0, \
"The size of data parallel needs to be divided by moe model parallel size"
self.data_parallel_size = gpc.data_parallel_size // moe_model_size
self.model_parallel_size = moe_model_size
def is_initialized(self):
return self.model_parallel_size is not None
def reset_loss(self):
self.aux_loss = 0
def add_loss(self, loss):
self.aux_loss += loss
def get_loss(self):
return self.aux_loss
tensor_parallel_env = TensorParallelEnv()
moe_env = MoeEnv()
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import argparse
import pprint
import os
from colossalai.nn.optimizer.colossalai_optimizer import ColossalaiOptimizer
import torch
import torch.nn as nn
from pathlib import Path
from typing import Iterable, Union, Optional, Tuple, List, Dict
from colossalai.amp import convert_to_amp, AMP_TYPE
from colossalai.context import Config, ParallelMode, ConfigException
from colossalai.core import global_context as gpc
from colossalai.engine import Engine
from colossalai.logging import get_dist_logger
from colossalai.utils import (accumulate_gradient, get_current_device,
sync_model_param, is_using_ddp, is_using_pp, is_using_sequence)
from colossalai.zero import convert_to_zero, ZeroRedundancyOptimizer_Level_2, ZeroRedundancyOptimizer_Level_3
from colossalai.builder.builder import build_gradient_handler
from torch.optim.optimizer import Optimizer
from torch.optim.lr_scheduler import _LRScheduler
from torch.utils.data import DataLoader
from torch.nn.modules.loss import _Loss
from torch.nn.parallel import DistributedDataParallel as DDP
from colossalai.global_variables import moe_env
def get_default_parser():
"""Reads user command line and uses an argument parser to parse the input arguments.
Input arguments include configuration, host, port, world size, local rank, backend for torch.distributed.
:return: Returns the parser with the default arguments, the user may add customized arguments into this parser
:rtype: Namespace
"""
parser = argparse.ArgumentParser()
parser.add_argument('--config', type=str, help='path to the config file')
parser.add_argument('--host',
type=str,
help='the master address for distributed training')
parser.add_argument('--port',
type=int,
help='the master port for distributed training')
parser.add_argument('--world_size', type=int, help='world size for distributed training')
parser.add_argument('--rank', type=int, help='rank for the default process group')
parser.add_argument('--local_rank',
type=int,
help='local rank on the node')
parser.add_argument('--backend',
type=str,
default='nccl',
help='backend for distributed communication')
return parser
def launch(config: Union[str, Path, Config, Dict],
rank: int,
world_size: int,
host: str,
port: int,
backend: str = 'nccl',
local_rank: int = None,
seed: int = 1024,
verbose: bool = True):
"""This function first parses the configuration arguments, using :func:`parse_args()` in case one of the input
arguments are not given. Then initialize and set distributed environment by calling global_context's functions.
:param config: Config file or config file path are both acceptable
:type config: Union[str, dict, Config]
:param rank: Rank for the default process group
:type rank: int
:param world_size: World size of the default process group
:type world_size: int
:param host: The master address for distributed training
:type host: str
:param port: The master port for distributed training
:type port: str
:param backend: Backend for torch.distributed
:type backend: str, optional
:param local_rank: Rank for the process on the node and is used to set the default CUDA device, defaults to None.
If local_rank = None, the default device ordinal will be calculated automatically
:type local_rank: int, optional
:param seed: Specified random seed for every processes
:type seed: int, optional
:param verbose: Whether to print logs
:type verbose: bool, optional
:raises Exception: Raise exception when config type is wrong
"""
gpc.verbose = verbose
# set config
assert isinstance(config, (Config, str, Path, dict)), \
f'expected argument config to be Config, str or Path, but got {type(config)}'
if not isinstance(config, Config) and isinstance(config, dict):
config = Config(config)
if isinstance(config, (str, Path)):
config = Config.from_file(config)
gpc.load_config(config)
# init default process group
gpc.init_global_dist(rank, world_size, backend, host, port)
# init process groups for different parallel modes from config
gpc.init_parallel_groups()
# set cuda device
if torch.cuda.is_available():
# if local rank is not given, calculate automatically
gpc.set_device(local_rank)
gpc.set_seed(seed)
if verbose:
logger = get_dist_logger()
logger.info(f'Distributed environment is initialized, '
f'data parallel size: {gpc.data_parallel_size}, pipeline parallel size: {gpc.pipeline_parallel_size}, '
f'tensor parallel size: {gpc.tensor_parallel_size}', ranks=[0])
def launch_from_slurm(config: Union[str, Path, Config, Dict],
host: str,
port: int,
backend: str = 'nccl',
seed: int = 1024,
verbose: bool = True):
"""A wrapper for colossalai.launch for SLURM launcher by reading rank and world size from the environment variables
set by SLURM
:param config: Config file or config file path are both acceptable
:type config: Union[str, dict, Config]
:param host: The master address for distributed training
:type host: str
:param port: The master port for distributed training
:type port: str
:param backend: Backend for torch.distributed
:type backend: str, optional
:param seed: Specified random seed for every processes
:type seed: int, optional
:param verbose: Whether to print logs
:type verbose: bool, optional
"""
rank = int(os.environ['SLURM_PROCID'])
world_size = int(os.environ['SLURM_NPROCS'])
launch(config=config,
rank=rank,
world_size=world_size,
host=host,
port=port,
backend=backend,
seed=seed,
verbose=verbose)
def launch_from_openmpi(config: Union[str, Path, Config, Dict],
host: str,
port: int,
backend: str = 'nccl',
seed: int = 1024,
verbose: bool = True):
"""A wrapper for colossalai.launch for OpenMPI launcher by reading rank and world size from the environment variables
set by OpenMPI
:param config: Config file or config file path are both acceptable
:type config: Union[str, dict, Config]
:param host: The master address for distributed training
:type host: str
:param port: The master port for distributed training
:type port: str
:param backend: Backend for torch.distributed
:type backend: str, optional
:param seed: Specified random seed for every processes
:type seed: int, optional
:param verbose: Whether to print logs
:type verbose: bool, optional
"""
rank = int(os.environ['OMPI_COMM_WORLD_RANK'])
local_rank = int(os.environ['OMPI_COMM_WORLD_LOCAL_RANK'])
world_size = int(os.environ['OMPI_COMM_WORLD_SIZE'])
launch(config=config,
local_rank=local_rank,
rank=rank,
world_size=world_size,
host=host,
port=port,
backend=backend,
seed=seed,
verbose=verbose)
def launch_from_torch(config: Union[str, Path, Config, Dict],
backend: str = 'nccl',
seed: int = 1024,
verbose: bool = True):
"""A wrapper for colossalai.launch for torchrun or torch.distributed.launch by reading rank and world size
from the environment variables set by PyTorch
:param config: Config file or config file path are both acceptable
:type config: Union[str, dict, Config]
:param backend: Backend for torch.distributed
:type backend: str, optional
:param seed: Specified random seed for every processes
:type seed: int, optional
:param verbose: Whether to print logs
:type verbose: bool, optional
"""
rank = int(os.environ['RANK'])
local_rank = int(os.environ['LOCAL_RANK'])
world_size = int(os.environ['WORLD_SIZE'])
host = os.environ['MASTER_ADDR']
port = int(os.environ['MASTER_PORT'])
launch(config=config,
local_rank=local_rank,
rank=rank,
world_size=world_size,
host=host,
port=port,
backend=backend,
seed=seed,
verbose=verbose)
def initialize(model: Union[nn.Module, List[nn.Module]],
optimizer: Union[Optimizer, List[Optimizer]],
criterion: Union[_Loss, List[_Loss]],
train_dataloader: Optional[Union[Iterable, List[Iterable]]] = None,
test_dataloader: Optional[Union[Iterable, List[Iterable]]] = None,
lr_scheduler: _LRScheduler = None,
verbose: bool = True
) -> Tuple[Engine, DataLoader, DataLoader, _LRScheduler]:
"""Core function to wrap the essential training components with our functionality based on the config which is
loaded into gpc.config.
:param model: Your model instance
:type model: :class:`torch.nn.Module`
:param optimizer: Your optimizer instance
:type optimizer: :class:`torch.optim.optimizer.Optimizer`
:param criterion: Your criterion instance
:type criterion: :class:`torch.nn.modules.loss._Loss`
:param train_dataloader: Dataloader for training
:type train_dataloader: :class:`torch.utils.data.DataLoader`, optional
:param test_dataloader: Dataloader for testing
:type test_dataloader: :class:`torch.utils.data.DataLoader`, optional
:param lr_scheduler: Your lr scheduler instance
:type lr_scheduler: :class:`torch.nn.lr_scheduler._LRScheduler`, optional
:param verbose: Whether to print logs
:type verbose: bool, optional
:return: (engine, train_dataloader, test_dataloader, lr_scheduler)
:rtype: Tuple
"""
# get logger
logger = get_dist_logger()
gpc.verbose = verbose
# get config from gpc
config = gpc.config
# print config
if verbose:
logger.info(f"\n========== Your Config ========\n"
f"{pprint.pformat(gpc.config)}\n"
f"================================\n", ranks=[0])
# cudnn
cudnn_benchmark = config.get('cudnn_benchmark', True)
cudnn_deterministic = config.get('cudnn_deterministic', False)
torch.backends.cudnn.benchmark = cudnn_benchmark
torch.backends.cudnn.deterministic = cudnn_deterministic
if verbose:
logger.info(
f"cuDNN benchmark = {cudnn_benchmark}, deterministic = {cudnn_deterministic}", ranks=[0])
# first sync model across dp ranks
model.to(get_current_device())
use_zero3 = hasattr(gpc.config, 'zero') and gpc.config.zero.level == 3
if not moe_env.is_initialized() and not use_zero3:
if is_using_sequence():
sync_model_param(model, ParallelMode.SEQUENCE_DP)
elif is_using_ddp():
sync_model_param(model, ParallelMode.DATA)
else:
logger.warning(
"The parameters of models is not automatically synchronized.\n"
"Please make sure that all parameters are the same in data parallel group.",
ranks=[0])
# check amp and zero
fp16_cfg = gpc.config.get('fp16', None)
zero_cfg = gpc.config.get('zero', None)
if fp16_cfg is not None and fp16_cfg.mode is not None and zero_cfg is not None:
raise ConfigException(
"It is not allowed to set fp16 and zero configuration in your config file at the same time")
# clip grad norm
clip_grad_norm = gpc.config.get('clip_grad_norm', 0.0)
if clip_grad_norm > 0:
if zero_cfg is not None:
raise ConfigException(
"clip_grad_norm should be specified with zero, you should specify clip_grad in zero configuration")
# initialize amp
amp_mode = None
if fp16_cfg is not None and fp16_cfg.mode is not None:
cfg_ = fp16_cfg.copy()
amp_mode = cfg_.pop('mode')
if is_using_pp():
assert amp_mode == AMP_TYPE.NAIVE, 'Pipeline only support NaiveAMP currently'
if amp_mode == AMP_TYPE.NAIVE:
cfg_['clip_grad'] = clip_grad_norm
model, optimizer, criterion = convert_to_amp(model=model,
optimizer=optimizer,
criterion=criterion,
mode=amp_mode,
amp_config=cfg_)
if zero_cfg is not None:
cfg_ = zero_cfg.copy()
level = cfg_.pop('level')
model, optimizer = convert_to_zero(model=model,
optimizer=optimizer,
level=level,
zero_config=cfg_
)
# gradient handler
gradient_handler_cfg = gpc.config.get('gradient_handler', None)
if gradient_handler_cfg is None:
# if gradient handler is not specified in the configuration file,
# check in the following order
# 1. if optimizer is ZERO, then use zero grad handler
# 2. if dp size is larger than 1 and pipeline is not used, use pytorch ddp
# 3. if using pipeline and dp size larger than 1, use data parallel grad handler
if isinstance(optimizer, (ZeroRedundancyOptimizer_Level_2,
ZeroRedundancyOptimizer_Level_3)):
gradient_handler_cfg = [dict(type='ZeROGradientHandler')]
if verbose:
logger.info(
"Training with zero is detected, ZeROGradientHandler is automatically "
"added even though not specified in the configuration",
ranks=[0])
elif is_using_ddp() and moe_env.is_initialized():
gradient_handler_cfg = [dict(type='MoeGradientHandler')]
if verbose:
logger.info(
"Data parallel training is detected with moe parallel, MoeGradientHandler is automatically "
"added even though not specified in the configuration",
ranks=[0])
elif is_using_sequence():
model = DDP(model, process_group=gpc.get_group(ParallelMode.SEQUENCE_DP), device_ids=[torch.cuda.current_device()])
if verbose:
logger.info(
'Model is using torch.nn.parallel.DistributedDataParallel for Sequence Parallelism', ranks=[0])
elif is_using_ddp() and not is_using_pp() and amp_mode != AMP_TYPE.NAIVE:
model = DDP(model, process_group=gpc.get_group(ParallelMode.DATA), device_ids=[torch.cuda.current_device()])
if verbose:
logger.info(
'Model is using torch.nn.parallel.DistributedDataParallel for Data Parallelism', ranks=[0])
elif is_using_ddp():
gradient_handler_cfg = [dict(type='DataParallelGradientHandler')]
if verbose:
logger.info(
"Data parallel training is detected when using pipeline parallel, DataParallelGradientHandler is automatically "
"added even though not specified in the configuration",
ranks=[0])
# add pipeline parallel gradient handler, if pipeline shared module is detected
for param in model.parameters():
if getattr(param, 'pipeline_shared_module_pg', None) is not None:
if gradient_handler_cfg is None:
gradient_handler_cfg = [dict(type='PipelineSharedModuleGradientHandler')]
else:
gradient_handler_cfg.append(dict(type='PipelineSharedModuleGradientHandler'))
if verbose:
logger.info(
"pipeline_shared_module is detected, PipelineSharedModuleGradientHandler is automatically "
"added even though not specified in the configuration",
ranks=[0])
break
else:
if not isinstance(gradient_handler_cfg, list):
raise ConfigException(
f"expected gradient_handler in the configuration file to be a list but got {type(gradient_handler_cfg)}")
if gradient_handler_cfg is None:
gradient_handlers = None
if verbose and not isinstance(model, DDP):
logger.warning(
"No PyTorch DDP or gradient handler is set up, please make sure you do not need "
"to all-reduce the gradients after a training step.",
ranks=[0])
else:
gradient_handlers = [build_gradient_handler(cfg, model, optimizer) for cfg in gradient_handler_cfg]
# check if optimizer is ColossalaiOptimizer
if not isinstance(optimizer, (ColossalaiOptimizer, ZeroRedundancyOptimizer_Level_2, ZeroRedundancyOptimizer_Level_3)):
optimizer = ColossalaiOptimizer(optim=optimizer)
# gradient accumulation
grad_accum_size = gpc.config.get('gradient_accumulation', None)
if grad_accum_size is not None:
optimizer, train_dataloader, gradient_handlers, lr_scheduler = accumulate_gradient(model=model,
optimizer=optimizer,
dataloader=train_dataloader,
accumulate_size=grad_accum_size,
gradient_handlers=gradient_handlers,
lr_scheduler=lr_scheduler)
engine = Engine(
model=model,
optimizer=optimizer,
criterion=criterion,
gradient_handlers=gradient_handlers,
clip_grad_norm=clip_grad_norm
)
return engine, train_dataloader, test_dataloader, lr_scheduler
from .cuda_native import LayerNorm, FusedScaleMaskSoftmax, MultiHeadAttention
__all__ = [
"LayerNorm", "FusedScaleMaskSoftmax", "MultiHeadAttention"
]
from .layer_norm import MixedFusedLayerNorm as LayerNorm
from .scaled_softmax import FusedScaleMaskSoftmax
from .multihead_attention import MultiHeadAttention
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment