Unverified Commit 5221a388 authored by Hu Di's avatar Hu Di Committed by GitHub
Browse files

[Feature] Support MMCV on IPU (#1882)

* implement runner on IPU

* adjust import

* adjust import

* add ignore for ipu on without ipu

* remove compilation cache

* remove ipu from mmcv/runner.__all__

* adjust IS_IPU and IS_MLU

* adjust by isort

* add ipuHardwareIsAvailable

* remove engine_cache

* code review 9
parent 42e7e2ee
......@@ -45,7 +45,7 @@ jobs:
- name: Run unittests and generate coverage report
run: |
pip install -r requirements/test.txt
pytest tests/ --ignore=tests/test_runner --ignore=tests/test_optimizer.py --ignore=tests/test_cnn --ignore=tests/test_parallel.py --ignore=tests/test_ops --ignore=tests/test_load_model_zoo.py --ignore=tests/test_utils/test_logging.py --ignore=tests/test_image/test_io.py --ignore=tests/test_utils/test_registry.py --ignore=tests/test_utils/test_parrots_jit.py --ignore=tests/test_utils/test_trace.py --ignore=tests/test_utils/test_hub.py --ignore=tests/test_device/test_mlu/test_mlu_parallel.py
pytest tests/ --ignore=tests/test_runner --ignore=tests/test_device/test_ipu --ignore=tests/test_optimizer.py --ignore=tests/test_cnn --ignore=tests/test_parallel.py --ignore=tests/test_ops --ignore=tests/test_load_model_zoo.py --ignore=tests/test_utils/test_logging.py --ignore=tests/test_image/test_io.py --ignore=tests/test_utils/test_registry.py --ignore=tests/test_utils/test_parrots_jit.py --ignore=tests/test_utils/test_trace.py --ignore=tests/test_utils/test_hub.py --ignore=tests/test_device/test_mlu/test_mlu_parallel.py
build_without_ops:
runs-on: ubuntu-18.04
......
# Copyright (c) OpenMMLab. All rights reserved.
from . import mlu
from . import ipu, mlu
__all__ = ['mlu']
__all__ = ['mlu', 'ipu']
# Copyright (c) OpenMMLab. All rights reserved.
from mmcv.utils import IS_IPU_AVAILABLE
if IS_IPU_AVAILABLE:
from .dataloader import IPUDataLoader
from .hook_wrapper import IPUFp16OptimizerHook
from .model_wrapper import ipu_model_wrapper
from .runner import IPUBaseRunner, IPUEpochBasedRunner, IPUIterBasedRunner
from .utils import cfg2options
__all__ = [
'cfg2options', 'ipu_model_wrapper', 'IPUFp16OptimizerHook',
'IPUDataLoader', 'IPUBaseRunner', 'IPUEpochBasedRunner',
'IPUIterBasedRunner'
]
# Copyright (c) OpenMMLab. All rights reserved.
from collections.abc import Mapping, Sequence
from functools import partial
import poptorch
from torch.utils.data.dataloader import default_collate
from mmcv.parallel import DataContainer
def collate(batch, samples_per_gpu=1):
"""Put each data field into a tensor/DataContainer with outer dimension
batch size.
TODO support for
:type:`~mmcv.parallel.DataContainer`. Currently, it will be ignored.
There are 3 cases.
1. cpu_only = True, e.g., meta data.
2. cpu_only = False, stack = True, e.g., images tensors.
3. cpu_only = False, stack = False, e.g., gt bboxes.
"""
if not isinstance(batch, Sequence):
raise TypeError(
f'`batch` should be a sequence, but got {type(batch)}.')
if isinstance(batch[0], DataContainer):
# TODO `DataContainer` will be supported in the future.
raise TypeError('DataContainer is not supported in ipu data loader.')
elif isinstance(batch[0], Sequence):
transposed = zip(*batch)
collated_batch = []
for samples in transposed:
if not isinstance(samples[0], DataContainer):
# At present, we will skip the processing of datacontainer,
# which will reduce the performance of IPU DataLoder
collated_batch.append(collate(samples, samples_per_gpu))
return collated_batch
elif isinstance(batch[0], Mapping):
collated_batch = {}
for key in batch[0]:
if not isinstance(batch[0][key], DataContainer):
# At present, we will skip the processing of datacontainer,
# which will reduce the performance of IPU DataLoder
collated_batch[key] = collate([d[key] for d in batch])
return collated_batch
else:
return default_collate(batch)
class IPUDataLoader(poptorch.DataLoader):
"""Thin wrapper of `torch.utils.data.DataLoader`.
Compared with the pytorch DataLoder, this DataLoder changes the way of
calculation of batch size and adds the AsynchronousDataAccessor to
load and release data faster in cpu mode.
If this data loader is used in a distributed execution environment, it will
ensure that each process uses a different subset of the dataset, providing
you first call ``options.randomSeed(N)`` with an integer N which is the
same across all hosts.
Args:
dataset (torch.utils.data.Dataset): The dataset to get the data from.
options (poptorch.Options): Options that will be used to compile
and run the model.
batch_size (int, optional): This is the batch size in the conventional
sense of being the size that runs through an operation in the model
at any given time.
shuffle (bool, optional): set to ``True`` to have the data reshuffled
at every epoch (default: ``False``).
num_workers (int, optional): how many subprocesses to use for data
loading. ``0`` means that the data will be loaded in the main
process. (default: ``0``)
drop_last (bool, optional): If True and the number of elements in the
dataset is not a multiple of the combined batch size then the
incomplete batch at the end will be dropped.
persistent_workers (bool, optional): Re-use workers between
iterations if True.
auto_distributed_partitioning (bool, optional): If True, partitions the
dataset for distributed execution automatically. Otherwise, it is
assumed that partitioning has been handled manually.
mode (poptorch.DataLoaderMode, optional): If `DataLoaderMode.Async`,
uses an :py:class:`~poptorch.AsynchronousDataAccessor` to access
the dataset. If `DataLoaderMode.Sync`, accesses the dataset
synchronously.
async_options (Dict[str, Any], optional): Options to pass to
:py:class:`~poptorch.AsynchronousDataAccessor`.
rebatched_worker_size (int, optional): When using AsyncRebatched: batch
size of the tensors loaded by the workers.
Default to the combined batch size.
If specified the ``rebatched_worker_size`` must be less than
or equal to the combined batch size.
kwargs (Dict[str, Any], optional): Other options to pass to PyTorch's
``DataLoader`` constructor.
"""
def __init__(self,
dataset,
options,
batch_size=1,
shuffle=False,
num_workers=0,
drop_last=True,
persistent_workers=True,
auto_distributed_partitioning=True,
mode='sync',
async_options=None,
rebatched_worker_size=None,
**kwargs):
"""Lazy init:
In many frameworks, the dataloader will be constructed before the
initialization of the ipu options, so the lazy init method is used
here, and the real initialization will not be done until the dataloader
needs to be used and the options are input.
"""
# lazy init: sometimes, we cannot get IPU options when build data
# loader
self.kwargs = {
'dataset': dataset,
'batch_size': batch_size,
'shuffle': shuffle,
'num_workers': num_workers,
'drop_last': drop_last,
'persistent_workers': persistent_workers,
'auto_distributed_partitioning': auto_distributed_partitioning,
'mode': mode,
'collate_fn': partial(collate, samples_per_gpu=batch_size),
'async_options': async_options,
'rebatched_worker_size': rebatched_worker_size,
**kwargs
}
self.dataset = dataset
self.initialized = False
if options:
self.init(options=options)
def init(self, options, **kwargs):
if not self.initialized:
kwargs = {**self.kwargs, **kwargs, 'options': options}
if kwargs['mode'] == 'sync':
kwargs['mode'] = poptorch.DataLoaderMode.Sync
elif kwargs['mode'] == 'async':
kwargs['mode'] = poptorch.DataLoaderMode.AsyncRebatched
if kwargs['async_options'] is None:
kwargs['async_options'] = {
'load_indefinitely': True,
'buffer_size': 8
}
if kwargs['rebatched_worker_size'] is None:
kwargs['rebatched_worker_size'] = 128
super().__init__(**kwargs)
self.initialized = True
return self
# Copyright (c) OpenMMLab. All rights reserved.
import warnings
import numpy as np
import torch
from mmcv.parallel import DataContainer
# A customized None type for HierarchicalDataManager
HierarchicalDataNone = object()
class HierarchicalDataManager:
"""A class manage all the tensors in the hierarchical data.
At present, the input data structure accepted by IPU is limited,
when the input data structure of mmcv varies.
Here, an intermediate class is needed to get and update tensors
from the original data.
HierarchicalDataManager will record a hierarchical input/output data in
self._hierarchical_data. For example, we have an input data:
{'img': tensorA, 'label': tensorB, 'img_metas': [tensorC, tensorD]}
To enable IPU to use the input, HierarchicalDataManager will collect
the torch tensors from self._hierarchical_data into a tuple like:
(tensorA, tensorB, tensorC, tensorD).
Meanwhile, the return of IPU is a tuple of tensors, HierarchicalDataManager
also have a function named update_all_tensors to update tensors in
self._hierarchical_data which is the output for upper calls.
Args:
logger (:obj:`logging.Logger`): Logger used during running.
Defaults to None.
"""
def __init__(self, logger=None):
self.atomic_types = (int, str, float, np.ndarray, type(None))
self.warning = warnings.warn if logger is None else logger.warning
# enable or disable input data's shape and value check
self.quick_mode = False
self._hierarchical_data = None
def quick(self):
self.quick_mode = True
def compare_atomic_type(self, a, b):
"""Compare data, supported datatypes are numpy array and python basic
types."""
if isinstance(a, np.ndarray):
return np.all(a == b)
else:
return a == b
def record_hierarchical_data(self, data):
"""Record a hierarchical data."""
if self._hierarchical_data is not None:
if isinstance(data, torch.Tensor):
assert isinstance(self._hierarchical_data, torch.Tensor), \
'original hierarchical data is not torch.tensor'
self._hierarchical_data = data
else:
self.update_hierarchical_data(data)
else:
self._hierarchical_data = data
@property
def hierarchical_data(self):
return self._hierarchical_data
def update_hierarchical_data(self,
dataA,
dataB=HierarchicalDataNone,
strict=True,
address='data'):
"""Update dataB with dataA in-place.
Args:
dataA (list or dict or tuple): New hierarchical data.
dataB (list or dict or tuple): hierarchical data to update.
if not specified, self.hierarchical_data will be updated then.
strict (bool, optional): If true, an error will be reported
when the following conditions occur:
1. Non-torch.Tensor data changed.
2. Torch.Tensor data shape changed.
address (str): Record the address of current data to be updated.
Default: 'data'.
"""
if dataB is HierarchicalDataNone:
dataB = self.hierarchical_data
# Update with a da ta with the same structure
# but different values(tensors and basic python data types)
if isinstance(dataA, (tuple, list)):
for idx, node in enumerate(dataA):
new_address = ''
if not self.quick_mode:
new_address = address + f'[{str(idx)}]'
assert isinstance(node, type(dataB[idx])),\
f'data structure changed: {new_address}'
if isinstance(node, torch.Tensor):
dataB[idx] = node
else:
self.update_hierarchical_data(
node, dataB[idx], strict, address=new_address)
elif isinstance(dataA, dict):
for k, v in dataA.items():
new_address = ''
if not self.quick_mode:
new_address = address + f'[{str(k)}]'
assert isinstance(v, type(dataB[k])),\
f'data structure changed: {new_address}'
if isinstance(v, torch.Tensor):
dataB[k] = v
else:
self.update_hierarchical_data(
v, dataB[k], strict, address=new_address)
elif isinstance(dataA, self.atomic_types):
if not self.quick_mode:
is_equal = self.compare_atomic_type(dataA, dataB)
if not is_equal:
if strict:
raise ValueError(
'all data except torch.Tensor should be same, '
f'but data({address}) is changed.')
else:
self.warning(
f'find a non-torch.Tensor data({type(dataA)}) '
f'changed, and the address is {address}')
elif isinstance(dataA, DataContainer):
if not self.quick_mode:
assert isinstance(dataB, DataContainer)
new_address = address + '.data'
self.update_hierarchical_data(
dataA.data, dataB.data, False, address=new_address)
else:
raise NotImplementedError(
f'not supported datatype:{type(dataA)}, address is {address}')
def collect_all_tensors(self, hierarchical_data=None):
"""Collect torch.Tensor data from self.hierarchical_data to a list and
return."""
# get a list of tensor from self._hierarchical_data
if hierarchical_data is None:
hierarchical_data = self._hierarchical_data
tensors = []
if isinstance(hierarchical_data, torch.Tensor):
tensors = [hierarchical_data]
else:
self._collect_tensors(hierarchical_data, tensors)
return tensors
def _collect_tensors(self, data, tensors):
if isinstance(data, (tuple, list)):
for node in data:
if isinstance(node, torch.Tensor):
tensors.append(node)
else:
self._collect_tensors(node, tensors)
elif isinstance(data, dict):
for v in data.values():
if isinstance(v, torch.Tensor):
tensors.append(v)
else:
self._collect_tensors(v, tensors)
elif isinstance(data, self.atomic_types):
pass
elif isinstance(data, DataContainer):
self._collect_tensors(data.data, tensors)
else:
raise NotImplementedError(f'not supported datatype:{type(data)}')
def update_all_tensors(self, tensors):
"""Put tensors from tuple back to self.hierarchical_data."""
if isinstance(self._hierarchical_data, torch.Tensor):
print(tensors, len(tensors))
assert len(tensors) == 1
assert isinstance(tensors[0], torch.Tensor)
self._hierarchical_data = tensors[0]
else:
# convert to list if tensors is tuple
tensors = list(tensors)
self._set_tensors(self._hierarchical_data, tensors)
return self.hierarchical_data
def _set_tensors(self, data, tensors):
if isinstance(data, tuple):
data = list(data)
for idx in range(len(data)):
if isinstance(data[idx], torch.Tensor):
data[idx] = tensors.pop(0)
else:
self._set_tensors(data[idx], tensors)
data = tuple(data)
elif isinstance(data, list):
for idx in range(len(data)):
if isinstance(data[idx], torch.Tensor):
data[idx] = tensors.pop(0)
else:
self._set_tensors(data[idx], tensors)
elif isinstance(data, dict):
for k, v in data.items():
if isinstance(v, torch.Tensor):
data[k] = tensors.pop(0)
else:
self._set_tensors(v, tensors)
elif isinstance(data, self.atomic_types):
pass
elif isinstance(data, DataContainer):
self._set_tensors(data.data, tensors)
else:
raise NotImplementedError(f'not supported datatype:{type(data)}')
def clean_all_tensors(self):
"""Delete tensors from self.hierarchical_data."""
self._clean_tensors(self._hierarchical_data)
def _clean_tensors(self, data):
if isinstance(data, tuple):
data = list(data)
for idx in range(len(data)):
if isinstance(data[idx], torch.Tensor):
data[idx] = None
else:
self._clean_tensors(data[idx])
data = tuple(data)
elif isinstance(data, list):
for idx in range(len(data)):
if isinstance(data[idx], torch.Tensor):
data[idx] = None
else:
self._clean_tensors(data[idx])
elif isinstance(data, dict):
for k, v in data.items():
if isinstance(v, torch.Tensor):
data[k] = None
else:
self._clean_tensors(v)
elif isinstance(data, self.atomic_types):
pass
elif isinstance(data, DataContainer):
self._clean_tensors(data.data)
else:
raise NotImplementedError(f'not supported datatype:{type(data)}')
# Copyright (c) OpenMMLab. All rights reserved.
from mmcv.runner import HOOKS, LrUpdaterHook, OptimizerHook
from mmcv.utils import TORCH_VERSION, digit_version
def wrap_lr_updater_hook(lr_hook_class):
"""A wrapper function to wrap any subclass of LrUpdaterHook.
IPU needs extra operations to upload optimizer settings. This wrapper will
override function(_set_lr) of a subclass of LrUpdaterHook.
"""
assert issubclass(lr_hook_class, LrUpdaterHook)
class ipu_lr_hook_class(lr_hook_class):
def _set_lr(self, runner, *args, **kwargs):
super()._set_lr(runner, *args, **kwargs)
# convert torch optimizer to poptorch optimizer
runner.model.setOptimizer(runner.optimizer)
return ipu_lr_hook_class
def wrap_optimizer_hook(optimizer_hook_class):
"""A wrapper function to wrap OptimizerHook.
This is an non-intrusive implementation of wrapping optimizer hook (or you
need to change every config file to use IPU optimizer hook) IPU's clip-norm
implementation is different from pytorch, so there should be an error
raised when using clip-norm.
"""
class ipu_optimizer_hook_class(OptimizerHook):
def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.grad_clip is not None:
raise NotImplementedError('IPU does not support gradient clip')
return ipu_optimizer_hook_class
if (TORCH_VERSION != 'parrots'
and digit_version(TORCH_VERSION) >= digit_version('1.6.0')):
@HOOKS.register_module()
class IPUFp16OptimizerHook(OptimizerHook):
"""FP16 optimizer hook (using PyTorch's implementation).
If you are using PyTorch >= 1.6, torch.cuda.amp is used as the backend,
to take care of the optimization procedure.
Args:
loss_scale (float | str | dict): Scale factor configuration.
If loss_scale is a float, static loss scaling will be used with
the specified scale. If loss_scale is a string, it must be
'dynamic', then dynamic loss scaling will be used.
It can also be a dict containing arguments of GradScalar.
Defaults to 512. For Pytorch >= 1.6, mmcv uses official
implementation of GradScaler. If you use a dict version of
loss_scale to create GradScaler, please refer to:
https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler
for the parameters.
Examples:
>>> loss_scale = dict(
... init_scale=65536.0,
... growth_factor=2.0,
... backoff_factor=0.5,
... growth_interval=2000
... )
>>> optimizer_hook = Fp16OptimizerHook(loss_scale=loss_scale)
"""
def __init__(self,
grad_clip=None,
coalesce=True,
bucket_size_mb=-1,
loss_scale=512.,
distributed=True):
assert grad_clip is None,\
'IPU mode does not support `grad_clip` currently'
assert coalesce,\
'implemented all reduce in distributed training currently'
assert bucket_size_mb == -1,\
'`bucket_size_mb` should not be set in IPU mode'
self.distributed = distributed
self._scale_update_param = None
if loss_scale == 'dynamic':
raise NotImplementedError(
'IPU mode does not support dynamic loss scale currently')
elif isinstance(loss_scale, float):
self.loss_scale = loss_scale
elif isinstance(loss_scale, dict):
raise NotImplementedError(
'IPU mode supports single scale currently')
else:
raise ValueError(
f'loss_scale should be float, but got {loss_scale} ')
def after_train_iter(self, runner):
pass
else:
raise RuntimeError('The IPU mode only supports torch 1.6 and above')
# Copyright (c) OpenMMLab. All rights reserved.
import copy
import inspect
from collections import OrderedDict
from typing import Optional, Union
import poptorch
import torch
import torch.nn as nn
from poptorch import PoplarExecutor, __version__, identity_loss
from poptorch._args_parser import ArgsParser
from mmcv.runner import auto_fp16
from .hierarchical_data_manager import HierarchicalDataManager
from .utils import compare_ndarray, model_sharding, recomputation_checkpoint
class DictArgsParser(ArgsParser):
"""A helper class for handling model input.
Args:
inputs (list): Inputs of model.
"""
def __init__(self, inputs):
# Combine args and kwargs:
self._has_variadic_arguments = True
self._varnames = list(inputs.keys())
self._defaults = [inspect.Parameter.empty for _ in self._varnames]
self._warned_not_contiguous_input = False
class WrappedNet(nn.Module):
"""A net wrapper for model conversion.
This wrapper will make some changes and add some extra functions to
training/inference model.
Args:
model (:obj:`nn.Module`): The model to run.
inputs_manager (:obj:`HierarchicalDataManager`): A parser
converting inputs from tuple to dictionary.
outputs_manager (:obj:`HierarchicalDataManager`): A parser
converting outputs from dictionary to tuple.
inter_outputs_in_cpu (dict): Specify the features to be
recorded.
modules_to_record (mmcv.Config, list): Index or name of modules which
will be recorded for output. It is necessary to specify output for
static graph of model training or inference.
"""
def __init__(self,
model,
inputs_manager,
outputs_manager,
inter_outputs_in_cpu,
modules_to_record=None):
super().__init__()
self.model = model
self.inputs_manager = inputs_manager
self.outputs_manager = outputs_manager
self.training = model.training
# Register a hook function to capture the intermediate features
# generated by the network to align the outputs between ipu and cpu
# Used to confirm whether the implementation of CPU is consistent
# with the implementation of IPU
self.inter_outputs_in_cpu = inter_outputs_in_cpu
if modules_to_record is None:
modules_to_record = []
for idx, (name, module) in enumerate(model.named_modules()):
if name in modules_to_record or idx in modules_to_record:
features_hook = self.get_input_output_hook(
name, idx, self.inter_outputs_in_cpu)
module.register_forward_hook(hook=features_hook)
def get_input_output_hook(self, name, idx, save_dict):
def input_output_hook(module, fea_in, fea_out):
if isinstance(fea_in, tuple):
fea_in = list(fea_in)
if isinstance(fea_out, tuple):
fea_out = list(fea_out)
save_dict[name] = {
'fea_in': fea_in,
'fea_out': fea_out,
'idx': idx
}
return None
return input_output_hook
def forward(self, inputs_tuple):
"""This function is used to be compiled to ipu, the inputs and outputs
need to be tuples, so here we need to restore the input back to a
dictionary and convert the output to a tuple."""
self.inputs_manager.update_all_tensors(inputs_tuple)
kwargs = {**(self.inputs_manager.hierarchical_data)}
if self.training:
outputs = self.forward_train(kwargs)
# tell poptorch which loss will be used finally
identity_loss(outputs['loss'], reduction='none')
else:
outputs = self.forward_eval(kwargs)
if isinstance(outputs, torch.Tensor):
# currently not support single tensor output,
# need to wrap it with a dictionary,
# use a keyword to identify this case
outputs = {'output of WrappedNet: single tensor': outputs}
# if there are some features need to be record, add extra outputs
for name in self.inter_outputs_in_cpu:
outputs[name] = self.inter_outputs_in_cpu[name]
# record all the places of return tensors in the converting stage
# while in the real run stage, all the tensor are changed in-place
# that means the output can be obtained directly outside this function
self.outputs_manager.record_hierarchical_data(outputs)
plain_outputs = self.outputs_manager.collect_all_tensors()
return plain_outputs
def forward_train(self, kwargs):
optimizer = kwargs.pop('optimizer')
outputs = self.train_step(kwargs, optimizer)
return outputs
def train_step(self, data, optimizer=None, **kwargs):
"""The iteration step during training.
This method defines an iteration step during training, except for the
back propagation and optimizer updating, which are done in an optimizer
hook. Note that in some complicated cases or models, the whole process
including back propagation and optimizer updating are also defined in
this method, such as GAN.
Args:
data (dict): The output of dataloader.
optimizer (:obj:`torch.optim.Optimizer`, optional): The
optimizer of runner is passed to ``train_step()``. This
argument is unused and reserved.
Returns:
dict: Dict of outputs. The following fields are contained.
- loss (torch.Tensor): A tensor for back propagation, which \
can be a weighted sum of multiple losses.
- log_vars (dict): Dict contains all the variables to be sent \
to the logger.
- num_samples (int): Indicates the batch size (when the model \
is DDP, it means the batch size on each GPU), which is \
used for averaging the logs.
"""
losses = self.model(**data)
loss, log_vars = self._parse_losses(losses)
outputs = dict(
loss=loss, log_vars=log_vars, num_samples=len(data['img'].data))
return outputs
def _parse_losses(self, losses):
log_vars = OrderedDict()
for loss_name, loss_value in losses.items():
if isinstance(loss_value, torch.Tensor):
log_vars[loss_name] = loss_value.mean()
elif isinstance(loss_value, list):
log_vars[loss_name] = sum(loss.mean() for loss in loss_value)
elif isinstance(loss_value, dict):
for name, value in loss_value.items():
log_vars[name] = value
else:
raise TypeError(
f'{loss_name} is not a tensor or list of tensors')
loss = sum(value for key, value in log_vars.items() if 'loss' in key)
log_vars['loss'] = loss
return loss, log_vars
def forward_eval(self, kwargs):
img = kwargs.pop('img')
img_metas = kwargs.pop('img_metas', None)
return_loss = kwargs.pop('return_loss')
assert not return_loss
# TODO Temporarily hard-code to close post_process,
# otherwise, in the third trace(_check_trace),
# post_process will convert output tensor to numpy array automatically,
# resulting in _check_trace failure
outputs = self.model(
img,
img_metas=img_metas,
return_loss=return_loss,
post_process=False)
return outputs
class MMPoplarExecutor(PoplarExecutor):
"""An executor for inputs/outputs parsing, model compilation, data
alignment and IPU upload/download.
Args:
model (:obj:`nn.Module`): The model to be compiled.
logger (:obj:`logging.Logger`): Logger used during running.
Defaults to None.
training (bool): Model in training mode or eval mode.
modules_to_record (mmcv.Config, list): Index or name of modules which
will be recorded for output. It is necessary to specify output for
static graph of model training or inference.
args (argument list): Arguments passed to the `__init__`
method of PoplarExecutor.
kwargs (keyword arguments): Keyword arguments passed to the `__init__`
method of PoplarExecutor.
"""
def __init__(self,
model,
logger=None,
training=True,
modules_to_record=None,
*args,
**kwargs):
# self.model == self._user_model: input pytorch model
# self._model: wrapped model which is used to compile
# and update weights, these two models use same weights
# wrapped model only accept and output tuple, so
# HierarchicalDataManager will convert dictionary
# to tuple and convert them back
self.inputs_manager = HierarchicalDataManager(logger=logger)
self.outputs_manager = HierarchicalDataManager(logger=logger)
self.logger = logger
# the features calculated by CPU
self.inter_outputs_in_cpu = {}
# the features calculated by IPU
self.inter_outputs_in_ipu = {}
if modules_to_record is None:
# It is possible that the IPU implementation of some operators
# is inconsistent with the expected (CPU), here you can use
# this method to confirm whether there is a problem
self.compare_with_cpu = False
else:
self.compare_with_cpu = True
# move model.fp16_enabled to self.fp16_enabled,
# modify the position where the input is automatically casted to half
if getattr(model, 'fp16_enabled', False):
model.fp16_enabled = False
self.fp16_enabled = True
# make torch.jit.trace convert self._model
model = WrappedNet(
model,
self.inputs_manager,
self.outputs_manager,
self.inter_outputs_in_cpu,
modules_to_record=modules_to_record)
super().__init__(model, training=training, *args, **kwargs)
# overwrite self._args_parser in train_step or val_step
self._args_parser = None
if training:
assert self.training
else:
assert not self.training
@property
def training(self):
# If trying to get the attribute(training) of self,
# since the class has no training attribute,
# it will automatically look for the training attribute of self.model.
# However, the real attribute we want to check is self._training,
# self.model.training and self._training are often inconsistent.
# It is not clear whether it is a Poptorch bug or a special design,
# temporarily use this function to fix the problem
return self._training # comes from self.model._training
@auto_fp16(supported_types=(PoplarExecutor, ))
def run_model(self, data_dict):
# this function is used to parse input_dict
# and convert to output_dict
if self.isCompiled():
self.inputs_manager.record_hierarchical_data(data_dict)
inputs_tuple = tuple(self.inputs_manager.collect_all_tensors())
else:
# get tensors out of data and put them in a tuple
self.inputs_manager.record_hierarchical_data(data_dict)
inputs_tuple = tuple(self.inputs_manager.collect_all_tensors())
# turn logger in data manager off after compilation
self.inputs_manager.quick()
self.outputs_manager.quick()
# parser args in the first iter
if self._args_parser is None:
self._args_parser = DictArgsParser({'args': inputs_tuple})
# run or convert model
# the plain_outputs will be used in converting stage
plain_outputs = self(inputs_tuple)
self.inputs_manager.clean_all_tensors()
# put list of tensors back to the output dict
# according to the same order
self.outputs_manager.update_all_tensors(plain_outputs)
# get the real output dictionary from self.outputs_manager
output_dict = self.outputs_manager.hierarchical_data
# split output_dict into inter_outputs_in_ipu
# and output of the torch model
torch_model_output = {}
for name in output_dict:
if name in self.inter_outputs_in_cpu:
self.inter_outputs_in_ipu[name] = output_dict[name]
else:
torch_model_output[name] = output_dict[name]
if 'output of WrappedNet: single tensor' in output_dict:
assert len(torch_model_output) == 1
assert isinstance(
torch_model_output['output of WrappedNet: single tensor'],
torch.Tensor)
torch_model_output = \
torch_model_output['output of WrappedNet: single tensor']
return torch_model_output
def train_step(self, data, optimizer=None, **kwargs):
# arguments from mmcls/models/classifiers/base.py:
# BaseClassifier.train_step
assert self.training
assert len(kwargs) == 0 # TODO, support later if necessary
# TODO support datacontainer as input
# currently, auto_fp16 and HierarchicalDataManager take too much
# time on traversing datacontainer
data['img_metas'] = None
num_samples = len(data['img'].data)
# TODO we will ignore optimizer because it will not be used in model,
# support later if necessary
data['optimizer'] = None
output_dict = self.run_model(data)
# outputs contained loss, log_vars, num_samples,
# only loss(torch.tensor) has been updated
# remove all unchanged vars, left torch.tensor
neat_output_dict = {'loss': output_dict['loss']}
# re-parse outputs, get back log_vars and num_samples
loss, log_vars = self.model._parse_losses(neat_output_dict)
final_output_dict = dict(
loss=loss, log_vars=log_vars, num_samples=num_samples)
return final_output_dict
def eval_call(self, img, img_metas=None, return_loss=True, **kwargs):
# arguments from mmdet/models/detectors/base.py:BaseDetector.forward
# tmp usssage for eval mode
assert not self.training
assert len(kwargs) == 0 # TODO, support later if necessary
assert not return_loss
data = {'img': img, 'img_metas': img_metas, 'return_loss': return_loss}
output_dict = self.run_model(data)
return output_dict
def detachFromDevice(self):
if self.isCompiled() and self._is_attached:
super().detachFromDevice()
def attachToDevice(self):
if self.isCompiled() and not self._is_attached:
super().attachToDevice()
class TrainEvalModel:
"""A class maintaining training MMPoplarExecutor and inference
MMPoplarExecutor.
Args:
train_model (:obj:`nn.Module`): The training model to be compiled.
``train_model`` can be None if only executing validation.
eval_model (:obj:`nn.Module`): The inference model to be compiled.
options (mmcv.Config, dict): Options that will be used to compile
and run the model.
optimizer (:obj:`torch.optim.Optimizer`, optional): torch
optimizer, necessary if in training mode
logger (:obj:`logging.Logger`): Logger used during running.
Defaults to None.
modules_to_record (mmcv.Config, list): Index or name of modules which
will be recorded for output. It is necessary to specify output for
static graph of model training or inference.
"""
def __init__(self,
train_model,
eval_model,
options,
optimizer,
modules_to_record=None,
logger=None):
if train_model is None:
self._train_executor = None
self.training = False
else:
self._train_executor = get_training_model(
train_model,
options=options['training'],
optimizer=optimizer,
logger=logger,
modules_to_record=modules_to_record)
self.training = True
self._eval_executor = get_inference_model(
eval_model, options=options['inference'], logger=logger)
@property
def executor(self):
if self.training:
return self._train_executor
else:
return self._eval_executor
def train(self, mode: bool = True):
"""Sets the module in training mode.
This has any effect only on certain modules. See documentations of
particular modules for details of their behaviors in
training/evaluation mode, if they are affected,
e.g. :class:`Dropout`, :class:`BatchNorm`, etc.
Args:
mode (bool): whether to set training mode (``True``) or evaluation
mode (``False``). Default: ``True``.
Returns:
Module: self
"""
if not isinstance(mode, bool):
raise ValueError('training mode is expected to be boolean, '
f'but got {type(mode)}')
if self._train_executor is None and mode:
raise RuntimeError(
'The train_executor is not initialized.'
'If you want to initialize train_executor,'
'you need to input optimizer when converting pytorch model')
if mode == self.training:
self.model.train(mode)
return self
else:
if self.isCompiled():
# copy weights from IPU to cpu before off-load current session
self.copyWeightsToHost()
# detach the current session before change the mode,
# if is training mode and weights are updated,
# poptorch will copy weights from IPU to host
self.detachFromDevice()
self.training = mode # session will changed with mode changing
self.model.train(mode)
# after changing mode, attach the current new session,
# and this function will copy weights of model to device
self.attachToDevice()
return self
def eval(self):
"""Sets the module in evaluation mode.
This has any effect only on certain modules.
See documentations of particular modules
for details of their behaviors in training/evaluation mode,
if they are affected, e.g. :class:`Dropout`, :class:`BatchNorm`, etc.
This is equivalent with :meth:`self.train(False)
<nn.Module.train>`.
See :ref:`locally-disable-grad-doc` for a comparison between
`.eval()` and several similar mechanisms that may be confused with it.
Returns:
Module: self
"""
return self.train(False)
def compare_data_between_ipu_and_cpu(self, inter_outputs_in_cpu,
inter_outputs_in_ipu):
for key, val in inter_outputs_in_cpu.items():
is_tensor = isinstance(val['fea_in'], torch.Tensor)
fea_in_cpu = val['fea_in']
fea_in_cpu_list = [fea_in_cpu] if is_tensor else fea_in_cpu
fea_in_ipu = inter_outputs_in_ipu[key]['fea_in']
fea_in_ipu_list = [fea_in_ipu] if is_tensor else fea_in_ipu
is_tensor = isinstance(val['fea_out'], torch.Tensor)
fea_out_cpu = val['fea_out']
fea_out_cpu_list = [fea_out_cpu] if is_tensor else fea_out_cpu
fea_out_ipu = inter_outputs_in_ipu[key]['fea_out']
fea_out_ipu_list = [fea_out_ipu] if is_tensor else fea_out_ipu
print('comparing layer:', key)
for idx, (featA, featB) in \
enumerate(zip(fea_in_cpu_list, fea_in_ipu_list)):
print('fea_in, tensor ', idx)
compare_ndarray(featA.detach().numpy(), featB.detach().numpy())
for idx, (featA, featB) in \
enumerate(zip(fea_out_cpu_list, fea_out_ipu_list)):
print('fea_out, tensor', idx)
compare_ndarray(featA.detach().numpy(), featB.detach().numpy())
# TODO Unified training and eval interface,
# merge train_step(train) and __call__(eval) together
def train_step(self, data, optimizer=None, **kwargs):
assert self.training, 'not supported train_step on eval mode'
inter_outputs_in_cpu = {}
if (self._train_executor.isCompiled()
and self._train_executor.compare_with_cpu):
self.copyWeightsToHost()
# run in CPU mode
self._train_executor.model.train_step(data, optimizer, **kwargs)
inter_outputs_in_cpu = {
**(self._train_executor.inter_outputs_in_cpu)
}
# run in IPU mode
result = self._train_executor.train_step(data, optimizer, **kwargs)
if (self._train_executor.isCompiled()
and self._train_executor.compare_with_cpu
and len(inter_outputs_in_cpu) > 0):
self.compare_data_between_ipu_and_cpu(
inter_outputs_in_cpu,
self._train_executor.inter_outputs_in_ipu)
return result
# TODO Unified training and eval interface,
# merge train_step(train) and __call__(eval) together
def __call__(self, *args, **kwargs):
if self.training:
raise NotImplementedError('use train_step rather than __call__')
else:
return self._eval_executor.eval_call(*args, **kwargs)
def __getattr__(self, attr):
return getattr(self.executor, attr)
def get_training_model(model: nn.Module,
options: Optional[poptorch.Options] = None,
optimizer: Optional[torch.optim.Optimizer] = None,
logger=None,
modules_to_record=None) -> poptorch.PoplarExecutor:
"""Create a PopTorch training model from a PyTorch model, running on IPU
hardware in training mode.
Note:
PopTorch makes a shallow copy of the model. Changes to the
parameters in the returned training model affect the original model
and vice versa. However, primitive variable types are not synced: for
example calling ``model.train()`` on the original model, which
changes the ``training`` bool of the model instance, will not alter the
model returned by this function. You may need to call ``model.train()``
on your model before you call this function for correct behavior.
Args:
model (:obj:`nn.Module`): The model to run.
options (poptorch.Options): Options that will be used to compile
and run the model.
optimizer (:obj:`torch.optim.Optimizer`, optional): The optimizers
to apply during training.
logger (:obj:`logging.Logger`): Logger used during running.
Defaults to None.
modules_to_record (mmcv.Config, list): Index or name of modules which
will be recorded for output. It is necessary to specify output for
static graph of model training or inference.
Returns:
The :class:`poptorch.PoplarExecutor` wrapper to use in place
of ``model``.
"""
# Create a copy of the original model in case it needs to be wrapped
maybe_wrapped_model = copy.copy(model)
return MMPoplarExecutor(
model=maybe_wrapped_model,
logger=logger,
options=options,
training=True,
optimizer=optimizer,
user_model=model,
modules_to_record=modules_to_record,
poptorch_version=__version__)
def get_inference_model(model: Union[nn.Module, poptorch.PoplarExecutor],
options: Optional[poptorch.Options] = None,
logger=None) -> poptorch.PoplarExecutor:
"""Create a PopTorch inference model from a PyTorch model, running on IPU
hardware in inference mode.
Note:
PopTorch makes a shallow copy of the model. Changes to the
parameters in the returned inference model affect the original model
and vice versa. However, primitive variable types are not synced: for
example calling ``model.eval()`` on the original model will not alter
the model returned by this function. You may need to call
``model.eval()`` on your model before you call this function for
correct behavior.
Args:
model (:obj:`nn.Module`): The model to run.
options (poptorch.Options): Options that will be used to compile
and run the model.
logger (:obj:`logging.Logger`): Logger used during running.
Defaults to None.
Returns:
The :class:`poptorch.PoplarExecutor` wrapper to use in place of
``model``.
"""
return MMPoplarExecutor(
model=copy.copy(model),
logger=logger,
options=options,
training=False,
poptorch_version=__version__)
def ipu_model_wrapper(model,
options,
optimizer=None,
logger=None,
modules_to_record=None,
ipu_model_cfg=None,
fp16_cfg=None):
"""Convert torch model to IPU model.
Args:
model (nn.Module): The target model to be converted.
options (dict[str, poptorch.Options]): IPU options, generated
by :func:`cfg2options`.
optimizer (:obj:`torch.optim.Optimizer`, optional): torch
optimizer, necessary if in training mode
logger (:obj:`logging.Logger`): Logger used during training.
modules_to_record (mmcv.Config, list): Index or name of modules which
will be recorded for output. It is necessary to specify output for
static graph of model training or inference.
ipu_model_cfg (dict): A dictionary contains train_split_edges and
train_ckpt_nodes, See details in :func:`model_sharding` and
:func:`recomputation_checkpoint` functions.
fp16_cfg (dict): Config for IPU fp16 training. Currently supports
configs: `loss_scale`, `velocity_accum_type` and `accum_type`.
See details in
https://docs.graphcore.ai/projects/poptorch-user-guide/en/latest/index.html
Returns:
TrainEvalModel: IPU wrapped model.
"""
if ipu_model_cfg is None:
ipu_model_cfg = {}
training = model.training if optimizer is not None else False
# set mixed-precision
if fp16_cfg is not None:
from mmcv.runner import wrap_fp16_model
loss_scale = fp16_cfg['loss_scale']
wrap_fp16_model(model)
model.half()
# TODO tmp ussage to set loss scaling for torch original optimizer
if optimizer is not None:
optimizer.loss_scaling = loss_scale
if fp16_cfg.get('velocity_accum_type', False):
if fp16_cfg['velocity_accum_type'] == 'half':
optimizer.velocity_accum_type = torch.half
else:
optimizer.velocity_accum_type = torch.float32
if fp16_cfg.get('accum_type', False):
if fp16_cfg['accum_type'] == 'half':
optimizer.accum_type = torch.half
else:
optimizer.accum_type = torch.float32
# TODO support feature alignment for fp16
if modules_to_record is not None:
raise NotImplementedError(
'Feature alignment for fp16 is not implemented')
# set model partition
if optimizer is None:
train_model = None
else:
# split model into multi-IPUs if specified
train_model = model_sharding(
copy.copy(model).train(),
ipu_model_cfg.get('train_split_edges', []))
recomputation_checkpoint(train_model,
ipu_model_cfg.get('train_ckpt_nodes', []))
# TODO support feature alignment for gradient accumulation mode
gradient_accumulation = \
getattr(options['training'].Training, 'gradient_accumulation', 1)
if gradient_accumulation > 1:
assert modules_to_record is None, \
'Feature alignment for grad-accumulation mode not implemented'
# TODO support feature alignment for multi-replica mode
replication_factor = \
getattr(options['training'], 'replication_factor', 1)
if replication_factor > 1:
assert modules_to_record is None, \
'Feature alignment for multi-replica mode not implemented'
# TODO supports different model partitions between train and eval mode
assert len(ipu_model_cfg.get('eval_split_edges', [])) == 0,\
'Currently, BeginBlock can only be used once on the same model'
eval_model = copy.copy(model).eval()
# wrap model for compilation
model = TrainEvalModel(
train_model,
eval_model,
options=options,
optimizer=optimizer,
logger=logger,
modules_to_record=modules_to_record)
model.train(training)
return model
# Copyright (c) OpenMMLab. All rights reserved.
from mmcv.runner import (HOOKS, RUNNERS, BaseRunner, EpochBasedRunner,
IterBasedRunner)
from mmcv.utils import IS_IPU_AVAILABLE
if IS_IPU_AVAILABLE:
from .dataloader import IPUDataLoader
from .hook_wrapper import (IPUFp16OptimizerHook, wrap_lr_updater_hook,
wrap_optimizer_hook)
from .model_wrapper import ipu_model_wrapper
from .utils import build_from_cfg_with_wrapper, cfg2options
class IPUBaseRunner(BaseRunner):
"""A base runner for IPU.
This runner has some extra processes for IPU which are shown below:
1. Parse options for IPU
2. wrap pytorch model for IPU
3. Raise errors while encountering illegal usage
4. Input IPU options and initialize dataloader if finding an instance
of IPUDataLoader
Args:
model (:obj:`nn.Module`): The model to run.
options_cfg (mmcv.Config, dict): Options that will be used to compile
and run the model.
modules_to_record (mmcv.Config, list): Index or name of modules which
will be recorded for output. It is necessary to specify output for
static graph of model training or inference.
ipu_model_cfg (mmcv.Config, dict): Config of model partition and
recomputing checkpoint
fp16_cfg (mmcv.Config): Config for fp16 training.
batch_processor (callable): A callable method that process a data
batch. Should be None for IPU runner
kwargs (Dict[str, Any], optional): Keyword arguments will be passed to
``base_runner.BaseRunner``.
"""
def __init__(self,
model,
options_cfg=None,
modules_to_record=None,
ipu_model_cfg=None,
fp16_cfg=None,
batch_processor=None,
**kwargs):
assert hasattr(model, 'train_step') and batch_processor is None,\
'only support model with train_step'
if options_cfg is None:
options_cfg = {}
# call BaseRunner.__init__() here
super().__init__(model, **kwargs)
# process options of ipu
if IS_IPU_AVAILABLE:
self.options = cfg2options(options_cfg)
self.model = ipu_model_wrapper(
self.model,
self.options,
self.optimizer,
self.logger,
modules_to_record=modules_to_record,
ipu_model_cfg=ipu_model_cfg,
fp16_cfg=fp16_cfg)
else:
raise NotImplementedError('cpu mode on IPURunner is not supported')
def register_lr_hook(self, lr_config):
if lr_config is None:
return
assert isinstance(lr_config, dict)
assert 'policy' in lr_config
policy_type = lr_config.pop('policy')
# If the type of policy is all in lower case,
# e.g., 'cyclic', then its first letter will be capitalized,
# e.g., to be 'Cyclic'.
# This is for the convenient usage of Lr updater.
# Since this is not applicable for `
# CosineAnnealingLrUpdater`, the string will not be changed
# if it contains capital letters.
if policy_type == policy_type.lower():
policy_type = policy_type.title()
hook_type = policy_type + 'LrUpdaterHook'
lr_config['type'] = hook_type
hook = build_from_cfg_with_wrapper(lr_config, HOOKS,
wrap_lr_updater_hook)
self.register_hook(hook, priority='VERY_HIGH')
def register_optimizer_hook(self, optimizer_config):
if optimizer_config is None:
return
assert isinstance(optimizer_config, (dict, IPUFp16OptimizerHook))
if isinstance(optimizer_config, dict):
optimizer_config.setdefault('type', 'OptimizerHook')
hook = build_from_cfg_with_wrapper(optimizer_config, HOOKS,
wrap_optimizer_hook)
else:
hook = optimizer_config
self.register_hook(hook, priority='ABOVE_NORMAL')
def run(self, data_loaders, workflow, *args, **kwargs):
for i, flow in enumerate(workflow):
mode, _ = flow
# initialize IPU dataloader if not initialized
assert isinstance(data_loaders[i], IPUDataLoader),\
'IPU runner can only work with `IPUDataLoader`'
data_loaders[i].init(options=self.get_options(mode))
super().run(data_loaders, workflow, *args, **kwargs)
def get_options(self, mode):
if mode == 'train':
return self.options['training']
elif mode == 'val':
return self.options['inference']
else:
raise ValueError(f'mode should be train or val but got {mode}')
@RUNNERS.register_module()
class IPUEpochBasedRunner(IPUBaseRunner, EpochBasedRunner):
"""Epoch-based Runner for IPU.
The Inheritance order(MRO) is: IPUEpochBasedRunner -> IPUBaseRunner ->
EpochBasedRunner -> BaseRunner This runner train models epoch by epoch.
"""
pass
@RUNNERS.register_module()
class IPUIterBasedRunner(IPUBaseRunner, IterBasedRunner):
"""Iteration-based Runner for IPU.
The Inheritance order(MRO) is: IPUIterBasedRunner -> IPUBaseRunner ->
IterBasedRunner -> BaseRunner This runner train models iteration by
iteration.
"""
pass
# Copyright (c) OpenMMLab. All rights reserved.
import inspect
import numpy as np
import popart
import poptorch
import torch
import torch.nn as nn
from mmcv.utils import Registry
def _options_assigner(cfg, options_node):
# set popart.options by config
# cfg: dict, python data type
# options_node: python module or function
if isinstance(cfg, dict):
for key in cfg:
_options_assigner(cfg[key], getattr(options_node, key))
elif isinstance(cfg, (int, float, str, list)):
if callable(options_node):
options_node(cfg)
else:
error_msg = f'options_node type {type(options_node)} not supported'
raise NotImplementedError(error_msg)
else:
error_msg = f'cfg type {type(cfg)} not supported'
raise NotImplementedError(error_msg)
def cfg2options(cfg):
"""Parse dictionary to ipu options.
Args:
cfg (dict): A dictionary of ipu settings.
Returns:
dict[str, poptorch.Options]: Training options and inference options
of IPU.
"""
# set ipu options for inference and training by config
train_cfg = cfg.pop('train_cfg', {})
eval_cfg = cfg.pop('eval_cfg', {})
eval_cfg['replicationFactor'] = 1 # eval mode only use one replica
eval_cfg['executionStrategy'] = 'ShardedExecution'
# overwrite default ipu cfg with specified train cfgs
training_ipu_cfg = {**cfg, **train_cfg}
# overwrite default ipu cfg with specified eval cfgs
inference_ipu_cfg = {**cfg, **eval_cfg}
ipu_options = {
'training': _cast_to_options(training_ipu_cfg),
'inference': _cast_to_options(inference_ipu_cfg)
}
# TODO configure these codes
ipu_options['training']._Popart.set('disableGradAccumulationTensorStreams',
True)
ipu_options['training']._Popart.set(
'accumulateOuterFragmentSettings.schedule',
int(popart.AccumulateOuterFragmentSchedule.OverlapMemoryOptimized))
ipu_options['training'].Precision.enableStochasticRounding(True)
return ipu_options
def _cast_to_options(cfg):
# If it cannot be directly assigned, use if statement to parse it,
# and if it can be directly assigned, use _options_assigner to assign
options = poptorch.Options()
if 'availableMemoryProportion' in cfg:
available_memory_proportion = cfg.pop('availableMemoryProportion')
mem_props = {}
for i, mem_prop in enumerate(available_memory_proportion):
mem_props[f'IPU{i}'] = mem_prop
options.setAvailableMemoryProportion(mem_props)
if 'executionStrategy' in cfg:
execution_strategy = cfg.pop('executionStrategy')
if execution_strategy == 'SameAsIpu':
options.setExecutionStrategy(
poptorch.PipelinedExecution(
getattr(poptorch.AutoStage, execution_strategy)))
elif execution_strategy == 'ShardedExecution':
options.setExecutionStrategy(poptorch.ShardedExecution())
else:
raise NotImplementedError(
'executionStrategy should be "SameAsIpu" or "ShardedExecution"'
f', but got {execution_strategy}')
if 'partialsType' in cfg:
partials_type = cfg.pop('partialsType')
options.Precision.setPartialsType(getattr(
torch, partials_type)) # half or float
_options_assigner(cfg, options)
return options
def model_sharding(model, split_edges):
"""split models in-place into multi-IPUs.
Args:
model (nn.Module): The target model to be split.
split_edges (list of dict): Model layer names or layer numbers
of split edge. Each item of ``split_edges`` is a dictionary,
which may contain the following key-pairs:
- layer_to_call: PyTorch module to assign to the block
- user_id (optional): A user defined identifier for the block.
- ipu_id: The id of the IPU to run on.
Examples:
>>> split_edges = [
... dict(layer_to_call='model.conv1', ipu_id=0),
... dict(layer_to_call='model.conv3', ipu_id=1)]
>>> sharding_model = model_sharding(torch_model, split_edges)
Returns:
nn.Module: Split model.
"""
if len(split_edges) == 0:
return model
assert isinstance(split_edges, list)
spilt_edges_dict = {edge['layer_to_call']: edge for edge in split_edges}
for idx, (name, module) in enumerate(model.named_modules()):
if idx in spilt_edges_dict and name in spilt_edges_dict:
raise ValueError(
'The same layer is referenced twice while doing model'
f' partition: idx is {idx} and name is {name}')
edge = spilt_edges_dict.pop(name, None)
edge = spilt_edges_dict.pop(idx, edge)
if edge is not None:
poptorch.BeginBlock(module, edge.get('user_id', name),
edge['ipu_id'])
# ensure all split_edges are used
if len(spilt_edges_dict) > 0:
split_edge_names = list(spilt_edges_dict.keys())
raise RuntimeError(
f'split_edges: {split_edge_names} are not contained in the model')
return model
def recomputation_checkpoint(model: nn.Module, module_names: list):
"""Annotates the output of a module to be checkpointed instead of
recomputed.
If recomputation mode is enabled, ipu will release the activations of
the middle layers to save memory. During the backward of gradient,
the activation of the middle layer will be recalculated again.
This function is used to declare the activations of some intermediate
layers that need to be saved in order to skip the recomputation of
some layers.
Args:
model (nn.Module): The target model to apply recomputation
checkpoint.
module_names (list): Layer names of module.
"""
def recompute_outputs(module, inputs, outputs):
if isinstance(outputs, tuple):
return tuple(poptorch.recomputationCheckpoint(y) for y in outputs)
else:
return poptorch.recomputationCheckpoint(outputs)
for name, module in model.named_modules():
if name in module_names:
module.register_forward_hook(recompute_outputs)
module_names.remove(name)
# check all module_names are used
assert len(module_names) == 0,\
f'recomputed nodes: {module_names} are not contained in the model'
def compare_ndarray(featA, featB, rtol=1e-3, atol=1e-5):
"""Align data between two activations or weights."""
try:
np.testing.assert_allclose(featA, featB, rtol=rtol, atol=atol)
except AssertionError as e:
print(e)
def build_from_cfg_with_wrapper(cfg,
registry,
wrapper_func=None,
default_args=None):
"""Build a module from config dict and wrap module with "wrapper_func".
Args:
cfg (dict): Config dict. It should at least contain the key "type".
registry (:obj:`Registry`): The registry to search the type from.
default_args (dict, optional): Default initialization arguments.
wrapper_func (function): Used to wrap class
Returns:
object: The constructed object.
"""
if not isinstance(cfg, dict):
raise TypeError(f'cfg must be a dict, but got {type(cfg)}')
if 'type' not in cfg:
if default_args is None or 'type' not in default_args:
raise KeyError(
'`cfg` or `default_args` must contain the key "type", '
f'but got {cfg}\n{default_args}')
if not isinstance(registry, Registry):
raise TypeError('registry must be an mmcv.Registry object, '
f'but got {type(registry)}')
if not (isinstance(default_args, dict) or default_args is None):
raise TypeError('default_args must be a dict or None, '
f'but got {type(default_args)}')
args = cfg.copy()
if default_args is not None:
for name, value in default_args.items():
args.setdefault(name, value)
obj_type = args.pop('type')
if isinstance(obj_type, str):
obj_cls = registry.get(obj_type)
if obj_cls is None:
raise KeyError(
f'{obj_type} is not in the {registry.name} registry')
elif inspect.isclass(obj_type):
obj_cls = obj_type
else:
raise TypeError(
f'type must be a str or valid type, but got {type(obj_type)}')
if wrapper_func is None:
wrapped_obj_cls = obj_cls
else:
wrapped_obj_cls = wrapper_func(obj_cls)
try:
return wrapped_obj_cls(**args)
except Exception as e:
# Normal TypeError does not print class name.
raise type(e)(f'{wrapped_obj_cls.__name__}: {e}')
......@@ -40,6 +40,9 @@ from .optimizer import (OPTIMIZER_BUILDERS, OPTIMIZERS,
from .priority import Priority, get_priority
from .utils import get_host_info, get_time_str, obj_from_dict, set_random_seed
# initialize ipu to registor ipu runner to RUNNERS
from mmcv.device import ipu # isort:skip # noqa
__all__ = [
'BaseRunner', 'Runner', 'EpochBasedRunner', 'IterBasedRunner', 'LogBuffer',
'HOOKS', 'Hook', 'CheckpointHook', 'ClosureHook', 'LrUpdaterHook',
......
......@@ -63,7 +63,7 @@ def cast_tensor_type(inputs, src_type, dst_type):
return inputs
def auto_fp16(apply_to=None, out_fp32=False):
def auto_fp16(apply_to=None, out_fp32=False, supported_types=(nn.Module, )):
"""Decorator to enable fp16 training automatically.
This decorator is useful when you write custom modules and want to support
......@@ -76,7 +76,8 @@ def auto_fp16(apply_to=None, out_fp32=False):
apply_to (Iterable, optional): The argument names to be converted.
`None` indicates all arguments.
out_fp32 (bool): Whether to convert the output back to fp32.
supported_types (tuple): Classes can be decorated by ``auto_fp16``.
`New in version 1.5.0.`
Example:
>>> import torch.nn as nn
......@@ -102,9 +103,9 @@ def auto_fp16(apply_to=None, out_fp32=False):
def new_func(*args, **kwargs):
# check if the module has set the attribute `fp16_enabled`, if not,
# just fallback to the original method.
if not isinstance(args[0], torch.nn.Module):
if not isinstance(args[0], supported_types):
raise TypeError('@auto_fp16 can only be used to decorate the '
'method of nn.Module')
f'method of those classes {supported_types}')
if not (hasattr(args[0], 'fp16_enabled') and args[0].fp16_enabled):
return old_func(*args, **kwargs)
......
......@@ -36,7 +36,7 @@ except ImportError:
'is_method_overridden', 'has_method'
]
else:
from .device_type import IS_MLU_AVAILABLE
from .device_type import IS_IPU_AVAILABLE, IS_MLU_AVAILABLE
from .env import collect_env
from .hub import load_url
from .logging import get_logger, print_log
......@@ -74,5 +74,5 @@ else:
'assert_params_all_zeros', 'check_python_script',
'is_method_overridden', 'is_jit_tracing', 'is_rocm_pytorch',
'_get_cuda_home', 'load_url', 'has_method', 'IS_CUDA_AVAILABLE',
'worker_init_fn', 'IS_MLU_AVAILABLE'
'worker_init_fn', 'IS_MLU_AVAILABLE', 'IS_IPU_AVAILABLE'
]
# Copyright (c) OpenMMLab. All rights reserved.
def is_ipu_available():
try:
import poptorch
return poptorch.ipuHardwareIsAvailable()
except ImportError:
return False
IS_IPU_AVAILABLE = is_ipu_available()
def is_mlu_available():
try:
import torch
......
# Copyright (c) OpenMMLab. All rights reserved.
import logging
import numpy as np
import pytest
import torch
from mmcv.parallel.data_container import DataContainer
from mmcv.utils import IS_IPU_AVAILABLE
if IS_IPU_AVAILABLE:
from mmcv.device.ipu.hierarchical_data_manager import \
HierarchicalDataManager
skip_no_ipu = pytest.mark.skipif(
not IS_IPU_AVAILABLE, reason='test case under ipu environment')
@skip_no_ipu
def test_HierarchicalData():
# test hierarchical data
hierarchical_data_sample = {
'a': torch.rand(3, 4),
'b': np.random.rand(3, 4),
'c': DataContainer({
'a': torch.rand(3, 4),
'b': 4,
'c': 'd'
}),
'd': 123,
'e': [1, 3, torch.rand(3, 4),
np.random.rand(3, 4)],
'f': {
'a': torch.rand(3, 4),
'b': np.random.rand(3, 4),
'c': [1, 'asd']
}
}
all_tensors = []
all_tensors.append(hierarchical_data_sample['a'])
all_tensors.append(hierarchical_data_sample['c'].data['a'])
all_tensors.append(hierarchical_data_sample['e'][2])
all_tensors.append(hierarchical_data_sample['f']['a'])
all_tensors_id = [id(ele) for ele in all_tensors]
hd = HierarchicalDataManager(logging.getLogger())
hd.record_hierarchical_data(hierarchical_data_sample)
tensors = hd.collect_all_tensors()
for t in tensors:
assert id(t) in all_tensors_id
tensors[0].add_(1)
hd.update_all_tensors(tensors)
data = hd.hierarchical_data
data['c'].data['a'].sub_(1)
hd.record_hierarchical_data(data)
tensors = hd.collect_all_tensors()
for t in tensors:
assert id(t) in all_tensors_id
hd.quick()
with pytest.raises(
AssertionError,
match='original hierarchical data is not torch.tensor'):
hd.record_hierarchical_data(torch.rand(3, 4))
class AuxClass:
pass
with pytest.raises(NotImplementedError, match='not supported datatype:'):
hd.record_hierarchical_data(AuxClass())
with pytest.raises(NotImplementedError, match='not supported datatype:'):
hierarchical_data_sample['a'] = AuxClass()
hd.update_all_tensors(tensors)
with pytest.raises(NotImplementedError, match='not supported datatype:'):
hierarchical_data_sample['a'] = AuxClass()
hd.collect_all_tensors()
with pytest.raises(NotImplementedError, match='not supported datatype:'):
hierarchical_data_sample['a'] = AuxClass()
hd.clean_all_tensors()
hd = HierarchicalDataManager(logging.getLogger())
hd.record_hierarchical_data(hierarchical_data_sample)
hierarchical_data_sample['a'] = torch.rand(3, 4)
with pytest.raises(ValueError, match='all data except torch.Tensor'):
new_hierarchical_data_sample = {
**hierarchical_data_sample, 'b': np.random.rand(3, 4)
}
hd.update_hierarchical_data(new_hierarchical_data_sample)
hd.update_hierarchical_data(new_hierarchical_data_sample, strict=False)
hd.clean_all_tensors()
# test single tensor
single_tensor = torch.rand(3, 4)
hd = HierarchicalDataManager(logging.getLogger())
hd.record_hierarchical_data(single_tensor)
tensors = hd.collect_all_tensors()
assert len(tensors) == 1 and single_tensor in tensors
single_tensor_to_update = [torch.rand(3, 4)]
hd.update_all_tensors(single_tensor_to_update)
new_tensors = hd.collect_all_tensors()
assert new_tensors == single_tensor_to_update
# Copyright (c) OpenMMLab. All rights reserved.
import numpy as np
import pytest
import torch
from torch.utils.data import Dataset
from mmcv.parallel.data_container import DataContainer
from mmcv.utils import IS_IPU_AVAILABLE
if IS_IPU_AVAILABLE:
from mmcv.device.ipu import IPUDataLoader, cfg2options
from mmcv.device.ipu.dataloader import collate
skip_no_ipu = pytest.mark.skipif(
not IS_IPU_AVAILABLE, reason='test case under ipu environment')
class ToyDataset(Dataset):
def __getitem__(self, index):
return 111
def __len__(self, ):
return 3
@skip_no_ipu
def test_ipu_dataloader():
# test lazy initialization
dataloader = IPUDataLoader(
ToyDataset(), None, batch_size=256, num_workers=1, mode='async')
options_cfg = {'train_cfg': {}, 'eval_cfg': {}}
ipu_options = cfg2options(options_cfg)
dataloader.init(ipu_options['training'])
# test normal initialization
options_cfg = {'train_cfg': {}, 'eval_cfg': {}}
ipu_options = cfg2options(options_cfg)['training']
dataloader = IPUDataLoader(
ToyDataset(), ipu_options, batch_size=256, num_workers=1, mode='async')
@skip_no_ipu
def test_ipu_collate():
with pytest.raises(TypeError, match='`batch` should be a sequence'):
collate(123)
with pytest.raises(TypeError, match='DataContainer is not supported'):
collate([DataContainer(666)])
data_list = [[1, 2, 3], [2, 3, 4], DataContainer(666)]
batch0 = {
'tensor': torch.rand(3, 4, 5),
'arr': np.random.rand(3, 4, 5, 6),
'data_list': data_list
}
batch1 = {
'tensor': torch.rand(3, 4, 5),
'arr': np.random.rand(3, 4, 5, 6),
'data_list': data_list
}
batch = [batch1, batch0]
results = collate(batch)
assert results['tensor'].shape == (2, 3, 4, 5)
assert results['arr'].shape == (2, 3, 4, 5, 6)
for data in results['data_list']:
for tensor in data:
assert not isinstance(tensor, DataContainer)
assert tensor.shape == (2, )
# Copyright (c) OpenMMLab. All rights reserved.
import logging
import os.path as osp
import pytest
import torch
import torch.nn as nn
from mmcv.runner import build_runner
from mmcv.runner.fp16_utils import auto_fp16
from mmcv.utils import IS_IPU_AVAILABLE
if IS_IPU_AVAILABLE:
from mmcv.device.ipu.hook_wrapper import IPUFp16OptimizerHook
skip_no_ipu = pytest.mark.skipif(
not IS_IPU_AVAILABLE, reason='test case under ipu environment')
# TODO Once the model training and inference interfaces
# of MMCLS and MMDET are unified,
# construct the model according to the unified standards
class ToyModel(nn.Module):
def __init__(self):
super().__init__()
self.conv = nn.Conv2d(3, 3, 1)
self.bn = nn.BatchNorm2d(3)
self.relu = nn.ReLU6()
self.fp16_enabled = False
@auto_fp16(apply_to=('img', ))
def forward(self, img, return_loss=True, **kwargs):
x = self.conv(img)
x = self.bn(x)
x = self.relu(x)
if return_loss:
loss = ((x - kwargs['gt_label'])**2).sum()
return {
'loss': loss,
'loss_list': [loss, loss],
'loss_dict': {
'loss1': loss
}
}
return x
def _parse_losses(self, losses):
return losses['loss'], losses['loss']
def train_step(self, data, optimizer=None, **kwargs):
losses = self(**data)
loss, log_vars = self._parse_losses(losses)
outputs = dict(
loss=loss, log_vars=log_vars, num_samples=len(data['img'].data))
return outputs
@skip_no_ipu
def test_ipu_hook_wrapper(tmp_path):
model = ToyModel()
dummy_input = {
'data': {
'img': torch.rand((16, 3, 10, 10)),
'gt_label': torch.rand((16, 3, 10, 10))
}
}
dir_name = 'a_tmp_dir'
working_dir = osp.join(tmp_path, dir_name)
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
default_args = dict(
model=model,
work_dir=working_dir,
optimizer=optimizer,
logger=logging.getLogger())
cfg = dict(type='IPUEpochBasedRunner', max_epochs=1)
dummy_runner = build_runner(cfg, default_args=default_args)
# learning policy
lr_config = dict(policy='step', step=[1, 150])
# test optimizer config
optimizer_config = dict(
grad_clip=dict(max_norm=2), detect_anomalous_params=True)
# test building ipu_lr_hook_class
dummy_runner.register_training_hooks(
lr_config=lr_config, optimizer_config=None, timer_config=None)
# test _set_lr()
output = dummy_runner.model.train_step(**dummy_input)
dummy_runner.outputs = output
dummy_runner.call_hook('before_train_epoch')
# test building ipu_optimizer_hook_class
with pytest.raises(
NotImplementedError, match='IPU does not support gradient clip'):
dummy_runner.register_training_hooks(
lr_config=None,
optimizer_config=optimizer_config,
timer_config=None)
# test fp16 optimizer hook
lr_config = dict(policy='step', step=[1, 150])
optimizer_config = dict(grad_clip=dict(max_norm=2))
dummy_runner.hooks.pop(0)
with pytest.raises(NotImplementedError, match='IPU mode does not support'):
optimizer_config = IPUFp16OptimizerHook(
loss_scale='dynamic', distributed=False)
with pytest.raises(NotImplementedError, match='IPU mode supports single'):
optimizer_config = IPUFp16OptimizerHook(
loss_scale={}, distributed=False)
with pytest.raises(ValueError, match='loss_scale should be float'):
optimizer_config = IPUFp16OptimizerHook(
loss_scale=[], distributed=False)
optimizer_config = IPUFp16OptimizerHook(loss_scale=2.0, distributed=False)
dummy_runner.register_training_hooks(
lr_config=lr_config,
optimizer_config=optimizer_config,
timer_config=None)
dummy_runner.call_hook('after_train_iter')
# Copyright (c) OpenMMLab. All rights reserved.
import logging
import numpy as np
import pytest
import torch
import torch.nn as nn
from mmcv.runner.fp16_utils import auto_fp16
from mmcv.utils import IS_IPU_AVAILABLE
if IS_IPU_AVAILABLE:
from mmcv.device.ipu import cfg2options, ipu_model_wrapper
from mmcv.device.ipu.utils import compare_ndarray
skip_no_ipu = pytest.mark.skipif(
not IS_IPU_AVAILABLE, reason='test case under ipu environment')
class MyBN(nn.BatchNorm2d):
def forward(self, *args, **kwargs):
result = super().forward(*args, **kwargs)
return result, self.running_mean
# TODO Once the model training and inference interfaces
# of MMCLS and MMDET are unified,
# construct the model according to the unified standards
class ToyModel(nn.Module):
def __init__(self):
super().__init__()
self.conv = nn.Conv2d(3, 3, 1)
self.bn = MyBN(3)
self.relu = nn.ReLU6()
self.fp16_enabled = False
@auto_fp16(apply_to=('img', ))
def forward(self, img, return_loss=True, **kwargs):
x = self.conv(img)
x, running_mean = self.bn(x)
x = self.relu(x)
if return_loss:
loss = ((x - kwargs['gt_label'])**2).sum()
return {
'loss': loss,
'loss_list': [loss, loss],
'loss_dict': {
'loss1': loss
}
}
return x
def _parse_losses(self, losses):
return losses['loss'], losses['loss']
def train_step(self, data, optimizer=None, **kwargs):
losses = self(**data)
loss, log_vars = self._parse_losses(losses)
outputs = dict(
loss=loss, log_vars=log_vars, num_samples=len(data['img'].data))
return outputs
@skip_no_ipu
def test_build_model():
for execution_strategy in \
['SameAsIpu', 'ShardedExecution', 'error_strategy']:
if execution_strategy == 'error_strategy':
def maybe_catch_error(_error):
return pytest.raises(_error)
else:
class NullContextManager:
def __enter__(self, ):
pass
def __exit__(self, exc_type, exc_value, exc_traceback):
pass
def maybe_catch_error(_error):
return NullContextManager()
with maybe_catch_error(NotImplementedError):
options_cfg = dict(
randomSeed=888,
enableExecutableCaching='cache_engine',
train_cfg=dict(
executionStrategy=execution_strategy,
Training=dict(gradientAccumulation=8),
availableMemoryProportion=[0.3, 0.3, 0.3, 0.3]),
eval_cfg=dict(deviceIterations=1, ),
partialsType='half')
ipu_options = cfg2options(options_cfg)
model = ToyModel()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
logger = logging.getLogger()
modules_to_record = None
ipu_model_cfg = dict(
train_split_edges=[dict(layer_to_call='conv', ipu_id=0)],
train_ckpt_nodes=['bn', 'conv'])
fp16_cfg = {'loss_scale': 0.5}
ipu_model = ipu_model_wrapper(
model,
ipu_options,
optimizer,
logger,
modules_to_record=modules_to_record,
ipu_model_cfg=ipu_model_cfg,
fp16_cfg=fp16_cfg)
ipu_model.train()
ipu_model.eval()
ipu_model.train()
def run_model(ipu_options,
fp16_cfg,
modules_to_record,
ipu_model_wrapper_func,
only_eval=False):
model = ToyModel()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)\
if not only_eval else None
logger = logging.getLogger()
ipu_model_cfg = dict(
train_split_edges=[dict(layer_to_call='conv', ipu_id=0)],
train_ckpt_nodes=['bn', 'conv'])
ipu_model = ipu_model_wrapper_func(
model,
ipu_options,
optimizer,
logger,
modules_to_record=modules_to_record,
ipu_model_cfg=ipu_model_cfg,
fp16_cfg=fp16_cfg)
def get_dummy_input(training):
if training:
return {
'data': {
'img': torch.rand((16, 3, 10, 10)),
'gt_label': torch.rand((16, 3, 10, 10))
}
}
else:
return {
'img': torch.rand((16, 3, 10, 10)),
'img_metas': {
'img': torch.rand((16, 3, 10, 10))
},
'return_loss': False
}
if not only_eval:
training = True
ipu_model.train()
for _ in range(3):
dummy_input = get_dummy_input(training)
output = ipu_model.train_step(**dummy_input)
training = False
ipu_model.eval()
for _ in range(3):
dummy_input = get_dummy_input(training)
output = ipu_model(**dummy_input)
return output, ipu_model
@skip_no_ipu
def test_run_model():
# test feature alignment not support gradientAccumulation mode
options_cfg = dict(
randomSeed=888,
enableExecutableCaching='cache_engine',
train_cfg=dict(
executionStrategy='SameAsIpu',
Training=dict(gradientAccumulation=8),
availableMemoryProportion=[0.3, 0.3, 0.3, 0.3],
),
eval_cfg=dict(deviceIterations=1, ),
partialsType='half')
ipu_options = cfg2options(options_cfg)
modules_to_record = ['bn']
with pytest.raises(AssertionError, match='Feature alignment'):
run_model(ipu_options, None, modules_to_record, ipu_model_wrapper)
# test feature alignment not support multi-replica mode
options_cfg = dict(
randomSeed=888,
replicationFactor=2,
enableExecutableCaching='cache_engine',
train_cfg=dict(
executionStrategy='SameAsIpu',
availableMemoryProportion=[0.3, 0.3, 0.3, 0.3],
),
eval_cfg=dict(deviceIterations=1, ),
partialsType='half')
ipu_options = cfg2options(options_cfg)
modules_to_record = ['bn']
with pytest.raises(AssertionError, match='Feature alignment'):
run_model(ipu_options, None, modules_to_record, ipu_model_wrapper)
# test feature alignment not support fp16 mode
options_cfg = dict(
randomSeed=888,
enableExecutableCaching='cache_engine',
train_cfg=dict(
executionStrategy='SameAsIpu',
availableMemoryProportion=[0.3, 0.3, 0.3, 0.3],
),
eval_cfg=dict(deviceIterations=1, ),
partialsType='half')
ipu_options = cfg2options(options_cfg)
fp16_cfg = {
'loss_scale': 0.5,
'velocity_accum_type': 'half',
'accum_type': 'half'
}
modules_to_record = ['bn']
with pytest.raises(NotImplementedError):
run_model(ipu_options, fp16_cfg, modules_to_record, ipu_model_wrapper)
# test velocity_accum_type and accum_type
fp16_cfg = {
'loss_scale': 0.5,
'velocity_accum_type': 'float',
'accum_type': 'float'
}
run_model(ipu_options, fp16_cfg, None, ipu_model_wrapper)
# test compile and run
options_cfg = dict(
randomSeed=888,
enableExecutableCaching='cache_engine',
train_cfg=dict(
executionStrategy='SameAsIpu',
availableMemoryProportion=[0.3, 0.3, 0.3, 0.3],
),
eval_cfg=dict(deviceIterations=1, ),
partialsType='half')
ipu_options = cfg2options(options_cfg)
modules_to_record = ['bn']
run_model(ipu_options, None, modules_to_record, ipu_model_wrapper)
# test feature alignment
options_cfg = dict(
randomSeed=888,
enableExecutableCaching='cache_engine',
train_cfg=dict(
executionStrategy='SameAsIpu',
availableMemoryProportion=[0.3, 0.3, 0.3, 0.3],
),
eval_cfg=dict(deviceIterations=1, ))
ipu_options = cfg2options(options_cfg)
modules_to_record = None
run_model(ipu_options, None, modules_to_record, ipu_model_wrapper)
# test inference mode
options_cfg = dict(
randomSeed=888,
enableExecutableCaching='cache_engine',
train_cfg=dict(
executionStrategy='SameAsIpu',
availableMemoryProportion=[0.3, 0.3, 0.3, 0.3],
),
eval_cfg=dict(deviceIterations=1, ),
partialsType='half')
ipu_options = cfg2options(options_cfg)
fp16_cfg = {'loss_scale': 0.5}
modules_to_record = None
_, ipu_model = run_model(
ipu_options,
fp16_cfg,
modules_to_record,
ipu_model_wrapper,
only_eval=True)
with pytest.raises(RuntimeError):
ipu_model.train()
with pytest.raises(ValueError):
ipu_model.train(123)
_, ipu_model = run_model(ipu_options, None, modules_to_record,
ipu_model_wrapper)
# test NotImplementedError in __call__
ipu_model.train()
with pytest.raises(NotImplementedError):
ipu_model()
# test parse_losses
with pytest.raises(TypeError):
ipu_model._model.model._parse_losses({'loss': None})
@skip_no_ipu
def test_compare_tensor():
compare_ndarray(np.random.rand(3, 4), np.random.rand(3, 4))
# Copyright (c) OpenMMLab. All rights reserved.
import logging
import os.path as osp
import pytest
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from mmcv.runner import build_runner
from mmcv.utils import IS_IPU_AVAILABLE
if IS_IPU_AVAILABLE:
from mmcv.device.ipu import IPUDataLoader, runner
skip_no_ipu = pytest.mark.skipif(
not IS_IPU_AVAILABLE, reason='test case under ipu environment')
# Most of its functions are inherited from EpochBasedRunner and IterBasedRunner
# So only do incremental testing on overridden methods
# Comparing with base runner,
# Overridden functions are listed below:
# __init__, register_lr_hook, register_optimizer_hook
# register_lr_hook and register_optimizer_hook are tested in test_runner.py
class OldStyleModel(nn.Module):
def __init__(self):
super().__init__()
self.conv = nn.Conv2d(3, 3, 1)
class Model(OldStyleModel):
def train_step(self):
pass
def val_step(self):
pass
class ToyModel(nn.Module):
def __init__(self):
super().__init__()
self.conv = nn.Conv2d(3, 3, 1)
self.bn = nn.BatchNorm2d(3)
self.relu = nn.ReLU6()
self.fp16_enabled = False
def forward(self, img, return_loss=True, **kwargs):
x = self.conv(img)
x = self.bn(x)
x = self.relu(x)
if return_loss:
loss = ((x - kwargs['gt_label'])**2).sum()
return {'loss': loss, 'loss1': loss + 1}
return x
def _parse_losses(self, losses):
return losses['loss'], {'loss1': losses['loss']}
def train_step(self, data, optimizer=None, **kwargs):
losses = self(**data)
loss, log_vars = self._parse_losses(losses)
outputs = dict(
loss=loss, log_vars=log_vars, num_samples=len(data['img'].data))
return outputs
class ToyDataset(Dataset):
def __getitem__(self, index):
return {
'img': torch.rand((3, 10, 10)),
'gt_label': torch.rand((3, 10, 10))
}
def __len__(self, ):
return 3
@skip_no_ipu
def test_build_runner(tmp_path):
# __init__
dir_name = 'a_tmp_dir'
default_args = dict(
model=Model(),
work_dir=osp.join(tmp_path, dir_name),
logger=logging.getLogger())
cfg = dict(type='IPUEpochBasedRunner', max_epochs=1)
ipu_runner = build_runner(cfg, default_args=default_args)
assert ipu_runner._max_epochs == 1
cfg = dict(type='IPUIterBasedRunner', max_iters=1)
ipu_runner = build_runner(cfg, default_args=default_args)
assert ipu_runner._max_iters == 1
runner.IS_IPU_AVAILABLE = False
cfg = dict(type='IPUIterBasedRunner', max_iters=1)
with pytest.raises(
NotImplementedError,
match='cpu mode on IPURunner is not supported'):
ipu_runner = build_runner(cfg, default_args=default_args)
runner.IS_IPU_AVAILABLE = True
with pytest.raises(ValueError, match='Only one of'):
cfg = dict(type='IPUIterBasedRunner', max_epochs=1, max_iters=1)
ipu_runner = build_runner(cfg, default_args=default_args)
model = ToyModel()
options_cfg = {'train_cfg': {}, 'eval_cfg': {}}
dataloader = IPUDataLoader(ToyDataset(), None, num_workers=1)
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
cfg = dict(type='IPUIterBasedRunner', max_iters=2, options_cfg=options_cfg)
default_args = dict(
model=model,
optimizer=optimizer,
work_dir=osp.join(tmp_path, dir_name),
logger=logging.getLogger())
ipu_runner = build_runner(cfg, default_args=default_args)
ipu_runner.run([dataloader], [('train', 2)])
ipu_runner.get_options('val')
with pytest.raises(ValueError, match='mode should be train or val'):
ipu_runner.get_options('666')
# Copyright (c) OpenMMLab. All rights reserved.
import copy
import pytest
import torch.nn as nn
import mmcv
from mmcv.utils import IS_IPU_AVAILABLE
if IS_IPU_AVAILABLE:
from poptorch.options import _IExecutionStrategy
from mmcv.device.ipu import cfg2options
from mmcv.device.ipu.utils import (build_from_cfg_with_wrapper,
model_sharding)
skip_no_ipu = pytest.mark.skipif(
not IS_IPU_AVAILABLE, reason='test case under ipu environment')
class ToyModel(nn.Module):
def __init__(self):
super().__init__()
self.conv = nn.Conv2d(3, 3, 1)
self.bn = nn.BatchNorm2d(3)
self.relu = nn.ReLU6()
@skip_no_ipu
def test_build_from_cfg():
BACKBONES = mmcv.Registry('backbone')
@BACKBONES.register_module()
class ResNet:
def __init__(self, depth, stages=4):
self.depth = depth
self.stages = stages
@BACKBONES.register_module()
class ResNeXt:
def __init__(self, depth, stages=4):
self.depth = depth
self.stages = stages
cfg = dict(type='ResNet', depth=50)
model = build_from_cfg_with_wrapper(cfg, BACKBONES)
assert isinstance(model, ResNet)
assert model.depth == 50 and model.stages == 4
cfg = dict(type='ResNet', depth=50)
model = build_from_cfg_with_wrapper(
cfg, BACKBONES, default_args={'stages': 3})
assert isinstance(model, ResNet)
assert model.depth == 50 and model.stages == 3
cfg = dict(type='ResNeXt', depth=50, stages=3)
model = build_from_cfg_with_wrapper(cfg, BACKBONES)
assert isinstance(model, ResNeXt)
assert model.depth == 50 and model.stages == 3
cfg = dict(type=ResNet, depth=50)
model = build_from_cfg_with_wrapper(cfg, BACKBONES)
assert isinstance(model, ResNet)
assert model.depth == 50 and model.stages == 4
# type defined using default_args
cfg = dict(depth=50)
model = build_from_cfg_with_wrapper(
cfg, BACKBONES, default_args=dict(type='ResNet'))
assert isinstance(model, ResNet)
assert model.depth == 50 and model.stages == 4
cfg = dict(depth=50)
model = build_from_cfg_with_wrapper(
cfg, BACKBONES, default_args=dict(type=ResNet))
assert isinstance(model, ResNet)
assert model.depth == 50 and model.stages == 4
# not a registry
with pytest.raises(TypeError):
cfg = dict(type='VGG')
model = build_from_cfg_with_wrapper(cfg, 'BACKBONES')
# non-registered class
with pytest.raises(KeyError):
cfg = dict(type='VGG')
model = build_from_cfg_with_wrapper(cfg, BACKBONES)
# default_args must be a dict or None
with pytest.raises(TypeError):
cfg = dict(type='ResNet', depth=50)
model = build_from_cfg_with_wrapper(cfg, BACKBONES, default_args=1)
# cfg['type'] should be a str or class
with pytest.raises(TypeError):
cfg = dict(type=1000)
model = build_from_cfg_with_wrapper(cfg, BACKBONES)
# cfg should contain the key "type"
with pytest.raises(KeyError, match='must contain the key "type"'):
cfg = dict(depth=50, stages=4)
model = build_from_cfg_with_wrapper(cfg, BACKBONES)
# cfg or default_args should contain the key "type"
with pytest.raises(KeyError, match='must contain the key "type"'):
cfg = dict(depth=50)
model = build_from_cfg_with_wrapper(
cfg, BACKBONES, default_args=dict(stages=4))
# incorrect registry type
with pytest.raises(TypeError):
cfg = dict(type='ResNet', depth=50)
model = build_from_cfg_with_wrapper(cfg, 'BACKBONES')
# incorrect default_args type
with pytest.raises(TypeError):
cfg = dict(type='ResNet', depth=50)
model = build_from_cfg_with_wrapper(cfg, BACKBONES, default_args=0)
# incorrect arguments
with pytest.raises(TypeError):
cfg = dict(type='ResNet', non_existing_arg=50)
model = build_from_cfg_with_wrapper(cfg, BACKBONES)
# cfg not dict
with pytest.raises(TypeError):
cfg = []
model = build_from_cfg_with_wrapper(cfg, BACKBONES)
@skip_no_ipu
def test_cast_to_options():
options_cfg = dict(
randomSeed=888,
enableExecutableCaching='cache_engine',
train_cfg=dict(
executionStrategy='SameAsIpu',
Training=dict(gradientAccumulation=8),
availableMemoryProportion=[0.3, 0.3, 0.3, 0.3],
),
eval_cfg=dict(deviceIterations=1, ),
)
ipu_options = cfg2options(copy.deepcopy(options_cfg))
assert 'training' in ipu_options
assert 'inference' in ipu_options
assert ipu_options['training']._values['random_seed'] == 888
assert ipu_options['training']._values['replication_factor'] == 1
assert ipu_options['training']._values['available_memory_proportion'] == {
0: 0.3,
1: 0.3,
2: 0.3,
3: 0.3
}
assert ipu_options['training']._popart.options[
'cachePath'] == 'cache_engine'
assert isinstance(ipu_options['training']._execution_strategy,
_IExecutionStrategy)
assert ipu_options['inference']._values['device_iterations'] == 1
with pytest.raises(NotImplementedError, match='cfg type'):
_options_cfg = copy.deepcopy(options_cfg)
_options_cfg['randomSeed'] = (1, 3)
cfg2options(_options_cfg)
with pytest.raises(NotImplementedError, match='options_node type'):
_options_cfg = copy.deepcopy(options_cfg)
_options_cfg['train_cfg']['Precision'] = {'autocast_policy': 123}
cfg2options(_options_cfg)
@skip_no_ipu
def test_model_sharding():
model = ToyModel()
split_edges = [dict(layer_to_call='666', ipu_id=0)]
with pytest.raises(RuntimeError, match='split_edges:'):
model_sharding(model, split_edges)
model = ToyModel()
split_edges = [
dict(layer_to_call='conv', ipu_id=0),
dict(layer_to_call=1, ipu_id=0)
]
with pytest.raises(ValueError, match='The same layer is referenced'):
model_sharding(model, split_edges)
model = ToyModel()
split_edges = [dict(layer_to_call='conv', ipu_id=0)]
model_sharding(model, split_edges)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment