convert_partition.py 16 KB
Newer Older
1
2
import argparse
import gc
3
import json
4
5
import logging
import os
6
import time
7

8
9
import constants

10
import dgl
11
import numpy as np
12
import pandas as pd
13
14
import pyarrow
import torch as th
15
16
17
from dgl.distributed.partition import (
    _etype_str_to_tuple,
    _etype_tuple_to_str,
18
    RESERVED_FIELD_DTYPE,
19
)
20
21
from pyarrow import csv
from utils import get_idranges, memory_snapshot, read_json
22

23

24
25
26
27
28
29
30
31
32
def create_dgl_object(
    schema,
    part_id,
    node_data,
    edge_data,
    edgeid_offset,
    return_orig_nids=False,
    return_orig_eids=False,
):
33
34
    """
    This function creates dgl objects for a given graph partition, as in function
35
    arguments.
36

37
38
    The "schema" argument is a dictionary, which contains the metadata related to node ids
    and edge ids. It contains two keys: "nid" and "eid", whose value is also a dictionary
39
    with the following structure.
40
41

    1. The key-value pairs in the "nid" dictionary has the following format.
42
       "ntype-name" is the user assigned name to this node type. "format" describes the
43
44
       format of the contents of the files. and "data" is a list of lists, each list has
       3 elements: file-name, start_id and end_id. File-name can be either absolute or
45
       relative path to this file and starting and ending ids are type ids of the nodes
46
       which are contained in this file. These type ids are later used to compute global ids
47
       of these nodes which are used throughout the processing of this pipeline.
48
        "ntype-name" : {
49
            "format" : "csv",
50
            "data" : [
51
                    [ <path-to-file>/ntype0-name-0.csv, start_id0, end_id0],
52
53
54
55
56
57
58
59
                    [ <path-to-file>/ntype0-name-1.csv, start_id1, end_id1],
                    ...
                    [ <path-to-file>/ntype0-name-<p-1>.csv, start_id<p-1>, end_id<p-1>],
            ]
        }

    2. The key-value pairs in the "eid" dictionary has the following format.
       As described for the "nid" dictionary the "eid" dictionary is similarly structured
60
       except that these entries are for edges.
61
        "etype-name" : {
62
            "format" : "csv",
63
            "data" : [
64
                    [ <path-to-file>/etype0-name-0, start_id0, end_id0],
65
66
67
68
69
70
71
                    [ <path-to-file>/etype0-name-1 start_id1, end_id1],
                    ...
                    [ <path-to-file>/etype0-name-1 start_id<p-1>, end_id<p-1>]
            ]
        }

    In "nid" dictionary, the type_nids are specified that
72
73
    should be assigned to nodes which are read from the corresponding nodes file.
    Along the same lines dictionary for the key "eid" is used for edges in the
74
75
76
77
78
    input graph.

    These type ids, for nodes and edges, are used to compute global ids for nodes
    and edges which are stored in the graph object.

79
80
81
82
83
84
85
86
87
88
    Parameters:
    -----------
    schame : json object
        json object created by reading the graph metadata json file
    part_id : int
        partition id of the graph partition for which dgl object is to be created
    node_data : numpy ndarray
        node_data, where each row is of the following format:
        <global_nid> <ntype_id> <global_type_nid>
    edge_data : numpy ndarray
89
        edge_data, where each row is of the following format:
90
91
92
        <global_src_id> <global_dst_id> <etype_id> <global_type_eid>
    edgeid_offset : int
        offset to be used when assigning edge global ids in the current partition
93
94
    return_orig_ids : bool, optional
        Indicates whether to return original node/edge IDs.
95

96
    Returns:
97
98
99
100
101
102
103
104
105
106
107
    --------
    dgl object
        dgl object created for the current graph partition
    dictionary
        map between node types and the range of global node ids used
    dictionary
        map between edge types and the range of global edge ids used
    dictionary
        map between node type(string)  and node_type_id(int)
    dictionary
        map between edge type(string)  and edge_type_id(int)
108
109
110
111
112
113
114
115
    dict of tensors
        If `return_orig_nids=True`, return a dict of 1D tensors whose key is the node type
        and value is a 1D tensor mapping between shuffled node IDs and the original node
        IDs for each node type. Otherwise, ``None`` is returned.
    dict of tensors
        If `return_orig_eids=True`, return a dict of 1D tensors whose key is the edge type
        and value is a 1D tensor mapping between shuffled edge IDs and the original edge
        IDs for each edge type. Otherwise, ``None`` is returned.
116
    """
117
    # create auxiliary data structures from the schema object
118
    memory_snapshot("CreateDGLObj_Begin", part_id)
119
120
121
122
123
124
125
126
    _, global_nid_ranges = get_idranges(
        schema[constants.STR_NODE_TYPE],
        schema[constants.STR_NUM_NODES_PER_CHUNK],
    )
    _, global_eid_ranges = get_idranges(
        schema[constants.STR_EDGE_TYPE],
        schema[constants.STR_NUM_EDGES_PER_CHUNK],
    )
127

128
129
130
131
132
133
134
135
136
137
    id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)

    ntypes = [(key, global_nid_ranges[key][0, 0]) for key in global_nid_ranges]
    ntypes.sort(key=lambda e: e[1])
    ntype_offset_np = np.array([e[1] for e in ntypes])
    ntypes = [e[0] for e in ntypes]
    ntypes_map = {e: i for i, e in enumerate(ntypes)}
    etypes = [(key, global_eid_ranges[key][0, 0]) for key in global_eid_ranges]
    etypes.sort(key=lambda e: e[1])
    etypes = [e[0] for e in etypes]
138
    etypes_map = {_etype_str_to_tuple(e): i for i, e in enumerate(etypes)}
139
140

    node_map_val = {ntype: [] for ntype in ntypes}
141
    edge_map_val = {_etype_str_to_tuple(etype): [] for etype in etypes}
142

143
144
145
146
147
148
149
150
151
152
153
154
155
    memory_snapshot("CreateDGLObj_AssignNodeData", part_id)
    shuffle_global_nids = node_data[constants.SHUFFLE_GLOBAL_NID]
    node_data.pop(constants.SHUFFLE_GLOBAL_NID)
    gc.collect()

    ntype_ids = node_data[constants.NTYPE_ID]
    node_data.pop(constants.NTYPE_ID)
    gc.collect()

    global_type_nid = node_data[constants.GLOBAL_TYPE_NID]
    node_data.pop(constants.GLOBAL_TYPE_NID)
    node_data = None
    gc.collect()
156
157
158
159
160
161
162
163
164
165

    global_homo_nid = ntype_offset_np[ntype_ids] + global_type_nid
    assert np.all(shuffle_global_nids[1:] - shuffle_global_nids[:-1] == 1)
    shuffle_global_nid_range = (shuffle_global_nids[0], shuffle_global_nids[-1])

    # Determine the node ID ranges of different node types.
    for ntype_name in global_nid_ranges:
        ntype_id = ntypes_map[ntype_name]
        type_nids = shuffle_global_nids[ntype_ids == ntype_id]
        node_map_val[ntype_name].append(
166
167
            [int(type_nids[0]), int(type_nids[-1]) + 1]
        )
168

169
    # process edges
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
    memory_snapshot("CreateDGLObj_AssignEdgeData: ", part_id)
    shuffle_global_src_id = edge_data[constants.SHUFFLE_GLOBAL_SRC_ID]
    edge_data.pop(constants.SHUFFLE_GLOBAL_SRC_ID)
    gc.collect()

    shuffle_global_dst_id = edge_data[constants.SHUFFLE_GLOBAL_DST_ID]
    edge_data.pop(constants.SHUFFLE_GLOBAL_DST_ID)
    gc.collect()

    global_src_id = edge_data[constants.GLOBAL_SRC_ID]
    edge_data.pop(constants.GLOBAL_SRC_ID)
    gc.collect()

    global_dst_id = edge_data[constants.GLOBAL_DST_ID]
    edge_data.pop(constants.GLOBAL_DST_ID)
    gc.collect()

    global_edge_id = edge_data[constants.GLOBAL_TYPE_EID]
    edge_data.pop(constants.GLOBAL_TYPE_EID)
    gc.collect()

    etype_ids = edge_data[constants.ETYPE_ID]
    edge_data.pop(constants.ETYPE_ID)
    edge_data = None
194
195
196
197
    gc.collect()
    logging.info(
        f"There are {len(shuffle_global_src_id)} edges in partition {part_id}"
    )
198
199
200

    # It's not guaranteed that the edges are sorted based on edge type.
    # Let's sort edges and all attributes on the edges.
201
202
    if not np.all(np.diff(etype_ids) >= 0):
        sort_idx = np.argsort(etype_ids)
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
        (
            shuffle_global_src_id,
            shuffle_global_dst_id,
            global_src_id,
            global_dst_id,
            global_edge_id,
            etype_ids,
        ) = (
            shuffle_global_src_id[sort_idx],
            shuffle_global_dst_id[sort_idx],
            global_src_id[sort_idx],
            global_dst_id[sort_idx],
            global_edge_id[sort_idx],
            etype_ids[sort_idx],
        )
218
219
        assert np.all(np.diff(etype_ids) >= 0)
    else:
220
        print(f"[Rank: {part_id} Edge data is already sorted !!!")
221
222

    # Determine the edge ID range of different edge types.
223
    edge_id_start = edgeid_offset
224
    for etype_name in global_eid_ranges:
225
226
227
        etype = _etype_str_to_tuple(etype_name)
        assert len(etype) == 3
        etype_id = etypes_map[etype]
228
229
230
        edge_map_val[etype].append(
            [edge_id_start, edge_id_start + np.sum(etype_ids == etype_id)]
        )
231
        edge_id_start += np.sum(etype_ids == etype_id)
232
    memory_snapshot("CreateDGLObj_UniqueNodeIds: ", part_id)
233

234
235
236
    # get the edge list in some order and then reshuffle.
    # Here the order of nodes is defined by the `np.unique` function
    # node order is as listed in the uniq_ids array
237
    ids = np.concatenate(
238
239
240
241
242
243
244
245
        [
            shuffle_global_src_id,
            shuffle_global_dst_id,
            np.arange(
                shuffle_global_nid_range[0], shuffle_global_nid_range[1] + 1
            ),
        ]
    )
246
    uniq_ids, idx, inverse_idx = np.unique(
247
248
        ids, return_index=True, return_inverse=True
    )
249
    assert len(uniq_ids) == len(idx)
250

251
    # We get the edge list with their node IDs mapped to a contiguous ID range.
252
253
254
255
256
    part_local_src_id, part_local_dst_id = np.split(
        inverse_idx[: len(shuffle_global_src_id) * 2], 2
    )
    inner_nodes = th.as_tensor(
        np.logical_and(
257
            uniq_ids >= shuffle_global_nid_range[0],
258
259
260
            uniq_ids <= shuffle_global_nid_range[1],
        )
    )
261

262
263
    # get the list of indices, from inner_nodes, which will sort inner_nodes as [True, True, ...., False, False, ...]
    # essentially local nodes will be placed before non-local nodes.
264
    reshuffle_nodes = th.arange(len(uniq_ids))
265
266
267
    reshuffle_nodes = th.cat(
        [reshuffle_nodes[inner_nodes.bool()], reshuffle_nodes[inner_nodes == 0]]
    )
268

269
    """
270
271
272
273
274
275
276
    Following procedure is used to map the part_local_src_id, part_local_dst_id to account for
    reshuffling of nodes (to order localy owned nodes prior to non-local nodes in a partition)
    1. Form a node_map, in this case a numpy array, which will be used to map old node-ids (pre-reshuffling)
    to post-reshuffling ids.
    2. Once the map is created, use this map to map all the node-ids in the part_local_src_id 
    and part_local_dst_id list to their appropriate `new` node-ids (post-reshuffle order).
    3. Since only the node's order is changed, we will have to re-order nodes related information when
277
    creating dgl object: this includes dgl.NTYPE, dgl.NID and inner_node.
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
    4. Edge's order is not changed. At this point in the execution path edges are still ordered by their etype-ids.
    5. Create the dgl object appropriately and return the dgl object.
    
    Here is a  simple example to understand the above flow better.

    part_local_nids = [0, 1, 2, 3, 4, 5]
    part_local_src_ids = [0, 0, 0, 0, 2, 3, 4]
    part_local_dst_ids = [1, 2, 3, 4, 4, 4, 5]

    Assume that nodes {1, 5} are halo-nodes, which are not owned by this partition.

    reshuffle_nodes = [0, 2, 3, 4, 1, 5]

    A node_map, which maps node-ids from old to reshuffled order is as follows:
    node_map = np.zeros((len(reshuffle_nodes,)))
    node_map[reshuffle_nodes] = np.arange(len(reshuffle_nodes))

    Using the above map, we have mapped part_local_src_ids and part_local_dst_ids as follows:
    part_local_src_ids = [0, 0, 0, 0, 1, 2, 3]
    part_local_dst_ids = [4, 1, 2, 3, 3, 3, 5]

    In this graph above, note that nodes {0, 1, 2, 3} are inner_nodes and {4, 5} are NON-inner-nodes

    Since the edge are re-ordered in any way, there is no reordering required for edge related data
    during the DGL object creation.
303
304
305
306
307
308
309
310
311
312
313
    """
    # create the mappings to generate mapped part_local_src_id and part_local_dst_id
    # This map will map from unshuffled node-ids to reshuffled-node-ids (which are ordered to prioritize
    # locally owned nodes).
    nid_map = np.zeros(
        (
            len(
                reshuffle_nodes,
            )
        )
    )
314
315
    nid_map[reshuffle_nodes] = np.arange(len(reshuffle_nodes))

316
317
318
319
320
    # Now map the edge end points to reshuffled_values.
    part_local_src_id, part_local_dst_id = (
        nid_map[part_local_src_id],
        nid_map[part_local_dst_id],
    )
321

322
323
324
325
    # create the graph here now.
    part_graph = dgl.graph(
        data=(part_local_src_id, part_local_dst_id), num_nodes=len(uniq_ids)
    )
326
    part_graph.edata[dgl.EID] = th.arange(
327
328
329
330
331
332
333
334
335
336
337
338
339
        edgeid_offset,
        edgeid_offset + part_graph.number_of_edges(),
        dtype=th.int64,
    )
    part_graph.edata[dgl.ETYPE] = th.as_tensor(
        etype_ids, dtype=RESERVED_FIELD_DTYPE[dgl.ETYPE]
    )
    part_graph.edata["inner_edge"] = th.ones(
        part_graph.number_of_edges(), dtype=RESERVED_FIELD_DTYPE["inner_edge"]
    )

    # compute per_type_ids and ntype for all the nodes in the graph.
    global_ids = np.concatenate([global_src_id, global_dst_id, global_homo_nid])
340
341
342
343
    part_global_ids = global_ids[idx]
    part_global_ids = part_global_ids[reshuffle_nodes]
    ntype, per_type_ids = id_map(part_global_ids)

344
345
346
347
    # continue with the graph creation
    part_graph.ndata[dgl.NTYPE] = th.as_tensor(
        ntype, dtype=RESERVED_FIELD_DTYPE[dgl.NTYPE]
    )
348
    part_graph.ndata[dgl.NID] = th.as_tensor(uniq_ids[reshuffle_nodes])
349
350
351
    part_graph.ndata["inner_node"] = th.as_tensor(
        inner_nodes[reshuffle_nodes], dtype=RESERVED_FIELD_DTYPE["inner_node"]
    )
352

353
354
355
356
357
    orig_nids = None
    orig_eids = None
    if return_orig_nids:
        orig_nids = {}
        for ntype, ntype_id in ntypes_map.items():
358
359
360
361
            mask = th.logical_and(
                part_graph.ndata[dgl.NTYPE] == ntype_id,
                part_graph.ndata["inner_node"],
            )
362
363
364
365
            orig_nids[ntype] = th.as_tensor(per_type_ids[mask])
    if return_orig_eids:
        orig_eids = {}
        for etype, etype_id in etypes_map.items():
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
            mask = th.logical_and(
                part_graph.edata[dgl.ETYPE] == etype_id,
                part_graph.edata["inner_edge"],
            )
            orig_eids[_etype_tuple_to_str(etype)] = th.as_tensor(
                global_edge_id[mask]
            )

    return (
        part_graph,
        node_map_val,
        edge_map_val,
        ntypes_map,
        etypes_map,
        orig_nids,
        orig_eids,
    )


def create_metadata_json(
    graph_name,
    num_nodes,
    num_edges,
    part_id,
    num_parts,
    node_map_val,
    edge_map_val,
    ntypes_map,
    etypes_map,
    output_dir,
):
397
398
399
400
401
402
403
404
405
406
407
    """
    Auxiliary function to create json file for the graph partition metadata

    Parameters:
    -----------
    graph_name : string
        name of the graph
    num_nodes : int
        no. of nodes in the graph partition
    num_edges : int
        no. of edges in the graph partition
408
409
    part_id : int
       integer indicating the partition id
410
411
412
413
414
415
416
417
418
419
420
    num_parts : int
        total no. of partitions of the original graph
    node_map_val : dictionary
        map between node types and the range of global node ids used
    edge_map_val : dictionary
        map between edge types and the range of global edge ids used
    ntypes_map : dictionary
        map between node type(string)  and node_type_id(int)
    etypes_map : dictionary
        map between edge type(string)  and edge_type_id(int)
    output_dir : string
421
        directory where the output files are to be stored
422
423
424
425
426
427
428

    Returns:
    --------
    dictionary
        map describing the graph information

    """
429
430
431
432
433
434
435
436
437
438
439
440
441
442
    part_metadata = {
        "graph_name": graph_name,
        "num_nodes": num_nodes,
        "num_edges": num_edges,
        "part_method": "metis",
        "num_parts": num_parts,
        "halo_hops": 1,
        "node_map": node_map_val,
        "edge_map": edge_map_val,
        "ntypes": ntypes_map,
        "etypes": etypes_map,
    }

    part_dir = "part" + str(part_id)
443
444
445
    node_feat_file = os.path.join(part_dir, "node_feat.dgl")
    edge_feat_file = os.path.join(part_dir, "edge_feat.dgl")
    part_graph_file = os.path.join(part_dir, "graph.dgl")
446
447
448
449
450
    part_metadata["part-{}".format(part_id)] = {
        "node_feats": node_feat_file,
        "edge_feats": edge_feat_file,
        "part_graph": part_graph_file,
    }
451
    return part_metadata