dist_graph.py 21.3 KB
Newer Older
1
2
3
4
5
6
7
"""Define distributed graph."""

from collections.abc import MutableMapping

from ..graph import DGLGraph
from .. import backend as F
from ..base import NID, EID
8
from .kvstore import KVServer, KVClient
9
10
11
12
from ..graph_index import from_shared_mem_graph_index
from .._ffi.ndarray import empty_shared_mem
from ..frame import infer_scheme
from .partition import load_partition
Da Zheng's avatar
Da Zheng committed
13
from .graph_partition_book import PartitionPolicy, get_shared_mem_partition_book
14
from .. import utils
15
16
17
18
from .shared_mem_utils import _to_shared_mem, _get_ndata_path, _get_edata_path, DTYPE_DICT
from .rpc_client import connect_to_server
from .server_state import ServerState
from .rpc_server import start_server
19
from ..transform import as_heterograph
20
21
22
23
24
25
26
27
28

def _get_graph_path(graph_name):
    return "/" + graph_name

def _copy_graph_to_shared_mem(g, graph_name):
    gidx = g._graph.copyto_shared_mem(_get_graph_path(graph_name))
    new_g = DGLGraph(gidx)
    # We should share the node/edge data to the client explicitly instead of putting them
    # in the KVStore because some of the node/edge data may be duplicated.
Da Zheng's avatar
Da Zheng committed
29
30
31
32
    local_node_path = _get_ndata_path(graph_name, 'inner_node')
    new_g.ndata['inner_node'] = _to_shared_mem(g.ndata['inner_node'], local_node_path)
    local_edge_path = _get_edata_path(graph_name, 'inner_edge')
    new_g.edata['inner_edge'] = _to_shared_mem(g.edata['inner_edge'], local_edge_path)
33
34
    new_g.ndata[NID] = _to_shared_mem(g.ndata[NID], _get_ndata_path(graph_name, NID))
    new_g.edata[EID] = _to_shared_mem(g.edata[EID], _get_edata_path(graph_name, EID))
35
36
    return new_g

Da Zheng's avatar
Da Zheng committed
37
38
FIELD_DICT = {'inner_node': F.int64,
              'inner_edge': F.int64,
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
              NID: F.int64,
              EID: F.int64}

def _get_ndata_name(name):
    ''' This is to get the name of node data in the kvstore.

    KVStore doesn't understand node data or edge data. We'll use a prefix to distinguish them.
    '''
    return 'node:' + name

def _get_edata_name(name):
    ''' This is to get the name of edge data in the kvstore.

    KVStore doesn't understand node data or edge data. We'll use a prefix to distinguish them.
    '''
    return 'edge:' + name

def _is_ndata_name(name):
    ''' Is this node data in the kvstore '''
    return name[:5] == 'node:'

def _is_edata_name(name):
    ''' Is this edge data in the kvstore '''
    return name[:5] == 'edge:'

def _get_shared_mem_ndata(g, graph_name, name):
    ''' Get shared-memory node data from DistGraph server.

    This is called by the DistGraph client to access the node data in the DistGraph server
    with shared memory.
    '''
    shape = (g.number_of_nodes(),)
    dtype = FIELD_DICT[name]
    dtype = DTYPE_DICT[dtype]
    data = empty_shared_mem(_get_ndata_path(graph_name, name), False, shape, dtype)
    dlpack = data.to_dlpack()
    return F.zerocopy_from_dlpack(dlpack)

def _get_shared_mem_edata(g, graph_name, name):
    ''' Get shared-memory edge data from DistGraph server.

    This is called by the DistGraph client to access the edge data in the DistGraph server
    with shared memory.
    '''
    shape = (g.number_of_edges(),)
    dtype = FIELD_DICT[name]
    dtype = DTYPE_DICT[dtype]
    data = empty_shared_mem(_get_edata_path(graph_name, name), False, shape, dtype)
    dlpack = data.to_dlpack()
    return F.zerocopy_from_dlpack(dlpack)

def _get_graph_from_shared_mem(graph_name):
    ''' Get the graph from the DistGraph server.

    The DistGraph server puts the graph structure of the local partition in the shared memory.
    The client can access the graph structure and some metadata on nodes and edges directly
    through shared memory to reduce the overhead of data access.
    '''
    gidx = from_shared_mem_graph_index(_get_graph_path(graph_name))
    if gidx is None:
        return gidx

    g = DGLGraph(gidx)
Da Zheng's avatar
Da Zheng committed
102
103
    g.ndata['inner_node'] = _get_shared_mem_ndata(g, graph_name, 'inner_node')
    g.edata['inner_edge'] = _get_shared_mem_edata(g, graph_name, 'inner_edge')
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
    g.ndata[NID] = _get_shared_mem_ndata(g, graph_name, NID)
    g.edata[EID] = _get_shared_mem_edata(g, graph_name, EID)
    return g

class DistTensor:
    ''' Distributed tensor.

    This is a wrapper to access a tensor stored in multiple machines.
    This wrapper provides an interface similar to the local tensor.

    Parameters
    ----------
    kv : DistGraph
        The distributed graph object.
    name : string
        The name of the tensor.
    '''
    def __init__(self, g, name):
        self.kvstore = g._client
        self.name = name
        dtype, shape, _ = g._client.get_data_meta(name)
        self._shape = shape
        self._dtype = dtype

    def __getitem__(self, idx):
129
130
        idx = utils.toindex(idx)
        idx = idx.tousertensor()
131
132
133
        return self.kvstore.pull(name=self.name, id_tensor=idx)

    def __setitem__(self, idx, val):
134
135
        idx = utils.toindex(idx)
        idx = idx.tousertensor()
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
        # TODO(zhengda) how do we want to support broadcast (e.g., G.ndata['h'][idx] = 1).
        self.kvstore.push(name=self.name, id_tensor=idx, data_tensor=val)

    def __len__(self):
        return self._shape[0]

    @property
    def shape(self):
        ''' Return the shape of the distributed tensor. '''
        return self._shape

    @property
    def dtype(self):
        ''' Return the data type of the distributed tensor. '''
        return self._dtype


class NodeDataView(MutableMapping):
    """The data view class when dist_graph.ndata[...].data is called.
    """
    __slots__ = ['_graph', '_data']

    def __init__(self, g):
        self._graph = g
        # When this is created, the server may already load node data. We need to
        # initialize the node data in advance.
        names = g._get_all_ndata_names()
        self._data = {name: DistTensor(g, _get_ndata_name(name)) for name in names}

    def _get_names(self):
        return list(self._data.keys())

    def _add(self, name):
        self._data[name] = DistTensor(self._graph, _get_ndata_name(name))

    def __getitem__(self, key):
        return self._data[key]

    def __setitem__(self, key, val):
        raise DGLError("DGL doesn't support assignment. "
                       + "Please call init_ndata to initialize new node data.")

    def __delitem__(self, key):
        #TODO(zhengda) how to delete data in the kvstore.
        raise NotImplementedError("delete node data isn't supported yet")

    def __len__(self):
        # The number of node data may change. Let's count it every time we need them.
        # It's not called frequently. It should be fine.
        return len(self._data)

    def __iter__(self):
        return iter(self._data)

    def __repr__(self):
        reprs = {}
        for name in self._data:
            dtype = F.dtype(self._data[name])
            shape = F.shape(self._data[name])
            reprs[name] = 'DistTensor(shape={}, dtype={})'.format(str(shape), str(dtype))
        return repr(reprs)

class EdgeDataView(MutableMapping):
    """The data view class when G.edges[...].data is called.
    """
    __slots__ = ['_graph', '_data']

    def __init__(self, g):
        self._graph = g
        # When this is created, the server may already load edge data. We need to
        # initialize the edge data in advance.
        names = g._get_all_edata_names()
        self._data = {name: DistTensor(g, _get_edata_name(name)) for name in names}

    def _get_names(self):
        return list(self._data.keys())

    def _add(self, name):
        self._data[name] = DistTensor(self._graph, _get_edata_name(name))

    def __getitem__(self, key):
        return self._data[key]

    def __setitem__(self, key, val):
        raise DGLError("DGL doesn't support assignment. "
                       + "Please call init_edata to initialize new edge data.")

    def __delitem__(self, key):
        #TODO(zhengda) how to delete data in the kvstore.
        raise NotImplementedError("delete edge data isn't supported yet")

    def __len__(self):
        # The number of edge data may change. Let's count it every time we need them.
        # It's not called frequently. It should be fine.
        return len(self._data)

    def __iter__(self):
        return iter(self._data)

    def __repr__(self):
        reprs = {}
        for name in self._data:
            dtype = F.dtype(self._data[name])
            shape = F.shape(self._data[name])
            reprs[name] = 'DistTensor(shape={}, dtype={})'.format(str(shape), str(dtype))
        return repr(reprs)


class DistGraphServer(KVServer):
    ''' The DistGraph server.

    This DistGraph server loads the graph data and sets up a service so that clients can read data
    of a graph partition (graph structure, node data and edge data) from remote machines.
    A server is responsible for one graph partition.

    Currently, each machine runs only one main server with a set of backup servers to handle
    clients' requests. The main server and the backup servers all handle the requests for the same
    graph partition. They all share the partition data (graph structure and node/edge data) with
    shared memory.

256
257
258
    By default, the partition data is shared with the DistGraph clients that run on
    the same machine. However, a user can disable shared memory option. This is useful for the case
    that a user wants to run the server and the client on different machines.
259
260
261
262
263

    Parameters
    ----------
    server_id : int
        The server ID (start from 0).
264
265
266
    ip_config : str
        Path of IP configuration file.
    num_clients : int
267
268
269
270
271
        Total number of client nodes.
    graph_name : string
        The name of the graph. The server and the client need to specify the same graph name.
    conf_file : string
        The path of the config file generated by the partition tool.
272
273
    disable_shared_mem : bool
        Disable shared memory.
274
    '''
275
276
    def __init__(self, server_id, ip_config, num_clients, graph_name, conf_file,
                 disable_shared_mem=False):
277
278
279
280
        super(DistGraphServer, self).__init__(server_id=server_id, ip_config=ip_config,
                                              num_clients=num_clients)
        self.ip_config = ip_config
        # Load graph partition data.
Da Zheng's avatar
Da Zheng committed
281
        self.client_g, node_feats, edge_feats, self.gpb = load_partition(conf_file, server_id)
282
283
        if not disable_shared_mem:
            self.client_g = _copy_graph_to_shared_mem(self.client_g, graph_name)
284

285
        # Init kvstore.
286
287
        if not disable_shared_mem:
            self.gpb.shared_memory(graph_name)
288
289
290
291
        self.add_part_policy(PartitionPolicy('node', server_id, self.gpb))
        self.add_part_policy(PartitionPolicy('edge', server_id, self.gpb))

        if not self.is_backup_server():
292
            for name in node_feats:
293
294
                self.init_data(name=_get_ndata_name(name), policy_str='node',
                               data_tensor=node_feats[name])
295
            for name in edge_feats:
296
297
                self.init_data(name=_get_edata_name(name), policy_str='edge',
                               data_tensor=edge_feats[name])
298
299
        else:
            for name in node_feats:
300
                self.init_data(name=_get_ndata_name(name), policy_str='node')
301
            for name in edge_feats:
302
303
304
305
306
307
                self.init_data(name=_get_edata_name(name), policy_str='edge')

    def start(self):
        """ Start graph store server.
        """
        # start server
Jinjing Zhou's avatar
Jinjing Zhou committed
308
309
        server_state = ServerState(kv_store=self, local_g=self.client_g, partition_book=self.gpb)
        start_server(server_id=self.server_id, ip_config=self.ip_config,
310
311
312
313
                     num_clients=self.num_clients, server_state=server_state)

def _default_init_data(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())
314

315
316
317
318
class DistGraph:
    ''' The DistGraph client.

    This provides the graph interface to access the partitioned graph data for distributed GNN
319
320
321
322
323
324
325
326
    training. All data of partitions are loaded by the DistGraph server.

    By default, `DistGraph` uses shared-memory to access the partition data in the local machine.
    This gives the best performance for distributed training when we run `DistGraphServer`
    and `DistGraph` on the same machine. However, a user may want to run them in separate
    machines. In this case, a user may want to disable shared memory by passing
    `disable_shared_mem=False` when creating `DistGraphServer`. When shared-memory is disabled,
    a user has to pass a partition book.
327
328
329

    Parameters
    ----------
330
331
    ip_config : str
        Path of IP configuration file.
332
333
    graph_name : str
        The name of the graph. This name has to be the same as the one used in DistGraphServer.
334
335
    gpb : PartitionBook
        The partition book object
336
    '''
337
    def __init__(self, ip_config, graph_name, gpb=None):
338
339
        connect_to_server(ip_config=ip_config)
        self._client = KVClient(ip_config)
340
341
342
343
344
        g = _get_graph_from_shared_mem(graph_name)
        if g is not None:
            self._g = as_heterograph(g)
        else:
            self._g = None
345
        self._gpb = get_shared_mem_partition_book(graph_name, self._g)
346
347
        if self._gpb is None:
            self._gpb = gpb
348
        self._client.barrier()
349
        self._client.map_shared_data(self._gpb)
350
351
        self._ndata = NodeDataView(self)
        self._edata = EdgeDataView(self)
352
353
        self._default_init_ndata = _default_init_data
        self._default_init_edata = _default_init_data
354

Da Zheng's avatar
Da Zheng committed
355
356
357
358
359
360
        self._num_nodes = 0
        self._num_edges = 0
        for part_md in self._gpb.metadata():
            self._num_nodes += int(part_md['num_nodes'])
            self._num_edges += int(part_md['num_edges'])

361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376

    def init_ndata(self, ndata_name, shape, dtype):
        '''Initialize node data

        This initializes the node data in the distributed graph storage.

        Parameters
        ----------
        name : string
            The name of the node data.
        shape : tuple
            The shape of the node data.
        dtype : dtype
            The data type of the node data.
        '''
        assert shape[0] == self.number_of_nodes()
377
378
        self._client.init_data(_get_ndata_name(ndata_name), shape, dtype, 'node', self._gpb,
                               self._default_init_ndata)
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
        self._ndata._add(ndata_name)

    def init_edata(self, edata_name, shape, dtype):
        '''Initialize edge data

        This initializes the edge data in the distributed graph storage.

        Parameters
        ----------
        name : string
            The name of the edge data.
        shape : tuple
            The shape of the edge data.
        dtype : dtype
            The data type of the edge data.
        '''
        assert shape[0] == self.number_of_edges()
396
397
        self._client.init_data(_get_edata_name(edata_name), shape, dtype, 'edge', self._gpb,
                               self._default_init_edata)
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
        self._edata._add(edata_name)

    def init_node_emb(self, name, shape, dtype, initializer):
        ''' Initialize node embeddings.

        This initializes the node embeddings in the distributed graph storage.

        Parameters
        ----------
        name : string
            The name of the node embeddings.
        shape : tuple
            The shape of the node embeddings.
        dtype : string
            The data type of the node embeddings.
        initializer : callable
            The initializer.
        '''
        # TODO(zhengda)
        raise NotImplementedError("init_node_emb isn't supported yet")

    def get_node_embeddings(self):
        ''' Return node embeddings

        Returns
        -------
        a dict of SparseEmbedding
            All node embeddings in the graph store.
        '''
        # TODO(zhengda)
        raise NotImplementedError("get_node_embeddings isn't supported yet")

430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
    @property
    def local_partition(self):
        ''' Return the local partition on the client

        DistGraph provides a global view of the distributed graph. Internally,
        it may contains a partition of the graph if it is co-located with
        the server. If there is no co-location, this returns None.

        Returns
        -------
        DGLHeterograph
            The local partition
        '''
        return self._g

445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
    @property
    def ndata(self):
        """Return the data view of all the nodes.

        Returns
        -------
        NodeDataView
            The data view in the distributed graph storage.
        """
        return self._ndata

    @property
    def edata(self):
        """Return the data view of all the edges.

        Returns
        -------
        EdgeDataView
            The data view in the distributed graph storage.
        """
        return self._edata

    def number_of_nodes(self):
        """Return the number of nodes"""
Da Zheng's avatar
Da Zheng committed
469
        return self._num_nodes
470
471
472

    def number_of_edges(self):
        """Return the number of edges"""
Da Zheng's avatar
Da Zheng committed
473
        return self._num_edges
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496

    def node_attr_schemes(self):
        """Return the node feature and embedding schemes."""
        schemes = {}
        for key in self.ndata:
            schemes[key] = infer_scheme(self.ndata[key])
        return schemes

    def edge_attr_schemes(self):
        """Return the edge feature and embedding schemes."""
        schemes = {}
        for key in self.edata:
            schemes[key] = infer_scheme(self.edata[key])
        return schemes

    def rank(self):
        ''' The rank of the distributed graph store.

        Returns
        -------
        int
            The rank of the current graph store.
        '''
497
498
499
500
        if self._g is None:
            return self._client.client_id
        else:
            return self._gpb.partid
501
502
503
504
505
506
507
508
509
510

    def get_partition_book(self):
        """Get the partition information.

        Returns
        -------
        GraphPartitionBook
            Object that stores all kinds of partition information.
        """
        return self._gpb
511
512
513
514

    def _get_all_ndata_names(self):
        ''' Get the names of all node data.
        '''
515
        names = self._client.data_name_list()
516
517
518
519
520
521
522
523
524
525
        ndata_names = []
        for name in names:
            if _is_ndata_name(name):
                # Remove the prefix "node:"
                ndata_names.append(name[5:])
        return ndata_names

    def _get_all_edata_names(self):
        ''' Get the names of all edge data.
        '''
526
        names = self._client.data_name_list()
527
528
529
530
531
532
        edata_names = []
        for name in names:
            if _is_edata_name(name):
                # Remove the prefix "edge:"
                edata_names.append(name[5:])
        return edata_names
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627

def _get_overlap(mask_arr, ids):
    """ Select the Ids given a boolean mask array.

    The boolean mask array indicates all of the Ids to be selected. We want to
    find the overlap between the Ids selected by the boolean mask array and
    the Id array.

    Parameters
    ----------
    mask_arr : 1D tensor
        A boolean mask array.
    ids : 1D tensor
        A vector with Ids.

    Returns
    -------
    1D tensor
        The selected Ids.
    """
    if isinstance(mask_arr, DistTensor):
        masks = mask_arr[ids]
        return F.boolean_mask(ids, masks)
    else:
        mask_arr = utils.toindex(mask_arr)
        masks = F.gather_row(mask_arr.tousertensor(), ids)
        return F.boolean_mask(ids, masks)

def node_split(nodes, partition_book, rank):
    ''' Split nodes and return a subset for the local rank.

    This function splits the input nodes based on the partition book and
    returns a subset of nodes for the local rank. This method is used for
    dividing workloads for distributed training.

    The input nodes can be stored as a vector of masks. The length of the vector is
    the same as the number of nodes in a graph; 1 indicates that the vertex in
    the corresponding location exists.

    Parameters
    ----------
    nodes : 1D tensor or DistTensor
        A boolean mask vector that indicates input nodes.
    partition_book : GraphPartitionBook
        The graph partition book
    rank : int
        The rank of the current process

    Returns
    -------
    1D-tensor
        The vector of node Ids that belong to the rank.
    '''
    num_nodes = 0
    for part in partition_book.metadata():
        num_nodes += part['num_nodes']
    assert len(nodes) == num_nodes, \
            'The length of boolean mask vector should be the number of nodes in the graph.'
    # Get all nodes that belong to the rank.
    local_nids = partition_book.partid2nids(rank)
    return _get_overlap(nodes, local_nids)

def edge_split(edges, partition_book, rank):
    ''' Split edges and return a subset for the local rank.

    This function splits the input edges based on the partition book and
    returns a subset of edges for the local rank. This method is used for
    dividing workloads for distributed training.

    The input edges can be stored as a vector of masks. The length of the vector is
    the same as the number of edges in a graph; 1 indicates that the edge in
    the corresponding location exists.

    Parameters
    ----------
    edges : 1D tensor or DistTensor
        A boolean mask vector that indicates input nodes.
    partition_book : GraphPartitionBook
        The graph partition book
    rank : int
        The rank of the current process

    Returns
    -------
    1D-tensor
        The vector of edge Ids that belong to the rank.
    '''
    num_edges = 0
    for part in partition_book.metadata():
        num_edges += part['num_edges']
    assert len(edges) == num_edges, \
            'The length of boolean mask vector should be the number of edges in the graph.'
    # Get all edges that belong to the rank.
    local_eids = partition_book.partid2eids(rank)
    return _get_overlap(edges, local_eids)