"tests/vscode:/vscode.git/clone" did not exist on "a49c6ddc9c1e584241d98b34efadf82a53c573b8"
parallel.py 6.92 KB
Newer Older
Hang Zhang's avatar
docs  
Hang Zhang committed
1
2
3
4
5
6
7
##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
## Created by: Hang Zhang
## ECE Department, Rutgers University
## Email: zhang.hang@rutgers.edu
## Copyright (c) 2017
##
## This source code is licensed under the MIT-style license found in the
Hang Zhang's avatar
sync BN  
Hang Zhang committed
8
## LICENSE file in the root directory of this source tree
Hang Zhang's avatar
docs  
Hang Zhang committed
9
10
##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Hang Zhang's avatar
sync BN  
Hang Zhang committed
11
"""Encoding Data Parallel"""
Hang Zhang's avatar
docs  
Hang Zhang committed
12
13
import threading
import torch
Zhang's avatar
Zhang committed
14
from torch.autograd import Variable, Function
Hang Zhang's avatar
Hang Zhang committed
15
16
17
import torch.cuda.comm as comm
from torch.nn.parallel.data_parallel import DataParallel
from torch.nn.parallel.parallel_apply import get_a_var
Hang Zhang's avatar
sync BN  
Hang Zhang committed
18
from torch.nn.parallel._functions import ReduceAddCoalesced, Broadcast
Hang Zhang's avatar
docs  
Hang Zhang committed
19

Hang Zhang's avatar
pylint  
Hang Zhang committed
20
__all__ = ['allreduce', 'DataParallelModel', 'DataParallelCriterion']
Hang Zhang's avatar
v1.0.1  
Hang Zhang committed
21

Zhang's avatar
Zhang committed
22
torch_ver = torch.__version__[:3]
Hang Zhang's avatar
v1.0.1  
Hang Zhang committed
23

Zhang's avatar
Zhang committed
24
def allreduce(*inputs):
Hang Zhang's avatar
sync BN  
Hang Zhang committed
25
26
    """Cross GPU all reduce autograd operation for calculate mean and
    variance in SyncBN.
Hang Zhang's avatar
v1.0.1  
Hang Zhang committed
27
    """
Zhang's avatar
Zhang committed
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
    return AllReduce.apply(*inputs)

class AllReduce(Function):
    @staticmethod
    def forward(ctx, num_inputs, *inputs):
        ctx.num_inputs = num_inputs
        ctx.target_gpus = [inputs[i].get_device() for i in range(0, len(inputs), num_inputs)]
        inputs = [inputs[i:i + num_inputs]
                 for i in range(0, len(inputs), num_inputs)]
        # sort before reduce sum
        inputs = sorted(inputs, key=lambda i: i[0].get_device())
        results = comm.reduce_add_coalesced(inputs, ctx.target_gpus[0])
        outputs = comm.broadcast_coalesced(results, ctx.target_gpus)
        return tuple([t for tensors in outputs for t in tensors])

    @staticmethod
    def backward(ctx, *inputs):
        inputs = [i.data for i in inputs]
        inputs = [inputs[i:i + ctx.num_inputs]
                 for i in range(0, len(inputs), ctx.num_inputs)]
        results = comm.reduce_add_coalesced(inputs, ctx.target_gpus[0])
        outputs = comm.broadcast_coalesced(results, ctx.target_gpus)
        return (None,) + tuple([Variable(t) for tensors in outputs for t in tensors])
Hang Zhang's avatar
v1.0.1  
Hang Zhang committed
51

Hang Zhang's avatar
docs  
Hang Zhang committed
52

Hang Zhang's avatar
Hang Zhang committed
53
54
55
56
class Reduce(Function):
    @staticmethod
    def forward(ctx, *inputs):
        ctx.target_gpus = [inputs[i].get_device() for i in range(len(inputs))]
Zhang's avatar
Zhang committed
57
        inputs = sorted(inputs, key=lambda i: i.get_device())
Hang Zhang's avatar
Hang Zhang committed
58
59
60
61
62
63
64
65
        return comm.reduce_add(inputs)

    @staticmethod
    def backward(ctx, gradOutput):
        return Broadcast.apply(ctx.target_gpus, gradOutput)


class DataParallelModel(DataParallel):
Hang Zhang's avatar
docs  
Hang Zhang committed
66
67
68
    """Implements data parallelism at the module level.

    This container parallelizes the application of the given module by
Hang Zhang's avatar
sync BN  
Hang Zhang committed
69
70
    splitting the input across the specified devices by chunking in the
    batch dimension.
Hang Zhang's avatar
docs  
Hang Zhang committed
71
    In the forward pass, the module is replicated on each device,
Hang Zhang's avatar
sync BN  
Hang Zhang committed
72
73
    and each replica handles a portion of the input. During the backwards pass, gradients from each replica are summed into the original module.
    Note that the outputs are not gathered, please use compatible
Hang Zhang's avatar
Hang Zhang committed
74
    :class:`encoding.parallel.DataParallelCriterion`.
Hang Zhang's avatar
docs  
Hang Zhang committed
75
76
77
78
79
80
81
82
83

    The batch size should be larger than the number of GPUs used. It should
    also be an integer multiple of the number of GPUs so that each chunk is
    the same size (so that each GPU processes the same number of samples).

    Args:
        module: module to be parallelized
        device_ids: CUDA devices (default: all devices)

Hang Zhang's avatar
sync BN  
Hang Zhang committed
84
85
86
87
88
    Reference:
        Hang Zhang, Kristin Dana, Jianping Shi, Zhongyue Zhang, Xiaogang Wang, Ambrish Tyagi,
        Amit Agrawal. “Context Encoding for Semantic Segmentation.
        *The IEEE Conference on Computer Vision and Pattern Recognition (CVPR) 2018*

Hang Zhang's avatar
docs  
Hang Zhang committed
89
90
    Example::

Hang Zhang's avatar
Hang Zhang committed
91
        >>> net = encoding.nn.DataParallelModel(model, device_ids=[0, 1, 2])
Hang Zhang's avatar
sync BN  
Hang Zhang committed
92
        >>> y = net(x)
Hang Zhang's avatar
docs  
Hang Zhang committed
93
    """
Hang Zhang's avatar
Hang Zhang committed
94
    def gather(self, outputs, output_device):
Hang Zhang's avatar
sync BN  
Hang Zhang committed
95
        return outputs
Hang Zhang's avatar
docs  
Hang Zhang committed
96

Hang Zhang's avatar
sync BN  
Hang Zhang committed
97

Hang Zhang's avatar
Hang Zhang committed
98
class DataParallelCriterion(DataParallel):
Hang Zhang's avatar
docs  
Hang Zhang committed
99
    """
Hang Zhang's avatar
sync BN  
Hang Zhang committed
100
    Calculate loss in multiple-GPUs, which balance the memory usage for
Hang Zhang's avatar
docs  
Hang Zhang committed
101
102
103
    Semantic Segmentation.

    The targets are splitted across the specified devices by chunking in
Hang Zhang's avatar
Hang Zhang committed
104
    the batch dimension. Please use together with :class:`encoding.parallel.DataParallelModel`.
Hang Zhang's avatar
sync BN  
Hang Zhang committed
105
106
107
108
109
110
111
112

    Reference:
        Hang Zhang, Kristin Dana, Jianping Shi, Zhongyue Zhang, Xiaogang Wang, Ambrish Tyagi,
        Amit Agrawal. “Context Encoding for Semantic Segmentation.
        *The IEEE Conference on Computer Vision and Pattern Recognition (CVPR) 2018*

    Example::

Hang Zhang's avatar
Hang Zhang committed
113
114
        >>> net = encoding.nn.DataParallelModel(model, device_ids=[0, 1, 2])
        >>> criterion = encoding.nn.DataParallelCriterion(criterion, device_ids=[0, 1, 2])
Hang Zhang's avatar
sync BN  
Hang Zhang committed
115
116
        >>> y = net(x)
        >>> loss = criterion(y, target)
Hang Zhang's avatar
docs  
Hang Zhang committed
117
118
119
120
    """
    def forward(self, inputs, *targets, **kwargs):
        # input should be already scatterd
        # scattering the targets instead
Hang Zhang's avatar
Hang Zhang committed
121
122
        if not self.device_ids:
            return self.module(inputs, *targets, **kwargs)
Zhang's avatar
Zhang committed
123
        targets, kwargs = self.scatter(targets, kwargs, self.device_ids)
Hang Zhang's avatar
docs  
Hang Zhang committed
124
125
        if len(self.device_ids) == 1:
            return self.module(inputs, *targets[0], **kwargs[0])
Zhang's avatar
Zhang committed
126
        replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
Hang Zhang's avatar
Hang Zhang committed
127
128
        outputs = _criterion_parallel_apply(replicas, inputs, targets, kwargs)
        return Reduce.apply(*outputs) / len(outputs)
Zhang's avatar
Zhang committed
129
        #return self.gather(outputs, self.output_device).mean()
Hang Zhang's avatar
docs  
Hang Zhang committed
130
131


Hang Zhang's avatar
Hang Zhang committed
132
def _criterion_parallel_apply(modules, inputs, targets, kwargs_tup=None, devices=None):
Hang Zhang's avatar
v1.0.1  
Hang Zhang committed
133
134
135
136
137
138
    assert len(modules) == len(inputs)
    assert len(targets) == len(inputs)
    if kwargs_tup:
        assert len(modules) == len(kwargs_tup)
    else:
        kwargs_tup = ({},) * len(modules)
Hang Zhang's avatar
Hang Zhang committed
139
140
141
142
    if devices is not None:
        assert len(modules) == len(devices)
    else:
        devices = [None] * len(modules)
Hang Zhang's avatar
v1.0.1  
Hang Zhang committed
143
144
145

    lock = threading.Lock()
    results = {}
Zhang's avatar
Zhang committed
146
147
    if torch_ver != "0.3":
        grad_enabled = torch.is_grad_enabled()
Hang Zhang's avatar
v1.0.1  
Hang Zhang committed
148

Hang Zhang's avatar
Hang Zhang committed
149
    def _worker(i, module, input, target, kwargs, device=None):
Zhang's avatar
Zhang committed
150
151
        if torch_ver != "0.3":
            torch.set_grad_enabled(grad_enabled)
Hang Zhang's avatar
Hang Zhang committed
152
153
        if device is None:
            device = get_a_var(input).get_device()
Hang Zhang's avatar
v1.0.1  
Hang Zhang committed
154
        try:
Zhang's avatar
Zhang committed
155
            with torch.cuda.device(device):
Hang Zhang's avatar
Hang Zhang committed
156
                output = module(*(input + target), **kwargs)
Hang Zhang's avatar
v1.0.1  
Hang Zhang committed
157
158
159
160
161
162
            with lock:
                results[i] = output
        except Exception as e:
            with lock:
                results[i] = e

Hang Zhang's avatar
Hang Zhang committed
163
164
165
166
    if len(modules) > 1:
        threads = [threading.Thread(target=_worker,
                                    args=(i, module, input, target,
                                          kwargs, device),)
Zhang's avatar
Zhang committed
167
                   for i, (module, input, target, kwargs, device) in
Hang Zhang's avatar
Hang Zhang committed
168
169
170
171
172
173
174
175
                   enumerate(zip(modules, inputs, targets, kwargs_tup, devices))]

        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
    else:
        _worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])
Hang Zhang's avatar
v1.0.1  
Hang Zhang committed
176
177
178
179
180
181
182
183

    outputs = []
    for i in range(len(inputs)):
        output = results[i]
        if isinstance(output, Exception):
            raise output
        outputs.append(output)
    return outputs