globalids.py 10.8 KB
Newer Older
1
2
3
import itertools
import operator

4
5
import constants

6
7
import numpy as np
import torch
8
from dist_lookup import DistLookupService
9
from gloo_wrapper import allgather_sizes, alltoallv_cpu
10
from utils import memory_snapshot
11

12

13
def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data):
14
    """
15
16
17
    For nodes which are not owned by the current rank, whose global_nid <-> shuffle_global-nid mapping
    is not present at the current rank, this function retrieves their shuffle_global_ids from the owner rank

18
    Parameters:
19
20
21
22
23
24
25
    -----------
    rank : integer
        rank of the process
    world_size : integer
        total no. of ranks configured
    global_nids_ranks : list
        list of numpy arrays (of global_nids), index of the list is the rank of the process
26
                    where global_nid <-> shuffle_global_nid mapping is located.
27
28
29
30
31
32
33
    node_data : dictionary
        node_data is a dictionary with keys as column names and values as numpy arrays

    Returns:
    --------
    numpy ndarray
        where the column-0 are global_nids and column-1 are shuffle_global_nids which are retrieved
34
        from other processes.
35
    """
36
    # build a list of sizes (lengths of lists)
37
38
    global_nids_ranks = [torch.from_numpy(x) for x in global_nids_ranks]
    recv_nodes = alltoallv_cpu(rank, world_size, global_nids_ranks)
39
40
41
42

    # Use node_data to lookup global id to send over.
    send_nodes = []
    for proc_i_nodes in recv_nodes:
43
44
        # list of node-ids to lookup
        if proc_i_nodes is not None:
45
            global_nids = proc_i_nodes.numpy()
46
47
48
49
50
51
52
53
54
55
56
57
58
59
            if len(global_nids) != 0:
                common, ind1, ind2 = np.intersect1d(
                    node_data[constants.GLOBAL_NID],
                    global_nids,
                    return_indices=True,
                )
                shuffle_global_nids = node_data[constants.SHUFFLE_GLOBAL_NID][
                    ind1
                ]
                send_nodes.append(
                    torch.from_numpy(shuffle_global_nids).type(
                        dtype=torch.int64
                    )
                )
60
61
            else:
                send_nodes.append(torch.empty((0), dtype=torch.int64))
62
        else:
63
            send_nodes.append(torch.empty((0), dtype=torch.int64))
64

65
    # send receive global-ids
66
    recv_shuffle_global_nids = alltoallv_cpu(rank, world_size, send_nodes)
67
68
69
    shuffle_global_nids = np.concatenate(
        [x.numpy() if x is not None else [] for x in recv_shuffle_global_nids]
    )
70
71
72
    global_nids = np.concatenate([x for x in global_nids_ranks])
    ret_val = np.column_stack([global_nids, shuffle_global_nids])
    return ret_val
73

74
75
76
77
78

def lookup_shuffle_global_nids_edges(
    rank, world_size, num_parts, edge_data, id_lookup, node_data
):
    """
79
80
    This function is a helper function used to lookup shuffle-global-nids for a given set of
    global-nids using a distributed lookup service.
81

82
    Parameters:
83
84
85
86
    -----------
    rank : integer
        rank of the process
    world_size : integer
87
        total number of processes used in the process group
88
89
    num_parts : integer
        total number of output graph partitions
90
91
92
93
94
95
    edge_data : dictionary
        edge_data is a dicitonary with keys as column names and values as numpy arrays representing
        all the edges present in the current graph partition
    id_lookup : instance of DistLookupService class
        instance of a distributed lookup service class which is used to retrieve partition-ids and
        shuffle-global-nids for any given set of global-nids
96
    node_data : dictionary
97
98
        node_data is a dictionary with keys as column names and values as numpy arrays representing
        all the nodes owned by the current process
99

100
101
102
103
104
    Returns:
    --------
    dictionary :
        dictionary where keys are column names and values are numpy arrays representing all the
        edges present in the current graph partition
105
106
    """
    # Make sure that the outgoing message size does not exceed 2GB in size.
107
108
    # Even though gloo can handle upto 10GB size of data in the outgoing messages,
    # it needs additional memory to store temporary information into the buffers which will increase
109
    # the memory needs of the process.
110
111
    MILLION = 1000 * 1000
    BATCH_SIZE = 250 * MILLION
112
    memory_snapshot("GlobalToShuffleIDMapBegin: ", rank)
113

114
115
    local_nids = []
    local_shuffle_nids = []
116
117
118
119
120
121
122
    for local_part_id in range(num_parts // world_size):
        local_nids.append(
            node_data[constants.GLOBAL_NID + "/" + str(local_part_id)]
        )
        local_shuffle_nids.append(
            node_data[constants.SHUFFLE_GLOBAL_NID + "/" + str(local_part_id)]
        )
123
124
125

    local_nids = np.concatenate(local_nids)
    local_shuffle_nids = np.concatenate(local_shuffle_nids)
126
127
128
129
130
131

    for local_part_id in range(num_parts // world_size):
        node_list = edge_data[
            constants.GLOBAL_SRC_ID + "/" + str(local_part_id)
        ]

132
        # Determine the no. of times each process has to send alltoall messages.
133
134
135
        all_sizes = allgather_sizes(
            [node_list.shape[0]], world_size, num_parts, return_sizes=True
        )
136
        max_count = np.amax(all_sizes)
137
138
        num_splits = max_count // BATCH_SIZE + 1

139
140
141
142
        # Split the message into batches and send.
        splits = np.array_split(node_list, num_splits)
        shuffle_mappings = []
        for item in splits:
143
144
145
            shuffle_ids = id_lookup.get_shuffle_nids(
                item, local_nids, local_shuffle_nids, world_size
            )
146
147
148
149
            shuffle_mappings.append(shuffle_ids)

        shuffle_ids = np.concatenate(shuffle_mappings)
        assert shuffle_ids.shape[0] == node_list.shape[0]
150
151
152
153
        edge_data[
            constants.SHUFFLE_GLOBAL_SRC_ID + "/" + str(local_part_id)
        ] = shuffle_ids

154
        # Destination end points of edges are owned by the current node and therefore
155
        # should have corresponding SHUFFLE_GLOBAL_NODE_IDs.
156
        # Here retrieve SHUFFLE_GLOBAL_NODE_IDs for the destination end points of local edges.
157
158
159
160
161
162
163
164
165
166
        uniq_ids, inverse_idx = np.unique(
            edge_data[constants.GLOBAL_DST_ID + "/" + str(local_part_id)],
            return_inverse=True,
        )
        common, idx1, idx2 = np.intersect1d(
            uniq_ids,
            node_data[constants.GLOBAL_NID + "/" + str(local_part_id)],
            assume_unique=True,
            return_indices=True,
        )
167
168
        assert len(common) == len(uniq_ids)

169
170
171
172
173
174
175
176
177
178
179
180
181
        edge_data[
            constants.SHUFFLE_GLOBAL_DST_ID + "/" + str(local_part_id)
        ] = node_data[constants.SHUFFLE_GLOBAL_NID + "/" + str(local_part_id)][
            idx2
        ][
            inverse_idx
        ]
        assert len(
            edge_data[
                constants.SHUFFLE_GLOBAL_DST_ID + "/" + str(local_part_id)
            ]
        ) == len(edge_data[constants.GLOBAL_DST_ID + "/" + str(local_part_id)])

182
    memory_snapshot("GlobalToShuffleIDMap_AfterLookupServiceCalls: ", rank)
183
    return edge_data
184

185

186
def assign_shuffle_global_nids_nodes(rank, world_size, num_parts, node_data):
187
188
189
190
191
192
193
    """
    Utility function to assign shuffle global ids to nodes at a given rank
    node_data gets converted from [ntype, global_type_nid, global_nid]
    to [shuffle_global_nid, ntype, global_type_nid, global_nid, part_local_type_nid]
    where shuffle_global_nid : global id of the node after data shuffle
            ntype : node-type as read from xxx_nodes.txt
            global_type_nid : node-type-id as read from xxx_nodes.txt
194
            global_nid : node-id as read from xxx_nodes.txt, implicitly
195
196
                            this is the line no. in the file
            part_local_type_nid : type_nid assigned by the current rank within its scope
197

198
199
200
201
202
203
    Parameters:
    -----------
    rank : integer
        rank of the process
    world_size : integer
        total number of processes used in the process group
204
205
    num_parts : integer
        total number of output graph partitions
206
207
208
209
    node_data : dictionary
        node_data is a dictionary with keys as column names and values as numpy arrays
    """
    # Compute prefix sum to determine node-id offsets
210
    local_row_counts = []
211
212
213
214
    for local_part_id in range(num_parts // world_size):
        local_row_counts.append(
            node_data[constants.GLOBAL_NID + "/" + str(local_part_id)].shape[0]
        )
215

216
217
    # Perform allgather to compute the local offsets.
    prefix_sum_nodes = allgather_sizes(local_row_counts, world_size, num_parts)
218

219
220
221
222
223
224
225
226
227
228
229
230
231
    for local_part_id in range(num_parts // world_size):
        shuffle_global_nid_start = prefix_sum_nodes[
            rank + (local_part_id * world_size)
        ]
        shuffle_global_nid_end = prefix_sum_nodes[
            rank + 1 + (local_part_id * world_size)
        ]
        shuffle_global_nids = np.arange(
            shuffle_global_nid_start, shuffle_global_nid_end, dtype=np.int64
        )
        node_data[
            constants.SHUFFLE_GLOBAL_NID + "/" + str(local_part_id)
        ] = shuffle_global_nids
232
233


234
def assign_shuffle_global_nids_edges(rank, world_size, num_parts, edge_data):
235
236
237
238
239
240
241
242
243
244
245
    """
    Utility function to assign shuffle_global_eids to edges
    edge_data gets converted from [global_src_nid, global_dst_nid, global_type_eid, etype]
    to [shuffle_global_src_nid, shuffle_global_dst_nid, global_src_nid, global_dst_nid, global_type_eid, etype]

    Parameters:
    -----------
    rank : integer
        rank of the current process
    world_size : integer
        total count of processes in execution
246
247
    num_parts : integer
        total number of output graph partitions
248
249
250
251
252
253
254
255
256
    edge_data : numpy ndarray
        edge data as read from xxx_edges.txt file

    Returns:
    --------
    integer
        shuffle_global_eid_start, which indicates the starting value from which shuffle_global-ids are assigned to edges
        on this rank
    """
257
258
    # get prefix sum of edge counts per rank to locate the starting point
    # from which global-ids to edges are assigned in the current rank
259
    local_row_counts = []
260
261
262
263
264
265
    for local_part_id in range(num_parts // world_size):
        local_row_counts.append(
            edge_data[constants.GLOBAL_SRC_ID + "/" + str(local_part_id)].shape[
                0
            ]
        )
266
267
268

    shuffle_global_eid_offset = []
    prefix_sum_edges = allgather_sizes(local_row_counts, world_size, num_parts)
269
270
271
272
273
274
275
276
277
278
279
280
281
    for local_part_id in range(num_parts // world_size):
        shuffle_global_eid_start = prefix_sum_edges[
            rank + (local_part_id * world_size)
        ]
        shuffle_global_eid_end = prefix_sum_edges[
            rank + 1 + (local_part_id * world_size)
        ]
        shuffle_global_eids = np.arange(
            shuffle_global_eid_start, shuffle_global_eid_end, dtype=np.int64
        )
        edge_data[
            constants.SHUFFLE_GLOBAL_EID + "/" + str(local_part_id)
        ] = shuffle_global_eids
282
283
284
        shuffle_global_eid_offset.append(shuffle_global_eid_start)

    return shuffle_global_eid_offset