"git@developer.sourcefind.cn:OpenDAS/mmcv.git" did not exist on "f75f4094f3e6b553f4f1d3cec209a32d187acc58"
network.py 10.8 KB
Newer Older
1
2
3
"""DGL Distributed Training Infrastructure."""
from __future__ import absolute_import

Chao Ma's avatar
Chao Ma committed
4
5
6
7
8
import time
from enum import Enum
from collections import namedtuple

import dgl.backend as F
9
10
11
12
13
14
from ._ffi.function import _init_api
from .nodeflow import NodeFlow
from . import utils

_init_api("dgl.network")

15

16
17
################################ Common Network Components ##################################

Chao Ma's avatar
Chao Ma committed
18
19
20
21
22
23
24
25
_WAIT_TIME_SEC = 3  # 3 seconds


def _network_wait():
    """Sleep for a few seconds
    """
    time.sleep(_WAIT_TIME_SEC)

26

27
def _create_sender(net_type, msg_queue_size=2*1024*1024*1024):
28
    """Create a Sender communicator via C api
29
30
31
32
33

    Parameters
    ----------
    net_type : str
        'socket' or 'mpi'
34
    msg_queue_size : int
35
        message queue size (2GB by default)
36
    """
37
    assert net_type in ('socket', 'mpi'), 'Unknown network type.'
38
    return _CAPI_DGLSenderCreate(net_type, msg_queue_size)
39

40

41
def _create_receiver(net_type, msg_queue_size=2*1024*1024*1024):
42
43
44
45
46
47
    """Create a Receiver communicator via C api

    Parameters
    ----------
    net_type : str
        'socket' or 'mpi'
48
    msg_queue_size : int
49
        message queue size (2GB by default)
50
51
    """
    assert net_type in ('socket', 'mpi'), 'Unknown network type.'
52
    return _CAPI_DGLReceiverCreate(net_type, msg_queue_size)
53

54

55
56
def _finalize_sender(sender):
    """Finalize Sender communicator
57
58
59

    Parameters
    ----------
60
61
    sender : ctypes.c_void_p
        C Sender handle
62
    """
63
    _CAPI_DGLFinalizeSender(sender)
64

65

66
67
68
69
70
def _finalize_receiver(receiver):
    """Finalize Receiver Communicator
    """
    _CAPI_DGLFinalizeReceiver(receiver)

71

72
73
def _add_receiver_addr(sender, ip_addr, port, recv_id):
    """Add Receiver IP address to namebook
74
75
76

    Parameters
    ----------
77
78
    sender : ctypes.c_void_p
        C Sender handle
79
    ip_addr : str
80
        IP address of Receiver
81
    port : int
82
83
84
85
        listen of Receiver
    recv_id : int
        Receiver ID
    """
86
    assert recv_id >= 0, 'recv_id cannot be a negative number.'
87
    _CAPI_DGLSenderAddReceiver(sender, ip_addr, int(port), int(recv_id))
88

89

90
91
92
93
94
95
96
def _sender_connect(sender):
    """Connect to all the Receiver

    Parameters
    ----------
    sender : ctypes.c_void_p
        C Sender handle
97
    """
98
    _CAPI_DGLSenderConnect(sender)
99

100

101
def _receiver_wait(receiver, ip_addr, port, num_sender):
102
    """Wait all Sender to connect.
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121

    Parameters
    ----------
    receiver : ctypes.c_void_p
        C Receiver handle
    ip_addr : str
        IP address of Receiver
    port : int
        port of Receiver
    num_sender : int
        total number of Sender
    """
    assert num_sender >= 0, 'num_sender cannot be a negative number.'
    _CAPI_DGLReceiverWait(receiver, ip_addr, int(port), int(num_sender))


################################ Distributed Sampler Components ################################


122
123
def _send_nodeflow(sender, nodeflow, recv_id):
    """Send sampled subgraph (Nodeflow) to remote Receiver.
124
125
126
127

    Parameters
    ----------
    sender : ctypes.c_void_p
128
        C Sender handle
129
130
    nodeflow : NodeFlow
        NodeFlow object
131
132
    recv_id : int
        Receiver ID
133
    """
134
    assert recv_id >= 0, 'recv_id cannot be a negative number.'
135
    gidx = nodeflow._graph
136
137
138
139
    node_mapping = nodeflow._node_mapping.todgltensor()
    edge_mapping = nodeflow._edge_mapping.todgltensor()
    layers_offsets = utils.toindex(nodeflow._layer_offsets).todgltensor()
    flows_offsets = utils.toindex(nodeflow._block_offsets).todgltensor()
140
    _CAPI_SenderSendNodeFlow(sender,
141
                             int(recv_id),
142
                             gidx,
143
144
145
146
147
                             node_mapping,
                             edge_mapping,
                             layers_offsets,
                             flows_offsets)

148
def _send_sampler_end_signal(sender, recv_id):
149
150
151
152
153
154
155
156
157
    """Send an epoch-end signal to remote Receiver.

    Parameters
    ----------
    sender : ctypes.c_void_p
        C sender handle
    recv_id : int
        Receiver ID
    """
158
159
    assert recv_id >= 0, 'recv_id cannot be a negative number.'
    _CAPI_SenderSendSamplerEndSignal(sender, int(recv_id))
160
161

def _recv_nodeflow(receiver, graph):
162
163
164
165
166
    """Receive sampled subgraph (NodeFlow) from remote sampler.

    Parameters
    ----------
    receiver : ctypes.c_void_p
167
        C Receiver handle
168
169
170
171
172
    graph : DGLGraph
        The parent graph

    Returns
    -------
173
    NodeFlow or an end-signal
174
    """
175
    res = _CAPI_ReceiverRecvNodeFlow(receiver)
176
    if isinstance(res, int):
177
        return res
178
    else:
179
        return NodeFlow(graph, res)
Chao Ma's avatar
Chao Ma committed
180
181
182
183
184
185
186
187
188
189
190
191
192
193


################################ Distributed KVStore Components ################################


class KVMsgType(Enum):
    """Type of kvstore message
    """
    FINAL = 1
    INIT = 2
    PUSH = 3
    PULL = 4
    PULL_BACK = 5
    BARRIER = 6
194
    IP_ID = 7
195
196
    GET_SHAPE = 8
    GET_SHAPE_BACK = 9
Chao Ma's avatar
Chao Ma committed
197

198

199
KVStoreMsg = namedtuple("KVStoreMsg", "type rank name id data shape c_ptr")
Chao Ma's avatar
Chao Ma committed
200
201
202
203
204
205
206
207
208
209
210
211
212
213
"""Message of DGL kvstore

Data Field
----------
type : KVMsgType
    Type of DGL kvstore message
rank : int
    sender's ID
name : str
    data name
id : tensor (mx.ndarray or torch.tensor)
    data vector storing the global IDs
data : tensor (mx.ndarray or torch.tensor)
    data matrix with the same row size of id
214
215
c_ptr : void*
    c pointer of message
Chao Ma's avatar
Chao Ma committed
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
"""

def _send_kv_msg(sender, msg, recv_id):
    """Send kvstore message.

    Parameters
    ----------
    sender : ctypes.c_void_p
        C sender handle
    msg : KVStoreMsg
        kvstore message
    recv_id : int
        receiver's ID
    """
    if msg.type == KVMsgType.PULL:
        tensor_id = F.zerocopy_to_dgl_ndarray(msg.id)
        _CAPI_SenderSendKVMsg(
            sender,
            int(recv_id),
            msg.type.value,
            msg.rank,
            msg.name,
            tensor_id)
239
    elif msg.type in (KVMsgType.INIT, KVMsgType.GET_SHAPE_BACK):
240
241
242
243
244
245
246
247
        tensor_shape = F.zerocopy_to_dgl_ndarray(msg.shape)
        _CAPI_SenderSendKVMsg(
            sender,
            int(recv_id),
            msg.type.value,
            msg.rank,
            msg.name,
            tensor_shape)
248
    elif msg.type in (KVMsgType.IP_ID, KVMsgType.GET_SHAPE):
249
250
251
252
253
254
        _CAPI_SenderSendKVMsg(
            sender,
            int(recv_id),
            msg.type.value,
            msg.rank,
            msg.name)
Chao Ma's avatar
Chao Ma committed
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
    elif msg.type in (KVMsgType.FINAL, KVMsgType.BARRIER):
        _CAPI_SenderSendKVMsg(
            sender,
            int(recv_id),
            msg.type.value,
            msg.rank)
    else:
        tensor_id = F.zerocopy_to_dgl_ndarray(msg.id)
        data = F.zerocopy_to_dgl_ndarray(msg.data)
        _CAPI_SenderSendKVMsg(
            sender,
            int(recv_id),
            msg.type.value,
            msg.rank,
            msg.name,
            tensor_id,
            data)

273

Chao Ma's avatar
Chao Ma committed
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def _recv_kv_msg(receiver):
    """Receive kvstore message.

    Parameters
    ----------
    receiver : ctypes.c_void_p
        C Receiver handle
    Return
    ------
    KVStoreMsg
        kvstore message
    """
    msg_ptr = CAPI_ReceiverRecvKVMsg(receiver)
    msg_type = KVMsgType(_CAPI_ReceiverGetKVMsgType(msg_ptr))
    rank = _CAPI_ReceiverGetKVMsgRank(msg_ptr)
    if msg_type == KVMsgType.PULL:
        name = _CAPI_ReceiverGetKVMsgName(msg_ptr)
        tensor_id = F.zerocopy_from_dgl_ndarray(_CAPI_ReceiverGetKVMsgID(msg_ptr))
        msg = KVStoreMsg(
            type=msg_type,
            rank=rank,
            name=name,
            id=tensor_id,
297
            data=None,
298
299
300
            shape=None,
            c_ptr=msg_ptr)
        return msg
301
    elif msg_type in (KVMsgType.INIT, KVMsgType.GET_SHAPE_BACK):
302
303
304
305
306
307
308
309
310
        name = _CAPI_ReceiverGetKVMsgName(msg_ptr)
        tensor_shape = F.zerocopy_from_dgl_ndarray(_CAPI_ReceiverGetKVMsgShape(msg_ptr))
        msg = KVStoreMsg(
            type=msg_type,
            rank=rank,
            name=name,
            id=None,
            data=None,
            shape=tensor_shape,
311
            c_ptr=msg_ptr)
Chao Ma's avatar
Chao Ma committed
312
        return msg
313
    elif msg_type in (KVMsgType.IP_ID, KVMsgType.GET_SHAPE):
314
315
316
317
318
319
        name = _CAPI_ReceiverGetKVMsgName(msg_ptr)
        msg = KVStoreMsg(
            type=msg_type,
            rank=rank,
            name=name,
            id=None,
320
            data=None,
321
            shape=None,
322
            c_ptr=msg_ptr)
323
        return msg
Chao Ma's avatar
Chao Ma committed
324
325
326
327
328
329
    elif msg_type in (KVMsgType.FINAL, KVMsgType.BARRIER):
        msg = KVStoreMsg(
            type=msg_type,
            rank=rank,
            name=None,
            id=None,
330
            data=None,
331
            shape=None,
332
            c_ptr=msg_ptr)
Chao Ma's avatar
Chao Ma committed
333
334
335
336
337
338
339
340
341
342
        return msg
    else:
        name = _CAPI_ReceiverGetKVMsgName(msg_ptr)
        tensor_id = F.zerocopy_from_dgl_ndarray(_CAPI_ReceiverGetKVMsgID(msg_ptr))
        data = F.zerocopy_from_dgl_ndarray(_CAPI_ReceiverGetKVMsgData(msg_ptr))
        msg = KVStoreMsg(
            type=msg_type,
            rank=rank,
            name=name,
            id=tensor_id,
343
            data=data,
344
            shape=None,
345
            c_ptr=msg_ptr)
Chao Ma's avatar
Chao Ma committed
346
347
348
        return msg

    raise RuntimeError('Unknown message type: %d' % msg_type.value)
349
350


351
def _clear_kv_msg(msg):
352
353
    """Clear data of kvstore message
    """
354
    F.sync()
355
356
    if msg.c_ptr is not None:
        _CAPI_DeleteKVMsg(msg.c_ptr)
Chao Ma's avatar
Chao Ma committed
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409


def _fast_pull(name, id_tensor,
               machine_count, group_count, machine_id, client_id,
               partition_book, g2l, local_data,
               sender, receiver):
    """ Pull message

    Parameters
    ----------
    name : str
        data name string
    id_tensor : tensor
        tensor of ID
    machine_count : int
        count of total machine
    group_count : int
        count of server group
    machine_id : int
        current machine id
    client_id : int
        current client ID
    partition_book : tensor
        tensor of partition book
    g2l : tensor
        tensor of global2local
    local_data : tensor
        tensor of local shared data
    sender : ctypes.c_void_p
        C Sender handle
    receiver : ctypes.c_void_p
        C Receiver handle

    Return
    ------
    tensor
        target tensor
    """
    if g2l is not None:
        res_tensor = _CAPI_FastPull(name, machine_id, machine_count, group_count, client_id,
                                    F.zerocopy_to_dgl_ndarray(id_tensor),
                                    F.zerocopy_to_dgl_ndarray(partition_book),
                                    F.zerocopy_to_dgl_ndarray(local_data),
                                    sender, receiver, 'has_g2l',
                                    F.zerocopy_to_dgl_ndarray(g2l))
    else:
        res_tensor = _CAPI_FastPull(name, machine_id, machine_count, group_count, client_id,
                                    F.zerocopy_to_dgl_ndarray(id_tensor),
                                    F.zerocopy_to_dgl_ndarray(partition_book),
                                    F.zerocopy_to_dgl_ndarray(local_data),
                                    sender, receiver, 'no_g2l')

    return F.zerocopy_from_dgl_ndarray(res_tensor)