data_shuffle.py 56.8 KB
Newer Older
1
2
3
import gc
import logging
import math
4
5
import os
import sys
6
7
8
from datetime import timedelta
from timeit import default_timer as timer

9
10
import constants

11
import dgl
12
13
14
15
import numpy as np
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
16
from convert_partition import create_dgl_object, create_metadata_json
17
from dataset_utils import get_dataset
18
from dist_lookup import DistLookupService
19
20
21
22
23
from globalids import (
    assign_shuffle_global_nids_edges,
    assign_shuffle_global_nids_nodes,
    lookup_shuffle_global_nids_edges,
)
24
from gloo_wrapper import allgather_sizes, alltoallv_cpu, gather_metadata_json
25
26
from utils import (
    augment_edge_data,
27
    DATA_TYPE_ID,
28
29
    get_edge_types,
    get_etype_featnames,
30
    get_gid_offsets,
31
32
33
    get_gnid_range_map,
    get_idranges,
    get_node_types,
34
    get_ntype_counts_map,
35
36
37
38
39
    get_ntype_featnames,
    map_partid_rank,
    memory_snapshot,
    read_json,
    read_ntype_partition_files,
40
    REV_DATA_TYPE_ID,
41
42
43
44
45
46
47
48
49
    write_dgl_objects,
    write_metadata_json,
)


def gen_node_data(
    rank, world_size, num_parts, id_lookup, ntid_ntype_map, schema_map
):
    """
50
51
    For this data processing pipeline, reading node files is not needed. All the needed information about
    the nodes can be found in the metadata json file. This function generates the nodes owned by a given
52
    process, using metis partitions.
53

54
    Parameters:
55
56
    -----------
    rank : int
57
        rank of the process
58
    world_size : int
59
        total no. of processes
60
61
    num_parts : int
        total no. of partitions
62
    id_lookup : instance of class DistLookupService
63
       Distributed lookup service used to map global-nids to respective partition-ids and
64
       shuffle-global-nids
65
    ntid_ntype_map :
66
        a dictionary where keys are node_type ids(integers) and values are node_type names(strings).
67
    schema_map:
68
        dictionary formed by reading the input metadata json file for the input dataset.
69
70
71

        Please note that, it is assumed that for the input graph files, the nodes of a particular node-type are
        split into `p` files (because of `p` partitions to be generated). On a similar node, edges of a particular
72
        edge-type are split into `p` files as well.
73

74
75
        #assuming m nodetypes present in the input graph
        "num_nodes_per_chunk" : [
76
77
            [a0, a1, a2, ... a<p-1>],
            [b0, b1, b2, ... b<p-1>],
78
79
80
            ...
            [m0, m1, m2, ... m<p-1>]
        ]
81
        Here, each sub-list, corresponding a nodetype in the input graph, has `p` elements. For instance [a0, a1, ... a<p-1>]
82
83
84
        where each element represents the number of nodes which are to be processed by a process during distributed partitioning.

        In addition to the above key-value pair for the nodes in the graph, the node-features are captured in the
85
        "node_data" key-value pair. In this dictionary the keys will be nodetype names and value will be a dictionary which
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
        is used to capture all the features present for that particular node-type. This is shown in the following example:

        "node_data" : {
            "paper": {       # node type
                "feat": {   # feature key
                    "format": {"name": "numpy"},
                    "data": ["node_data/paper-feat-part1.npy", "node_data/paper-feat-part2.npy"]
                },
                "label": {   # feature key
                    "format": {"name": "numpy"},
                    "data": ["node_data/paper-label-part1.npy", "node_data/paper-label-part2.npy"]
                },
                "year": {   # feature key
                    "format": {"name": "numpy"},
                    "data": ["node_data/paper-year-part1.npy", "node_data/paper-year-part2.npy"]
                }
            }
        }
104
105
106
        In the above textual description we have a node-type, which is paper, and it has 3 features namely feat, label and year.
        Each feature has `p` files whose location in the filesystem is the list for the key "data" and "foramt" is used to
        describe storage format.
107
108
109

    Returns:
    --------
110
111
    dictionary :
        dictionary where keys are column names and values are numpy arrays, these arrays are generated by
112
113
        using information present in the metadata json file

114
    """
115
    local_node_data = {}
116
117
118
119
120
121
    for local_part_id in range(num_parts // world_size):
        local_node_data[constants.GLOBAL_NID + "/" + str(local_part_id)] = []
        local_node_data[constants.NTYPE_ID + "/" + str(local_part_id)] = []
        local_node_data[
            constants.GLOBAL_TYPE_NID + "/" + str(local_part_id)
        ] = []
122
123

    # Note that `get_idranges` always returns two dictionaries. Keys in these
124
    # dictionaries are type names for nodes and edges and values are
125
    # `num_parts` number of tuples indicating the range of type-ids in first
126
127
128
    # dictionary and range of global-nids in the second dictionary.
    type_nid_dict, global_nid_dict = get_idranges(
        schema_map[constants.STR_NODE_TYPE],
129
130
131
132
        get_ntype_counts_map(
            schema_map[constants.STR_NODE_TYPE],
            schema_map[constants.STR_NUM_NODES_PER_TYPE],
        ),
133
134
        num_chunks=num_parts,
    )
135

136
    for ntype_id, ntype_name in ntid_ntype_map.items():
137
138
139
140
        # No. of nodes in each process can differ significantly in lopsided distributions
        # Synchronize on a per ntype basis
        dist.barrier()

141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
        type_start, type_end = (
            type_nid_dict[ntype_name][0][0],
            type_nid_dict[ntype_name][-1][1],
        )
        gnid_start, gnid_end = (
            global_nid_dict[ntype_name][0, 0],
            global_nid_dict[ntype_name][0, 1],
        )

        node_partid_slice = id_lookup.get_partition_ids(
            np.arange(gnid_start, gnid_end, dtype=np.int64)
        )  # exclusive

        for local_part_id in range(num_parts // world_size):
            cond = node_partid_slice == (rank + local_part_id * world_size)
156
157
158
159
160
            own_gnids = np.arange(gnid_start, gnid_end, dtype=np.int64)
            own_gnids = own_gnids[cond]

            own_tnids = np.arange(type_start, type_end, dtype=np.int64)
            own_tnids = own_tnids[cond]
161

162
163
164
165
166
167
168
169
170
            local_node_data[
                constants.NTYPE_ID + "/" + str(local_part_id)
            ].append(np.ones(own_gnids.shape, dtype=np.int64) * ntype_id)
            local_node_data[
                constants.GLOBAL_NID + "/" + str(local_part_id)
            ].append(own_gnids)
            local_node_data[
                constants.GLOBAL_TYPE_NID + "/" + str(local_part_id)
            ].append(own_tnids)
171
172
173
174
175

    for k in local_node_data.keys():
        local_node_data[k] = np.concatenate(local_node_data[k])

    return local_node_data
176

177

178
def exchange_edge_data(rank, world_size, num_parts, edge_data, id_lookup):
179
180
181
182
183
184
185
186
187
188
189
190
191
192
    """
    Exchange edge_data among processes in the world.
    Prepare list of sliced data targeting each process and trigger
    alltoallv_cpu to trigger messaging api

    Parameters:
    -----------
    rank : int
        rank of the process
    world_size : int
        total no. of processes
    edge_data : dictionary
        edge information, as a dicitonary which stores column names as keys and values
        as column data. This information is read from the edges.txt file.
193
194
    id_lookup : DistLookupService instance
        this object will be used to retrieve ownership information of nodes
195
196
197

    Returns:
    --------
198
    dictionary :
199
200
201
202
        the input argument, edge_data, is updated with the edge data received by other processes
        in the world.
    """

203
204
205
    # Synchronize at the beginning of this function
    dist.barrier()

206
    # Prepare data for each rank in the cluster.
207
    timer_start = timer()
208

209
210
211
212
213
214
215
216
217
218
219
220
221
    CHUNK_SIZE = 100 * 1000 * 1000  # 100 * 8 * 5 = 1 * 4 = 8 GB/message/node
    num_edges = edge_data[constants.GLOBAL_SRC_ID].shape[0]
    all_counts = allgather_sizes(
        [num_edges], world_size, num_parts, return_sizes=True
    )
    max_edges = np.amax(all_counts)
    all_edges = np.sum(all_counts)
    num_chunks = (max_edges // CHUNK_SIZE) + (
        0 if (max_edges % CHUNK_SIZE == 0) else 1
    )
    LOCAL_CHUNK_SIZE = (num_edges // num_chunks) + (
        0 if (num_edges % num_chunks == 0) else 1
    )
222
    logging.debug(
223
224
225
226
227
228
229
230
231
232
233
234
235
        f"[Rank: {rank} Edge Data Shuffle - max_edges: {max_edges}, \
                        local_edges: {num_edges} and num_chunks: {num_chunks} \
                        Total edges: {all_edges} Local_CHUNK_SIZE: {LOCAL_CHUNK_SIZE}"
    )

    for local_part_id in range(num_parts // world_size):
        local_src_ids = []
        local_dst_ids = []
        local_type_eids = []
        local_etype_ids = []
        local_eids = []

        for chunk in range(num_chunks):
236
237
            chunk_start = chunk * LOCAL_CHUNK_SIZE
            chunk_end = (chunk + 1) * LOCAL_CHUNK_SIZE
238

239
            logging.debug(
240
241
                f"[Rank: {rank}] EdgeData Shuffle: processing \
                    local_part_id: {local_part_id} and chunkid: {chunk}"
242
            )
243
244
245
246
247
248
249
250
251
252
253
            cur_src_id = edge_data[constants.GLOBAL_SRC_ID][
                chunk_start:chunk_end
            ]
            cur_dst_id = edge_data[constants.GLOBAL_DST_ID][
                chunk_start:chunk_end
            ]
            cur_type_eid = edge_data[constants.GLOBAL_TYPE_EID][
                chunk_start:chunk_end
            ]
            cur_etype_id = edge_data[constants.ETYPE_ID][chunk_start:chunk_end]
            cur_eid = edge_data[constants.GLOBAL_EID][chunk_start:chunk_end]
254
255
256
257
258
259
260
261
262
263
264
265
266
267

            input_list = []
            owner_ids = id_lookup.get_partition_ids(cur_dst_id)
            for idx in range(world_size):
                send_idx = owner_ids == (idx + local_part_id * world_size)
                send_idx = send_idx.reshape(cur_src_id.shape[0])
                filt_data = np.column_stack(
                    (
                        cur_src_id[send_idx == 1],
                        cur_dst_id[send_idx == 1],
                        cur_type_eid[send_idx == 1],
                        cur_etype_id[send_idx == 1],
                        cur_eid[send_idx == 1],
                    )
268
                )
269
270
271
272
273
274
275
276
277
                if filt_data.shape[0] <= 0:
                    input_list.append(torch.empty((0, 5), dtype=torch.int64))
                else:
                    input_list.append(torch.from_numpy(filt_data))

            # Now send newly formed chunk to others.
            dist.barrier()
            output_list = alltoallv_cpu(
                rank, world_size, input_list, retain_nones=False
278
            )
279

280
281
282
283
284
285
286
287
            # Replace the values of the edge_data, with the received data from all the other processes.
            rcvd_edge_data = torch.cat(output_list).numpy()
            local_src_ids.append(rcvd_edge_data[:, 0])
            local_dst_ids.append(rcvd_edge_data[:, 1])
            local_type_eids.append(rcvd_edge_data[:, 2])
            local_etype_ids.append(rcvd_edge_data[:, 3])
            local_eids.append(rcvd_edge_data[:, 4])

288
289
        edge_data[
            constants.GLOBAL_SRC_ID + "/" + str(local_part_id)
290
        ] = np.concatenate(local_src_ids)
291
292
        edge_data[
            constants.GLOBAL_DST_ID + "/" + str(local_part_id)
293
        ] = np.concatenate(local_dst_ids)
294
295
        edge_data[
            constants.GLOBAL_TYPE_EID + "/" + str(local_part_id)
296
        ] = np.concatenate(local_type_eids)
297
298
        edge_data[
            constants.ETYPE_ID + "/" + str(local_part_id)
299
        ] = np.concatenate(local_etype_ids)
300
301
        edge_data[
            constants.GLOBAL_EID + "/" + str(local_part_id)
302
303
304
305
306
307
308
309
310
311
312
313
314
        ] = np.concatenate(local_eids)

    # Check if the data was exchanged correctly
    local_edge_count = 0
    for local_part_id in range(num_parts // world_size):
        local_edge_count += edge_data[
            constants.GLOBAL_SRC_ID + "/" + str(local_part_id)
        ].shape[0]
    shuffle_edge_counts = allgather_sizes(
        [local_edge_count], world_size, num_parts, return_sizes=True
    )
    shuffle_edge_total = np.sum(shuffle_edge_counts)
    assert shuffle_edge_total == all_edges
315

316
    timer_end = timer()
317
    logging.info(
318
        f"[Rank: {rank}] Time to send/rcv edge data: {timedelta(seconds=timer_end-timer_start)}"
319
    )
320

321
322
323
324
325
326
327
    # Clean up.
    edge_data.pop(constants.GLOBAL_SRC_ID)
    edge_data.pop(constants.GLOBAL_DST_ID)
    edge_data.pop(constants.GLOBAL_TYPE_EID)
    edge_data.pop(constants.ETYPE_ID)
    edge_data.pop(constants.GLOBAL_EID)

328
329
    return edge_data

330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348

def exchange_feature(
    rank,
    data,
    id_lookup,
    feat_type,
    feat_key,
    featdata_key,
    gid_start,
    gid_end,
    type_id_start,
    type_id_end,
    local_part_id,
    world_size,
    num_parts,
    cur_features,
    cur_global_ids,
):
    """This function is used to send/receive one feature for either nodes or
349
350
351
352
353
    edges of the input graph dataset.

    Parameters:
    -----------
    rank : int
354
        integer, unique id assigned to the current process
355
    data: dicitonary
356
357
        dictionry in which node or edge features are stored and this information
        is read from the appropriate node features file which belongs to the
358
359
360
361
362
        current process
    id_lookup : instance of DistLookupService
        instance of an implementation of dist. lookup service to retrieve values
        for keys
    feat_type : string
363
364
365
        this is used to distinguish which features are being exchanged. Please
        note that for nodes ownership is clearly defined and for edges it is
        always assumed that destination end point of the edge defines the
366
367
        ownership of that particular edge
    feat_key : string
368
        this string is used as a key in the dictionary to store features, as
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
        tensors, in local dictionaries
    featdata_key : numpy array
        features associated with this feature key being processed
    gid_start : int
        starting global_id, of either node or edge, for the feature data
    gid_end : int
        ending global_if, of either node or edge, for the feature data
    type_id_start : int
        starting type_id for the feature data
    type_id_end : int
        ending type_id for the feature data
    local_part_id : int
        integers used to the identify the local partition id used to locate
        data belonging to this partition
    world_size : int
        total number of processes created
    num_parts : int
        total number of partitions
    cur_features : dictionary
388
        dictionary to store the feature data which belongs to the current
389
390
        process
    cur_global_ids : dictionary
391
        dictionary to store global ids, of either nodes or edges, for which
392
        the features stored in the cur_features dictionary
393

394
395
396
    Returns:
    -------
    dictionary :
397
        a dictionary is returned where keys are type names and
398
399
        feature data are the values
    list :
400
        a dictionary of global_ids either nodes or edges whose features are
401
402
        received during the data shuffle process
    """
403
    # type_ids for this feature subset on the current rank
404
405
406
407
408
409
410
411
412
    gids_feat = np.arange(gid_start, gid_end)
    tids_feat = np.arange(type_id_start, type_id_end)
    local_idx = np.arange(0, type_id_end - type_id_start)

    feats_per_rank = []
    global_id_per_rank = []

    tokens = feat_key.split("/")
    assert len(tokens) == 3
413
    local_feat_key = "/".join(tokens[:-1]) + "/" + str(local_part_id)
414

415
    logging.debug(
416
417
418
        f"[Rank: {rank} feature: {feat_key}, gid_start - {gid_start} and gid_end - {gid_end}"
    )

419
420
421
422
423
424
425
426
427
428
429
430
    # Get the partition ids for the range of global nids.
    if feat_type == constants.STR_NODE_FEATURES:
        # Retrieve the partition ids for the node features.
        # Each partition id will be in the range [0, num_parts).
        partid_slice = id_lookup.get_partition_ids(
            np.arange(gid_start, gid_end, dtype=np.int64)
        )
    else:
        # Edge data case.
        # Ownership is determined by the destination node.
        assert data is not None
        global_eids = np.arange(gid_start, gid_end, dtype=np.int64)
431
        if data[constants.GLOBAL_EID].shape[0] > 0:
432
            logging.debug(
433
434
                f"[Rank: {rank} disk read global eids - min - {np.amin(data[constants.GLOBAL_EID])}, max - {np.amax(data[constants.GLOBAL_EID])}, count - {data[constants.GLOBAL_EID].shape}"
            )
435
436
437
438
439
440

        # Now use `data` to extract destination nodes' global id
        # and use that to get the ownership
        common, idx1, idx2 = np.intersect1d(
            data[constants.GLOBAL_EID], global_eids, return_indices=True
        )
441
442
443
444
445
446
        assert (
            common.shape[0] == idx2.shape[0]
        ), f"Rank {rank}: {common.shape[0]} != {idx2.shape[0]}"
        assert (
            common.shape[0] == global_eids.shape[0]
        ), f"Rank {rank}: {common.shape[0]} != {global_eids.shape[0]}"
447

448
449
450
        global_dst_nids = data[constants.GLOBAL_DST_ID][idx1]
        assert np.all(global_eids == data[constants.GLOBAL_EID][idx1])
        partid_slice = id_lookup.get_partition_ids(global_dst_nids)
451

452
453
454
455
456
457
458
459
460
461
462
    # determine the shape of the feature-data
    # this is needed to so that ranks where feature-data is not present
    # should use the correct shape for sending the padded vector.
    # exchange length here.
    feat_dim_len = 0
    if featdata_key is not None:
        feat_dim_len = len(featdata_key.shape)
    all_lens = allgather_sizes(
        [feat_dim_len], world_size, num_parts, return_sizes=True
    )
    if all_lens[0] <= 0:
463
        logging.debug(
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
            f"[Rank: {rank} No process has any feature data to shuffle for {local_feat_key}"
        )
        return cur_features, cur_global_ids

    rank0_shape_len = all_lens[0]
    for idx in range(1, world_size):
        assert (all_lens[idx] == 0) or (all_lens[idx] == rank0_shape_len), (
            f"feature: {local_feat_key} shapes does not match "
            f"at rank - {idx} and rank - 0"
        )

    # exchange actual data here.
    if featdata_key != None:
        feat_dims_dtype = list(featdata_key.shape)
        feat_dims_dtype.append(DATA_TYPE_ID[featdata_key.dtype])
    else:
        feat_dims_dtype = list(np.zeros((rank0_shape_len), dtype=np.int64))
        feat_dims_dtype.append(DATA_TYPE_ID[torch.float32])

483
    logging.debug(f"Sending the feature shape information - {feat_dims_dtype}")
484
485
486
487
    all_dims_dtype = allgather_sizes(
        feat_dims_dtype, world_size, num_parts, return_sizes=True
    )

488
    for idx in range(world_size):
489
        cond = partid_slice == (idx + local_part_id * world_size)
490
491
492
493
        gids_per_partid = gids_feat[cond]
        tids_per_partid = tids_feat[cond]
        local_idx_partid = local_idx[cond]

494
        if gids_per_partid.shape[0] == 0:
495
496
497
498
499
500
501
502
            assert len(all_dims_dtype) % world_size == 0
            dim_len = int(len(all_dims_dtype) / world_size)
            rank0_shape = tuple(list(np.zeros((dim_len - 1), dtype=np.int32)))
            rank0_dtype = REV_DATA_TYPE_ID[
                all_dims_dtype[(dim_len - 1) : (dim_len)][0]
            ]
            data = torch.empty(rank0_shape, dtype=rank0_dtype)
            feats_per_rank.append(data)
503
            global_id_per_rank.append(torch.empty((0,), dtype=torch.int64))
504
505
        else:
            feats_per_rank.append(featdata_key[local_idx_partid])
506
507
508
            global_id_per_rank.append(
                torch.from_numpy(gids_per_partid).type(torch.int64)
            )
509
    for idx, tt in enumerate(feats_per_rank):
510
        logging.debug(
511
512
            f"[Rank: {rank} features shape - {tt.shape} and ids - {global_id_per_rank[idx].shape}"
        )
513
514
515
516
517
518
519
520
521

    # features (and global nids) per rank to be sent out are ready
    # for transmission, perform alltoallv here.
    output_feat_list = alltoallv_cpu(
        rank, world_size, feats_per_rank, retain_nones=False
    )
    output_id_list = alltoallv_cpu(
        rank, world_size, global_id_per_rank, retain_nones=False
    )
522
    logging.debug(
523
524
        f"[Rank : {rank} feats - {output_feat_list}, ids - {output_id_list}"
    )
525
526
527
528
    assert len(output_feat_list) == len(output_id_list), (
        "Length of feature list and id list are expected to be equal while "
        f"got {len(output_feat_list)} and {len(output_id_list)}."
    )
529

530
    # stitch node_features together to form one large feature tensor
531
532
533
    if len(output_feat_list) > 0:
        output_feat_list = torch.cat(output_feat_list)
        output_id_list = torch.cat(output_id_list)
534
        if local_feat_key in cur_features:
535
536
537
538
539
540
541
            temp = cur_features[local_feat_key]
            cur_features[local_feat_key] = torch.cat([temp, output_feat_list])
            temp = cur_global_ids[local_feat_key]
            cur_global_ids[local_feat_key] = torch.cat([temp, output_id_list])
        else:
            cur_features[local_feat_key] = output_feat_list
            cur_global_ids[local_feat_key] = output_id_list
542
543
544
545

    return cur_features, cur_global_ids


546
547
548
549
550
551
552
553
554
555
556
def exchange_features(
    rank,
    world_size,
    num_parts,
    feature_tids,
    type_id_map,
    id_lookup,
    feature_data,
    feat_type,
    data,
):
557
558
    """
    This function is used to shuffle node features so that each process will receive
559
    all the node features whose corresponding nodes are owned by the same process.
560
561
    The mapping procedure to identify the owner process is not straight forward. The
    following steps are used to identify the owner processes for the locally read node-
562
    features.
563
564
    a. Compute the global_nids for the locally read node features. Here metadata json file
        is used to identify the corresponding global_nids. Please note that initial graph input
565
566
567
568
569
        nodes.txt files are sorted based on node_types.
    b. Using global_nids and metis partitions owner processes can be easily identified.
    c. Now each process sends the global_nids for which shuffle_global_nids are needed to be
        retrieved.
    d. After receiving the corresponding shuffle_global_nids these ids are added to the
570
        node_data and edge_data dictionaries
571

572
573
    This pipeline assumes all the input data in numpy format, except node/edge features which
    are maintained as tensors throughout the various stages of the pipeline execution.
574

575
    Parameters:
576
577
578
579
    -----------
    rank : int
        rank of the current process
    world_size : int
580
        total no. of participating processes.
581
    feature_tids : dictionary
582
        dictionary with keys as node-type names with suffixes as feature names
583
584
585
        and value is a dictionary. This dictionary contains information about
        node-features associated with a given node-type and value is a list.
        This list contains a of indexes, like [starting-idx, ending-idx) which
586
        can be used to index into the node feature tensors read from
587
588
        corresponding input files.
    type_id_map : dictionary
589
        mapping between type names and global_ids, of either nodes or edges,
590
        which belong to the keys in this dictionary
591
    id_lookup : instance of class DistLookupService
592
       Distributed lookup service used to map global-nids to respective
593
       partition-ids and shuffle-global-nids
594
    feat_type : string
595
596
597
        this is used to distinguish which features are being exchanged. Please
        note that for nodes ownership is clearly defined and for edges it is
        always assumed that destination end point of the edge defines the
598
599
600
        ownership of that particular edge
    data: dicitonary
        dictionry in which node or edge features are stored and this information
601
        is read from the appropriate node features file which belongs to the
602
        current process
603
604
605

    Returns:
    --------
606
    dictionary :
607
        a dictionary is returned where keys are type names and
608
609
        feature data are the values
    list :
610
        a dictionary of global_ids either nodes or edges whose features are
611
        received during the data shuffle process
612
613
    """
    start = timer()
614
    own_features = {}
615
616
617
618
619
620
621
    own_global_ids = {}

    # To iterate over the node_types and associated node_features
    for feat_key, type_info in feature_tids.items():
        # To iterate over the feature data, of a given (node or edge )type
        # type_info is a list of 3 elements (as shown below):
        #   [feature-name, starting-idx, ending-idx]
622
        #       feature-name is the name given to the feature-data,
623
        #       read from the input metadata file
624
625
        #       [starting-idx, ending-idx) specifies the range of indexes
        #        associated with the features data
626
627
        # Determine the owner process for these features.
        # Note that the keys in the node features (and similarly edge features)
628
        # dictionary is of the following format:
629
        #   `node_type/feature_name/local_part_id`:
630
631
632
        #    where node_type and feature_name are self-explanatory and
        #    local_part_id denotes the partition-id, in the local process,
        #    which will be used a suffix to store all the information of a
633
        #    given partition which is processed by the current process. Its
634
        #    values start from 0 onwards, for instance 0, 1, 2 ... etc.
635
        #    local_part_id can be easily mapped to global partition id very
636
        #    easily, using cyclic ordering. All local_part_ids = 0 from all
637
638
639
640
641
642
643
644
        #    processes will form global partition-ids between 0 and world_size-1.
        #    Similarly all local_part_ids = 1 from all processes will form
        #    global partition ids in the range [world_size, 2*world_size-1] and
        #    so on.
        tokens = feat_key.split("/")
        assert len(tokens) == 3
        type_name = tokens[0]
        feat_name = tokens[1]
645
        logging.debug(f"[Rank: {rank}] processing feature: {feat_key}")
646

647
648
649
650
651
652
653
654
655
656
657
        for feat_info in type_info:
            # Compute the global_id range for this feature data
            type_id_start = int(feat_info[0])
            type_id_end = int(feat_info[1])
            begin_global_id = type_id_map[type_name][0]
            gid_start = begin_global_id + type_id_start
            gid_end = begin_global_id + type_id_end

            # Check if features exist for this type_name + feat_name.
            # This check should always pass, because feature_tids are built
            # by reading the input metadata json file for existing features.
658
            assert feat_key in feature_data
659

660
            for local_part_id in range(num_parts // world_size):
661
                featdata_key = feature_data[feat_key]
662
663
664

                # Synchronize for each feature
                dist.barrier()
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
                own_features, own_global_ids = exchange_feature(
                    rank,
                    data,
                    id_lookup,
                    feat_type,
                    feat_key,
                    featdata_key,
                    gid_start,
                    gid_end,
                    type_id_start,
                    type_id_end,
                    local_part_id,
                    world_size,
                    num_parts,
                    own_features,
                    own_global_ids,
                )
682
683

    end = timer()
684
685
686
    logging.info(
        f"[Rank: {rank}] Total time for feature exchange: {timedelta(seconds = end - start)}"
    )
687
    for k, v in own_features.items():
688
        logging.debug(f"Rank: {rank}] Key - {k} Value - {v.shape}")
689
    return own_features, own_global_ids
690

691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707

def exchange_graph_data(
    rank,
    world_size,
    num_parts,
    node_features,
    edge_features,
    node_feat_tids,
    edge_feat_tids,
    edge_data,
    id_lookup,
    ntypes_ntypeid_map,
    ntypes_gnid_range_map,
    etypes_geid_range_map,
    ntid_ntype_map,
    schema_map,
):
708
    """
709
    Wrapper function which is used to shuffle graph data on all the processes.
710

711
    Parameters:
712
713
714
715
    -----------
    rank : int
        rank of the current process
    world_size : int
716
        total no. of participating processes.
717
718
    num_parts : int
        total no. of graph partitions.
719
    node_feautres : dicitonary
720
721
        dictionry where node_features are stored and this information is read from the appropriate
        node features file which belongs to the current process
722
723
724
    edge_features : dictionary
        dictionary where edge_features are stored. This information is read from the appropriate
        edge feature files whose ownership is assigned to the current process
725
726
727
728
729
    node_feat_tids: dictionary
        in which keys are node-type names and values are triplets. Each triplet has node-feature name
        and the starting and ending type ids of the node-feature data read from the corresponding
        node feature data file read by current process. Each node type may have several features and
        hence each key may have several triplets.
730
731
    edge_feat_tids : dictionary
        a dictionary in which keys are edge-type names and values are triplets of the format
732
        <feat-name, start-per-type-idx, end-per-type-idx>. This triplet is used to identify
733
        the chunk of feature data for which current process is responsible for
734
    edge_data : dictionary
735
        dictionary which is used to store edge information as read from appropriate files assigned
736
        to each process.
737
    id_lookup : instance of class DistLookupService
738
       Distributed lookup service used to map global-nids to respective partition-ids and
739
       shuffle-global-nids
740
    ntypes_ntypeid_map : dictionary
741
        mappings between node type names and node type ids
742
    ntypes_gnid_range_map : dictionary
743
        mapping between node type names and global_nids which belong to the keys in this dictionary
744
745
746
    etypes_geid_range_map : dictionary
        mapping between edge type names and global_eids which are assigned to the edges of this
        edge_type
747
    ntid_ntype_map : dictionary
748
        mapping between node type id and no of nodes which belong to each node_type_id
749
750
    schema_map : dictionary
        is the data structure read from the metadata json file for the input graph
751
752
753

    Returns:
    --------
754
    dictionary :
755
756
        the input argument, node_data dictionary, is updated with the node data received from other processes
        in the world. The node data is received by each rank in the process of data shuffling.
757
758
    dictionary :
        node features dictionary which has node features for the nodes which are owned by the current
759
        process
760
761
    dictionary :
        list of global_nids for the nodes whose node features are received when node features shuffling was
762
        performed in the `exchange_features` function call
763
    dictionary :
764
765
        the input argument, edge_data dictionary, is updated with the edge data received from other processes
        in the world. The edge data is received by each rank in the process of data shuffling.
766
    dictionary :
767
768
769
770
771
        edge features dictionary which has edge features. These destination end points of these edges
        are owned by the current process
    dictionary :
        list of global_eids for the edges whose edge features are received when edge features shuffling
        was performed in the `exchange_features` function call
772
    """
773
    memory_snapshot("ShuffleNodeFeaturesBegin: ", rank)
774
    logging.debug(f"[Rank: {rank} - node_feat_tids - {node_feat_tids}")
775
776
777
778
779
780
781
782
783
784
785
    rcvd_node_features, rcvd_global_nids = exchange_features(
        rank,
        world_size,
        num_parts,
        node_feat_tids,
        ntypes_gnid_range_map,
        id_lookup,
        node_features,
        constants.STR_NODE_FEATURES,
        None,
    )
786
    dist.barrier()
787
    memory_snapshot("ShuffleNodeFeaturesComplete: ", rank)
788
    logging.debug(f"[Rank: {rank}] Done with node features exchange.")
789
790
791
792
793
794
795
796
797
798
799
800

    rcvd_edge_features, rcvd_global_eids = exchange_features(
        rank,
        world_size,
        num_parts,
        edge_feat_tids,
        etypes_geid_range_map,
        id_lookup,
        edge_features,
        constants.STR_EDGE_FEATURES,
        edge_data,
    )
801
    dist.barrier()
802
    logging.debug(f"[Rank: {rank}] Done with edge features exchange.")
803

804
805
806
    node_data = gen_node_data(
        rank, world_size, num_parts, id_lookup, ntid_ntype_map, schema_map
    )
807
    dist.barrier()
808
    memory_snapshot("NodeDataGenerationComplete: ", rank)
809

810
811
812
    edge_data = exchange_edge_data(
        rank, world_size, num_parts, edge_data, id_lookup
    )
813
    dist.barrier()
814
    memory_snapshot("ShuffleEdgeDataComplete: ", rank)
815
816
817
818
819
820
821
822
823
    return (
        node_data,
        rcvd_node_features,
        rcvd_global_nids,
        edge_data,
        rcvd_edge_features,
        rcvd_global_eids,
    )

824

825
def read_dataset(rank, world_size, id_lookup, params, schema_map, ntype_counts):
826
827
    """
    This function gets the dataset and performs post-processing on the data which is read from files.
828
    Additional information(columns) are added to nodes metadata like owner_process, global_nid which
829
830
    are later used in processing this information. For edge data, which is now a dictionary, we add new columns
    like global_edge_id and owner_process. Augmenting these data structure helps in processing these data structures
831
    when data shuffling is performed.
832
833
834
835
836

    Parameters:
    -----------
    rank : int
        rank of the current process
837
    world_size : int
838
        total no. of processes instantiated
839
    id_lookup : instance of class DistLookupService
840
       Distributed lookup service used to map global-nids to respective partition-ids and
841
       shuffle-global-nids
842
    params : argparser object
843
        argument parser object to access command line arguments
844
845
    schema_map : dictionary
        dictionary created by reading the input graph metadata json file
846

847
    Returns :
848
849
    ---------
    dictionary
850
851
        in which keys are node-type names and values are are tuples representing the range of ids
        for nodes to be read by the current process
852
853
    dictionary
        node features which is a dictionary where keys are feature names and values are feature
854
        data as multi-dimensional tensors
855
856
857
858
859
    dictionary
        in which keys are node-type names and values are triplets. Each triplet has node-feature name
        and the starting and ending type ids of the node-feature data read from the corresponding
        node feature data file read by current process. Each node type may have several features and
        hence each key may have several triplets.
860
    dictionary
861
862
        edge data information is read from edges.txt and additional columns are added such as
        owner process for each edge.
863
864
    dictionary
        edge features which is also a dictionary, similar to node features dictionary
865
866
867
868
    dictionary
        a dictionary in which keys are edge-type names and values are tuples indicating the range of ids
        for edges read by the current process.
    dictionary
869
        a dictionary in which keys are edge-type names and values are triplets,
870
871
        (edge-feature-name, start_type_id, end_type_id). These type_ids are indices in the edge-features
        read by the current process. Note that each edge-type may have several edge-features.
872
873
    """
    edge_features = {}
874
875
876
877
    (
        node_features,
        node_feat_tids,
        edge_data,
878
        edge_typecounts,
879
880
881
882
883
884
885
886
887
888
        edge_tids,
        edge_features,
        edge_feat_tids,
    ) = get_dataset(
        params.input_dir,
        params.graph_name,
        rank,
        world_size,
        params.num_parts,
        schema_map,
889
        ntype_counts,
890
    )
891
892
    # Synchronize so that everybody completes reading dataset from disk
    dist.barrier()
893
    logging.info(f"[Rank: {rank}] Done reading dataset {params.input_dir}")
894

895
896
897
    edge_data = augment_edge_data(
        edge_data, id_lookup, edge_tids, rank, world_size, params.num_parts
    )
898
    dist.barrier()  # SYNCH
899
    logging.debug(
900
901
902
903
904
905
906
        f"[Rank: {rank}] Done augmenting edge_data: {len(edge_data)}, {edge_data[constants.GLOBAL_SRC_ID].shape}"
    )

    return (
        node_features,
        node_feat_tids,
        edge_data,
907
        edge_typecounts,
908
909
910
        edge_features,
        edge_feat_tids,
    )
911
912


913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
def reorder_data(num_parts, world_size, data, key):
    """
    Auxiliary function used to sort node and edge data for the input graph.

    Parameters:
    -----------
    num_parts : int
        total no. of partitions
    world_size : int
        total number of nodes used in this execution
    data : dictionary
        which is used to store the node and edge data for the input graph
    key : string
        specifies the column which is used to determine the sort order for
        the remaining columns

    Returns:
    --------
    dictionary
        same as the input dictionary, but with reordered columns (values in
        the dictionary), as per the np.argsort results on the column specified
        by the ``key`` column
    """
    for local_part_id in range(num_parts // world_size):
        sorted_idx = data[key + "/" + str(local_part_id)].argsort()
        for k, v in data.items():
            tokens = k.split("/")
            assert len(tokens) == 2
            if tokens[1] == str(local_part_id):
                data[k] = v[sorted_idx]
        sorted_idx = None
    gc.collect()
    return data


948
949
def gen_dist_partitions(rank, world_size, params):
    """
950
951
    Function which will be executed by all Gloo processes to begin execution of the pipeline.
    This function expects the input dataset is split across multiple file format.
952

953
    Input dataset and its file structure is described in metadata json file which is also part of the
954
955
    input dataset. On a high-level, this metadata json file contains information about the following items
    a) Nodes metadata, It is assumed that nodes which belong to each node-type are split into p files
956
957
       (wherer `p` is no. of partitions).
    b) Similarly edge metadata contains information about edges which are split into p-files.
958
959
960
961
962
    c) Node and Edge features, it is also assumed that each node (and edge) feature, if present, is also
       split into `p` files.

    For example, a sample metadata json file might be as follows: :
    (In this toy example, we assume that we have "m" node-types, "k" edge types, and for node_type = ntype0-name
963
     we have two features namely feat0-name and feat1-name. Please note that the node-features are also split into
964
965
966
967
     `p` files. This will help in load-balancing during data-shuffling phase).

    Terminology used to identify any particular "id" assigned to nodes, edges or node features. Prefix "global" is
    used to indicate that this information is either read from the input dataset or autogenerated based on the information
968
969
970
971
    read from input dataset files. Prefix "type" is used to indicate a unique id assigned to either nodes or edges.
    For instance, type_node_id means that a unique id, with a given node type,  assigned to a node. And prefix "shuffle"
    will be used to indicate a unique id, across entire graph, assigned to either a node or an edge. For instance,
    SHUFFLE_GLOBAL_NID means a unique id which is assigned to a node after the data shuffle is completed.
972

973
974
    Some high-level notes on the structure of the metadata json file.
    1. path(s) mentioned in the entries for nodes, edges and node-features files can be either absolute or relative.
975
       if these paths are relative, then it is assumed that they are relative to the folder from which the execution is
976
977
978
979
       launched.
    2. The id_startx and id_endx represent the type_node_id and type_edge_id respectively for nodes and edge data. This
       means that these ids should match the no. of nodes/edges read from any given file. Since these are type_ids for
       the nodes and edges in any given file, their global_ids can be easily computed as well.
980
981

    {
982
983
984
985
        "graph_name" : xyz,
        "node_type" : ["ntype0-name", "ntype1-name", ....], #m node types
        "num_nodes_per_chunk" : [
            [a0, a1, ...a<p-1>], #p partitions
986
            [b0, b1, ... b<p-1>],
987
988
989
990
991
992
            ....
            [c0, c1, ..., c<p-1>] #no, of node types
        ],
        "edge_type" : ["src_ntype:edge_type:dst_ntype", ....], #k edge types
        "num_edges_per_chunk" : [
            [a0, a1, ...a<p-1>], #p partitions
993
            [b0, b1, ... b<p-1>],
994
995
996
            ....
            [c0, c1, ..., c<p-1>] #no, of edge types
        ],
997
998
        "node_data" : {
            "ntype0-name" : {
999
1000
1001
1002
1003
1004
                "feat0-name" : {
                    "format" : {"name": "numpy"},
                    "data" :   [ #list of lists
                        ["<path>/feat-0.npy", 0, id_end0],
                        ["<path>/feat-1.npy", id_start1, id_end1],
                        ....
1005
                        ["<path>/feat-<p-1>.npy", id_start<p-1>, id_end<p-1>]
1006
1007
1008
                    ]
                },
                "feat1-name" : {
1009
                    "format" : {"name": "numpy"},
1010
1011
1012
1013
                    "data" : [ #list of lists
                        ["<path>/feat-0.npy", 0, id_end0],
                        ["<path>/feat-1.npy", id_start1, id_end1],
                        ....
1014
                        ["<path>/feat-<p-1>.npy", id_start<p-1>, id_end<p-1>]
1015
1016
                    ]
                }
1017
1018
            }
        },
1019
        "edges": { #k edge types
1020
            "src_ntype:etype0-name:dst_ntype" : {
1021
                "format": {"name" : "csv", "delimiter" : " "},
1022
1023
1024
1025
1026
1027
                "data" : [
                    ["<path>/etype0-name-0.txt", 0, id_end0], #These are type_edge_ids for edges of this type
                    ["<path>/etype0-name-1.txt", id_start1, id_end1],
                    ...,
                    ["<path>/etype0-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
                ]
1028
1029
            },
            ...,
1030
            "src_ntype:etype<k-1>-name:dst_ntype" : {
1031
                "format": {"name" : "csv", "delimiter" : " "},
1032
1033
1034
1035
1036
1037
                "data" : [
                    ["<path>/etype<k-1>-name-0.txt", 0, id_end0],
                    ["<path>/etype<k-1>-name-1.txt", id_start1, id_end1],
                    ...,
                    ["<path>/etype<k-1>-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
                ]
1038
1039
            },
        },
1040
    }
1041

1042
    The function performs the following steps:
1043
    1. Reads the metis partitions to identify the owner process of all the nodes in the entire graph.
1044
    2. Reads the input data set, each partitipating process will map to a single file for the edges,
1045
1046
1047
        node-features and edge-features for each node-type and edge-types respectively. Using nodes metadata
        information, nodes which are owned by a given process are generated to optimize communication to some
        extent.
1048
    3. Now each process shuffles the data by identifying the respective owner processes using metis
1049
1050
1051
1052
        partitions.
        a. To identify owner processes for nodes, metis partitions will be used.
        b. For edges, the owner process of the destination node will be the owner of the edge as well.
        c. For node and edge features, identifying the owner process is a little bit involved.
1053
1054
            For this purpose, graph metadata json file is used to first map the locally read node features
            to their global_nids. Now owner process is identified using metis partitions for these global_nids
1055
1056
1057
1058
1059
1060
1061
            to retrieve shuffle_global_nids. A similar process is used for edge_features as well.
        d. After all the data shuffling is done, the order of node-features may be different when compared to
            their global_type_nids. Node- and edge-data are ordered by node-type and edge-type respectively.
            And now node features and edge features are re-ordered to match the order of their node- and edge-types.
    4. Last step is to create the DGL objects with the data present on each of the processes.
        a. DGL objects for nodes, edges, node- and edge- features.
        b. Metadata is gathered from each process to create the global metadata json file, by process rank = 0.
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072

    Parameters:
    ----------
    rank : int
        integer representing the rank of the current process in a typical distributed implementation
    world_size : int
        integer representing the total no. of participating processes in a typical distributed implementation
    params : argparser object
        this object, key value pairs, provides access to the command line arguments from the runtime environment
    """
    global_start = timer()
1073
1074
1075
    logging.info(
        f"[Rank: {rank}] Starting distributed data processing pipeline..."
    )
1076
    memory_snapshot("Pipeline Begin: ", rank)
1077

1078
    # init processing
1079
1080
    schema_map = read_json(os.path.join(params.input_dir, params.schema))

1081
1082
1083
    # The resources, which are node-id to partition-id mappings, are split
    # into `world_size` number of parts, where each part can be mapped to
    # each physical node.
1084
1085
1086
1087
1088
    id_lookup = DistLookupService(
        os.path.join(params.input_dir, params.partitions_dir),
        schema_map[constants.STR_NODE_TYPE],
        rank,
        world_size,
1089
        params.num_parts,
1090
    )
1091

1092
    # get the id to name mappings here.
1093
    ntypes_ntypeid_map, ntypes, ntypeid_ntypes_map = get_node_types(schema_map)
1094
    etypes_etypeid_map, etypes, etypeid_etypes_map = get_edge_types(schema_map)
1095
1096
1097
    logging.info(
        f"[Rank: {rank}] Initialized metis partitions and node_types map..."
    )
1098

1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
    # Initialize distributed lookup service for partition-id and shuffle-global-nids mappings
    # for global-nids
    _, global_nid_ranges = get_idranges(
        schema_map[constants.STR_NODE_TYPE],
        get_ntype_counts_map(
            schema_map[constants.STR_NODE_TYPE],
            schema_map[constants.STR_NUM_NODES_PER_TYPE],
        ),
    )
    id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)
    id_lookup.set_idMap(id_map)

1111
1112
1113
1114
1115
1116
    # read input graph files and augment these datastructures with
    # appropriate information (global_nid and owner process) for node and edge data
    (
        node_features,
        node_feat_tids,
        edge_data,
1117
        edge_typecounts,
1118
1119
        edge_features,
        edge_feat_tids,
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
    ) = read_dataset(
        rank,
        world_size,
        id_lookup,
        params,
        schema_map,
        get_ntype_counts_map(
            schema_map[constants.STR_NODE_TYPE],
            schema_map[constants.STR_NUM_NODES_PER_TYPE],
        ),
    )
1131
1132
1133
    logging.info(
        f"[Rank: {rank}] Done augmenting file input data with auxilary columns"
    )
1134
    memory_snapshot("DatasetReadComplete: ", rank)
1135

1136
1137
1138
    # send out node and edge data --- and appropriate features.
    # this function will also stitch the data recvd from other processes
    # and return the aggregated data
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
    # ntypes_gnid_range_map = get_gnid_range_map(node_tids)
    # etypes_geid_range_map = get_gnid_range_map(edge_tids)
    ntypes_gnid_range_map = get_gid_offsets(
        schema_map[constants.STR_NODE_TYPE],
        get_ntype_counts_map(
            schema_map[constants.STR_NODE_TYPE],
            schema_map[constants.STR_NUM_NODES_PER_TYPE],
        ),
    )
    etypes_geid_range_map = get_gid_offsets(
        schema_map[constants.STR_EDGE_TYPE], edge_typecounts
    )

1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
    (
        node_data,
        rcvd_node_features,
        rcvd_global_nids,
        edge_data,
        rcvd_edge_features,
        rcvd_global_eids,
    ) = exchange_graph_data(
        rank,
        world_size,
        params.num_parts,
        node_features,
        edge_features,
        node_feat_tids,
        edge_feat_tids,
        edge_data,
        id_lookup,
        ntypes_ntypeid_map,
        ntypes_gnid_range_map,
        etypes_geid_range_map,
        ntypeid_ntypes_map,
        schema_map,
    )
1175
    gc.collect()
1176
    logging.debug(f"[Rank: {rank}] Done with data shuffling...")
1177
    memory_snapshot("DataShuffleComplete: ", rank)
1178

1179
    # sort node_data by ntype
1180
1181
1182
    node_data = reorder_data(
        params.num_parts, world_size, node_data, constants.NTYPE_ID
    )
1183
    logging.debug(f"[Rank: {rank}] Sorted node_data by node_type")
1184
    memory_snapshot("NodeDataSortComplete: ", rank)
1185

1186
    # resolve global_ids for nodes
1187
1188
    # Synchronize before assigning shuffle-global-ids to nodes
    dist.barrier()
1189
1190
1191
    assign_shuffle_global_nids_nodes(
        rank, world_size, params.num_parts, node_data
    )
1192
    logging.debug(f"[Rank: {rank}] Done assigning global-ids to nodes...")
1193
    memory_snapshot("ShuffleGlobalID_Nodes_Complete: ", rank)
1194

1195
    # shuffle node feature according to the node order on each rank.
1196
1197
1198
    for ntype_name in ntypes:
        featnames = get_ntype_featnames(ntype_name, schema_map)
        for featname in featnames:
1199
1200
1201
1202
1203
1204
1205
            # if a feature name exists for a node-type, then it should also have
            # feature data as well. Hence using the assert statement.
            for local_part_id in range(params.num_parts // world_size):
                feature_key = (
                    ntype_name + "/" + featname + "/" + str(local_part_id)
                )
                assert feature_key in rcvd_global_nids
1206
                global_nids = rcvd_global_nids[feature_key]
1207

1208
1209
1210
1211
1212
1213
1214
1215
                _, idx1, _ = np.intersect1d(
                    node_data[constants.GLOBAL_NID + "/" + str(local_part_id)],
                    global_nids,
                    return_indices=True,
                )
                shuffle_global_ids = node_data[
                    constants.SHUFFLE_GLOBAL_NID + "/" + str(local_part_id)
                ][idx1]
1216
                feature_idx = shuffle_global_ids.argsort()
1217

1218
1219
1220
                rcvd_node_features[feature_key] = rcvd_node_features[
                    feature_key
                ][feature_idx]
1221
    memory_snapshot("ReorderNodeFeaturesComplete: ", rank)
1222

1223
1224
1225
1226
    # Sort edge_data by etype
    edge_data = reorder_data(
        params.num_parts, world_size, edge_data, constants.ETYPE_ID
    )
1227
    logging.debug(f"[Rank: {rank}] Sorted edge_data by edge_type")
1228
    memory_snapshot("EdgeDataSortComplete: ", rank)
1229

1230
1231
    # Synchronize before assigning shuffle-global-nids for edges end points.
    dist.barrier()
1232
1233
1234
    shuffle_global_eid_offsets = assign_shuffle_global_nids_edges(
        rank, world_size, params.num_parts, edge_data
    )
1235
    logging.debug(f"[Rank: {rank}] Done assigning global_ids to edges ...")
1236

1237
    memory_snapshot("ShuffleGlobalID_Edges_Complete: ", rank)
1238

1239
    # Shuffle edge features according to the edge order on each rank.
1240
1241
1242
    for etype_name in etypes:
        featnames = get_etype_featnames(etype_name, schema_map)
        for featname in featnames:
1243
1244
1245
1246
            for local_part_id in range(params.num_parts // world_size):
                feature_key = (
                    etype_name + "/" + featname + "/" + str(local_part_id)
                )
1247
1248
                assert feature_key in rcvd_global_eids
                global_eids = rcvd_global_eids[feature_key]
1249

1250
1251
1252
1253
1254
1255
1256
1257
                _, idx1, _ = np.intersect1d(
                    edge_data[constants.GLOBAL_EID + "/" + str(local_part_id)],
                    global_eids,
                    return_indices=True,
                )
                shuffle_global_ids = edge_data[
                    constants.SHUFFLE_GLOBAL_EID + "/" + str(local_part_id)
                ][idx1]
1258
                feature_idx = shuffle_global_ids.argsort()
1259

1260
1261
1262
                rcvd_edge_features[feature_key] = rcvd_edge_features[
                    feature_key
                ][feature_idx]
1263

1264
    # determine global-ids for edge end-points
1265
1266
    # Synchronize before retrieving shuffle-global-nids for edges end points.
    dist.barrier()
1267
1268
1269
    edge_data = lookup_shuffle_global_nids_edges(
        rank, world_size, params.num_parts, edge_data, id_lookup, node_data
    )
1270
    logging.debug(
1271
1272
        f"[Rank: {rank}] Done resolving orig_node_id for local node_ids..."
    )
1273
    memory_snapshot("ShuffleGlobalID_Lookup_Complete: ", rank)
1274

1275
1276
1277
1278
    def prepare_local_data(src_data, local_part_id):
        local_data = {}
        for k, v in src_data.items():
            tokens = k.split("/")
1279
            if tokens[len(tokens) - 1] == str(local_part_id):
1280
1281
1282
                local_data["/".join(tokens[:-1])] = v
        return local_data

1283
    # create dgl objects here
1284
    output_meta_json = {}
1285
    start = timer()
1286

1287
1288
    graph_formats = None
    if params.graph_formats:
1289
1290
1291
        graph_formats = params.graph_formats.split(",")

    for local_part_id in range(params.num_parts // world_size):
1292
1293
1294
        # Synchronize for each local partition of the graph object.
        dist.barrier()

1295
        num_edges = shuffle_global_eid_offsets[local_part_id]
1296
1297
1298
1299
1300
1301
        node_count = len(
            node_data[constants.NTYPE_ID + "/" + str(local_part_id)]
        )
        edge_count = len(
            edge_data[constants.ETYPE_ID + "/" + str(local_part_id)]
        )
1302
1303
        local_node_data = prepare_local_data(node_data, local_part_id)
        local_edge_data = prepare_local_data(edge_data, local_part_id)
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
        (
            graph_obj,
            ntypes_map_val,
            etypes_map_val,
            ntypes_map,
            etypes_map,
            orig_nids,
            orig_eids,
        ) = create_dgl_object(
            schema_map,
            rank + local_part_id * world_size,
            local_node_data,
            local_edge_data,
            num_edges,
1318
1319
1320
1321
1322
            get_ntype_counts_map(
                schema_map[constants.STR_NODE_TYPE],
                schema_map[constants.STR_NUM_NODES_PER_TYPE],
            ),
            edge_typecounts,
1323
1324
1325
            params.save_orig_nids,
            params.save_orig_eids,
        )
1326
        sort_etypes = len(etypes_map) > 1
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
        local_node_features = prepare_local_data(
            rcvd_node_features, local_part_id
        )
        local_edge_features = prepare_local_data(
            rcvd_edge_features, local_part_id
        )
        write_dgl_objects(
            graph_obj,
            local_node_features,
            local_edge_features,
            params.output,
            rank + (local_part_id * world_size),
            orig_nids,
            orig_eids,
            graph_formats,
            sort_etypes,
        )
1344
1345
        memory_snapshot("DiskWriteDGLObjectsComplete: ", rank)

1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
        # get the meta-data
        json_metadata = create_metadata_json(
            params.graph_name,
            node_count,
            edge_count,
            local_part_id * world_size + rank,
            params.num_parts,
            ntypes_map_val,
            etypes_map_val,
            ntypes_map,
            etypes_map,
            params.output,
        )
        output_meta_json[
            "local-part-id-" + str(local_part_id * world_size + rank)
        ] = json_metadata
1362
        memory_snapshot("MetadataCreateComplete: ", rank)
1363

1364
1365
    if rank == 0:
        # get meta-data from all partitions and merge them on rank-0
1366
1367
        metadata_list = gather_metadata_json(output_meta_json, rank, world_size)
        metadata_list[0] = output_meta_json
1368
1369
1370
1371
1372
1373
1374
        write_metadata_json(
            metadata_list,
            params.output,
            params.graph_name,
            world_size,
            params.num_parts,
        )
1375
    else:
1376
        # send meta-data to Rank-0 process
1377
        gather_metadata_json(output_meta_json, rank, world_size)
1378
    end = timer()
1379
1380
1381
    logging.info(
        f"[Rank: {rank}] Time to create dgl objects: {timedelta(seconds = end - start)}"
    )
1382
    memory_snapshot("MetadataWriteComplete: ", rank)
1383
1384

    global_end = timer()
1385
1386
1387
    logging.info(
        f"[Rank: {rank}] Total execution time of the program: {timedelta(seconds = global_end - global_start)}"
    )
1388
    memory_snapshot("PipelineComplete: ", rank)
1389

1390

1391
def single_machine_run(params):
1392
    """Main function for distributed implementation on a single machine
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402

    Parameters:
    -----------
    params : argparser object
        Argument Parser structure with pre-determined arguments as defined
        at the bottom of this file.
    """
    processes = []
    mp.set_start_method("spawn")

1403
1404
    # Invoke `target` function from each of the spawned process for distributed
    # implementation
1405
    for rank in range(params.world_size):
1406
1407
1408
1409
        p = mp.Process(
            target=run,
            args=(rank, params.world_size, gen_dist_partitions, params),
        )
1410
1411
1412
1413
1414
1415
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

1416

1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
def run(rank, world_size, func_exec, params, backend="gloo"):
    """
    Init. function which is run by each process in the Gloo ProcessGroup

    Parameters:
    -----------
    rank : integer
        rank of the process
    world_size : integer
        number of processes configured in the Process Group
    proc_exec : function name
        function which will be invoked which has the logic for each process in the group
    params : argparser object
        argument parser object to access the command line arguments
    backend : string
        string specifying the type of backend to use for communication
    """
1434
1435
    os.environ["MASTER_ADDR"] = "127.0.0.1"
    os.environ["MASTER_PORT"] = "29500"
1436

1437
1438
1439
1440
1441
1442
1443
    # create Gloo Process Group
    dist.init_process_group(
        backend,
        rank=rank,
        world_size=world_size,
        timeout=timedelta(seconds=5 * 60),
    )
1444

1445
    # Invoke the main function to kick-off each process
1446
1447
    func_exec(rank, world_size, params)

1448

1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
def multi_machine_run(params):
    """
    Function to be invoked when executing data loading pipeline on multiple machines

    Parameters:
    -----------
    params : argparser object
        argparser object providing access to command line arguments.
    """
    rank = int(os.environ["RANK"])

1460
    # init the gloo process group here.
1461
    dist.init_process_group(
1462
1463
1464
1465
1466
1467
        backend="gloo",
        rank=rank,
        world_size=params.world_size,
        timeout=timedelta(seconds=params.process_group_timeout),
    )
    logging.info(f"[Rank: {rank}] Done with process group initialization...")
1468

1469
    # invoke the main function here.
1470
    gen_dist_partitions(rank, params.world_size, params)
1471
1472
1473
    logging.info(
        f"[Rank: {rank}] Done with Distributed data processing pipeline processing."
    )