"vscode:/vscode.git/clone" did not exist on "2b302b93938c3de5fc98c5149a7ebcce86648051"
data_shuffle.py 46.7 KB
Newer Older
1
2
3
import gc
import logging
import math
4
5
import os
import sys
6
7
8
9
from datetime import timedelta
from timeit import default_timer as timer

import dgl
10
11
12
13
14
import numpy as np
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

15
import constants
16
from convert_partition import create_dgl_object, create_metadata_json
17
from dataset_utils import get_dataset
18
from dist_lookup import DistLookupService
19
20
21
22
from globalids import (assign_shuffle_global_nids_edges,
                       assign_shuffle_global_nids_nodes,
                       lookup_shuffle_global_nids_edges)
from gloo_wrapper import allgather_sizes, alltoallv_cpu, gather_metadata_json
23
24
25
26
from utils import (augment_edge_data, get_edge_types, get_etype_featnames,
                   get_gnid_range_map, get_idranges, get_node_types,
                   get_ntype_featnames, memory_snapshot, read_json,
                   read_ntype_partition_files, write_dgl_objects,
27
                   write_metadata_json, map_partid_rank)
28

29
def gen_node_data(rank, world_size, num_parts, id_lookup, ntid_ntype_map, schema_map):
30
31
32
    '''
    For this data processing pipeline, reading node files is not needed. All the needed information about
    the nodes can be found in the metadata json file. This function generates the nodes owned by a given
33
    process, using metis partitions.
34

35
    Parameters:
36
37
    -----------
    rank : int
38
        rank of the process
39
    world_size : int
40
        total no. of processes
41
42
    num_parts : int
        total no. of partitions
43
44
45
    id_lookup : instance of class DistLookupService
       Distributed lookup service used to map global-nids to respective partition-ids and 
       shuffle-global-nids
46
    ntid_ntype_map : 
47
        a dictionary where keys are node_type ids(integers) and values are node_type names(strings).
48
    schema_map:
49
        dictionary formed by reading the input metadata json file for the input dataset.
50
51
52

        Please note that, it is assumed that for the input graph files, the nodes of a particular node-type are
        split into `p` files (because of `p` partitions to be generated). On a similar node, edges of a particular
53
        edge-type are split into `p` files as well.
54

55
56
        #assuming m nodetypes present in the input graph
        "num_nodes_per_chunk" : [
57
58
            [a0, a1, a2, ... a<p-1>],
            [b0, b1, b2, ... b<p-1>],
59
60
61
            ...
            [m0, m1, m2, ... m<p-1>]
        ]
62
        Here, each sub-list, corresponding a nodetype in the input graph, has `p` elements. For instance [a0, a1, ... a<p-1>]
63
64
65
        where each element represents the number of nodes which are to be processed by a process during distributed partitioning.

        In addition to the above key-value pair for the nodes in the graph, the node-features are captured in the
66
        "node_data" key-value pair. In this dictionary the keys will be nodetype names and value will be a dictionary which
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
        is used to capture all the features present for that particular node-type. This is shown in the following example:

        "node_data" : {
            "paper": {       # node type
                "feat": {   # feature key
                    "format": {"name": "numpy"},
                    "data": ["node_data/paper-feat-part1.npy", "node_data/paper-feat-part2.npy"]
                },
                "label": {   # feature key
                    "format": {"name": "numpy"},
                    "data": ["node_data/paper-label-part1.npy", "node_data/paper-label-part2.npy"]
                },
                "year": {   # feature key
                    "format": {"name": "numpy"},
                    "data": ["node_data/paper-year-part1.npy", "node_data/paper-year-part2.npy"]
                }
            }
        }
85
86
87
        In the above textual description we have a node-type, which is paper, and it has 3 features namely feat, label and year.
        Each feature has `p` files whose location in the filesystem is the list for the key "data" and "foramt" is used to
        describe storage format.
88
89
90

    Returns:
    --------
91
92
    dictionary :
        dictionary where keys are column names and values are numpy arrays, these arrays are generated by
93
94
95
        using information present in the metadata json file

    '''
96
97
98
99
100
101
102
103
104
105
    local_node_data = {}
    for local_part_id in range(num_parts//world_size):
        local_node_data[constants.GLOBAL_NID+"/"+str(local_part_id)] = []
        local_node_data[constants.NTYPE_ID+"/"+str(local_part_id)] = []
        local_node_data[constants.GLOBAL_TYPE_NID+"/"+str(local_part_id)] = []

    # Note that `get_idranges` always returns two dictionaries. Keys in these
    # dictionaries are type names for nodes and edges and values are 
    # `num_parts` number of tuples indicating the range of type-ids in first
    # dictionary and range of global-nids in the second dictionary. 
106
    type_nid_dict, global_nid_dict = get_idranges(schema_map[constants.STR_NODE_TYPE],
107
                                        schema_map[constants.STR_NUM_NODES_PER_CHUNK],
108
                                        num_chunks=num_parts)
109

110
    for ntype_id, ntype_name in ntid_ntype_map.items():
111
112
        type_start, type_end = type_nid_dict[ntype_name][0][0], type_nid_dict[ntype_name][-1][1]
        gnid_start, gnid_end = global_nid_dict[ntype_name][0, 0], global_nid_dict[ntype_name][0, 1]
113

114
        node_partid_slice = id_lookup.get_partition_ids(np.arange(gnid_start, gnid_end, dtype=np.int64)) #exclusive
115

116
117
118
119
120
121
122
        for local_part_id in range(num_parts//world_size):
            cond = node_partid_slice == (rank + local_part_id*world_size)
            own_gnids = np.arange(gnid_start, gnid_end, dtype=np.int64)
            own_gnids = own_gnids[cond]

            own_tnids = np.arange(type_start, type_end, dtype=np.int64)
            own_tnids = own_tnids[cond]
123

124
125
126
            local_node_data[constants.NTYPE_ID+"/"+str(local_part_id)].append(np.ones(own_gnids.shape, dtype=np.int64)*ntype_id)
            local_node_data[constants.GLOBAL_NID+"/"+str(local_part_id)].append(own_gnids)
            local_node_data[constants.GLOBAL_TYPE_NID+"/"+str(local_part_id)].append(own_tnids)
127
128
129
130
131

    for k in local_node_data.keys():
        local_node_data[k] = np.concatenate(local_node_data[k])

    return local_node_data
132

133
def exchange_edge_data(rank, world_size, num_parts, edge_data):
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
    """
    Exchange edge_data among processes in the world.
    Prepare list of sliced data targeting each process and trigger
    alltoallv_cpu to trigger messaging api

    Parameters:
    -----------
    rank : int
        rank of the process
    world_size : int
        total no. of processes
    edge_data : dictionary
        edge information, as a dicitonary which stores column names as keys and values
        as column data. This information is read from the edges.txt file.

    Returns:
    --------
151
    dictionary :
152
153
154
155
        the input argument, edge_data, is updated with the edge data received by other processes
        in the world.
    """

156
    # Prepare data for each rank in the cluster. 
157
    start = timer()
158
159
160
161
162
163
164
    for local_part_id in range(num_parts//world_size):

        input_list = []
        for idx in range(world_size):
            send_idx = (edge_data[constants.OWNER_PROCESS] == (idx + local_part_id*world_size))
            send_idx = send_idx.reshape(edge_data[constants.GLOBAL_SRC_ID].shape[0])
            filt_data = np.column_stack((edge_data[constants.GLOBAL_SRC_ID][send_idx == 1], \
165
166
167
168
                                    edge_data[constants.GLOBAL_DST_ID][send_idx == 1], \
                                    edge_data[constants.GLOBAL_TYPE_EID][send_idx == 1], \
                                    edge_data[constants.ETYPE_ID][send_idx == 1], \
                                    edge_data[constants.GLOBAL_EID][send_idx == 1]))
169
170
171
172
173
174
            if(filt_data.shape[0] <= 0):
                input_list.append(torch.empty((0,5), dtype=torch.int64))
            else:
                input_list.append(torch.from_numpy(filt_data))

        dist.barrier ()
175
        output_list = alltoallv_cpu(rank, world_size, input_list, retain_nones=False)
176
177
178
179
180
181
182
183

        #Replace the values of the edge_data, with the received data from all the other processes.
        rcvd_edge_data = torch.cat(output_list).numpy()
        edge_data[constants.GLOBAL_SRC_ID+"/"+str(local_part_id)] = rcvd_edge_data[:,0]
        edge_data[constants.GLOBAL_DST_ID+"/"+str(local_part_id)] = rcvd_edge_data[:,1]
        edge_data[constants.GLOBAL_TYPE_EID+"/"+str(local_part_id)] = rcvd_edge_data[:,2]
        edge_data[constants.ETYPE_ID+"/"+str(local_part_id)] = rcvd_edge_data[:,3]
        edge_data[constants.GLOBAL_EID+"/"+str(local_part_id)] = rcvd_edge_data[:,4]
184

185
    end = timer()
186
    logging.info(f'[Rank: {rank}] Time to send/rcv edge data: {timedelta(seconds=end-start)}')
187

188
    # Clean up.
189
    edge_data.pop(constants.OWNER_PROCESS)
190
191
192
193
194
195
    edge_data.pop(constants.GLOBAL_SRC_ID)
    edge_data.pop(constants.GLOBAL_DST_ID)
    edge_data.pop(constants.GLOBAL_TYPE_EID)
    edge_data.pop(constants.ETYPE_ID)
    edge_data.pop(constants.GLOBAL_EID)

196
197
    return edge_data

198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def exchange_feature(rank, data, id_lookup, feat_type, feat_key, featdata_key, gid_start,
        gid_end, type_id_start, type_id_end, local_part_id, world_size, num_parts, 
        cur_features, cur_global_ids):
    """This function is used to send/receive one feature for either nodes or 
    edges of the input graph dataset.

    Parameters:
    -----------
    rank : int
	integer, unique id assigned to the current process
    data: dicitonary
        dictionry in which node or edge features are stored and this information 
        is read from the appropriate node features file which belongs to the 
        current process
    id_lookup : instance of DistLookupService
        instance of an implementation of dist. lookup service to retrieve values
        for keys
    feat_type : string
        this is used to distinguish which features are being exchanged. Please 
        note that for nodes ownership is clearly defined and for edges it is 
        always assumed that destination end point of the edge defines the 
        ownership of that particular edge
    feat_key : string
        this string is used as a key in the dictionary to store features, as 
        tensors, in local dictionaries
    featdata_key : numpy array
        features associated with this feature key being processed
    gid_start : int
        starting global_id, of either node or edge, for the feature data
    gid_end : int
        ending global_if, of either node or edge, for the feature data
    type_id_start : int
        starting type_id for the feature data
    type_id_end : int
        ending type_id for the feature data
    local_part_id : int
        integers used to the identify the local partition id used to locate
        data belonging to this partition
    world_size : int
        total number of processes created
    num_parts : int
        total number of partitions
    cur_features : dictionary
        dictionary to store the feature data which belongs to the current 
        process
    cur_global_ids : dictionary
        dictionary to store global ids, of either nodes or edges, for which 
        the features stored in the cur_features dictionary
    
    Returns:
    -------
    dictionary :
        a dictionary is returned where keys are type names and 
        feature data are the values
    list :
        a dictionary of global_ids either nodes or edges whose features are 
        received during the data shuffle process
    """
    #type_ids for this feature subset on the current rank
    gids_feat = np.arange(gid_start, gid_end)
    tids_feat = np.arange(type_id_start, type_id_end)
    local_idx = np.arange(0, type_id_end - type_id_start)

    feats_per_rank = []
    global_id_per_rank = []

    tokens = feat_key.split("/")
    assert len(tokens) == 3
    local_feat_key = "/".join(tokens[:-1]) +"/"+ str(local_part_id)
    for idx in range(world_size):
        # Get the partition ids for the range of global nids.
        if feat_type == constants.STR_NODE_FEATURES:
            # Retrieve the partition ids for the node features.
            # Each partition id will be in the range [0, num_parts).
            partid_slice = id_lookup.get_partition_ids(np.arange(gid_start, gid_end, dtype=np.int64))
        else:
            #Edge data case. 
            #Ownership is determined by the destination node.
            assert data is not None
            global_eids = np.arange(gid_start, gid_end, dtype=np.int64)

            #Now use `data` to extract destination nodes' global id 
            #and use that to get the ownership
            common, idx1, idx2 = np.intersect1d(data[constants.GLOBAL_EID], global_eids, return_indices=True)
            assert common.shape[0] == idx2.shape[0]

            global_dst_nids = data[constants.GLOBAL_DST_ID][idx1]
            assert np.all(global_eids == data[constants.GLOBAL_EID][idx1])
            partid_slice = id_lookup.get_partition_ids(global_dst_nids)

        cond = (partid_slice == (idx + local_part_id*world_size))
        gids_per_partid = gids_feat[cond]
        tids_per_partid = tids_feat[cond]
        local_idx_partid = local_idx[cond]

        if (gids_per_partid.shape[0] == 0):
            feats_per_rank.append(torch.empty((0,1), dtype=torch.float))
295
            global_id_per_rank.append(torch.empty((0,1), dtype=torch.int64))
296
297
298
299
300
301
        else:
            feats_per_rank.append(featdata_key[local_idx_partid])
            global_id_per_rank.append(torch.from_numpy(gids_per_partid).type(torch.int64))

    #features (and global nids) per rank to be sent out are ready
    #for transmission, perform alltoallv here.
302
303
304
305
306
307
    output_feat_list = alltoallv_cpu(rank, world_size, feats_per_rank, retain_nones=False)
    output_id_list = alltoallv_cpu(rank, world_size, global_id_per_rank, retain_nones=False)
    assert len(output_feat_list) == len(output_id_list), (
        "Length of feature list and id list are expected to be equal while "
        f"got {len(output_feat_list)} and {len(output_id_list)}."
    )
308
309

    #stitch node_features together to form one large feature tensor
310
311
312
313
314
315
316
317
318
319
320
    if len(output_feat_list) > 0:
        output_feat_list = torch.cat(output_feat_list)
        output_id_list = torch.cat(output_id_list)
        if local_feat_key in cur_features: 
            temp = cur_features[local_feat_key]
            cur_features[local_feat_key] = torch.cat([temp, output_feat_list])
            temp = cur_global_ids[local_feat_key]
            cur_global_ids[local_feat_key] = torch.cat([temp, output_id_list])
        else:
            cur_features[local_feat_key] = output_feat_list
            cur_global_ids[local_feat_key] = output_id_list
321
322
323
324
325

    return cur_features, cur_global_ids


def exchange_features(rank, world_size, num_parts, feature_tids, type_id_map, id_lookup, feature_data, feat_type, data):
326
327
    """
    This function is used to shuffle node features so that each process will receive
328
    all the node features whose corresponding nodes are owned by the same process.
329
330
    The mapping procedure to identify the owner process is not straight forward. The
    following steps are used to identify the owner processes for the locally read node-
331
    features.
332
333
    a. Compute the global_nids for the locally read node features. Here metadata json file
        is used to identify the corresponding global_nids. Please note that initial graph input
334
335
336
337
338
        nodes.txt files are sorted based on node_types.
    b. Using global_nids and metis partitions owner processes can be easily identified.
    c. Now each process sends the global_nids for which shuffle_global_nids are needed to be
        retrieved.
    d. After receiving the corresponding shuffle_global_nids these ids are added to the
339
        node_data and edge_data dictionaries
340

341
342
    This pipeline assumes all the input data in numpy format, except node/edge features which
    are maintained as tensors throughout the various stages of the pipeline execution.
343

344
    Parameters:
345
346
347
348
    -----------
    rank : int
        rank of the current process
    world_size : int
349
        total no. of participating processes.
350
351
352
353
354
355
356
357
358
359
    feature_tids : dictionary
        dictionary with keys as node-type names with suffixes as feature names 
        and value is a dictionary. This dictionary contains information about
        node-features associated with a given node-type and value is a list.
        This list contains a of indexes, like [starting-idx, ending-idx) which
        can be used to index into the node feature tensors read from 
        corresponding input files.
    type_id_map : dictionary
        mapping between type names and global_ids, of either nodes or edges, 
        which belong to the keys in this dictionary
360
    id_lookup : instance of class DistLookupService
361
362
       Distributed lookup service used to map global-nids to respective 
       partition-ids and shuffle-global-nids
363
    feat_type : string
364
365
366
367
368
369
370
371
        this is used to distinguish which features are being exchanged. Please 
        note that for nodes ownership is clearly defined and for edges it is 
        always assumed that destination end point of the edge defines the 
        ownership of that particular edge
    data: dicitonary
        dictionry in which node or edge features are stored and this information
        is read from the appropriate node features file which belongs to the 
        current process
372
373
374

    Returns:
    --------
375
    dictionary :
376
377
378
379
380
        a dictionary is returned where keys are type names and 
        feature data are the values
    list :
        a dictionary of global_ids either nodes or edges whose features are 
        received during the data shuffle process
381
382
    """
    start = timer()
383
    own_features = {}
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
    own_global_ids = {}

    # To iterate over the node_types and associated node_features
    for feat_key, type_info in feature_tids.items():

        # To iterate over the feature data, of a given (node or edge )type
        # type_info is a list of 3 elements (as shown below):
        #   [feature-name, starting-idx, ending-idx]
        #       feature-name is the name given to the feature-data, 
        #       read from the input metadata file
        #       [starting-idx, ending-idx) specifies the range of indexes 
        #        associated with the features data 
        # Determine the owner process for these features.
        # Note that the keys in the node features (and similarly edge features)
        # dictionary is of the following format: 
        #   `node_type/feature_name/local_part_id`:
        #    where node_type and feature_name are self-explanatory and 
        #    local_part_id denotes the partition-id, in the local process, 
        #    which will be used a suffix to store all the information of a 
        #    given partition which is processed by the current process. Its
        #    values start from 0 onwards, for instance 0, 1, 2 ... etc. 
        #    local_part_id can be easily mapped to global partition id very
        #    easily, using cyclic ordering. All local_part_ids = 0 from all 
        #    processes will form global partition-ids between 0 and world_size-1.
        #    Similarly all local_part_ids = 1 from all processes will form
        #    global partition ids in the range [world_size, 2*world_size-1] and
        #    so on.
        tokens = feat_key.split("/")
        assert len(tokens) == 3
        type_name = tokens[0]
        feat_name = tokens[1]
        logging.info(f'[Rank: {rank}] processing feature: {feat_key}')
416

417
418
419
420
421
422
423
424
425
426
427
        for feat_info in type_info:
            # Compute the global_id range for this feature data
            type_id_start = int(feat_info[0])
            type_id_end = int(feat_info[1])
            begin_global_id = type_id_map[type_name][0]
            gid_start = begin_global_id + type_id_start
            gid_end = begin_global_id + type_id_end

            # Check if features exist for this type_name + feat_name.
            # This check should always pass, because feature_tids are built
            # by reading the input metadata json file for existing features.
428
            assert(feat_key in feature_data)
429

430
431
432
433
434
435
            for local_part_id in range(num_parts//world_size):
                featdata_key = feature_data[feat_key]
                own_features, own_global_ids = exchange_feature(rank, data, id_lookup,
                        feat_type, feat_key, featdata_key, gid_start, gid_end, type_id_start, 
                        type_id_end, local_part_id, world_size, num_parts, own_features, 
                        own_global_ids)
436
437

    end = timer()
438
    logging.info(f'[Rank: {rank}] Total time for feature exchange: {timedelta(seconds = end - start)}')
439
    return own_features, own_global_ids
440

441
def exchange_graph_data(rank, world_size, num_parts, node_features, edge_features, 
442
443
444
445
        node_feat_tids, edge_feat_tids, 
        edge_data, id_lookup, ntypes_ntypeid_map, 
        ntypes_gnid_range_map, etypes_geid_range_map, 
        ntid_ntype_map, schema_map):
446
    """
447
    Wrapper function which is used to shuffle graph data on all the processes.
448

449
    Parameters:
450
451
452
453
    -----------
    rank : int
        rank of the current process
    world_size : int
454
        total no. of participating processes.
455
456
    num_parts : int
        total no. of graph partitions.
457
    node_feautres : dicitonary
458
459
        dictionry where node_features are stored and this information is read from the appropriate
        node features file which belongs to the current process
460
461
462
    edge_features : dictionary
        dictionary where edge_features are stored. This information is read from the appropriate
        edge feature files whose ownership is assigned to the current process
463
464
465
466
467
    node_feat_tids: dictionary
        in which keys are node-type names and values are triplets. Each triplet has node-feature name
        and the starting and ending type ids of the node-feature data read from the corresponding
        node feature data file read by current process. Each node type may have several features and
        hence each key may have several triplets.
468
469
470
471
    edge_feat_tids : dictionary
        a dictionary in which keys are edge-type names and values are triplets of the format
        <feat-name, start-per-type-idx, end-per-type-idx>. This triplet is used to identify 
        the chunk of feature data for which current process is responsible for
472
    edge_data : dictionary
473
        dictionary which is used to store edge information as read from appropriate files assigned
474
        to each process.
475
476
477
    id_lookup : instance of class DistLookupService
       Distributed lookup service used to map global-nids to respective partition-ids and 
       shuffle-global-nids
478
    ntypes_ntypeid_map : dictionary
479
        mappings between node type names and node type ids
480
    ntypes_gnid_range_map : dictionary
481
        mapping between node type names and global_nids which belong to the keys in this dictionary
482
483
484
    etypes_geid_range_map : dictionary
        mapping between edge type names and global_eids which are assigned to the edges of this
        edge_type
485
    ntid_ntype_map : dictionary
486
        mapping between node type id and no of nodes which belong to each node_type_id
487
488
    schema_map : dictionary
        is the data structure read from the metadata json file for the input graph
489
490
491

    Returns:
    --------
492
    dictionary :
493
494
        the input argument, node_data dictionary, is updated with the node data received from other processes
        in the world. The node data is received by each rank in the process of data shuffling.
495
496
    dictionary :
        node features dictionary which has node features for the nodes which are owned by the current
497
        process
498
499
    dictionary :
        list of global_nids for the nodes whose node features are received when node features shuffling was
500
        performed in the `exchange_features` function call
501
    dictionary :
502
503
        the input argument, edge_data dictionary, is updated with the edge data received from other processes
        in the world. The edge data is received by each rank in the process of data shuffling.
504
505
506
507
508
509
    dictionary : 
        edge features dictionary which has edge features. These destination end points of these edges
        are owned by the current process
    dictionary :
        list of global_eids for the edges whose edge features are received when edge features shuffling
        was performed in the `exchange_features` function call
510
    """
511
    memory_snapshot("ShuffleNodeFeaturesBegin: ", rank)
512
    rcvd_node_features, rcvd_global_nids = exchange_features(rank, world_size, num_parts, node_feat_tids,
513
514
                                                ntypes_gnid_range_map, id_lookup, node_features,
                                                constants.STR_NODE_FEATURES, None)
515
    memory_snapshot("ShuffleNodeFeaturesComplete: ", rank)
516
    logging.info(f'[Rank: {rank}] Done with node features exchange.')
517

518
    rcvd_edge_features, rcvd_global_eids = exchange_features(rank, world_size, num_parts, edge_feat_tids,
519
520
521
522
                                                etypes_geid_range_map, id_lookup, edge_features,
                                                constants.STR_EDGE_FEATURES, edge_data)
    logging.info(f'[Rank: {rank}] Done with edge features exchange.')

523
    node_data = gen_node_data(rank, world_size, num_parts, id_lookup, ntid_ntype_map, schema_map)
524
    memory_snapshot("NodeDataGenerationComplete: ", rank)
525

526
    edge_data = exchange_edge_data(rank, world_size, num_parts, edge_data)
527
    memory_snapshot("ShuffleEdgeDataComplete: ", rank)
528
    return node_data, rcvd_node_features, rcvd_global_nids, edge_data, rcvd_edge_features, rcvd_global_eids
529

530
def read_dataset(rank, world_size, id_lookup, params, schema_map):
531
532
    """
    This function gets the dataset and performs post-processing on the data which is read from files.
533
    Additional information(columns) are added to nodes metadata like owner_process, global_nid which
534
535
    are later used in processing this information. For edge data, which is now a dictionary, we add new columns
    like global_edge_id and owner_process. Augmenting these data structure helps in processing these data structures
536
    when data shuffling is performed.
537
538
539
540
541

    Parameters:
    -----------
    rank : int
        rank of the current process
542
    world_size : int
543
        total no. of processes instantiated
544
545
546
    id_lookup : instance of class DistLookupService
       Distributed lookup service used to map global-nids to respective partition-ids and 
       shuffle-global-nids
547
    params : argparser object
548
        argument parser object to access command line arguments
549
550
    schema_map : dictionary
        dictionary created by reading the input graph metadata json file
551

552
    Returns :
553
554
    ---------
    dictionary
555
556
        in which keys are node-type names and values are are tuples representing the range of ids
        for nodes to be read by the current process
557
558
    dictionary
        node features which is a dictionary where keys are feature names and values are feature
559
        data as multi-dimensional tensors
560
561
562
563
564
    dictionary
        in which keys are node-type names and values are triplets. Each triplet has node-feature name
        and the starting and ending type ids of the node-feature data read from the corresponding
        node feature data file read by current process. Each node type may have several features and
        hence each key may have several triplets.
565
    dictionary
566
567
        edge data information is read from edges.txt and additional columns are added such as
        owner process for each edge.
568
569
    dictionary
        edge features which is also a dictionary, similar to node features dictionary
570
571
572
573
574
575
576
    dictionary
        a dictionary in which keys are edge-type names and values are tuples indicating the range of ids
        for edges read by the current process.
    dictionary
        a dictionary in which keys are edge-type names and values are triplets, 
        (edge-feature-name, start_type_id, end_type_id). These type_ids are indices in the edge-features
        read by the current process. Note that each edge-type may have several edge-features.
577
578
    """
    edge_features = {}
579
    #node_tids, node_features, edge_datadict, edge_tids
580
    node_tids, node_features, node_feat_tids, edge_data, edge_tids, edge_features, edge_feat_tids = \
581
582
        get_dataset(params.input_dir, params.graph_name, rank, world_size, params.num_parts, schema_map)
    logging.info(f'[Rank: {rank}] Done reading dataset {params.input_dir}')
583

584
    edge_data = augment_edge_data(edge_data, id_lookup, edge_tids, rank, world_size, params.num_parts)
585
    logging.info(f'[Rank: {rank}] Done augmenting edge_data: {len(edge_data)}, {edge_data[constants.GLOBAL_SRC_ID].shape}')
586

587
    return node_tids, node_features, node_feat_tids, edge_data, edge_features, edge_tids, edge_feat_tids
588
589
590

def gen_dist_partitions(rank, world_size, params):
    """
591
592
    Function which will be executed by all Gloo processes to begin execution of the pipeline.
    This function expects the input dataset is split across multiple file format.
593

594
    Input dataset and its file structure is described in metadata json file which is also part of the
595
596
    input dataset. On a high-level, this metadata json file contains information about the following items
    a) Nodes metadata, It is assumed that nodes which belong to each node-type are split into p files
597
598
       (wherer `p` is no. of partitions).
    b) Similarly edge metadata contains information about edges which are split into p-files.
599
600
601
602
603
    c) Node and Edge features, it is also assumed that each node (and edge) feature, if present, is also
       split into `p` files.

    For example, a sample metadata json file might be as follows: :
    (In this toy example, we assume that we have "m" node-types, "k" edge types, and for node_type = ntype0-name
604
     we have two features namely feat0-name and feat1-name. Please note that the node-features are also split into
605
606
607
608
     `p` files. This will help in load-balancing during data-shuffling phase).

    Terminology used to identify any particular "id" assigned to nodes, edges or node features. Prefix "global" is
    used to indicate that this information is either read from the input dataset or autogenerated based on the information
609
610
611
612
    read from input dataset files. Prefix "type" is used to indicate a unique id assigned to either nodes or edges.
    For instance, type_node_id means that a unique id, with a given node type,  assigned to a node. And prefix "shuffle"
    will be used to indicate a unique id, across entire graph, assigned to either a node or an edge. For instance,
    SHUFFLE_GLOBAL_NID means a unique id which is assigned to a node after the data shuffle is completed.
613

614
615
    Some high-level notes on the structure of the metadata json file.
    1. path(s) mentioned in the entries for nodes, edges and node-features files can be either absolute or relative.
616
       if these paths are relative, then it is assumed that they are relative to the folder from which the execution is
617
618
619
620
       launched.
    2. The id_startx and id_endx represent the type_node_id and type_edge_id respectively for nodes and edge data. This
       means that these ids should match the no. of nodes/edges read from any given file. Since these are type_ids for
       the nodes and edges in any given file, their global_ids can be easily computed as well.
621
622

    {
623
624
625
626
        "graph_name" : xyz,
        "node_type" : ["ntype0-name", "ntype1-name", ....], #m node types
        "num_nodes_per_chunk" : [
            [a0, a1, ...a<p-1>], #p partitions
627
            [b0, b1, ... b<p-1>],
628
629
630
631
632
633
            ....
            [c0, c1, ..., c<p-1>] #no, of node types
        ],
        "edge_type" : ["src_ntype:edge_type:dst_ntype", ....], #k edge types
        "num_edges_per_chunk" : [
            [a0, a1, ...a<p-1>], #p partitions
634
            [b0, b1, ... b<p-1>],
635
636
637
            ....
            [c0, c1, ..., c<p-1>] #no, of edge types
        ],
638
639
        "node_data" : {
            "ntype0-name" : {
640
641
642
643
644
645
                "feat0-name" : {
                    "format" : {"name": "numpy"},
                    "data" :   [ #list of lists
                        ["<path>/feat-0.npy", 0, id_end0],
                        ["<path>/feat-1.npy", id_start1, id_end1],
                        ....
646
                        ["<path>/feat-<p-1>.npy", id_start<p-1>, id_end<p-1>]
647
648
649
                    ]
                },
                "feat1-name" : {
650
                    "format" : {"name": "numpy"},
651
652
653
654
                    "data" : [ #list of lists
                        ["<path>/feat-0.npy", 0, id_end0],
                        ["<path>/feat-1.npy", id_start1, id_end1],
                        ....
655
                        ["<path>/feat-<p-1>.npy", id_start<p-1>, id_end<p-1>]
656
657
                    ]
                }
658
659
            }
        },
660
        "edges": { #k edge types
661
            "src_ntype:etype0-name:dst_ntype" : {
662
                "format": {"name" : "csv", "delimiter" : " "},
663
664
665
666
667
668
                "data" : [
                    ["<path>/etype0-name-0.txt", 0, id_end0], #These are type_edge_ids for edges of this type
                    ["<path>/etype0-name-1.txt", id_start1, id_end1],
                    ...,
                    ["<path>/etype0-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
                ]
669
670
            },
            ...,
671
            "src_ntype:etype<k-1>-name:dst_ntype" : {
672
                "format": {"name" : "csv", "delimiter" : " "},
673
674
675
676
677
678
                "data" : [
                    ["<path>/etype<k-1>-name-0.txt", 0, id_end0],
                    ["<path>/etype<k-1>-name-1.txt", id_start1, id_end1],
                    ...,
                    ["<path>/etype<k-1>-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
                ]
679
680
            },
        },
681
    }
682

683
    The function performs the following steps:
684
    1. Reads the metis partitions to identify the owner process of all the nodes in the entire graph.
685
    2. Reads the input data set, each partitipating process will map to a single file for the edges,
686
687
688
        node-features and edge-features for each node-type and edge-types respectively. Using nodes metadata
        information, nodes which are owned by a given process are generated to optimize communication to some
        extent.
689
    3. Now each process shuffles the data by identifying the respective owner processes using metis
690
691
692
693
        partitions.
        a. To identify owner processes for nodes, metis partitions will be used.
        b. For edges, the owner process of the destination node will be the owner of the edge as well.
        c. For node and edge features, identifying the owner process is a little bit involved.
694
695
            For this purpose, graph metadata json file is used to first map the locally read node features
            to their global_nids. Now owner process is identified using metis partitions for these global_nids
696
697
698
699
700
701
702
            to retrieve shuffle_global_nids. A similar process is used for edge_features as well.
        d. After all the data shuffling is done, the order of node-features may be different when compared to
            their global_type_nids. Node- and edge-data are ordered by node-type and edge-type respectively.
            And now node features and edge features are re-ordered to match the order of their node- and edge-types.
    4. Last step is to create the DGL objects with the data present on each of the processes.
        a. DGL objects for nodes, edges, node- and edge- features.
        b. Metadata is gathered from each process to create the global metadata json file, by process rank = 0.
703
704
705
706
707
708
709
710
711
712
713

    Parameters:
    ----------
    rank : int
        integer representing the rank of the current process in a typical distributed implementation
    world_size : int
        integer representing the total no. of participating processes in a typical distributed implementation
    params : argparser object
        this object, key value pairs, provides access to the command line arguments from the runtime environment
    """
    global_start = timer()
714
    logging.info(f'[Rank: {rank}] Starting distributed data processing pipeline...')
715
    memory_snapshot("Pipeline Begin: ", rank)
716
    #init processing
717
718
    schema_map = read_json(os.path.join(params.input_dir, params.schema))

719
720
721
    #Initialize distributed lookup service for partition-id and shuffle-global-nids mappings
    #for global-nids
    _, global_nid_ranges = get_idranges(schema_map[constants.STR_NODE_TYPE], 
722
                                        schema_map[constants.STR_NUM_NODES_PER_CHUNK], params.num_parts)
723
    id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)
724
725
726
727

    # The resources, which are node-id to partition-id mappings, are split
    # into `world_size` number of parts, where each part can be mapped to
    # each physical node.
728
729
730
    id_lookup = DistLookupService(os.path.join(params.input_dir, params.partitions_dir),\
                                    schema_map[constants.STR_NODE_TYPE],\
                                    id_map, rank, world_size)
731
732

    ntypes_ntypeid_map, ntypes, ntypeid_ntypes_map = get_node_types(schema_map)
733
    etypes_etypeid_map, etypes, etypeid_etypes_map = get_edge_types(schema_map)
734
    logging.info(f'[Rank: {rank}] Initialized metis partitions and node_types map...')
735
736
737

    #read input graph files and augment these datastructures with
    #appropriate information (global_nid and owner process) for node and edge data
738
    node_tids, node_features, node_feat_tids, edge_data, edge_features, edge_tids, edge_feat_tids = \
739
        read_dataset(rank, world_size, id_lookup, params, schema_map)
740
    logging.info(f'[Rank: {rank}] Done augmenting file input data with auxilary columns')
741
    memory_snapshot("DatasetReadComplete: ", rank)
742

743
    #send out node and edge data --- and appropriate features.
744
745
    #this function will also stitch the data recvd from other processes
    #and return the aggregated data
746
    ntypes_gnid_range_map = get_gnid_range_map(node_tids)
747
748
    etypes_geid_range_map = get_gnid_range_map(edge_tids)
    node_data, rcvd_node_features, rcvd_global_nids, edge_data, rcvd_edge_features, rcvd_global_eids  = \
749
                    exchange_graph_data(rank, world_size, params.num_parts, node_features, edge_features, \
750
751
752
                            node_feat_tids, edge_feat_tids, edge_data, id_lookup, ntypes_ntypeid_map, \
                            ntypes_gnid_range_map, etypes_geid_range_map, \
                            ntypeid_ntypes_map, schema_map)
753
    gc.collect()
754
    logging.info(f'[Rank: {rank}] Done with data shuffling...')
755
    memory_snapshot("DataShuffleComplete: ", rank)
756
757

    #sort node_data by ntype
758
759
760
761
762
763
764
765
    for local_part_id in range(params.num_parts//world_size):
        idx = node_data[constants.NTYPE_ID+"/"+str(local_part_id)].argsort()
        for k, v in node_data.items():
            tokens = k.split("/")
            assert len(tokens) == 2
            if tokens[1] == str(local_part_id):
                node_data[k] = v[idx]
        idx = None
766
    gc.collect()
767
    logging.info(f'[Rank: {rank}] Sorted node_data by node_type')
768
769

    #resolve global_ids for nodes
770
    assign_shuffle_global_nids_nodes(rank, world_size, params.num_parts, node_data)
771
    logging.info(f'[Rank: {rank}] Done assigning global-ids to nodes...')
772
    memory_snapshot("ShuffleGlobalID_Nodes_Complete: ", rank)
773

774
    #shuffle node feature according to the node order on each rank.
775
776
777
    for ntype_name in ntypes:
        featnames = get_ntype_featnames(ntype_name, schema_map)
        for featname in featnames:
778
            #if a feature name exists for a node-type, then it should also have
779
            #feature data as well. Hence using the assert statement.
780
781
782
783
            for local_part_id in range(params.num_parts//world_size):
                feature_key = ntype_name+'/'+featname+"/"+str(local_part_id)
                assert(feature_key in rcvd_global_nids)
                global_nids = rcvd_global_nids[feature_key]
784

785
786
787
                _, idx1, _ = np.intersect1d(node_data[constants.GLOBAL_NID+"/"+str(local_part_id)], global_nids, return_indices=True)
                shuffle_global_ids = node_data[constants.SHUFFLE_GLOBAL_NID+"/"+str(local_part_id)][idx1]
                feature_idx = shuffle_global_ids.argsort()
788

789
                rcvd_node_features[feature_key] = rcvd_node_features[feature_key][feature_idx]
790
    memory_snapshot("ReorderNodeFeaturesComplete: ", rank)
791
792

    #sort edge_data by etype
793
794
795
796
797
798
799
800
    for local_part_id in range(params.num_parts//world_size):
        sorted_idx = edge_data[constants.ETYPE_ID+"/"+str(local_part_id)].argsort()
        for k, v in edge_data.items():
            tokens = k.split("/")
            assert len(tokens) == 2
            if tokens[1] == str(local_part_id):
                edge_data[k] = v[sorted_idx]
        sorted_idx = None
801
    gc.collect()
802

803
    shuffle_global_eid_offsets = assign_shuffle_global_nids_edges(rank, world_size, params.num_parts, edge_data)
804
    logging.info(f'[Rank: {rank}] Done assigning global_ids to edges ...')
805
    memory_snapshot("ShuffleGlobalID_Edges_Complete: ", rank)
806

807
808
809
810
    #Shuffle edge features according to the edge order on each rank.
    for etype_name in etypes:
        featnames = get_etype_featnames(etype_name, schema_map)
        for featname in featnames:
811
812
813
814
            for local_part_id in range(params.num_parts//world_size):
                feature_key = etype_name+'/'+featname+"/"+str(local_part_id)
                assert feature_key in rcvd_global_eids
                global_eids = rcvd_global_eids[feature_key]
815

816
817
818
                _, idx1, _ = np.intersect1d(edge_data[constants.GLOBAL_EID+"/"+str(local_part_id)], global_eids, return_indices=True)
                shuffle_global_ids = edge_data[constants.SHUFFLE_GLOBAL_EID+"/"+str(local_part_id)][idx1]
                feature_idx = shuffle_global_ids.argsort()
819

820
                rcvd_edge_features[feature_key] = rcvd_edge_features[feature_key][feature_idx]
821

822
    #determine global-ids for edge end-points
823
    edge_data = lookup_shuffle_global_nids_edges(rank, world_size, params.num_parts, edge_data, id_lookup, node_data)
824
    logging.info(f'[Rank: {rank}] Done resolving orig_node_id for local node_ids...')
825
    memory_snapshot("ShuffleGlobalID_Lookup_Complete: ", rank)
826

827
828
829
830
831
832
833
834
    def prepare_local_data(src_data, local_part_id):
        local_data = {}
        for k, v in src_data.items():
            tokens = k.split("/")
            if tokens[len(tokens)-1] == str(local_part_id):
                local_data["/".join(tokens[:-1])] = v
        return local_data

835
    #create dgl objects here
836
    output_meta_json = {}
837
    start = timer()
838
    
839
840
841
    graph_formats = None
    if params.graph_formats:
        graph_formats = params.graph_formats.split(',')
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
    
    for local_part_id in range(params.num_parts//world_size):
        num_edges = shuffle_global_eid_offsets[local_part_id]
        node_count = len(node_data[constants.NTYPE_ID+"/"+str(local_part_id)])
        edge_count = len(edge_data[constants.ETYPE_ID+"/"+str(local_part_id)])
        local_node_data = prepare_local_data(node_data, local_part_id)
        local_edge_data = prepare_local_data(edge_data, local_part_id)
        graph_obj, ntypes_map_val, etypes_map_val, ntypes_map, etypes_map, \
            orig_nids, orig_eids = create_dgl_object(schema_map, rank+local_part_id*world_size, 
                    local_node_data, local_edge_data, 
                    num_edges, params.save_orig_nids, params.save_orig_eids)
        sort_etypes = len(etypes_map) > 1
        local_node_features = prepare_local_data(rcvd_node_features, local_part_id)
        local_edge_features = prepare_local_data(rcvd_edge_features, local_part_id)
        write_dgl_objects(graph_obj, 
                local_node_features, local_edge_features,
                params.output,
                rank + (local_part_id*world_size), 
                orig_nids, orig_eids, graph_formats, sort_etypes)
        memory_snapshot("DiskWriteDGLObjectsComplete: ", rank)

        #get the meta-data
        json_metadata = create_metadata_json(params.graph_name, node_count, edge_count, \
                            local_part_id * world_size + rank, params.num_parts, ntypes_map_val, \
866
                            etypes_map_val, ntypes_map, etypes_map, params.output)
867
868
        output_meta_json["local-part-id-"+str(local_part_id*world_size + rank)] = json_metadata
        memory_snapshot("MetadataCreateComplete: ", rank)
869
870
871

    if (rank == 0):
        #get meta-data from all partitions and merge them on rank-0
872
873
874
        metadata_list = gather_metadata_json(output_meta_json, rank, world_size)
        metadata_list[0] = output_meta_json
        write_metadata_json(metadata_list, params.output, params.graph_name, world_size, params.num_parts)
875
876
    else:
        #send meta-data to Rank-0 process
877
        gather_metadata_json(output_meta_json, rank, world_size)
878
    end = timer()
879
    logging.info(f'[Rank: {rank}] Time to create dgl objects: {timedelta(seconds = end - start)}')
880
    memory_snapshot("MetadataWriteComplete: ", rank)
881
882

    global_end = timer()
883
    logging.info(f'[Rank: {rank}] Total execution time of the program: {timedelta(seconds = global_end - global_start)}')
884
    memory_snapshot("PipelineComplete: ", rank)
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946

def single_machine_run(params):
    """ Main function for distributed implementation on a single machine

    Parameters:
    -----------
    params : argparser object
        Argument Parser structure with pre-determined arguments as defined
        at the bottom of this file.
    """
    log_params(params)
    processes = []
    mp.set_start_method("spawn")

    #Invoke `target` function from each of the spawned process for distributed
    #implementation
    for rank in range(params.world_size):
        p = mp.Process(target=run, args=(rank, params.world_size, gen_dist_partitions, params))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

def run(rank, world_size, func_exec, params, backend="gloo"):
    """
    Init. function which is run by each process in the Gloo ProcessGroup

    Parameters:
    -----------
    rank : integer
        rank of the process
    world_size : integer
        number of processes configured in the Process Group
    proc_exec : function name
        function which will be invoked which has the logic for each process in the group
    params : argparser object
        argument parser object to access the command line arguments
    backend : string
        string specifying the type of backend to use for communication
    """
    os.environ["MASTER_ADDR"] = '127.0.0.1'
    os.environ["MASTER_PORT"] = '29500'

    #create Gloo Process Group
    dist.init_process_group(backend, rank=rank, world_size=world_size, timeout=timedelta(seconds=5*60))

    #Invoke the main function to kick-off each process
    func_exec(rank, world_size, params)

def multi_machine_run(params):
    """
    Function to be invoked when executing data loading pipeline on multiple machines

    Parameters:
    -----------
    params : argparser object
        argparser object providing access to command line arguments.
    """
    rank = int(os.environ["RANK"])

    #init the gloo process group here.
947
948
949
950
    dist.init_process_group(
            backend="gloo",
            rank=rank,
            world_size=params.world_size,
951
            timeout=timedelta(seconds=params.process_group_timeout))
952
    logging.info(f'[Rank: {rank}] Done with process group initialization...')
953
954
955

    #invoke the main function here.
    gen_dist_partitions(rank, params.world_size, params)
956
    logging.info(f'[Rank: {rank}] Done with Distributed data processing pipeline processing.')