_array.py 6.57 KB
Newer Older
root's avatar
root committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import cupy
import numpy


class _MultiDeviceDummyMemory(cupy.cuda.Memory):
    pass


class _MultiDeviceDummyPointer(cupy.cuda.MemoryPointer):
    @property
    def device(self):
        # This override is needed to assign an invalid device id
        # Since the array is not residing in a single device now
        return cupy.cuda.device.Device(-1)


class _DistributedArray(cupy.ndarray):
    # Explicitly disable indexing, view, broadcast
    def __new__(cls, shape, dtype, chunks, axis, tile_size, devices):
        # Create a dummy memptr for the array data, we will also
        # override its device methods
        # memptr is needed to avoid actual device allocation on array init
        # we need to generate other devices
        mem = _MultiDeviceDummyMemory(0)
        memptr = _MultiDeviceDummyPointer(mem, 0)
        obj = super().__new__(cls, shape, dtype, memptr=memptr)
        obj._chunks = chunks
        obj._tile_size = tile_size
        obj._mem = mem
        obj._axis = axis
        return obj

    def __array_finalize__(self, obj):
        if obj is None:
            return
        self._chunks = getattr(obj, 'chunks', None)
        self._tile_size = getattr(obj, 'tile_size', None)
        self._axis = getattr(obj, 'axis', None)
        self._mem = getattr(obj, 'mem', None)

    def _get_chunk(self, i):
        return self._chunks[i]

    def _prepare_args(self, dist_args, regular_args, device):
        # Dist arrays must have chunks of compatible shapes, otherwise
        # hard error.
        # In case that they are of different, but broadcastable shapes
        # Data movement may be needed
        # Currently: Support only same shape chunks
        args = []
        c_shape = None
        for (i, arg) in dist_args:
            chunk = arg._get_chunk(device)
            args.append((i, chunk))
            if c_shape is None:
                c_shape = chunk.shape
            # TODO(ecastill) check if broadcastable, the array must have been
            # split in the same axis?
            if chunk.shape != c_shape:
                raise RuntimeError(
                    'Operating distributed arrays of different chunk sizes'
                    ' together is not supported')

        # Case of X.T and other data movement requiring cases not supported
        # TODO(ecastill) add support for operands being non distributed arrays
        # 1. Check if the regular arrays are on the specified device or
        #    peer access is enabled
        # 2. Check that their shape is compatible with the chunks
        #    distributed arrays
        # 3. Create views of this array and copy to the given device if needed
        #    so that the chunks in the distributed operate with the right slice
        if len(regular_args) > 0:
            raise RuntimeError(
                'Mix `cupy.ndarray` with distributed arrays is not currently'
                'supported')

        return args

    def _get_execution_devices(self, dist_args):
        devices = set()
        for _, arg in dist_args:
            # The key of chunks is the device id
            for dev in arg._chunks:
                devices.add(dev)
        return devices

    def _execute_kernel(self, kernel, args, kwargs):
        distributed_arrays = []
        regular_arrays = []
        for i, arg in enumerate(args):
            if isinstance(arg, _DistributedArray):
                distributed_arrays.append((i, arg))
            elif isinstance(arg, cupy.ndarray):
                regular_arrays.append((i, arg))

        # Do it for kwargs too
        for k, arg in kwargs.items():
            if isinstance(arg, _DistributedArray):
                distributed_arrays.append((k, arg))
            elif isinstance(arg, cupy.ndarray):
                regular_arrays.append((k, arg))

        args = list(args)
        devices = self._get_execution_devices(distributed_arrays)
        dev_outs = {}
        dtype = None
        for dev in devices:
            array_args = self._prepare_args(
                distributed_arrays, regular_arrays, dev)
            for (i, arg) in array_args:
                if isinstance(i, int):
                    args[i] = arg
                else:
                    kwargs[i] = arg
            with cupy.cuda.Device(dev):
                out = kernel(*args, **kwargs)
                dtype = out.dtype
                dev_outs[dev] = out

        for out in dev_outs.values():
            if not isinstance(out, cupy.ndarray):
                raise RuntimeError(
                    'kernels returning other than single array not supported')

        return _DistributedArray(
            self.shape, dtype, dev_outs,
            self._axis, self._tile_size, devices)

    def __cupy_override_elementwise_kernel__(self, kernel, *args, **kwargs):
        # This defines a protocol to be called from elementwise kernel
        # to override some of the ops done there
        outs = self._execute_kernel(kernel, args, kwargs)
        return outs

    def asnumpy(self):
        # Coalesce it in a single array
        chunks = [cupy.asnumpy(c) for c in self._chunks.values()]
        return numpy.concatenate(chunks, axis=self._axis)


def _split_tiles(array, nr_splits):
    # TODO(ecastill) support arbitrary tiles shape
    axis = nr_splits.argmax()
    ind = nr_splits[axis]
    if isinstance(array, cupy.ndarray):
        arrs = [cupy.ascontiguousarray(a)
                for a in cupy.array_split(array, int(ind), int(axis))]
    else:
        arrs = numpy.array_split(array, ind, axis)
    return axis, arrs


def array(array, devices, tile_shape):
    """ Create an array that is splitted accross multiple devices in the
    same host.
    """
    if not isinstance(array, (numpy.ndarray, cupy.ndarray)):
        raise TypeError('`array` needs to be a numpy or cupy array')
    # 1. Check that array_size / tile_shape == len(devices)
    array_shape = numpy.array(array.shape, dtype=numpy.int32)
    tile_shape = numpy.array(tile_shape, dtype=numpy.int32)
    nr_splits = numpy.ceil(array_shape / tile_shape)
    if nr_splits[nr_splits > 1].size > 1:
        raise RuntimeError(
            'Currently only sharding across a single axis is allowed')

    assert len(tile_shape) == len(array_shape)

    nr_chunks = numpy.prod(nr_splits)

    if nr_chunks != len(devices):
        raise RuntimeError(
            'Array chunks must match the amount of devices')

    axis, arrs = _split_tiles(array, nr_splits)
    cp_chunks = {}
    for arr, dev in zip(arrs, devices):
        with cupy.cuda.Device(dev):
            cp_chunks[dev] = cupy.array(arr)
    return _DistributedArray(
        array_shape, array.dtype, cp_chunks, axis, tile_shape, devices)