data_shuffle.py 38.9 KB
Newer Older
1
2
3
import gc
import logging
import math
4
5
import os
import sys
6
7
8
9
from datetime import timedelta
from timeit import default_timer as timer

import dgl
10
11
12
13
14
import numpy as np
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

15
import constants
16
from convert_partition import create_dgl_object, create_metadata_json
17
from dataset_utils import get_dataset
18
from dist_lookup import DistLookupService
19
20
21
22
from globalids import (assign_shuffle_global_nids_edges,
                       assign_shuffle_global_nids_nodes,
                       lookup_shuffle_global_nids_edges)
from gloo_wrapper import allgather_sizes, alltoallv_cpu, gather_metadata_json
23
24
25
26
from utils import (augment_edge_data, get_edge_types, get_etype_featnames,
                   get_gnid_range_map, get_idranges, get_node_types,
                   get_ntype_featnames, memory_snapshot, read_json,
                   read_ntype_partition_files, write_dgl_objects,
27
28
                   write_metadata_json)

29

30
def gen_node_data(rank, world_size, id_lookup, ntid_ntype_map, schema_map):
31
32
33
    '''
    For this data processing pipeline, reading node files is not needed. All the needed information about
    the nodes can be found in the metadata json file. This function generates the nodes owned by a given
34
    process, using metis partitions.
35

36
    Parameters:
37
38
    -----------
    rank : int
39
        rank of the process
40
    world_size : int
41
        total no. of processes
42
43
44
    id_lookup : instance of class DistLookupService
       Distributed lookup service used to map global-nids to respective partition-ids and 
       shuffle-global-nids
45
    ntid_ntype_map : 
46
        a dictionary where keys are node_type ids(integers) and values are node_type names(strings).
47
    schema_map:
48
        dictionary formed by reading the input metadata json file for the input dataset.
49
50
51

        Please note that, it is assumed that for the input graph files, the nodes of a particular node-type are
        split into `p` files (because of `p` partitions to be generated). On a similar node, edges of a particular
52
        edge-type are split into `p` files as well.
53

54
55
        #assuming m nodetypes present in the input graph
        "num_nodes_per_chunk" : [
56
57
            [a0, a1, a2, ... a<p-1>],
            [b0, b1, b2, ... b<p-1>],
58
59
60
            ...
            [m0, m1, m2, ... m<p-1>]
        ]
61
        Here, each sub-list, corresponding a nodetype in the input graph, has `p` elements. For instance [a0, a1, ... a<p-1>]
62
63
64
        where each element represents the number of nodes which are to be processed by a process during distributed partitioning.

        In addition to the above key-value pair for the nodes in the graph, the node-features are captured in the
65
        "node_data" key-value pair. In this dictionary the keys will be nodetype names and value will be a dictionary which
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
        is used to capture all the features present for that particular node-type. This is shown in the following example:

        "node_data" : {
            "paper": {       # node type
                "feat": {   # feature key
                    "format": {"name": "numpy"},
                    "data": ["node_data/paper-feat-part1.npy", "node_data/paper-feat-part2.npy"]
                },
                "label": {   # feature key
                    "format": {"name": "numpy"},
                    "data": ["node_data/paper-label-part1.npy", "node_data/paper-label-part2.npy"]
                },
                "year": {   # feature key
                    "format": {"name": "numpy"},
                    "data": ["node_data/paper-year-part1.npy", "node_data/paper-year-part2.npy"]
                }
            }
        }
84
85
86
        In the above textual description we have a node-type, which is paper, and it has 3 features namely feat, label and year.
        Each feature has `p` files whose location in the filesystem is the list for the key "data" and "foramt" is used to
        describe storage format.
87
88
89

    Returns:
    --------
90
91
    dictionary :
        dictionary where keys are column names and values are numpy arrays, these arrays are generated by
92
93
94
        using information present in the metadata json file

    '''
95
96
    local_node_data = { constants.GLOBAL_NID : [],
                        constants.NTYPE_ID : [],
97
98
99
                        constants.GLOBAL_TYPE_NID : []
                        }

100
    type_nid_dict, global_nid_dict = get_idranges(schema_map[constants.STR_NODE_TYPE],
101
102
                                        schema_map[constants.STR_NUM_NODES_PER_CHUNK],
                                        num_chunks=world_size)
103

104
    for ntype_id, ntype_name in ntid_ntype_map.items():
105
106
        type_start, type_end = type_nid_dict[ntype_name][0][0], type_nid_dict[ntype_name][-1][1]
        gnid_start, gnid_end = global_nid_dict[ntype_name][0, 0], global_nid_dict[ntype_name][0, 1]
107

108
        node_partid_slice = id_lookup.get_partition_ids(np.arange(gnid_start, gnid_end, dtype=np.int64)) #exclusive
109
110
111
112
113
114
115
        cond = node_partid_slice == rank
        own_gnids = np.arange(gnid_start, gnid_end, dtype=np.int64)
        own_gnids = own_gnids[cond]

        own_tnids = np.arange(type_start, type_end, dtype=np.int64)
        own_tnids = own_tnids[cond]

116
        local_node_data[constants.NTYPE_ID].append(np.ones(own_gnids.shape, dtype=np.int64)*ntype_id)
117
118
119
120
121
122
123
        local_node_data[constants.GLOBAL_NID].append(own_gnids)
        local_node_data[constants.GLOBAL_TYPE_NID].append(own_tnids)

    for k in local_node_data.keys():
        local_node_data[k] = np.concatenate(local_node_data[k])

    return local_node_data
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142

def exchange_edge_data(rank, world_size, edge_data):
    """
    Exchange edge_data among processes in the world.
    Prepare list of sliced data targeting each process and trigger
    alltoallv_cpu to trigger messaging api

    Parameters:
    -----------
    rank : int
        rank of the process
    world_size : int
        total no. of processes
    edge_data : dictionary
        edge information, as a dicitonary which stores column names as keys and values
        as column data. This information is read from the edges.txt file.

    Returns:
    --------
143
    dictionary :
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
        the input argument, edge_data, is updated with the edge data received by other processes
        in the world.
    """

    input_list = []
    start = timer()
    for i in np.arange(world_size):
        send_idx = (edge_data[constants.OWNER_PROCESS] == i)
        send_idx = send_idx.reshape(edge_data[constants.GLOBAL_SRC_ID].shape[0])
        filt_data = np.column_stack((edge_data[constants.GLOBAL_SRC_ID][send_idx == 1], \
                                    edge_data[constants.GLOBAL_DST_ID][send_idx == 1], \
                                    edge_data[constants.GLOBAL_TYPE_EID][send_idx == 1], \
                                    edge_data[constants.ETYPE_ID][send_idx == 1], \
                                    edge_data[constants.GLOBAL_EID][send_idx == 1]))
        if(filt_data.shape[0] <= 0):
159
            input_list.append(torch.empty((0,5), dtype=torch.int64))
160
161
162
        else:
            input_list.append(torch.from_numpy(filt_data))
    end = timer()
163

164
    dist.barrier ()
165
    output_list = alltoallv_cpu(rank, world_size, input_list)
166
    end = timer()
167
    logging.info(f'[Rank: {rank}] Time to send/rcv edge data: {timedelta(seconds=end-start)}')
168
169
170
171
172
173
174
175
176
177
178

    #Replace the values of the edge_data, with the received data from all the other processes.
    rcvd_edge_data = torch.cat(output_list).numpy()
    edge_data[constants.GLOBAL_SRC_ID] = rcvd_edge_data[:,0]
    edge_data[constants.GLOBAL_DST_ID] = rcvd_edge_data[:,1]
    edge_data[constants.GLOBAL_TYPE_EID] = rcvd_edge_data[:,2]
    edge_data[constants.ETYPE_ID] = rcvd_edge_data[:,3]
    edge_data[constants.GLOBAL_EID] = rcvd_edge_data[:,4]
    edge_data.pop(constants.OWNER_PROCESS)
    return edge_data

179
def exchange_features(rank, world_size, feature_tids, ntype_gnid_map, id_lookup, feature_data, feat_type, data):
180
181
    """
    This function is used to shuffle node features so that each process will receive
182
    all the node features whose corresponding nodes are owned by the same process.
183
184
    The mapping procedure to identify the owner process is not straight forward. The
    following steps are used to identify the owner processes for the locally read node-
185
    features.
186
187
    a. Compute the global_nids for the locally read node features. Here metadata json file
        is used to identify the corresponding global_nids. Please note that initial graph input
188
189
190
191
192
        nodes.txt files are sorted based on node_types.
    b. Using global_nids and metis partitions owner processes can be easily identified.
    c. Now each process sends the global_nids for which shuffle_global_nids are needed to be
        retrieved.
    d. After receiving the corresponding shuffle_global_nids these ids are added to the
193
        node_data and edge_data dictionaries
194

195
196
    This pipeline assumes all the input data in numpy format, except node/edge features which
    are maintained as tensors throughout the various stages of the pipeline execution.
197

198
    Parameters:
199
200
201
202
    -----------
    rank : int
        rank of the current process
    world_size : int
203
        total no. of participating processes.
204
205
206
    node_feature_tids : dictionary
        dictionary with keys as node-type names and value is a dictionary. This dictionary
        contains information about node-features associated with a given node-type and value
207
        is a list.  This list contains a of indexes, like [starting-idx, ending-idx) which
208
209
        can be used to index into the node feature tensors read from corresponding input files.
    ntypes_gnid_map : dictionary
210
        mapping between node type names and global_nids which belong to the keys in this dictionary
211
212
213
    id_lookup : instance of class DistLookupService
       Distributed lookup service used to map global-nids to respective partition-ids and 
       shuffle-global-nids
214
215
    feature_data: dicitonary
        dictionry in which node or edge features are stored and this information is read from the appropriate
216
        node features file which belongs to the current process
217
218
219
220
221
    feat_type : string
        this is used to distinguish which features are being exchanged. Please note that
        for nodes ownership is clearly defined and for edges it is always assumed that
        destination end point of the edge defines the ownership of that particular 
        edge
222
223
224

    Returns:
    --------
225
226
    dictionary :
        node features are returned as a dictionary where keys are node type names and node feature names
227
        and values are tensors
228
229
    dictionary :
        a dictionary of global_nids for the nodes whose node features are received during the data shuffle
230
        process
231
232
    """
    start = timer()
233
    own_features = {}
234
235
    own_global_nids = {}
    #To iterate over the node_types and associated node_features
236
    for type_name, type_info in feature_tids.items():
237

238
        #To iterate over the node_features, of a given node_type
239
240
241
242
243
        #type_info is a list of 3 elements
        #[feature-name, starting-idx, ending-idx]
        #feature-name is the name given to the feature-data, read from the input metadata file
        #[starting-idx, ending-idx) specifies the range of indexes associated with the features read from
        #the associated input file. Note that the rows of features read from the input file should be same
244
        #as specified with this range. So no. of rows = ending-idx - starting-idx.
245
        for feat_info in type_info:
246

247
            #determine the owner process for these node features.
248
            feats_per_rank = []
249
250
            global_nid_per_rank = []
            feat_name = feat_info[0]
251
            feat_key = type_name+'/'+feat_name
252
            logging.info(f'[Rank: {rank}] processing node feature: {feat_key}')
253
254
255
256

            #compute the global_nid range for this node features
            type_nid_start = int(feat_info[1])
            type_nid_end = int(feat_info[2])
257
            begin_global_nid = ntype_gnid_map[type_name][0]
258
259
260
261
262
263
264
265
266
267
268
            gnid_start = begin_global_nid + type_nid_start
            gnid_end = begin_global_nid + type_nid_end

            #type_nids for this feature subset on the current rank
            gnids_feat = np.arange(gnid_start, gnid_end)
            tnids_feat = np.arange(type_nid_start, type_nid_end)
            local_idx = np.arange(0, type_nid_end - type_nid_start)

            #check if node features exist for this ntype_name + feat_name
            #this check should always pass, because node_feature_tids are built
            #by reading the input metadata json file for existing node features.
269
            assert(feat_key in feature_data)
270

271
            key_feats = feature_data[feat_key]
272
            for part_id in range(world_size):
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
                # Get the partition ids for the range of global nids.
                if feat_type == constants.STR_NODE_FEATURES:
                    partid_slice = id_lookup.get_partition_ids(np.arange(gnid_start, gnid_end, dtype=np.int64))
                else:
                    #Edge data case. 
                    #Ownership is determined by the destination node.
                    assert data is not None
                    global_eids = np.arange(gnid_start, gnid_end, dtype=np.int64)

                    #Now use `data` to extract destination nodes' global id 
                    #and use that to get the ownership
                    common, idx1, idx2 = np.intersect1d(data[constants.GLOBAL_EID], global_eids, return_indices=True)
                    assert common.shape[0] == idx2.shape[0]

                    global_dst_nids = data[constants.GLOBAL_DST_ID][idx1]
                    assert np.all(global_eids == data[constants.GLOBAL_EID][idx1])
                    partid_slice = id_lookup.get_partition_ids(global_dst_nids)
                    
291
292
293
294
295
296
                cond = (partid_slice == part_id)
                gnids_per_partid = gnids_feat[cond]
                tnids_per_partid = tnids_feat[cond]
                local_idx_partid = local_idx[cond]

                if (gnids_per_partid.shape[0] == 0):
297
                    feats_per_rank.append(torch.empty((0,1), dtype=torch.float))
298
                    global_nid_per_rank.append(np.empty((0,1), dtype=np.int64))
299
                else:
300
                    feats_per_rank.append(key_feats[local_idx_partid])
301
                    global_nid_per_rank.append(torch.from_numpy(gnids_per_partid).type(torch.int64))
302
303
304

            #features (and global nids) per rank to be sent out are ready
            #for transmission, perform alltoallv here.
305
            output_feat_list = alltoallv_cpu(rank, world_size, feats_per_rank)
306
            output_nid_list = alltoallv_cpu(rank, world_size, global_nid_per_rank)
307
308

            #stitch node_features together to form one large feature tensor
309
            own_features[feat_key] = torch.cat(output_feat_list)
310
            own_global_nids[feat_key] = torch.cat(output_nid_list).numpy()
311
312

    end = timer()
313
    logging.info(f'[Rank: {rank}] Total time for node feature exchange: {timedelta(seconds = end - start)}')
314
    return own_features, own_global_nids
315

316
317
318
319
320
def exchange_graph_data(rank, world_size, node_features, edge_features, 
        node_feat_tids, edge_feat_tids, 
        edge_data, id_lookup, ntypes_ntypeid_map, 
        ntypes_gnid_range_map, etypes_geid_range_map, 
        ntid_ntype_map, schema_map):
321
    """
322
    Wrapper function which is used to shuffle graph data on all the processes.
323

324
    Parameters:
325
326
327
328
    -----------
    rank : int
        rank of the current process
    world_size : int
329
        total no. of participating processes.
330
    node_feautres : dicitonary
331
332
        dictionry where node_features are stored and this information is read from the appropriate
        node features file which belongs to the current process
333
334
335
    edge_features : dictionary
        dictionary where edge_features are stored. This information is read from the appropriate
        edge feature files whose ownership is assigned to the current process
336
337
338
339
340
    node_feat_tids: dictionary
        in which keys are node-type names and values are triplets. Each triplet has node-feature name
        and the starting and ending type ids of the node-feature data read from the corresponding
        node feature data file read by current process. Each node type may have several features and
        hence each key may have several triplets.
341
342
343
344
    edge_feat_tids : dictionary
        a dictionary in which keys are edge-type names and values are triplets of the format
        <feat-name, start-per-type-idx, end-per-type-idx>. This triplet is used to identify 
        the chunk of feature data for which current process is responsible for
345
    edge_data : dictionary
346
        dictionary which is used to store edge information as read from appropriate files assigned
347
        to each process.
348
349
350
    id_lookup : instance of class DistLookupService
       Distributed lookup service used to map global-nids to respective partition-ids and 
       shuffle-global-nids
351
    ntypes_ntypeid_map : dictionary
352
        mappings between node type names and node type ids
353
    ntypes_gnid_range_map : dictionary
354
        mapping between node type names and global_nids which belong to the keys in this dictionary
355
356
357
    etypes_geid_range_map : dictionary
        mapping between edge type names and global_eids which are assigned to the edges of this
        edge_type
358
    ntid_ntype_map : dictionary
359
        mapping between node type id and no of nodes which belong to each node_type_id
360
361
    schema_map : dictionary
        is the data structure read from the metadata json file for the input graph
362
363
364

    Returns:
    --------
365
    dictionary :
366
367
        the input argument, node_data dictionary, is updated with the node data received from other processes
        in the world. The node data is received by each rank in the process of data shuffling.
368
369
    dictionary :
        node features dictionary which has node features for the nodes which are owned by the current
370
        process
371
372
    dictionary :
        list of global_nids for the nodes whose node features are received when node features shuffling was
373
        performed in the `exchange_features` function call
374
    dictionary :
375
376
        the input argument, edge_data dictionary, is updated with the edge data received from other processes
        in the world. The edge data is received by each rank in the process of data shuffling.
377
378
379
380
381
382
    dictionary : 
        edge features dictionary which has edge features. These destination end points of these edges
        are owned by the current process
    dictionary :
        list of global_eids for the edges whose edge features are received when edge features shuffling
        was performed in the `exchange_features` function call
383
    """
384
    memory_snapshot("ShuffleNodeFeaturesBegin: ", rank)
385
386
387
    rcvd_node_features, rcvd_global_nids = exchange_features(rank, world_size, node_feat_tids,
                                                ntypes_gnid_range_map, id_lookup, node_features,
                                                constants.STR_NODE_FEATURES, None)
388
    memory_snapshot("ShuffleNodeFeaturesComplete: ", rank)
389
    logging.info(f'[Rank: {rank}] Done with node features exchange.')
390

391
392
393
394
395
    rcvd_edge_features, rcvd_global_eids = exchange_features(rank, world_size, edge_feat_tids,
                                                etypes_geid_range_map, id_lookup, edge_features,
                                                constants.STR_EDGE_FEATURES, edge_data)
    logging.info(f'[Rank: {rank}] Done with edge features exchange.')

396
    node_data = gen_node_data(rank, world_size, id_lookup, ntid_ntype_map, schema_map)
397
    memory_snapshot("NodeDataGenerationComplete: ", rank)
398
    edge_data = exchange_edge_data(rank, world_size, edge_data)
399

400
    memory_snapshot("ShuffleEdgeDataComplete: ", rank)
401
    return node_data, rcvd_node_features, rcvd_global_nids, edge_data, rcvd_edge_features, rcvd_global_eids
402

403
def read_dataset(rank, world_size, id_lookup, params, schema_map):
404
405
    """
    This function gets the dataset and performs post-processing on the data which is read from files.
406
    Additional information(columns) are added to nodes metadata like owner_process, global_nid which
407
408
    are later used in processing this information. For edge data, which is now a dictionary, we add new columns
    like global_edge_id and owner_process. Augmenting these data structure helps in processing these data structures
409
    when data shuffling is performed.
410
411
412
413
414

    Parameters:
    -----------
    rank : int
        rank of the current process
415
    world_size : int
416
        total no. of processes instantiated
417
418
419
    id_lookup : instance of class DistLookupService
       Distributed lookup service used to map global-nids to respective partition-ids and 
       shuffle-global-nids
420
    params : argparser object
421
        argument parser object to access command line arguments
422
423
    schema_map : dictionary
        dictionary created by reading the input graph metadata json file
424

425
    Returns :
426
427
    ---------
    dictionary
428
429
        in which keys are node-type names and values are are tuples representing the range of ids
        for nodes to be read by the current process
430
431
    dictionary
        node features which is a dictionary where keys are feature names and values are feature
432
        data as multi-dimensional tensors
433
434
435
436
437
    dictionary
        in which keys are node-type names and values are triplets. Each triplet has node-feature name
        and the starting and ending type ids of the node-feature data read from the corresponding
        node feature data file read by current process. Each node type may have several features and
        hence each key may have several triplets.
438
    dictionary
439
440
        edge data information is read from edges.txt and additional columns are added such as
        owner process for each edge.
441
442
    dictionary
        edge features which is also a dictionary, similar to node features dictionary
443
444
445
446
447
448
449
    dictionary
        a dictionary in which keys are edge-type names and values are tuples indicating the range of ids
        for edges read by the current process.
    dictionary
        a dictionary in which keys are edge-type names and values are triplets, 
        (edge-feature-name, start_type_id, end_type_id). These type_ids are indices in the edge-features
        read by the current process. Note that each edge-type may have several edge-features.
450
451
    """
    edge_features = {}
452
    #node_tids, node_features, edge_datadict, edge_tids
453
    node_tids, node_features, node_feat_tids, edge_data, edge_tids, edge_features, edge_feat_tids = \
454
        get_dataset(params.input_dir, params.graph_name, rank, world_size, schema_map)
455
    logging.info(f'[Rank: {rank}] Done reading dataset deom {params.input_dir}')
456

457
    edge_data = augment_edge_data(edge_data, id_lookup, edge_tids, rank, world_size)
458
    logging.info(f'[Rank: {rank}] Done augmenting edge_data: {len(edge_data)}, {edge_data[constants.GLOBAL_SRC_ID].shape}')
459

460
    return node_tids, node_features, node_feat_tids, edge_data, edge_features, edge_tids, edge_feat_tids
461
462
463

def gen_dist_partitions(rank, world_size, params):
    """
464
465
    Function which will be executed by all Gloo processes to begin execution of the pipeline.
    This function expects the input dataset is split across multiple file format.
466

467
    Input dataset and its file structure is described in metadata json file which is also part of the
468
469
    input dataset. On a high-level, this metadata json file contains information about the following items
    a) Nodes metadata, It is assumed that nodes which belong to each node-type are split into p files
470
471
       (wherer `p` is no. of partitions).
    b) Similarly edge metadata contains information about edges which are split into p-files.
472
473
474
475
476
    c) Node and Edge features, it is also assumed that each node (and edge) feature, if present, is also
       split into `p` files.

    For example, a sample metadata json file might be as follows: :
    (In this toy example, we assume that we have "m" node-types, "k" edge types, and for node_type = ntype0-name
477
     we have two features namely feat0-name and feat1-name. Please note that the node-features are also split into
478
479
480
481
     `p` files. This will help in load-balancing during data-shuffling phase).

    Terminology used to identify any particular "id" assigned to nodes, edges or node features. Prefix "global" is
    used to indicate that this information is either read from the input dataset or autogenerated based on the information
482
483
484
485
    read from input dataset files. Prefix "type" is used to indicate a unique id assigned to either nodes or edges.
    For instance, type_node_id means that a unique id, with a given node type,  assigned to a node. And prefix "shuffle"
    will be used to indicate a unique id, across entire graph, assigned to either a node or an edge. For instance,
    SHUFFLE_GLOBAL_NID means a unique id which is assigned to a node after the data shuffle is completed.
486

487
488
    Some high-level notes on the structure of the metadata json file.
    1. path(s) mentioned in the entries for nodes, edges and node-features files can be either absolute or relative.
489
       if these paths are relative, then it is assumed that they are relative to the folder from which the execution is
490
491
492
493
       launched.
    2. The id_startx and id_endx represent the type_node_id and type_edge_id respectively for nodes and edge data. This
       means that these ids should match the no. of nodes/edges read from any given file. Since these are type_ids for
       the nodes and edges in any given file, their global_ids can be easily computed as well.
494
495

    {
496
497
498
499
        "graph_name" : xyz,
        "node_type" : ["ntype0-name", "ntype1-name", ....], #m node types
        "num_nodes_per_chunk" : [
            [a0, a1, ...a<p-1>], #p partitions
500
            [b0, b1, ... b<p-1>],
501
502
503
504
505
506
            ....
            [c0, c1, ..., c<p-1>] #no, of node types
        ],
        "edge_type" : ["src_ntype:edge_type:dst_ntype", ....], #k edge types
        "num_edges_per_chunk" : [
            [a0, a1, ...a<p-1>], #p partitions
507
            [b0, b1, ... b<p-1>],
508
509
510
            ....
            [c0, c1, ..., c<p-1>] #no, of edge types
        ],
511
512
        "node_data" : {
            "ntype0-name" : {
513
514
515
516
517
518
                "feat0-name" : {
                    "format" : {"name": "numpy"},
                    "data" :   [ #list of lists
                        ["<path>/feat-0.npy", 0, id_end0],
                        ["<path>/feat-1.npy", id_start1, id_end1],
                        ....
519
                        ["<path>/feat-<p-1>.npy", id_start<p-1>, id_end<p-1>]
520
521
522
                    ]
                },
                "feat1-name" : {
523
                    "format" : {"name": "numpy"},
524
525
526
527
                    "data" : [ #list of lists
                        ["<path>/feat-0.npy", 0, id_end0],
                        ["<path>/feat-1.npy", id_start1, id_end1],
                        ....
528
                        ["<path>/feat-<p-1>.npy", id_start<p-1>, id_end<p-1>]
529
530
                    ]
                }
531
532
            }
        },
533
        "edges": { #k edge types
534
            "src_ntype:etype0-name:dst_ntype" : {
535
                "format": {"name" : "csv", "delimiter" : " "},
536
537
538
539
540
541
                "data" : [
                    ["<path>/etype0-name-0.txt", 0, id_end0], #These are type_edge_ids for edges of this type
                    ["<path>/etype0-name-1.txt", id_start1, id_end1],
                    ...,
                    ["<path>/etype0-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
                ]
542
543
            },
            ...,
544
            "src_ntype:etype<k-1>-name:dst_ntype" : {
545
                "format": {"name" : "csv", "delimiter" : " "},
546
547
548
549
550
551
                "data" : [
                    ["<path>/etype<k-1>-name-0.txt", 0, id_end0],
                    ["<path>/etype<k-1>-name-1.txt", id_start1, id_end1],
                    ...,
                    ["<path>/etype<k-1>-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
                ]
552
553
            },
        },
554
    }
555

556
    The function performs the following steps:
557
    1. Reads the metis partitions to identify the owner process of all the nodes in the entire graph.
558
    2. Reads the input data set, each partitipating process will map to a single file for the edges,
559
560
561
        node-features and edge-features for each node-type and edge-types respectively. Using nodes metadata
        information, nodes which are owned by a given process are generated to optimize communication to some
        extent.
562
    3. Now each process shuffles the data by identifying the respective owner processes using metis
563
564
565
566
        partitions.
        a. To identify owner processes for nodes, metis partitions will be used.
        b. For edges, the owner process of the destination node will be the owner of the edge as well.
        c. For node and edge features, identifying the owner process is a little bit involved.
567
568
            For this purpose, graph metadata json file is used to first map the locally read node features
            to their global_nids. Now owner process is identified using metis partitions for these global_nids
569
570
571
572
573
574
575
            to retrieve shuffle_global_nids. A similar process is used for edge_features as well.
        d. After all the data shuffling is done, the order of node-features may be different when compared to
            their global_type_nids. Node- and edge-data are ordered by node-type and edge-type respectively.
            And now node features and edge features are re-ordered to match the order of their node- and edge-types.
    4. Last step is to create the DGL objects with the data present on each of the processes.
        a. DGL objects for nodes, edges, node- and edge- features.
        b. Metadata is gathered from each process to create the global metadata json file, by process rank = 0.
576
577
578
579
580
581
582
583
584
585
586

    Parameters:
    ----------
    rank : int
        integer representing the rank of the current process in a typical distributed implementation
    world_size : int
        integer representing the total no. of participating processes in a typical distributed implementation
    params : argparser object
        this object, key value pairs, provides access to the command line arguments from the runtime environment
    """
    global_start = timer()
587
    logging.info(f'[Rank: {rank}] Starting distributed data processing pipeline...')
588
    memory_snapshot("Pipeline Begin: ", rank)
589
    #init processing
590
591
    schema_map = read_json(os.path.join(params.input_dir, params.schema))

592
593
594
595
596
597
598
599
    #Initialize distributed lookup service for partition-id and shuffle-global-nids mappings
    #for global-nids
    _, global_nid_ranges = get_idranges(schema_map[constants.STR_NODE_TYPE], 
                                        schema_map[constants.STR_NUM_NODES_PER_CHUNK])
    id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)
    id_lookup = DistLookupService(os.path.join(params.input_dir, params.partitions_dir),\
                                    schema_map[constants.STR_NODE_TYPE],\
                                    id_map, rank, world_size)
600
601

    ntypes_ntypeid_map, ntypes, ntypeid_ntypes_map = get_node_types(schema_map)
602
    etypes_etypeid_map, etypes, etypeid_etypes_map = get_edge_types(schema_map)
603
    logging.info(f'[Rank: {rank}] Initialized metis partitions and node_types map...')
604
605
606

    #read input graph files and augment these datastructures with
    #appropriate information (global_nid and owner process) for node and edge data
607
    node_tids, node_features, node_feat_tids, edge_data, edge_features, edge_tids, edge_feat_tids = \
608
        read_dataset(rank, world_size, id_lookup, params, schema_map)
609
    logging.info(f'[Rank: {rank}] Done augmenting file input data with auxilary columns')
610
    memory_snapshot("DatasetReadComplete: ", rank)
611

612
    #send out node and edge data --- and appropriate features.
613
614
    #this function will also stitch the data recvd from other processes
    #and return the aggregated data
615
    ntypes_gnid_range_map = get_gnid_range_map(node_tids)
616
617
618
619
620
621
    etypes_geid_range_map = get_gnid_range_map(edge_tids)
    node_data, rcvd_node_features, rcvd_global_nids, edge_data, rcvd_edge_features, rcvd_global_eids  = \
                    exchange_graph_data(rank, world_size, node_features, edge_features, \
                            node_feat_tids, edge_feat_tids, edge_data, id_lookup, ntypes_ntypeid_map, \
                            ntypes_gnid_range_map, etypes_geid_range_map, \
                            ntypeid_ntypes_map, schema_map)
622
    gc.collect()
623
    logging.info(f'[Rank: {rank}] Done with data shuffling...')
624
    memory_snapshot("DataShuffleComplete: ", rank)
625
626
627
628
629

    #sort node_data by ntype
    idx = node_data[constants.NTYPE_ID].argsort()
    for k, v in node_data.items():
        node_data[k] = v[idx]
630
631
    idx = None
    gc.collect()
632
    logging.info(f'[Rank: {rank}] Sorted node_data by node_type')
633

634

635
636
    #resolve global_ids for nodes
    assign_shuffle_global_nids_nodes(rank, world_size, node_data)
637
    logging.info(f'[Rank: {rank}] Done assigning global-ids to nodes...')
638
    memory_snapshot("ShuffleGlobalID_Nodes_Complete: ", rank)
639

640
    #shuffle node feature according to the node order on each rank.
641
642
643
    for ntype_name in ntypes:
        featnames = get_ntype_featnames(ntype_name, schema_map)
        for featname in featnames:
644
            #if a feature name exists for a node-type, then it should also have
645
            #feature data as well. Hence using the assert statement.
646
647
648
            feature_key = ntype_name+'/'+featname
            assert(feature_key in rcvd_global_nids)
            global_nids = rcvd_global_nids[feature_key]
649

650
            _, idx1, _ = np.intersect1d(node_data[constants.GLOBAL_NID], global_nids, return_indices=True)
651
652
            shuffle_global_ids = node_data[constants.SHUFFLE_GLOBAL_NID][idx1]
            feature_idx = shuffle_global_ids.argsort()
653
654

            rcvd_node_features[feature_key] = rcvd_node_features[feature_key][feature_idx]
655
    memory_snapshot("ReorderNodeFeaturesComplete: ", rank)
656
657
658
659
660

    #sort edge_data by etype
    sorted_idx = edge_data[constants.ETYPE_ID].argsort()
    for k, v in edge_data.items():
        edge_data[k] = v[sorted_idx]
661
662
    sorted_idx = None
    gc.collect()
663
664

    shuffle_global_eid_start = assign_shuffle_global_nids_edges(rank, world_size, edge_data)
665
    logging.info(f'[Rank: {rank}] Done assigning global_ids to edges ...')
666
    memory_snapshot("ShuffleGlobalID_Edges_Complete: ", rank)
667

668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
    #Shuffle edge features according to the edge order on each rank.
    for etype_name in etypes:
        featnames = get_etype_featnames(etype_name, schema_map)
        for featname in featnames:
            feature_key = etype_name+'/'+featname
            assert feature_key in rcvd_global_eids
            global_eids = rcvd_global_eids[feature_key]

            _, idx1, _ = np.intersect1d(edge_data[constants.GLOBAL_EID], global_eids, return_indices=True)
            shuffle_global_ids = edge_data[constants.SHUFFLE_GLOBAL_EID][idx1]
            feature_idx = shuffle_global_ids.argsort()

            rcvd_edge_features[feature_key] = rcvd_edge_features[feature_key][feature_idx]

    for k, v in rcvd_edge_features.items():
        logging.info(f'[Rank: {rank}] key: {k} v: {v.shape}')

685
    #determine global-ids for edge end-points
686
    edge_data = lookup_shuffle_global_nids_edges(rank, world_size, edge_data, id_lookup, node_data)
687
    logging.info(f'[Rank: {rank}] Done resolving orig_node_id for local node_ids...')
688
    memory_snapshot("ShuffleGlobalID_Lookup_Complete: ", rank)
689
690
691
692
693

    #create dgl objects here
    start = timer()
    num_nodes = 0
    num_edges = shuffle_global_eid_start
694
695
    node_count = len(node_data[constants.NTYPE_ID])
    edge_count = len(edge_data[constants.ETYPE_ID])
696
697
698
    graph_obj, ntypes_map_val, etypes_map_val, ntypes_map, etypes_map, \
        orig_nids, orig_eids = create_dgl_object(schema_map, rank, node_data, \
            edge_data, num_edges, params.save_orig_nids, params.save_orig_eids)
699
    memory_snapshot("CreateDGLObjectsComplete: ", rank)
700
    write_dgl_objects(graph_obj, rcvd_node_features, rcvd_edge_features, params.output, \
701
        rank, orig_nids, orig_eids)
702
    memory_snapshot("DiskWriteDGLObjectsComplete: ", rank)
703

704
    #get the meta-data
705
    json_metadata = create_metadata_json(params.graph_name, node_count, edge_count, \
706
                            rank, world_size, ntypes_map_val, \
707
                            etypes_map_val, ntypes_map, etypes_map, params.output)
708
    memory_snapshot("MetadataCreateComplete: ", rank)
709
710
711
712
713
714
715
716
717
718

    if (rank == 0):
        #get meta-data from all partitions and merge them on rank-0
        metadata_list = gather_metadata_json(json_metadata, rank, world_size)
        metadata_list[0] = json_metadata
        write_metadata_json(metadata_list, params.output, params.graph_name)
    else:
        #send meta-data to Rank-0 process
        gather_metadata_json(json_metadata, rank, world_size)
    end = timer()
719
    logging.info(f'[Rank: {rank}] Time to create dgl objects: {timedelta(seconds = end - start)}')
720
    memory_snapshot("MetadataWriteComplete: ", rank)
721
722

    global_end = timer()
723
    logging.info(f'[Rank: {rank}] Total execution time of the program: {timedelta(seconds = global_end - global_start)}')
724
    memory_snapshot("PipelineComplete: ", rank)
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786

def single_machine_run(params):
    """ Main function for distributed implementation on a single machine

    Parameters:
    -----------
    params : argparser object
        Argument Parser structure with pre-determined arguments as defined
        at the bottom of this file.
    """
    log_params(params)
    processes = []
    mp.set_start_method("spawn")

    #Invoke `target` function from each of the spawned process for distributed
    #implementation
    for rank in range(params.world_size):
        p = mp.Process(target=run, args=(rank, params.world_size, gen_dist_partitions, params))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

def run(rank, world_size, func_exec, params, backend="gloo"):
    """
    Init. function which is run by each process in the Gloo ProcessGroup

    Parameters:
    -----------
    rank : integer
        rank of the process
    world_size : integer
        number of processes configured in the Process Group
    proc_exec : function name
        function which will be invoked which has the logic for each process in the group
    params : argparser object
        argument parser object to access the command line arguments
    backend : string
        string specifying the type of backend to use for communication
    """
    os.environ["MASTER_ADDR"] = '127.0.0.1'
    os.environ["MASTER_PORT"] = '29500'

    #create Gloo Process Group
    dist.init_process_group(backend, rank=rank, world_size=world_size, timeout=timedelta(seconds=5*60))

    #Invoke the main function to kick-off each process
    func_exec(rank, world_size, params)

def multi_machine_run(params):
    """
    Function to be invoked when executing data loading pipeline on multiple machines

    Parameters:
    -----------
    params : argparser object
        argparser object providing access to command line arguments.
    """
    rank = int(os.environ["RANK"])

    #init the gloo process group here.
787
788
789
790
    dist.init_process_group(
            backend="gloo",
            rank=rank,
            world_size=params.world_size,
791
            timeout=timedelta(seconds=params.process_group_timeout))
792
    logging.info(f'[Rank: {rank}] Done with process group initialization...')
793
794
795

    #invoke the main function here.
    gen_dist_partitions(rank, params.world_size, params)
796
    logging.info(f'[Rank: {rank}] Done with Distributed data processing pipeline processing.')