globalids.py 7.4 KB
Newer Older
1
2
3
import itertools
import operator

4
5
import numpy as np
import torch
6

7
import constants
8
from dist_lookup import DistLookupService
9
from gloo_wrapper import allgather_sizes, alltoallv_cpu
10
from utils import memory_snapshot
11

12

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data):
    """ 
    For nodes which are not owned by the current rank, whose global_nid <-> shuffle_global-nid mapping
    is not present at the current rank, this function retrieves their shuffle_global_ids from the owner rank

    Parameters: 
    -----------
    rank : integer
        rank of the process
    world_size : integer
        total no. of ranks configured
    global_nids_ranks : list
        list of numpy arrays (of global_nids), index of the list is the rank of the process
                    where global_nid <-> shuffle_global_nid mapping is located. 
    node_data : dictionary
        node_data is a dictionary with keys as column names and values as numpy arrays

    Returns:
    --------
    numpy ndarray
        where the column-0 are global_nids and column-1 are shuffle_global_nids which are retrieved
        from other processes. 
    """
    #build a list of sizes (lengths of lists)
37
38
    global_nids_ranks = [torch.from_numpy(x) for x in global_nids_ranks]
    recv_nodes = alltoallv_cpu(rank, world_size, global_nids_ranks)
39
40
41
42
43

    # Use node_data to lookup global id to send over.
    send_nodes = []
    for proc_i_nodes in recv_nodes:
        #list of node-ids to lookup
44
45
46
47
48
49
50
51
        if proc_i_nodes is not None: 
            global_nids = proc_i_nodes.numpy()
            if(len(global_nids) != 0):
                common, ind1, ind2 = np.intersect1d(node_data[constants.GLOBAL_NID], global_nids, return_indices=True)
                shuffle_global_nids = node_data[constants.SHUFFLE_GLOBAL_NID][ind1]
                send_nodes.append(torch.from_numpy(shuffle_global_nids).type(dtype=torch.int64))
            else:
                send_nodes.append(torch.empty((0), dtype=torch.int64))
52
        else:
53
            send_nodes.append(torch.empty((0), dtype=torch.int64))
54
55

    #send receive global-ids
56
57
    recv_shuffle_global_nids = alltoallv_cpu(rank, world_size, send_nodes)
    shuffle_global_nids = np.concatenate([x.numpy() if x is not None else [] for x in recv_shuffle_global_nids])
58
59
60
    global_nids = np.concatenate([x for x in global_nids_ranks])
    ret_val = np.column_stack([global_nids, shuffle_global_nids])
    return ret_val
61

62
63
64
65
def lookup_shuffle_global_nids_edges(rank, world_size, edge_data, id_lookup, node_data):
    '''
    This function is a helper function used to lookup shuffle-global-nids for a given set of
    global-nids using a distributed lookup service.
66

67
    Parameters:
68
69
70
71
    -----------
    rank : integer
        rank of the process
    world_size : integer
72
73
74
75
76
77
78
        total number of processes used in the process group
    edge_data : dictionary
        edge_data is a dicitonary with keys as column names and values as numpy arrays representing
        all the edges present in the current graph partition
    id_lookup : instance of DistLookupService class
        instance of a distributed lookup service class which is used to retrieve partition-ids and
        shuffle-global-nids for any given set of global-nids
79
    node_data : dictionary
80
81
        node_data is a dictionary with keys as column names and values as numpy arrays representing
        all the nodes owned by the current process
82

83
84
85
86
87
88
    Returns:
    --------
    dictionary :
        dictionary where keys are column names and values are numpy arrays representing all the
        edges present in the current graph partition
    '''
89
    memory_snapshot("GlobalToShuffleIDMapBegin: ", rank)
90
91
92
93
94
95
    node_list = np.concatenate([edge_data[constants.GLOBAL_SRC_ID], edge_data[constants.GLOBAL_DST_ID]])
    shuffle_ids = id_lookup.get_shuffle_nids(node_list,
                                            node_data[constants.GLOBAL_NID],
                                            node_data[constants.SHUFFLE_GLOBAL_NID])

    edge_data[constants.SHUFFLE_GLOBAL_SRC_ID], edge_data[constants.SHUFFLE_GLOBAL_DST_ID] = np.split(shuffle_ids, 2)
96
    memory_snapshot("GlobalToShuffleIDMap_AfterLookupServiceCalls: ", rank)
97
    return edge_data
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168

def assign_shuffle_global_nids_nodes(rank, world_size, node_data):
    """
    Utility function to assign shuffle global ids to nodes at a given rank
    node_data gets converted from [ntype, global_type_nid, global_nid]
    to [shuffle_global_nid, ntype, global_type_nid, global_nid, part_local_type_nid]
    where shuffle_global_nid : global id of the node after data shuffle
            ntype : node-type as read from xxx_nodes.txt
            global_type_nid : node-type-id as read from xxx_nodes.txt
            global_nid : node-id as read from xxx_nodes.txt, implicitly 
                            this is the line no. in the file
            part_local_type_nid : type_nid assigned by the current rank within its scope
            
    Parameters:
    -----------
    rank : integer
        rank of the process
    world_size : integer
        total number of processes used in the process group
    ntype_counts: list of tuples
        list of tuples (x,y), where x=ntype and y=no. of nodes whose shuffle_global_nids are needed
    node_data : dictionary
        node_data is a dictionary with keys as column names and values as numpy arrays
    """
    # Compute prefix sum to determine node-id offsets
    prefix_sum_nodes = allgather_sizes([node_data[constants.GLOBAL_NID].shape[0]], world_size)

    # assigning node-ids from localNodeStartId to (localNodeEndId - 1)
    # Assuming here that the nodeDataArr is sorted based on the nodeType.
    shuffle_global_nid_start = prefix_sum_nodes[rank]
    shuffle_global_nid_end = prefix_sum_nodes[rank + 1]

    # add a column with global-ids (after data shuffle)
    shuffle_global_nids = np.arange(shuffle_global_nid_start, shuffle_global_nid_end, dtype=np.int64)
    node_data[constants.SHUFFLE_GLOBAL_NID] = shuffle_global_nids


def assign_shuffle_global_nids_edges(rank, world_size, edge_data):
    """
    Utility function to assign shuffle_global_eids to edges
    edge_data gets converted from [global_src_nid, global_dst_nid, global_type_eid, etype]
    to [shuffle_global_src_nid, shuffle_global_dst_nid, global_src_nid, global_dst_nid, global_type_eid, etype]

    Parameters:
    -----------
    rank : integer
        rank of the current process
    world_size : integer
        total count of processes in execution
    etype_counts : list of tuples
        list of tuples (x,y), x = rank, y = no. of edges
    edge_data : numpy ndarray
        edge data as read from xxx_edges.txt file

    Returns:
    --------
    integer
        shuffle_global_eid_start, which indicates the starting value from which shuffle_global-ids are assigned to edges
        on this rank
    """
    #get prefix sum of edge counts per rank to locate the starting point
    #from which global-ids to edges are assigned in the current rank
    prefix_sum_edges = allgather_sizes([edge_data[constants.GLOBAL_SRC_ID].shape[0]], world_size)
    shuffle_global_eid_start = prefix_sum_edges[rank]
    shuffle_global_eid_end = prefix_sum_edges[rank + 1]

    # assigning edge-ids from localEdgeStart to (localEdgeEndId - 1)
    # Assuming here that the edge_data is sorted by edge_type
    shuffle_global_eids = np.arange(shuffle_global_eid_start, shuffle_global_eid_end, dtype=np.int64)
    edge_data[constants.SHUFFLE_GLOBAL_EID] = shuffle_global_eids
    return shuffle_global_eid_start