Commit 01a03aab authored by Kai Chen's avatar Kai Chen
Browse files

move parallel module to mmcv

parent 63396a32
from .anchor import * # noqa: F401, F403
from .bbox import * # noqa: F401, F403
from .mask import * # noqa: F401, F403
from .losses import * # noqa: F401, F403
from .eval import * # noqa: F401, F403
from .parallel import * # noqa: F401, F403
from .loss import * # noqa: F401, F403
from .evaluation import * # noqa: F401, F403
from .post_processing import * # noqa: F401, F403
from .utils import * # noqa: F401, F403
from .data_parallel import MMDataParallel
from .distributed import MMDistributedDataParallel
from .scatter_gather import scatter, scatter_kwargs
__all__ = [
'MMDataParallel', 'MMDistributedDataParallel', 'scatter', 'scatter_kwargs'
]
import torch
from torch.nn.parallel._functions import _get_stream
def scatter(input, devices, streams=None):
"""Scatters tensor across multiple GPUs.
"""
if streams is None:
streams = [None] * len(devices)
if isinstance(input, list):
chunk_size = (len(input) - 1) // len(devices) + 1
outputs = [
scatter(input[i], [devices[i // chunk_size]],
[streams[i // chunk_size]]) for i in range(len(input))
]
return outputs
elif isinstance(input, torch.Tensor):
output = input.contiguous()
# TODO: copy to a pinned buffer first (if copying from CPU)
stream = streams[0] if output.numel() > 0 else None
with torch.cuda.device(devices[0]), torch.cuda.stream(stream):
output = output.cuda(devices[0], non_blocking=True)
return output
else:
raise Exception('Unknown type {}.'.format(type(input)))
def synchronize_stream(output, devices, streams):
if isinstance(output, list):
chunk_size = len(output) // len(devices)
for i in range(len(devices)):
for j in range(chunk_size):
synchronize_stream(output[i * chunk_size + j], [devices[i]],
[streams[i]])
elif isinstance(output, torch.Tensor):
if output.numel() != 0:
with torch.cuda.device(devices[0]):
main_stream = torch.cuda.current_stream()
main_stream.wait_stream(streams[0])
output.record_stream(main_stream)
else:
raise Exception('Unknown type {}.'.format(type(output)))
def get_input_device(input):
if isinstance(input, list):
for item in input:
input_device = get_input_device(item)
if input_device != -1:
return input_device
return -1
elif isinstance(input, torch.Tensor):
return input.get_device() if input.is_cuda else -1
else:
raise Exception('Unknown type {}.'.format(type(input)))
class Scatter(object):
@staticmethod
def forward(target_gpus, input):
input_device = get_input_device(input)
streams = None
if input_device == -1:
# Perform CPU to GPU copies in a background stream
streams = [_get_stream(device) for device in target_gpus]
outputs = scatter(input, target_gpus, streams)
# Synchronize with the copy stream
if streams is not None:
synchronize_stream(outputs, target_gpus, streams)
return tuple(outputs)
from torch.nn.parallel import DataParallel
from .scatter_gather import scatter_kwargs
class MMDataParallel(DataParallel):
def scatter(self, inputs, kwargs, device_ids):
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
import torch
import torch.distributed as dist
import torch.nn as nn
from torch._utils import (_flatten_dense_tensors, _unflatten_dense_tensors,
_take_tensors)
from .scatter_gather import scatter_kwargs
class MMDistributedDataParallel(nn.Module):
def __init__(self, module, dim=0, broadcast_buffers=True):
super(MMDistributedDataParallel, self).__init__()
self.module = module
self.dim = dim
self.broadcast_buffers = broadcast_buffers
self.broadcast_bucket_size = 32 * 1024 * 1024
self._sync_params()
def _dist_broadcast_coalesced(self, tensors, buffer_size):
for tensors in _take_tensors(tensors, buffer_size):
flat_tensors = _flatten_dense_tensors(tensors)
dist.broadcast(flat_tensors, 0)
for tensor, synced in zip(
tensors, _unflatten_dense_tensors(flat_tensors, tensors)):
tensor.copy_(synced)
def _sync_params(self):
module_states = list(self.module.state_dict().values())
if len(module_states) > 0:
self._dist_broadcast_coalesced(module_states,
self.broadcast_bucket_size)
if self.broadcast_buffers:
buffers = [b.data for b in self.module._all_buffers()]
if len(buffers) > 0:
self._dist_broadcast_coalesced(buffers,
self.broadcast_bucket_size)
def scatter(self, inputs, kwargs, device_ids):
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
def forward(self, *inputs, **kwargs):
inputs, kwargs = self.scatter(inputs, kwargs,
[torch.cuda.current_device()])
return self.module(*inputs[0], **kwargs[0])
import torch
from torch.nn.parallel._functions import Scatter as OrigScatter
from ._functions import Scatter
from mmdet.datasets.utils import DataContainer
def scatter(inputs, target_gpus, dim=0):
"""Scatter inputs to target gpus.
The only difference from original :func:`scatter` is to add support for
:type:`~mmdet.DataContainer`.
"""
def scatter_map(obj):
if isinstance(obj, torch.Tensor):
return OrigScatter.apply(target_gpus, None, dim, obj)
if isinstance(obj, DataContainer):
if obj.cpu_only:
return obj.data
else:
return Scatter.forward(target_gpus, obj.data)
if isinstance(obj, tuple) and len(obj) > 0:
return list(zip(*map(scatter_map, obj)))
if isinstance(obj, list) and len(obj) > 0:
out = list(map(list, zip(*map(scatter_map, obj))))
return out
if isinstance(obj, dict) and len(obj) > 0:
out = list(map(type(obj), zip(*map(scatter_map, obj.items()))))
return out
return [obj for targets in target_gpus]
# After scatter_map is called, a scatter_map cell will exist. This cell
# has a reference to the actual function scatter_map, which has references
# to a closure that has a reference to the scatter_map cell (because the
# fn is recursive). To avoid this reference cycle, we set the function to
# None, clearing the cell
try:
return scatter_map(inputs)
finally:
scatter_map = None
def scatter_kwargs(inputs, kwargs, target_gpus, dim=0):
"""Scatter with support for kwargs dictionary"""
inputs = scatter(inputs, target_gpus, dim) if inputs else []
kwargs = scatter(kwargs, target_gpus, dim) if kwargs else []
if len(inputs) < len(kwargs):
inputs.extend([() for _ in range(len(kwargs) - len(inputs))])
elif len(kwargs) < len(inputs):
kwargs.extend([{} for _ in range(len(inputs) - len(kwargs))])
inputs = tuple(inputs)
kwargs = tuple(kwargs)
return inputs, kwargs
from .coco import CocoDataset
from .loader import (collate, GroupSampler, DistributedGroupSampler,
build_dataloader)
from .utils import DataContainer, to_tensor, random_scale, show_ann
from .loader import GroupSampler, DistributedGroupSampler, build_dataloader
from .utils import to_tensor, random_scale, show_ann
__all__ = [
'CocoDataset', 'collate', 'GroupSampler', 'DistributedGroupSampler',
'build_dataloader', 'DataContainer', 'to_tensor', 'random_scale',
'show_ann'
'CocoDataset', 'GroupSampler', 'DistributedGroupSampler',
'build_dataloader', 'to_tensor', 'random_scale', 'show_ann'
]
......@@ -2,13 +2,13 @@ import os.path as osp
import mmcv
import numpy as np
from mmcv.parallel import DataContainer as DC
from pycocotools.coco import COCO
from torch.utils.data import Dataset
from .transforms import (ImageTransform, BboxTransform, MaskTransform,
Numpy2Tensor)
from .utils import to_tensor, show_ann, random_scale
from .utils import DataContainer as DC
class CocoDataset(Dataset):
......
from .build_loader import build_dataloader
from .collate import collate
from .sampler import GroupSampler, DistributedGroupSampler
__all__ = [
'collate', 'GroupSampler', 'DistributedGroupSampler', 'build_dataloader'
'GroupSampler', 'DistributedGroupSampler', 'build_dataloader'
]
from functools import partial
from mmcv.runner import get_dist_info
from mmcv.parallel import collate
from torch.utils.data import DataLoader
from .collate import collate
from .sampler import GroupSampler, DistributedGroupSampler
# https://github.com/pytorch/pytorch/issues/973
import resource
rlimit = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (4096, rlimit[1]))
def build_dataloader(dataset,
imgs_per_gpu,
......
import collections
import torch
import torch.nn.functional as F
from torch.utils.data.dataloader import default_collate
from ..utils import DataContainer
# https://github.com/pytorch/pytorch/issues/973
import resource
rlimit = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (4096, rlimit[1]))
def collate(batch, samples_per_gpu=1):
"""Puts each data field into a tensor/DataContainer with outer dimension
batch size.
Extend default_collate to add support for :type:`~mmdet.DataContainer`.
There are 3 cases for data containers.
1. cpu_only = True, e.g., meta data
2. cpu_only = False, stack = True, e.g., images tensors
3. cpu_only = False, stack = False, e.g., gt bboxes
"""
if not isinstance(batch, collections.Sequence):
raise TypeError("{} is not supported.".format(batch.dtype))
if isinstance(batch[0], DataContainer):
assert len(batch) % samples_per_gpu == 0
stacked = []
if batch[0].cpu_only:
for i in range(0, len(batch), samples_per_gpu):
stacked.append(
[sample.data for sample in batch[i:i + samples_per_gpu]])
return DataContainer(
stacked, batch[0].stack, batch[0].padding_value, cpu_only=True)
elif batch[0].stack:
for i in range(0, len(batch), samples_per_gpu):
assert isinstance(batch[i].data, torch.Tensor)
# TODO: handle tensors other than 3d
assert batch[i].dim() == 3
c, h, w = batch[0].size()
for sample in batch[i:i + samples_per_gpu]:
assert c == sample.size(0)
h = max(h, sample.size(1))
w = max(w, sample.size(2))
padded_samples = [
F.pad(
sample.data,
(0, w - sample.size(2), 0, h - sample.size(1)),
value=sample.padding_value)
for sample in batch[i:i + samples_per_gpu]
]
stacked.append(default_collate(padded_samples))
else:
for i in range(0, len(batch), samples_per_gpu):
stacked.append(
[sample.data for sample in batch[i:i + samples_per_gpu]])
return DataContainer(stacked, batch[0].stack, batch[0].padding_value)
elif isinstance(batch[0], collections.Sequence):
transposed = zip(*batch)
return [collate(samples, samples_per_gpu) for samples in transposed]
elif isinstance(batch[0], collections.Mapping):
return {
key: collate([d[key] for d in batch], samples_per_gpu)
for key in batch[0]
}
else:
return default_collate(batch)
......@@ -5,7 +5,6 @@ import torch
import matplotlib.pyplot as plt
import numpy as np
import pycocotools.mask as maskUtils
def to_tensor(data):
......@@ -68,19 +67,3 @@ def show_ann(coco, img, ann_info):
plt.axis('off')
coco.showAnns(ann_info)
plt.show()
def draw_bbox_and_segm(img, results, dataset, score_thr=0.5):
bbox_results, segm_results = results
hi_bboxes = []
for cls_bboxes, cls_segms in zip(bbox_results, segm_results):
if len(cls_bboxes) == 0:
hi_bboxes.append(cls_bboxes)
continue
inds = np.where(cls_bboxes[:, -1] > score_thr)[0]
hi_bboxes.append(cls_bboxes[inds, :])
color_mask = np.random.random((1, 3))
for i in inds:
mask = maskUtils.decode(cls_segms[i]).astype(np.bool)
img[mask] = img[mask] * 0.5 + color_mask * 0.5
mmcv.draw_bboxes_with_label(np.ascontiguousarray(img), hi_bboxes, dataset)
from .data_container import DataContainer
from .misc import to_tensor, random_scale, show_ann
__all__ = ['DataContainer', 'to_tensor', 'random_scale', 'show_ann']
import functools
import torch
def assert_tensor_type(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if not isinstance(args[0].data, torch.Tensor):
raise AttributeError('{} has no attribute {} for type {}'.format(
args[0].__class__.__name__, func.__name__, args[0].datatype))
return func(*args, **kwargs)
return wrapper
class DataContainer(object):
def __init__(self, data, stack=False, padding_value=0, cpu_only=False):
self._data = data
self._cpu_only = cpu_only
self._stack = stack
self._padding_value = padding_value
def __repr__(self):
return '{}({})'.format(self.__class__.__name__, repr(self.data))
@property
def data(self):
return self._data
@property
def datatype(self):
if isinstance(self.data, torch.Tensor):
return self.data.type()
else:
return type(self.data)
@property
def cpu_only(self):
return self._cpu_only
@property
def stack(self):
return self._stack
@property
def padding_value(self):
return self._padding_value
@assert_tensor_type
def size(self, *args, **kwargs):
return self.data.size(*args, **kwargs)
@assert_tensor_type
def dim(self):
return self.data.dim()
......@@ -3,9 +3,10 @@ import argparse
import torch
import mmcv
from mmcv.runner import load_checkpoint, parallel_test, obj_from_dict
from mmcv.parallel import scatter, MMDataParallel
from mmdet import datasets
from mmdet.core import scatter, MMDataParallel, results2json, coco_eval
from mmdet.core import results2json, coco_eval
from mmdet.datasets import collate, build_dataloader
from mmdet.models import build_detector, detectors
......
......@@ -9,10 +9,10 @@ import numpy as np
import torch
from mmcv import Config
from mmcv.runner import Runner, obj_from_dict, DistSamplerSeedHook
from mmcv.parallel import MMDataParallel, MMDistributedDataParallel
from mmdet import datasets, __version__
from mmdet.core import (init_dist, DistOptimizerHook, MMDataParallel,
MMDistributedDataParallel, CocoDistEvalRecallHook,
from mmdet.core import (init_dist, DistOptimizerHook, CocoDistEvalRecallHook,
CocoDistEvalmAPHook)
from mmdet.datasets import build_dataloader
from mmdet.models import build_detector, RPN
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment