"examples/vscode:/vscode.git/clone" did not exist on "e82f174624663b0d72f6fcc13a1d3339d54ffc75"
globalids.py 9.95 KB
Newer Older
1
2
3
import itertools
import operator

4
5
import numpy as np
import torch
6

7
import constants
8
from dist_lookup import DistLookupService
9
from gloo_wrapper import allgather_sizes, alltoallv_cpu
10
from utils import memory_snapshot
11

12

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data):
    """ 
    For nodes which are not owned by the current rank, whose global_nid <-> shuffle_global-nid mapping
    is not present at the current rank, this function retrieves their shuffle_global_ids from the owner rank

    Parameters: 
    -----------
    rank : integer
        rank of the process
    world_size : integer
        total no. of ranks configured
    global_nids_ranks : list
        list of numpy arrays (of global_nids), index of the list is the rank of the process
                    where global_nid <-> shuffle_global_nid mapping is located. 
    node_data : dictionary
        node_data is a dictionary with keys as column names and values as numpy arrays

    Returns:
    --------
    numpy ndarray
        where the column-0 are global_nids and column-1 are shuffle_global_nids which are retrieved
        from other processes. 
    """
    #build a list of sizes (lengths of lists)
37
38
    global_nids_ranks = [torch.from_numpy(x) for x in global_nids_ranks]
    recv_nodes = alltoallv_cpu(rank, world_size, global_nids_ranks)
39
40
41
42
43

    # Use node_data to lookup global id to send over.
    send_nodes = []
    for proc_i_nodes in recv_nodes:
        #list of node-ids to lookup
44
45
46
47
48
49
50
51
        if proc_i_nodes is not None: 
            global_nids = proc_i_nodes.numpy()
            if(len(global_nids) != 0):
                common, ind1, ind2 = np.intersect1d(node_data[constants.GLOBAL_NID], global_nids, return_indices=True)
                shuffle_global_nids = node_data[constants.SHUFFLE_GLOBAL_NID][ind1]
                send_nodes.append(torch.from_numpy(shuffle_global_nids).type(dtype=torch.int64))
            else:
                send_nodes.append(torch.empty((0), dtype=torch.int64))
52
        else:
53
            send_nodes.append(torch.empty((0), dtype=torch.int64))
54
55

    #send receive global-ids
56
57
    recv_shuffle_global_nids = alltoallv_cpu(rank, world_size, send_nodes)
    shuffle_global_nids = np.concatenate([x.numpy() if x is not None else [] for x in recv_shuffle_global_nids])
58
59
60
    global_nids = np.concatenate([x for x in global_nids_ranks])
    ret_val = np.column_stack([global_nids, shuffle_global_nids])
    return ret_val
61

62
def lookup_shuffle_global_nids_edges(rank, world_size, num_parts, edge_data, id_lookup, node_data):
63
64
65
    '''
    This function is a helper function used to lookup shuffle-global-nids for a given set of
    global-nids using a distributed lookup service.
66

67
    Parameters:
68
69
70
71
    -----------
    rank : integer
        rank of the process
    world_size : integer
72
        total number of processes used in the process group
73
74
    num_parts : integer
        total number of output graph partitions
75
76
77
78
79
80
    edge_data : dictionary
        edge_data is a dicitonary with keys as column names and values as numpy arrays representing
        all the edges present in the current graph partition
    id_lookup : instance of DistLookupService class
        instance of a distributed lookup service class which is used to retrieve partition-ids and
        shuffle-global-nids for any given set of global-nids
81
    node_data : dictionary
82
83
        node_data is a dictionary with keys as column names and values as numpy arrays representing
        all the nodes owned by the current process
84

85
86
87
88
89
90
    Returns:
    --------
    dictionary :
        dictionary where keys are column names and values are numpy arrays representing all the
        edges present in the current graph partition
    '''
91
92
93
94
95
96
    # Make sure that the outgoing message size does not exceed 2GB in size. 
    # Even though gloo can handle upto 10GB size of data in the outgoing messages,
    # it needs additional memory to store temporary information into the buffers which will increase
    # the memory needs of the process. 
    MILLION = 1000 * 1000
    BATCH_SIZE = 250 * MILLION
97
    memory_snapshot("GlobalToShuffleIDMapBegin: ", rank)
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
    
    local_nids = []
    local_shuffle_nids = []
    for local_part_id in range(num_parts//world_size):
        local_nids.append(node_data[constants.GLOBAL_NID+"/"+str(local_part_id)])
        local_shuffle_nids.append(node_data[constants.SHUFFLE_GLOBAL_NID+"/"+str(local_part_id)])

    local_nids = np.concatenate(local_nids)
    local_shuffle_nids = np.concatenate(local_shuffle_nids)
    
    for local_part_id in range(num_parts//world_size):
        node_list = edge_data[constants.GLOBAL_SRC_ID+"/"+str(local_part_id)]  
        
        # Determine the no. of times each process has to send alltoall messages.
        all_sizes = allgather_sizes([node_list.shape[0]], world_size, num_parts, return_sizes=True)
        max_count = np.amax(all_sizes)
        num_splits = max_count // BATCH_SIZE + 1    
    
        # Split the message into batches and send.
        splits = np.array_split(node_list, num_splits)
        shuffle_mappings = []
        for item in splits:
            shuffle_ids = id_lookup.get_shuffle_nids(item, local_nids, local_shuffle_nids, world_size)
            shuffle_mappings.append(shuffle_ids)

        shuffle_ids = np.concatenate(shuffle_mappings)
        assert shuffle_ids.shape[0] == node_list.shape[0]
        edge_data[constants.SHUFFLE_GLOBAL_SRC_ID+"/"+str(local_part_id)] = shuffle_ids
    
        # Destination end points of edges are owned by the current node and therefore
        # should have corresponding SHUFFLE_GLOBAL_NODE_IDs. 
        # Here retrieve SHUFFLE_GLOBAL_NODE_IDs for the destination end points of local edges.
        uniq_ids, inverse_idx = np.unique(edge_data[constants.GLOBAL_DST_ID+"/"+str(local_part_id)], return_inverse=True)
        common, idx1, idx2 = np.intersect1d(uniq_ids, node_data[constants.GLOBAL_NID+"/"+str(local_part_id)], assume_unique=True, return_indices=True)
        assert len(common) == len(uniq_ids)

        edge_data[constants.SHUFFLE_GLOBAL_DST_ID+"/"+str(local_part_id)] = node_data[constants.SHUFFLE_GLOBAL_NID+"/"+str(local_part_id)][idx2][inverse_idx]
        assert len(edge_data[constants.SHUFFLE_GLOBAL_DST_ID+"/"+str(local_part_id)]) == len(edge_data[constants.GLOBAL_DST_ID+"/"+str(local_part_id)])
    
137
    memory_snapshot("GlobalToShuffleIDMap_AfterLookupServiceCalls: ", rank)
138
    return edge_data
139

140
def assign_shuffle_global_nids_nodes(rank, world_size, num_parts, node_data):
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
    """
    Utility function to assign shuffle global ids to nodes at a given rank
    node_data gets converted from [ntype, global_type_nid, global_nid]
    to [shuffle_global_nid, ntype, global_type_nid, global_nid, part_local_type_nid]
    where shuffle_global_nid : global id of the node after data shuffle
            ntype : node-type as read from xxx_nodes.txt
            global_type_nid : node-type-id as read from xxx_nodes.txt
            global_nid : node-id as read from xxx_nodes.txt, implicitly 
                            this is the line no. in the file
            part_local_type_nid : type_nid assigned by the current rank within its scope
            
    Parameters:
    -----------
    rank : integer
        rank of the process
    world_size : integer
        total number of processes used in the process group
158
159
    num_parts : integer
        total number of output graph partitions
160
161
162
163
    node_data : dictionary
        node_data is a dictionary with keys as column names and values as numpy arrays
    """
    # Compute prefix sum to determine node-id offsets
164
165
166
    local_row_counts = []
    for local_part_id in range(num_parts//world_size):
        local_row_counts.append(node_data[constants.GLOBAL_NID+"/"+str(local_part_id)].shape[0])
167

168
169
    # Perform allgather to compute the local offsets.
    prefix_sum_nodes = allgather_sizes(local_row_counts, world_size, num_parts)
170

171
172
173
174
175
    for local_part_id in range(num_parts//world_size):
        shuffle_global_nid_start = prefix_sum_nodes[rank + (local_part_id*world_size)]
        shuffle_global_nid_end = prefix_sum_nodes[rank + 1 + (local_part_id*world_size)]
        shuffle_global_nids = np.arange(shuffle_global_nid_start, shuffle_global_nid_end, dtype=np.int64)
        node_data[constants.SHUFFLE_GLOBAL_NID+"/"+str(local_part_id)] = shuffle_global_nids
176
177


178
def assign_shuffle_global_nids_edges(rank, world_size, num_parts, edge_data):
179
180
181
182
183
184
185
186
187
188
189
    """
    Utility function to assign shuffle_global_eids to edges
    edge_data gets converted from [global_src_nid, global_dst_nid, global_type_eid, etype]
    to [shuffle_global_src_nid, shuffle_global_dst_nid, global_src_nid, global_dst_nid, global_type_eid, etype]

    Parameters:
    -----------
    rank : integer
        rank of the current process
    world_size : integer
        total count of processes in execution
190
191
    num_parts : integer
        total number of output graph partitions
192
193
194
195
196
197
198
199
200
201
202
    edge_data : numpy ndarray
        edge data as read from xxx_edges.txt file

    Returns:
    --------
    integer
        shuffle_global_eid_start, which indicates the starting value from which shuffle_global-ids are assigned to edges
        on this rank
    """
    #get prefix sum of edge counts per rank to locate the starting point
    #from which global-ids to edges are assigned in the current rank
203
204
205
206
207
208
209
210
211
212
213
214
215
216
    local_row_counts = []
    for local_part_id in range(num_parts//world_size):
        local_row_counts.append(edge_data[constants.GLOBAL_SRC_ID+"/"+str(local_part_id)].shape[0])

    shuffle_global_eid_offset = []
    prefix_sum_edges = allgather_sizes(local_row_counts, world_size, num_parts)
    for local_part_id in range(num_parts//world_size):
        shuffle_global_eid_start = prefix_sum_edges[rank + (local_part_id*world_size)]
        shuffle_global_eid_end = prefix_sum_edges[rank + 1 + (local_part_id*world_size)]
        shuffle_global_eids = np.arange(shuffle_global_eid_start, shuffle_global_eid_end, dtype=np.int64)
        edge_data[constants.SHUFFLE_GLOBAL_EID+"/"+str(local_part_id)] = shuffle_global_eids
        shuffle_global_eid_offset.append(shuffle_global_eid_start)

    return shuffle_global_eid_offset