convert_partition.py 21.9 KB
Newer Older
1
2
import argparse
import gc
3
import json
4
5
import logging
import os
6
import time
7

8
9
import constants

10
import dgl
11
import numpy as np
12
import pandas as pd
13
14
import pyarrow
import torch as th
15
16
17
from dgl.distributed.partition import (
    _etype_str_to_tuple,
    _etype_tuple_to_str,
18
    RESERVED_FIELD_DTYPE,
19
)
20
21
from pyarrow import csv
from utils import get_idranges, memory_snapshot, read_json
22

23

24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def _get_unique_invidx(srcids, dstids, nids):
    """This function is used to compute a list of unique elements,
    and their indices in the input list, which is the concatenation
    of srcids, dstids and uniq_nids. In addition, this function will also
    compute inverse indices, in the list of unique elements, for the
    elements in srcids, dstids and nids arrays. srcids, dstids will be
    over-written to contain the inverse indices. Basically, this function
    is mimicing the functionality of numpy's unique function call.
    The problem with numpy's unique function call is its high memory
    requirement. For an input list of 3 billion edges it consumes about
    550GB of systems memory, which is limiting the capability of the
    partitioning pipeline.

    Current numpy uniques function returns 3 return parameters, which are
        . list of unique elements
        . list of indices, in the input argument list, which are first
            occurance of the corresponding element in the uniques list
        . list of inverse indices, which are indices from the uniques list
            and can be used to rebuild the original input array
    Compared to the above numpy's return parameters, this work around
    solution returns 4 values
        . list of unique elements,
        . list of indices, which may not be the first occurance of the
            corresponding element from the uniques
        . list of inverse indices, here we only build the inverse indices
            for srcids and dstids input arguments. For the current use case,
            only these two inverse indices are needed.

    Parameters:
    -----------
    srcids : numpy array
        a list of numbers, which are the src-ids of the edges
    dstids : numpy array
        a list of numbers, which are the dst-ids of the edges
    nids : numpy array
        a list of numbers, a list of unique shuffle-global-nids.
        This list is guaranteed to be a list of sorted consecutive unique
        list of numbers. Also, this list will be a `super set` for the
        list of dstids. Current implementation of the pipeline guarantees
        this assumption and is used to simplify the current implementation
        of the workaround solution.

    Returns:
    --------
    numpy array :
        a list of unique, sorted elements, computed from the input arguments
    numpy array :
        a list of integers. These are indices in the concatenated list
        [srcids, dstids, uniq_nids], which are the input arguments to this function
    numpy array :
        a list of integers. These are inverse indices, which will be indices
        from the unique elements list specifying the elements from the
        input array, srcids
    numpy array :
        a list of integers. These are inverse indices, which will be indices
        from the unique elements list specifying the elements from the
        input array, dstids
    """
    assert len(srcids) == len(
        dstids
    ), f"Please provide the correct input parameters"
    assert len(srcids) != 0, f"Please provide a non-empty edge-list."

    if np.__version__ < "1.24.0":
        logging.warning(
            f"Numpy version, {np.__version__}, is lower than expected."
            f"Falling back to numpy's native function unique."
            f"This functions memory overhead will limit size of the "
            f"partitioned graph objects processed by each node in the cluster."
        )
        uniques, idxes, inv_idxes = np.unique(
            np.concatenate([srcids, dstids, nids]),
            return_index=True,
            return_inverse=True,
        )
        src_len = len(srcids)
        dst_len = len(dstids)
        return (
            uniques,
            idxes,
            inv_idxes[:src_len],
            inv_idxes[src_len : (src_len + dst_len)],
        )

    # find uniqes which appear only in the srcids list
    mask = np.isin(srcids, nids, invert=True, kind="table")
    srcids_only = srcids[mask]
    srcids_idxes = np.where(mask == 1)[0]

    # sort
    uniques, unique_srcids_idx = np.unique(srcids_only, return_index=True)
    idxes = srcids_idxes[unique_srcids_idx]

    # build uniques and idxes, first and second return parameters
    uniques = np.concatenate([uniques, nids])
    idxes = np.concatenate(
        [idxes, len(srcids) + len(dstids) + np.arange(len(nids))]
    )

    # sort and idxes
    sort_idx = np.argsort(uniques)
    uniques = uniques[sort_idx]
    idxes = idxes[sort_idx]

    # uniques and idxes are built
    assert len(uniques) == len(idxes), f"Error building the idxes array."

    # build inverse idxes for srcids, dstids and nids
    # over-write the srcids and dstids arrays.
    sort_ids = np.argsort(srcids)
    srcids = srcids[sort_ids]

    # TODO: check if wrapping this while loop in a c++ wrapper
    # helps in speeding up the code
    idx1 = 0
    idx2 = 0
    while (idx1 < len(srcids)) and (idx2 < len(uniques)):
        if srcids[idx1] == uniques[idx2]:
            srcids[idx1] = idx2
            idx1 += 1
        elif srcids[idx1] < uniques[idx2]:
            idx1 += 1
        else:
            idx2 += 1

    assert idx1 >= len(srcids), (
        f"Failed to locate all srcids in the uniques array "
        f" len(srcids) = {len(srcids)}, idx1 = {idx1} "
        f" len(uniques) = {len(uniques)}, idx2 = {idx2}"
    )
    srcids[sort_ids] = srcids

    # process dstids now.
    # dstids is guaranteed to be a subset of the `nids` list
    # here we are computing index in the list of uniqes for
    # each element in the list of dstids, in a two step process
    # 1. locate the position of first element from nids in the
    #       list of uniques - dstids cannot appear to the left
    #       of this number, they are guaranteed to be on the right
    #       side of this number.
    # 2. dstids = dstids - nids[0]
    #       By subtracting nids[0] from the list of dstids will make
    #       the list of dstids to be in the range of [0, max(nids)-1]
    # 3. dstids = dstids - nids[0] + offset
    #       Now we move the list of dstids by `offset` which will be
    #       the starting position of the nids[0] element. Note that
    #       nids will ALWAYS be a SUPERSET of dstids.
    offset = np.searchsorted(uniques, nids[0], side="left")
    dstids = dstids - nids[0] + offset

    # return the values
    return uniques, idxes, srcids, dstids


178
179
180
181
182
183
def create_dgl_object(
    schema,
    part_id,
    node_data,
    edge_data,
    edgeid_offset,
184
185
    node_typecounts,
    edge_typecounts,
186
187
188
    return_orig_nids=False,
    return_orig_eids=False,
):
189
190
    """
    This function creates dgl objects for a given graph partition, as in function
191
    arguments.
192

193
194
    The "schema" argument is a dictionary, which contains the metadata related to node ids
    and edge ids. It contains two keys: "nid" and "eid", whose value is also a dictionary
195
    with the following structure.
196
197

    1. The key-value pairs in the "nid" dictionary has the following format.
198
       "ntype-name" is the user assigned name to this node type. "format" describes the
199
200
       format of the contents of the files. and "data" is a list of lists, each list has
       3 elements: file-name, start_id and end_id. File-name can be either absolute or
201
       relative path to this file and starting and ending ids are type ids of the nodes
202
       which are contained in this file. These type ids are later used to compute global ids
203
       of these nodes which are used throughout the processing of this pipeline.
204
        "ntype-name" : {
205
            "format" : "csv",
206
            "data" : [
207
                    [ <path-to-file>/ntype0-name-0.csv, start_id0, end_id0],
208
209
210
211
212
213
214
215
                    [ <path-to-file>/ntype0-name-1.csv, start_id1, end_id1],
                    ...
                    [ <path-to-file>/ntype0-name-<p-1>.csv, start_id<p-1>, end_id<p-1>],
            ]
        }

    2. The key-value pairs in the "eid" dictionary has the following format.
       As described for the "nid" dictionary the "eid" dictionary is similarly structured
216
       except that these entries are for edges.
217
        "etype-name" : {
218
            "format" : "csv",
219
            "data" : [
220
                    [ <path-to-file>/etype0-name-0, start_id0, end_id0],
221
222
223
224
225
226
227
                    [ <path-to-file>/etype0-name-1 start_id1, end_id1],
                    ...
                    [ <path-to-file>/etype0-name-1 start_id<p-1>, end_id<p-1>]
            ]
        }

    In "nid" dictionary, the type_nids are specified that
228
229
    should be assigned to nodes which are read from the corresponding nodes file.
    Along the same lines dictionary for the key "eid" is used for edges in the
230
231
232
233
234
    input graph.

    These type ids, for nodes and edges, are used to compute global ids for nodes
    and edges which are stored in the graph object.

235
236
237
238
239
240
241
242
243
244
    Parameters:
    -----------
    schame : json object
        json object created by reading the graph metadata json file
    part_id : int
        partition id of the graph partition for which dgl object is to be created
    node_data : numpy ndarray
        node_data, where each row is of the following format:
        <global_nid> <ntype_id> <global_type_nid>
    edge_data : numpy ndarray
245
        edge_data, where each row is of the following format:
246
247
248
        <global_src_id> <global_dst_id> <etype_id> <global_type_eid>
    edgeid_offset : int
        offset to be used when assigning edge global ids in the current partition
249
250
    return_orig_ids : bool, optional
        Indicates whether to return original node/edge IDs.
251

252
    Returns:
253
254
255
256
257
258
259
260
261
262
263
    --------
    dgl object
        dgl object created for the current graph partition
    dictionary
        map between node types and the range of global node ids used
    dictionary
        map between edge types and the range of global edge ids used
    dictionary
        map between node type(string)  and node_type_id(int)
    dictionary
        map between edge type(string)  and edge_type_id(int)
264
265
266
267
268
269
270
271
    dict of tensors
        If `return_orig_nids=True`, return a dict of 1D tensors whose key is the node type
        and value is a 1D tensor mapping between shuffled node IDs and the original node
        IDs for each node type. Otherwise, ``None`` is returned.
    dict of tensors
        If `return_orig_eids=True`, return a dict of 1D tensors whose key is the edge type
        and value is a 1D tensor mapping between shuffled edge IDs and the original edge
        IDs for each edge type. Otherwise, ``None`` is returned.
272
    """
273
    # create auxiliary data structures from the schema object
274
    memory_snapshot("CreateDGLObj_Begin", part_id)
275
    _, global_nid_ranges = get_idranges(
276
        schema[constants.STR_NODE_TYPE], node_typecounts
277
278
    )
    _, global_eid_ranges = get_idranges(
279
        schema[constants.STR_EDGE_TYPE], edge_typecounts
280
    )
281

282
283
284
285
286
287
288
289
290
291
    id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)

    ntypes = [(key, global_nid_ranges[key][0, 0]) for key in global_nid_ranges]
    ntypes.sort(key=lambda e: e[1])
    ntype_offset_np = np.array([e[1] for e in ntypes])
    ntypes = [e[0] for e in ntypes]
    ntypes_map = {e: i for i, e in enumerate(ntypes)}
    etypes = [(key, global_eid_ranges[key][0, 0]) for key in global_eid_ranges]
    etypes.sort(key=lambda e: e[1])
    etypes = [e[0] for e in etypes]
292
    etypes_map = {_etype_str_to_tuple(e): i for i, e in enumerate(etypes)}
293
294

    node_map_val = {ntype: [] for ntype in ntypes}
295
    edge_map_val = {_etype_str_to_tuple(etype): [] for etype in etypes}
296

297
298
299
300
301
302
303
304
305
306
307
308
309
    memory_snapshot("CreateDGLObj_AssignNodeData", part_id)
    shuffle_global_nids = node_data[constants.SHUFFLE_GLOBAL_NID]
    node_data.pop(constants.SHUFFLE_GLOBAL_NID)
    gc.collect()

    ntype_ids = node_data[constants.NTYPE_ID]
    node_data.pop(constants.NTYPE_ID)
    gc.collect()

    global_type_nid = node_data[constants.GLOBAL_TYPE_NID]
    node_data.pop(constants.GLOBAL_TYPE_NID)
    node_data = None
    gc.collect()
310
311
312
313
314
315
316
317
318
319

    global_homo_nid = ntype_offset_np[ntype_ids] + global_type_nid
    assert np.all(shuffle_global_nids[1:] - shuffle_global_nids[:-1] == 1)
    shuffle_global_nid_range = (shuffle_global_nids[0], shuffle_global_nids[-1])

    # Determine the node ID ranges of different node types.
    for ntype_name in global_nid_ranges:
        ntype_id = ntypes_map[ntype_name]
        type_nids = shuffle_global_nids[ntype_ids == ntype_id]
        node_map_val[ntype_name].append(
320
321
            [int(type_nids[0]), int(type_nids[-1]) + 1]
        )
322

323
    # process edges
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
    memory_snapshot("CreateDGLObj_AssignEdgeData: ", part_id)
    shuffle_global_src_id = edge_data[constants.SHUFFLE_GLOBAL_SRC_ID]
    edge_data.pop(constants.SHUFFLE_GLOBAL_SRC_ID)
    gc.collect()

    shuffle_global_dst_id = edge_data[constants.SHUFFLE_GLOBAL_DST_ID]
    edge_data.pop(constants.SHUFFLE_GLOBAL_DST_ID)
    gc.collect()

    global_src_id = edge_data[constants.GLOBAL_SRC_ID]
    edge_data.pop(constants.GLOBAL_SRC_ID)
    gc.collect()

    global_dst_id = edge_data[constants.GLOBAL_DST_ID]
    edge_data.pop(constants.GLOBAL_DST_ID)
    gc.collect()

    global_edge_id = edge_data[constants.GLOBAL_TYPE_EID]
    edge_data.pop(constants.GLOBAL_TYPE_EID)
    gc.collect()

    etype_ids = edge_data[constants.ETYPE_ID]
    edge_data.pop(constants.ETYPE_ID)
    edge_data = None
348
349
350
351
    gc.collect()
    logging.info(
        f"There are {len(shuffle_global_src_id)} edges in partition {part_id}"
    )
352
353
354

    # It's not guaranteed that the edges are sorted based on edge type.
    # Let's sort edges and all attributes on the edges.
355
356
    if not np.all(np.diff(etype_ids) >= 0):
        sort_idx = np.argsort(etype_ids)
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
        (
            shuffle_global_src_id,
            shuffle_global_dst_id,
            global_src_id,
            global_dst_id,
            global_edge_id,
            etype_ids,
        ) = (
            shuffle_global_src_id[sort_idx],
            shuffle_global_dst_id[sort_idx],
            global_src_id[sort_idx],
            global_dst_id[sort_idx],
            global_edge_id[sort_idx],
            etype_ids[sort_idx],
        )
372
373
        assert np.all(np.diff(etype_ids) >= 0)
    else:
374
        print(f"[Rank: {part_id} Edge data is already sorted !!!")
375
376

    # Determine the edge ID range of different edge types.
377
    edge_id_start = edgeid_offset
378
    for etype_name in global_eid_ranges:
379
380
381
        etype = _etype_str_to_tuple(etype_name)
        assert len(etype) == 3
        etype_id = etypes_map[etype]
382
383
384
        edge_map_val[etype].append(
            [edge_id_start, edge_id_start + np.sum(etype_ids == etype_id)]
        )
385
        edge_id_start += np.sum(etype_ids == etype_id)
386
    memory_snapshot("CreateDGLObj_UniqueNodeIds: ", part_id)
387

388
    # get the edge list in some order and then reshuffle.
389
390
391
392
393
    # Here the order of nodes is defined by the sorted order.
    uniq_ids, idx, part_local_src_id, part_local_dst_id = _get_unique_invidx(
        shuffle_global_src_id,
        shuffle_global_dst_id,
        np.arange(shuffle_global_nid_range[0], shuffle_global_nid_range[1] + 1),
394
    )
395

396
397
    inner_nodes = th.as_tensor(
        np.logical_and(
398
            uniq_ids >= shuffle_global_nid_range[0],
399
400
401
            uniq_ids <= shuffle_global_nid_range[1],
        )
    )
402

403
404
    # get the list of indices, from inner_nodes, which will sort inner_nodes as [True, True, ...., False, False, ...]
    # essentially local nodes will be placed before non-local nodes.
405
    reshuffle_nodes = th.arange(len(uniq_ids))
406
407
408
    reshuffle_nodes = th.cat(
        [reshuffle_nodes[inner_nodes.bool()], reshuffle_nodes[inner_nodes == 0]]
    )
409

410
    """
411
412
413
414
415
416
417
    Following procedure is used to map the part_local_src_id, part_local_dst_id to account for
    reshuffling of nodes (to order localy owned nodes prior to non-local nodes in a partition)
    1. Form a node_map, in this case a numpy array, which will be used to map old node-ids (pre-reshuffling)
    to post-reshuffling ids.
    2. Once the map is created, use this map to map all the node-ids in the part_local_src_id 
    and part_local_dst_id list to their appropriate `new` node-ids (post-reshuffle order).
    3. Since only the node's order is changed, we will have to re-order nodes related information when
418
    creating dgl object: this includes dgl.NTYPE, dgl.NID and inner_node.
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
    4. Edge's order is not changed. At this point in the execution path edges are still ordered by their etype-ids.
    5. Create the dgl object appropriately and return the dgl object.
    
    Here is a  simple example to understand the above flow better.

    part_local_nids = [0, 1, 2, 3, 4, 5]
    part_local_src_ids = [0, 0, 0, 0, 2, 3, 4]
    part_local_dst_ids = [1, 2, 3, 4, 4, 4, 5]

    Assume that nodes {1, 5} are halo-nodes, which are not owned by this partition.

    reshuffle_nodes = [0, 2, 3, 4, 1, 5]

    A node_map, which maps node-ids from old to reshuffled order is as follows:
    node_map = np.zeros((len(reshuffle_nodes,)))
    node_map[reshuffle_nodes] = np.arange(len(reshuffle_nodes))

    Using the above map, we have mapped part_local_src_ids and part_local_dst_ids as follows:
    part_local_src_ids = [0, 0, 0, 0, 1, 2, 3]
    part_local_dst_ids = [4, 1, 2, 3, 3, 3, 5]

    In this graph above, note that nodes {0, 1, 2, 3} are inner_nodes and {4, 5} are NON-inner-nodes

    Since the edge are re-ordered in any way, there is no reordering required for edge related data
    during the DGL object creation.
444
445
446
447
448
449
450
451
452
453
454
    """
    # create the mappings to generate mapped part_local_src_id and part_local_dst_id
    # This map will map from unshuffled node-ids to reshuffled-node-ids (which are ordered to prioritize
    # locally owned nodes).
    nid_map = np.zeros(
        (
            len(
                reshuffle_nodes,
            )
        )
    )
455
456
    nid_map[reshuffle_nodes] = np.arange(len(reshuffle_nodes))

457
458
459
460
461
    # Now map the edge end points to reshuffled_values.
    part_local_src_id, part_local_dst_id = (
        nid_map[part_local_src_id],
        nid_map[part_local_dst_id],
    )
462

463
464
465
466
    # create the graph here now.
    part_graph = dgl.graph(
        data=(part_local_src_id, part_local_dst_id), num_nodes=len(uniq_ids)
    )
467
    part_graph.edata[dgl.EID] = th.arange(
468
469
470
471
472
473
474
475
476
477
478
479
480
        edgeid_offset,
        edgeid_offset + part_graph.number_of_edges(),
        dtype=th.int64,
    )
    part_graph.edata[dgl.ETYPE] = th.as_tensor(
        etype_ids, dtype=RESERVED_FIELD_DTYPE[dgl.ETYPE]
    )
    part_graph.edata["inner_edge"] = th.ones(
        part_graph.number_of_edges(), dtype=RESERVED_FIELD_DTYPE["inner_edge"]
    )

    # compute per_type_ids and ntype for all the nodes in the graph.
    global_ids = np.concatenate([global_src_id, global_dst_id, global_homo_nid])
481
482
483
484
    part_global_ids = global_ids[idx]
    part_global_ids = part_global_ids[reshuffle_nodes]
    ntype, per_type_ids = id_map(part_global_ids)

485
486
487
488
    # continue with the graph creation
    part_graph.ndata[dgl.NTYPE] = th.as_tensor(
        ntype, dtype=RESERVED_FIELD_DTYPE[dgl.NTYPE]
    )
489
    part_graph.ndata[dgl.NID] = th.as_tensor(uniq_ids[reshuffle_nodes])
490
491
492
    part_graph.ndata["inner_node"] = th.as_tensor(
        inner_nodes[reshuffle_nodes], dtype=RESERVED_FIELD_DTYPE["inner_node"]
    )
493

494
495
496
497
498
    orig_nids = None
    orig_eids = None
    if return_orig_nids:
        orig_nids = {}
        for ntype, ntype_id in ntypes_map.items():
499
500
501
502
            mask = th.logical_and(
                part_graph.ndata[dgl.NTYPE] == ntype_id,
                part_graph.ndata["inner_node"],
            )
503
504
505
506
            orig_nids[ntype] = th.as_tensor(per_type_ids[mask])
    if return_orig_eids:
        orig_eids = {}
        for etype, etype_id in etypes_map.items():
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
            mask = th.logical_and(
                part_graph.edata[dgl.ETYPE] == etype_id,
                part_graph.edata["inner_edge"],
            )
            orig_eids[_etype_tuple_to_str(etype)] = th.as_tensor(
                global_edge_id[mask]
            )

    return (
        part_graph,
        node_map_val,
        edge_map_val,
        ntypes_map,
        etypes_map,
        orig_nids,
        orig_eids,
    )


def create_metadata_json(
    graph_name,
    num_nodes,
    num_edges,
    part_id,
    num_parts,
    node_map_val,
    edge_map_val,
    ntypes_map,
    etypes_map,
    output_dir,
):
538
539
540
541
542
543
544
545
546
547
548
    """
    Auxiliary function to create json file for the graph partition metadata

    Parameters:
    -----------
    graph_name : string
        name of the graph
    num_nodes : int
        no. of nodes in the graph partition
    num_edges : int
        no. of edges in the graph partition
549
550
    part_id : int
       integer indicating the partition id
551
552
553
554
555
556
557
558
559
560
561
    num_parts : int
        total no. of partitions of the original graph
    node_map_val : dictionary
        map between node types and the range of global node ids used
    edge_map_val : dictionary
        map between edge types and the range of global edge ids used
    ntypes_map : dictionary
        map between node type(string)  and node_type_id(int)
    etypes_map : dictionary
        map between edge type(string)  and edge_type_id(int)
    output_dir : string
562
        directory where the output files are to be stored
563
564
565
566
567
568
569

    Returns:
    --------
    dictionary
        map describing the graph information

    """
570
571
572
573
574
575
576
577
578
579
580
581
582
583
    part_metadata = {
        "graph_name": graph_name,
        "num_nodes": num_nodes,
        "num_edges": num_edges,
        "part_method": "metis",
        "num_parts": num_parts,
        "halo_hops": 1,
        "node_map": node_map_val,
        "edge_map": edge_map_val,
        "ntypes": ntypes_map,
        "etypes": etypes_map,
    }

    part_dir = "part" + str(part_id)
584
585
586
    node_feat_file = os.path.join(part_dir, "node_feat.dgl")
    edge_feat_file = os.path.join(part_dir, "edge_feat.dgl")
    part_graph_file = os.path.join(part_dir, "graph.dgl")
587
588
589
590
591
    part_metadata["part-{}".format(part_id)] = {
        "node_feats": node_feat_file,
        "edge_feats": edge_feat_file,
        "part_graph": part_graph_file,
    }
592
    return part_metadata