"...models/git@developer.sourcefind.cn:OpenDAS/lmdeploy.git" did not exist on "2f80c556fbdb2e37e93446913101e08ce8bfcc4c"
Unverified Commit f51b31b2 authored by kylasa's avatar kylasa Committed by GitHub
Browse files

Distributed Lookup service implementation to retrieve node-level mappings (#4387)

* Distributed Lookup service which is for retrieving global_nids to shuffle-global-nids/partition-id mappings

1. Implemented a class to provide distributed lookup service
2. This class can be used to retrieve global-nids mappings

* Code changes to address CI comments.

1. Removed some unneeded type_casts to numpy.int64
2. Added additional comments when iterating over the partition-ids list.
3.Added docstring to the class and adjusted comments where it is relevant.

* Updated code comments and variable names...

1. Changed the variable names to appropriately represent the values stored in these variables.
2. Updated the docstring correctly.

* Corrected docstring as per the suggestion... and removed all the capital letters for Global nids and Shuffle Global nids...

* Addressing CI review comments.
parent 8d3c5820
...@@ -20,10 +20,11 @@ from gloo_wrapper import allgather_sizes, gather_metadata_json,\ ...@@ -20,10 +20,11 @@ from gloo_wrapper import allgather_sizes, gather_metadata_json,\
alltoallv_cpu alltoallv_cpu
from globalids import assign_shuffle_global_nids_nodes, \ from globalids import assign_shuffle_global_nids_nodes, \
assign_shuffle_global_nids_edges, \ assign_shuffle_global_nids_edges, \
get_shuffle_global_nids_edges lookup_shuffle_global_nids_edges
from convert_partition import create_dgl_object, create_metadata_json from convert_partition import create_dgl_object, create_metadata_json
from dist_lookup import DistLookupService
def gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, schema_map): def gen_node_data(rank, world_size, id_lookup, ntid_ntype_map, schema_map):
''' '''
For this data processing pipeline, reading node files is not needed. All the needed information about For this data processing pipeline, reading node files is not needed. All the needed information about
the nodes can be found in the metadata json file. This function generates the nodes owned by a given the nodes can be found in the metadata json file. This function generates the nodes owned by a given
...@@ -35,9 +36,9 @@ def gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, schema_map): ...@@ -35,9 +36,9 @@ def gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, schema_map):
rank of the process rank of the process
world_size : int world_size : int
total no. of processes total no. of processes
node_part_ids : id_lookup : instance of class DistLookupService
numpy array, whose length is same as no. of nodes in the graph. Index in this array is the global_nid Distributed lookup service used to map global-nids to respective partition-ids and
and value is the partition-id which owns the node shuffle-global-nids
ntid_ntype_map : ntid_ntype_map :
a dictionary where keys are node_type ids(integers) and values are node_type names(strings). a dictionary where keys are node_type ids(integers) and values are node_type names(strings).
schema_map: schema_map:
...@@ -100,7 +101,7 @@ def gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, schema_map): ...@@ -100,7 +101,7 @@ def gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, schema_map):
type_start, type_end = type_nid_dict[ntype_name][0][0], type_nid_dict[ntype_name][-1][1] type_start, type_end = type_nid_dict[ntype_name][0][0], type_nid_dict[ntype_name][-1][1]
gnid_start, gnid_end = global_nid_dict[ntype_name][0, 0], global_nid_dict[ntype_name][0, 1] gnid_start, gnid_end = global_nid_dict[ntype_name][0, 0], global_nid_dict[ntype_name][0, 1]
node_partid_slice = node_part_ids[gnid_start:gnid_end] node_partid_slice = id_lookup.get_partition_ids(np.arange(gnid_start, gnid_end, dtype=np.int64)) #exclusive
cond = node_partid_slice == rank cond = node_partid_slice == rank
own_gnids = np.arange(gnid_start, gnid_end, dtype=np.int64) own_gnids = np.arange(gnid_start, gnid_end, dtype=np.int64)
own_gnids = own_gnids[cond] own_gnids = own_gnids[cond]
...@@ -171,7 +172,7 @@ def exchange_edge_data(rank, world_size, edge_data): ...@@ -171,7 +172,7 @@ def exchange_edge_data(rank, world_size, edge_data):
edge_data.pop(constants.OWNER_PROCESS) edge_data.pop(constants.OWNER_PROCESS)
return edge_data return edge_data
def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map, node_part_ids, node_features): def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map, id_lookup, node_features):
""" """
This function is used to shuffle node features so that each process will receive This function is used to shuffle node features so that each process will receive
all the node features whose corresponding nodes are owned by the same process. all the node features whose corresponding nodes are owned by the same process.
...@@ -203,8 +204,9 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map, ...@@ -203,8 +204,9 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map,
can be used to index into the node feature tensors read from corresponding input files. can be used to index into the node feature tensors read from corresponding input files.
ntypes_gnid_map : dictionary ntypes_gnid_map : dictionary
mapping between node type names and global_nids which belong to the keys in this dictionary mapping between node type names and global_nids which belong to the keys in this dictionary
node_part_ids : numpy array id_lookup : instance of class DistLookupService
numpy array which store the partition-ids and indexed by global_nids Distributed lookup service used to map global-nids to respective partition-ids and
shuffle-global-nids
node_feautres: dicitonary node_feautres: dicitonary
dictionry where node_features are stored and this information is read from the appropriate dictionry where node_features are stored and this information is read from the appropriate
node features file which belongs to the current process node features file which belongs to the current process
...@@ -259,7 +261,7 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map, ...@@ -259,7 +261,7 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map,
node_feats = node_features[feat_key] node_feats = node_features[feat_key]
for part_id in range(world_size): for part_id in range(world_size):
partid_slice = node_part_ids[gnid_start:gnid_end] partid_slice = id_lookup.get_partition_ids(np.arange(gnid_start, gnid_end, dtype=np.int64))
cond = (partid_slice == part_id) cond = (partid_slice == part_id)
gnids_per_partid = gnids_feat[cond] gnids_per_partid = gnids_feat[cond]
tnids_per_partid = tnids_feat[cond] tnids_per_partid = tnids_feat[cond]
...@@ -286,7 +288,7 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map, ...@@ -286,7 +288,7 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map,
return own_node_features, own_global_nids return own_node_features, own_global_nids
def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_data, def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_data,
node_part_ids, ntypes_ntypeid_map, ntypes_gnid_range_map, ntid_ntype_map, schema_map): id_lookup, ntypes_ntypeid_map, ntypes_gnid_range_map, ntid_ntype_map, schema_map):
""" """
Wrapper function which is used to shuffle graph data on all the processes. Wrapper function which is used to shuffle graph data on all the processes.
...@@ -307,8 +309,9 @@ def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_da ...@@ -307,8 +309,9 @@ def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_da
edge_data : dictionary edge_data : dictionary
dictionary which is used to store edge information as read from the edges.txt file assigned dictionary which is used to store edge information as read from the edges.txt file assigned
to each process. to each process.
node_part_ids : numpy array id_lookup : instance of class DistLookupService
numpy array which store the partition-ids and indexed by global_nids Distributed lookup service used to map global-nids to respective partition-ids and
shuffle-global-nids
ntypes_ntypeid_map : dictionary ntypes_ntypeid_map : dictionary
mappings between node type names and node type ids mappings between node type names and node type ids
ntypes_gnid_range_map : dictionary ntypes_gnid_range_map : dictionary
...@@ -334,14 +337,14 @@ def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_da ...@@ -334,14 +337,14 @@ def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_da
in the world. The edge data is received by each rank in the process of data shuffling. in the world. The edge data is received by each rank in the process of data shuffling.
""" """
rcvd_node_features, rcvd_global_nids = exchange_node_features(rank, world_size, node_feat_tids, \ rcvd_node_features, rcvd_global_nids = exchange_node_features(rank, world_size, node_feat_tids, \
ntypes_gnid_range_map, node_part_ids, node_features) ntypes_gnid_range_map, id_lookup, node_features)
logging.info(f'[Rank: {rank}] Done with node features exchange.') logging.info(f'[Rank: {rank}] Done with node features exchange.')
node_data = gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, schema_map) node_data = gen_node_data(rank, world_size, id_lookup, ntid_ntype_map, schema_map)
edge_data = exchange_edge_data(rank, world_size, edge_data) edge_data = exchange_edge_data(rank, world_size, edge_data)
return node_data, rcvd_node_features, rcvd_global_nids, edge_data return node_data, rcvd_node_features, rcvd_global_nids, edge_data
def read_dataset(rank, world_size, node_part_ids, params, schema_map): def read_dataset(rank, world_size, id_lookup, params, schema_map):
""" """
This function gets the dataset and performs post-processing on the data which is read from files. This function gets the dataset and performs post-processing on the data which is read from files.
Additional information(columns) are added to nodes metadata like owner_process, global_nid which Additional information(columns) are added to nodes metadata like owner_process, global_nid which
...@@ -355,8 +358,9 @@ def read_dataset(rank, world_size, node_part_ids, params, schema_map): ...@@ -355,8 +358,9 @@ def read_dataset(rank, world_size, node_part_ids, params, schema_map):
rank of the current process rank of the current process
world_size : int world_size : int
total no. of processes instantiated total no. of processes instantiated
node_part_ids : numpy array id_lookup : instance of class DistLookupService
metis partitions which are the output of partitioning algorithm Distributed lookup service used to map global-nids to respective partition-ids and
shuffle-global-nids
params : argparser object params : argparser object
argument parser object to access command line arguments argument parser object to access command line arguments
schema_map : dictionary schema_map : dictionary
...@@ -387,7 +391,7 @@ def read_dataset(rank, world_size, node_part_ids, params, schema_map): ...@@ -387,7 +391,7 @@ def read_dataset(rank, world_size, node_part_ids, params, schema_map):
get_dataset(params.input_dir, params.graph_name, rank, world_size, schema_map) get_dataset(params.input_dir, params.graph_name, rank, world_size, schema_map)
logging.info(f'[Rank: {rank}] Done reading dataset deom {params.input_dir}') logging.info(f'[Rank: {rank}] Done reading dataset deom {params.input_dir}')
augment_edge_data(edge_data, node_part_ids, edge_tids, rank, world_size) edge_data = augment_edge_data(edge_data, id_lookup, edge_tids, rank, world_size)
logging.info(f'[Rank: {rank}] Done augmenting edge_data: {len(edge_data)}, {edge_data[constants.GLOBAL_SRC_ID].shape}') logging.info(f'[Rank: {rank}] Done augmenting edge_data: {len(edge_data)}, {edge_data[constants.GLOBAL_SRC_ID].shape}')
return node_tids, node_features, node_feat_tids, edge_data, edge_features return node_tids, node_features, node_feat_tids, edge_data, edge_features
...@@ -522,17 +526,22 @@ def gen_dist_partitions(rank, world_size, params): ...@@ -522,17 +526,22 @@ def gen_dist_partitions(rank, world_size, params):
#init processing #init processing
schema_map = read_json(os.path.join(params.input_dir, params.schema)) schema_map = read_json(os.path.join(params.input_dir, params.schema))
#TODO: For large graphs, this mapping function can be memory intensive. This needs to be changed to #Initialize distributed lookup service for partition-id and shuffle-global-nids mappings
#processes owning a set of global-nids, per partitioning algorithm, and messaging will be used to #for global-nids
#identify the ownership instead of mem. lookups. _, global_nid_ranges = get_idranges(schema_map[constants.STR_NODE_TYPE],
node_part_ids = read_ntype_partition_files(schema_map, os.path.join(params.input_dir, params.partitions_dir)) schema_map[constants.STR_NUM_NODES_PER_CHUNK])
id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)
id_lookup = DistLookupService(os.path.join(params.input_dir, params.partitions_dir),\
schema_map[constants.STR_NODE_TYPE],\
id_map, rank, world_size)
ntypes_ntypeid_map, ntypes, ntypeid_ntypes_map = get_node_types(schema_map) ntypes_ntypeid_map, ntypes, ntypeid_ntypes_map = get_node_types(schema_map)
logging.info(f'[Rank: {rank}] Initialized metis partitions and node_types map...') logging.info(f'[Rank: {rank}] Initialized metis partitions and node_types map...')
#read input graph files and augment these datastructures with #read input graph files and augment these datastructures with
#appropriate information (global_nid and owner process) for node and edge data #appropriate information (global_nid and owner process) for node and edge data
node_tids, node_features, node_feat_tids, edge_data, edge_features = read_dataset(rank, world_size, node_part_ids, params, schema_map) node_tids, node_features, node_feat_tids, edge_data, edge_features = \
read_dataset(rank, world_size, id_lookup, params, schema_map)
logging.info(f'[Rank: {rank}] Done augmenting file input data with auxilary columns') logging.info(f'[Rank: {rank}] Done augmenting file input data with auxilary columns')
#send out node and edge data --- and appropriate features. #send out node and edge data --- and appropriate features.
...@@ -541,7 +550,7 @@ def gen_dist_partitions(rank, world_size, params): ...@@ -541,7 +550,7 @@ def gen_dist_partitions(rank, world_size, params):
ntypes_gnid_range_map = get_gnid_range_map(node_tids) ntypes_gnid_range_map = get_gnid_range_map(node_tids)
node_data, rcvd_node_features, rcvd_global_nids, edge_data = \ node_data, rcvd_node_features, rcvd_global_nids, edge_data = \
exchange_graph_data(rank, world_size, node_features, node_feat_tids, \ exchange_graph_data(rank, world_size, node_features, node_feat_tids, \
edge_data, node_part_ids, ntypes_ntypeid_map, ntypes_gnid_range_map, \ edge_data, id_lookup, ntypes_ntypeid_map, ntypes_gnid_range_map, \
ntypeid_ntypes_map, schema_map) ntypeid_ntypes_map, schema_map)
logging.info(f'[Rank: {rank}] Done with data shuffling...') logging.info(f'[Rank: {rank}] Done with data shuffling...')
...@@ -578,7 +587,7 @@ def gen_dist_partitions(rank, world_size, params): ...@@ -578,7 +587,7 @@ def gen_dist_partitions(rank, world_size, params):
logging.info(f'[Rank: {rank}] Done assigning global_ids to edges ...') logging.info(f'[Rank: {rank}] Done assigning global_ids to edges ...')
#determine global-ids for edge end-points #determine global-ids for edge end-points
get_shuffle_global_nids_edges(rank, world_size, edge_data, node_part_ids, node_data) edge_data = lookup_shuffle_global_nids_edges(rank, world_size, edge_data, id_lookup, node_data)
logging.info(f'[Rank: {rank}] Done resolving orig_node_id for local node_ids...') logging.info(f'[Rank: {rank}] Done resolving orig_node_id for local node_ids...')
#create dgl objects here #create dgl objects here
......
import numpy as np
import os
import pyarrow
import torch
from pyarrow import csv
from gloo_wrapper import alltoallv_cpu
class DistLookupService:
'''
This is an implementation of a Distributed Lookup Service to provide the following
services to its users. Map 1) global node-ids to partition-ids, and 2) global node-ids
to shuffle global node-ids (contiguous, within each node for a give node_type and across
all the partitions)
This services initializes itself with the node-id to partition-id mappings, which are inputs
to this service. The node-id to partition-id mappings are assumed to be in one file for each
node type. These node-id-to-partition-id mappings are split within the service processes so that
each process ends up with a contiguous chunk. It first divides the no of mappings (node-id to
partition-id) for each node type into equal chunks across all the service processes. So each
service process will be thse owner of a set of node-id-to-partition-id mappings. This class
has two functions which are as follows:
1) `get_partition_ids` function which returns the node-id to partition-id mappings to the user
2) `get_shuffle_nids` function which returns the node-id to shuffle-node-id mapping to the user
Parameters:
-----------
input_dir : string
string representing the input directory where the node-type partition-id
files are located
ntype_names : list of strings
list of strings which are used to read files located within the input_dir
directory and these files contents are partition-id's for the node-ids which
are of a particular node type
id_map : dgl.distributed.id_map instance
this id_map is used to retrieve ntype-ids, node type ids, and type_nids, per type
node ids, for any given global node id
rank : integer
integer indicating the rank of a given process
world_size : integer
integer indicating the total no. of processes
'''
def __init__(self, input_dir, ntype_names, id_map, rank, world_size):
assert os.path.isdir(input_dir)
assert ntype_names is not None
assert len(ntype_names) > 0
# These lists are indexed by ntype_ids.
type_nid_begin = []
type_nid_end = []
partid_list = []
ntype_count = []
# Iterate over the node types and extract the partition id mappings.
for ntype in ntype_names:
print('[Rank: ', rank, '] Reading file: ', os.path.join(input_dir, '{}.txt'.format(ntype)))
df = csv.read_csv(os.path.join(input_dir, '{}.txt'.format(ntype)), \
read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True), \
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
ntype_partids = df['f0'].to_numpy()
count = len(ntype_partids)
ntype_count.append(count)
# Each rank assumes a contiguous set of partition-ids which are equally split
# across all the processes.
split_size = np.ceil(count/np.int64(world_size)).astype(np.int64)
start, end = np.int64(rank)*split_size, np.int64(rank+1)*split_size
if rank == (world_size-1):
end = count
type_nid_begin.append(start)
type_nid_end.append(end)
# Slice the partition-ids which belong to the current instance.
partid_list.append(ntype_partids[start:end])
# Store all the information in the object instance variable.
self.id_map = id_map
self.type_nid_begin = np.array(type_nid_begin, dtype=np.int64)
self.type_nid_end = np.array(type_nid_end, dtype=np.int64)
self.partid_list = partid_list
self.ntype_count = np.array(ntype_count, dtype=np.int64)
self.rank = rank
self.world_size = world_size
def get_partition_ids(self, global_nids):
'''
This function is used to get the partition-ids for a given set of global node ids
global_nids <-> partition-ids mappings are deterministically distributed across
all the participating processes, within the service. A contiguous global-nids
(ntype-ids, per-type-nids) are stored within each process and this is determined
by the total no. of nodes of a given ntype-id and the rank of the process.
Process, where the global_nid <-> partition-id mapping is stored can be easily computed
as described above. Once this is determined we perform an alltoallv to send the request.
On the receiving side, each process receives a set of global_nids and retrieves corresponding
partition-ids using locally stored lookup tables. It builds responses to all the other
processes and performs alltoallv.
Once the response, partition-ids, is received, they are re-ordered corresponding to the
incoming global-nids order and returns to the caller.
Parameters:
-----------
self : instance of this class
instance of this class, which is passed by the runtime implicitly
global_nids : numpy array
an array of global node-ids for which partition-ids are to be retrieved by
the distributed lookup service.
Returns:
--------
list of integers :
list of integers, which are the partition-ids of the global-node-ids (which is the
function argument)
'''
# Find the process where global_nid --> partition-id(owner) is stored.
ntype_ids, type_nids = self.id_map(global_nids)
ntype_ids, type_nids = ntype_ids.numpy(), type_nids.numpy()
assert len(ntype_ids) == len(global_nids)
# For each node-type, the per-type-node-id <-> partition-id mappings are
# stored as contiguous chunks by this lookup service.
# The no. of these mappings stored by each process, in the lookup service, are
# equally split among all the processes in the lookup service, deterministically.
typeid_counts = self.ntype_count[ntype_ids]
chunk_sizes = np.ceil(typeid_counts/self.world_size).astype(np.int64)
service_owners = np.floor_divide(type_nids, chunk_sizes).astype(np.int64)
# Now `service_owners` is a list of ranks (process-ids) which own the corresponding
# global-nid <-> partition-id mapping.
# Split the input global_nids into a list of lists where each list will be
# sent to the respective rank/process
# We also need to store the indices, in the indices_list, so that we can re-order
# the final result (partition-ids) in the same order as the global-nids (function argument)
send_list = []
indices_list = []
for idx in range(self.world_size):
idxes = np.where(service_owners == idx)
ll = global_nids[idxes[0]]
send_list.append(torch.from_numpy(ll))
indices_list.append(idxes[0])
assert len(np.concatenate(indices_list)) == len(global_nids)
assert np.all(np.sort(np.concatenate(indices_list)) == np.arange(len(global_nids)))
# Send the request to everyone else.
# As a result of this operation, the current process also receives a list of lists
# from all the other processes.
# These lists are global-node-ids whose global-node-ids <-> partition-id mappings
# are owned/stored by the current process
owner_req_list = alltoallv_cpu(self.rank, self.world_size, send_list)
# Create the response list here for each of the request list received in the previous
# step. Populate the respective partition-ids in this response lists appropriately
out_list = []
for idx in range(self.world_size):
if owner_req_list[idx] is None:
out_list.append(torch.empty((0,), dtype=torch.int64))
continue
# Get the node_type_ids and per_type_nids for the incoming global_nids.
ntype_ids, type_nids = self.id_map(owner_req_list[idx].numpy())
nypte_ids, type_nids = ntype_ids.numpy(), type_nids.numpy()
# Lists to store partition-ids for the incoming global-nids.
type_id_lookups = []
local_order_idx = []
# Now iterate over all the node_types and acculumulate all the partition-ids
# since all the partition-ids are based on the node_type order... they
# must be re-ordered as per the order of the input, which may be different.
for tid in range(len(self.partid_list)):
cond = ntype_ids == tid
local_order_idx.append(np.where(cond)[0])
global_type_nids = type_nids[cond]
if len(global_type_nids) <= 0:
continue
local_type_nids = global_type_nids - self.type_nid_begin[tid]
assert np.all(local_type_nids >= 0)
assert np.all(local_type_nids <= (self.type_nid_end[tid] + 1 - self.type_nid_begin[tid]))
cur_owners = self.partid_list[tid][local_type_nids]
type_id_lookups.append(cur_owners)
# Reorder the partition-ids, so that it agrees with the input order --
# which is the order in which the incoming message is received.
if len(type_id_lookups) <= 0:
out_list.append(torch.empty((0,), dtype=torch.int64))
else:
# Now reorder results for each request.
sort_order_idx = np.argsort(np.concatenate(local_order_idx))
lookups = np.concatenate(type_id_lookups)[sort_order_idx]
out_list.append(torch.from_numpy(lookups))
# Send the partition-ids to their respective requesting processes.
owner_resp_list = alltoallv_cpu(self.rank, self.world_size, out_list)
# Owner_resp_list, is a list of lists of numpy arrays where each list
# is a list of partition-ids which the current process requested
# Now we need to re-order so that the parition-ids correspond to the
# global_nids which are passed into this function.
# Order according to the requesting order.
# Owner_resp_list is the list of owner-ids for global_nids (function argument).
owner_ids = torch.cat([x for x in owner_resp_list if x is not None]).numpy()
assert len(owner_ids) == len(global_nids)
global_nids_order = np.concatenate(indices_list)
sort_order_idx = np.argsort(global_nids_order)
owner_ids = owner_ids[sort_order_idx]
global_nids_order = global_nids_order[sort_order_idx]
assert np.all(np.arange(len(global_nids)) == global_nids_order)
# Now the owner_ids (partition-ids) which corresponding to the global_nids.
return owner_ids
def get_shuffle_nids(self, global_nids, my_global_nids, my_shuffle_global_nids):
'''
This function is used to retrieve shuffle_global_nids for a given set of incoming
global_nids. Note that global_nids are of random order and will contain duplicates
This function first retrieves the partition-ids of the incoming global_nids.
These partition-ids which are also the ranks of processes which own the respective
global-nids as well as shuffle-global-nids. alltoallv is performed to send the
global-nids to respective ranks/partition-ids where the mapping
global-nids <-> shuffle-global-nid is located.
On the receiving side, once the global-nids are received associated shuffle-global-nids
are retrieved and an alltoallv is performed to send the responses to all the other
processes.
Once the responses, shuffle-global-nids, are received, they are re-ordered according
to the incoming global-nids order and returns to the caller.
Parameters:
-----------
self : instance of this class
instance of this class, which is passed by the runtime implicitly
global_nids : numpy array
an array of global node-ids for which partition-ids are to be retrieved by
the distributed lookup service.
my_global_nids: numpy ndarray
array of global_nids which are owned by the current partition/rank/process
This process has the node <-> partition id mapping
my_shuffle_global_nids : numpy ndarray
array of shuffle_global_nids which are assigned by the current process/rank
Returns:
--------
list of integers:
list of shuffle_global_nids which correspond to the incoming node-ids in the
global_nids.
'''
# Get the owner_ids (partition-ids or rank).
owner_ids = self.get_partition_ids(global_nids)
# Ask these owners to supply for the shuffle_global_nids.
send_list = []
id_list = []
for idx in range(self.world_size):
cond = owner_ids == idx
idxes = np.where(cond)
ll = global_nids[idxes[0]]
send_list.append(torch.from_numpy(ll))
id_list.append(idxes[0])
assert len(np.concatenate(id_list)) == len(global_nids)
cur_global_nids = alltoallv_cpu(self.rank, self.world_size, send_list)
# At this point, current process received a list of lists each containing
# a list of global-nids whose corresponding shuffle_global_nids are located
# in the current process.
shuffle_nids_list = []
for idx in range(self.world_size):
if cur_global_nids[idx] is None:
shuffle_nids_list.append(torch.empty((0,), dtype=torch.int64))
continue
uniq_ids, inverse_idx = np.unique(cur_global_nids[idx], return_inverse=True)
common, idx1, idx2 = np.intersect1d(uniq_ids, my_global_nids, assume_unique=True, return_indices=True)
assert len(common) == len(uniq_ids)
req_shuffle_global_nids = my_shuffle_global_nids[idx2][inverse_idx]
assert len(req_shuffle_global_nids) == len(cur_global_nids[idx])
shuffle_nids_list.append(torch.from_numpy(req_shuffle_global_nids))
# Send the shuffle-global-nids to their respective ranks.
mapped_global_nids = alltoallv_cpu(self.rank, self.world_size, shuffle_nids_list)
# Reorder to match global_nids (function parameter).
global_nids_order = np.concatenate(id_list)
shuffle_global_nids = torch.cat(mapped_global_nids).numpy()
assert len(shuffle_global_nids) == len(global_nids)
sorted_idx = np.argsort(global_nids_order)
shuffle_global_nids = shuffle_global_nids[ sorted_idx ]
global_nids_ordered = global_nids_order[sorted_idx]
assert np.all(global_nids_ordered == np.arange(len(global_nids)))
return shuffle_global_nids
...@@ -5,6 +5,8 @@ import itertools ...@@ -5,6 +5,8 @@ import itertools
import constants import constants
from gloo_wrapper import allgather_sizes, alltoallv_cpu from gloo_wrapper import allgather_sizes, alltoallv_cpu
from dist_lookup import DistLookupService
def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data): def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data):
""" """
For nodes which are not owned by the current rank, whose global_nid <-> shuffle_global-nid mapping For nodes which are not owned by the current rank, whose global_nid <-> shuffle_global-nid mapping
...@@ -54,69 +56,41 @@ def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data): ...@@ -54,69 +56,41 @@ def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data):
ret_val = np.column_stack([global_nids, shuffle_global_nids]) ret_val = np.column_stack([global_nids, shuffle_global_nids])
return ret_val return ret_val
def lookup_shuffle_global_nids_edges(rank, world_size, edge_data, id_lookup, node_data):
def get_shuffle_global_nids_edges(rank, world_size, edge_data, node_part_ids, node_data): '''
""" This function is a helper function used to lookup shuffle-global-nids for a given set of
Edges which are owned by this rank, may have global_nids whose shuffle_global_nids are NOT present locally. global-nids using a distributed lookup service.
This function retrieves shuffle_global_nids for such global_nids.
Parameters: Parameters:
----------- -----------
rank : integer rank : integer
rank of the process rank of the process
world_size : integer world_size : integer
total no. of processes used total number of processes used in the process group
edge_data : numpy ndarray edge_data : dictionary
edge_data (augmented) as read from the xxx_edges.txt file edge_data is a dicitonary with keys as column names and values as numpy arrays representing
node_part_ids : numpy array all the edges present in the current graph partition
list of partition ids indexed by global node ids. id_lookup : instance of DistLookupService class
instance of a distributed lookup service class which is used to retrieve partition-ids and
shuffle-global-nids for any given set of global-nids
node_data : dictionary node_data : dictionary
node_data, is a dictionary with keys as column_names and values as numpy arrays node_data is a dictionary with keys as column names and values as numpy arrays representing
""" all the nodes owned by the current process
#determine unique node-ids present locally Returns:
global_nids = np.sort(np.unique(np.concatenate((edge_data[constants.GLOBAL_SRC_ID], edge_data[constants.GLOBAL_DST_ID], node_data[constants.GLOBAL_NID])))) --------
dictionary :
#determine the rank which owns orig-node-id <-> partition/rank mappings dictionary where keys are column names and values are numpy arrays representing all the
part_ids = node_part_ids[global_nids] edges present in the current graph partition
'''
#form list of lists, each list includes global_nids whose mappings (shuffle_global_nids) needs to be retrieved.
#and rank will be the process which owns mappings of these global_nids node_list = np.concatenate([edge_data[constants.GLOBAL_SRC_ID], edge_data[constants.GLOBAL_DST_ID]])
global_nids_ranks = [] shuffle_ids = id_lookup.get_shuffle_nids(node_list,
for i in range(world_size): node_data[constants.GLOBAL_NID],
if (i == rank): node_data[constants.SHUFFLE_GLOBAL_NID])
global_nids_ranks.append(np.empty(shape=(0), dtype=np.int64))
continue edge_data[constants.SHUFFLE_GLOBAL_SRC_ID], edge_data[constants.SHUFFLE_GLOBAL_DST_ID] = np.split(shuffle_ids, 2)
return edge_data
#not_owned_nodes = part_ids[:,0][part_ids[:,1] == i]
not_owned_node_ids = np.where(part_ids == i)[0]
if not_owned_node_ids.shape[0] == 0:
not_owned_nodes = np.empty(shape=(0), dtype=np.int64)
else:
not_owned_nodes = global_nids[not_owned_node_ids]
global_nids_ranks.append(not_owned_nodes)
#Retrieve Global-ids for respective node owners
non_local_nids = get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data)
#Add global_nid <-> shuffle_global_nid mappings to the received data
for i in range(world_size):
if (i == rank):
own_node_ids = np.where(part_ids == i)[0]
own_global_nids = global_nids[own_node_ids]
common, ind1, ind2 = np.intersect1d(node_data[constants.GLOBAL_NID], own_global_nids, return_indices=True)
my_shuffle_global_nids = node_data[constants.SHUFFLE_GLOBAL_NID][ind1]
local_mappings = np.column_stack((own_global_nids, my_shuffle_global_nids))
resolved_global_nids = np.concatenate((non_local_nids, local_mappings))
#form a dictionary of mappings between orig-node-ids and global-ids
resolved_mappings = dict(zip(resolved_global_nids[:,0], resolved_global_nids[:,1]))
#determine global-ids for the orig-src-id and orig-dst-id
shuffle_global_src_id = [resolved_mappings[x] for x in edge_data[constants.GLOBAL_SRC_ID]]
shuffle_global_dst_id = [resolved_mappings[x] for x in edge_data[constants.GLOBAL_DST_ID]]
edge_data[constants.SHUFFLE_GLOBAL_SRC_ID] = np.array(shuffle_global_src_id, dtype=np.int64)
edge_data[constants.SHUFFLE_GLOBAL_DST_ID] = np.array(shuffle_global_dst_id, dtype=np.int64)
def assign_shuffle_global_nids_nodes(rank, world_size, node_data): def assign_shuffle_global_nids_nodes(rank, world_size, node_data):
""" """
......
...@@ -194,7 +194,7 @@ def write_metadata_json(metadata_list, output_dir, graph_name): ...@@ -194,7 +194,7 @@ def write_metadata_json(metadata_list, output_dir, graph_name):
with open('{}/metadata.json'.format(output_dir), 'w') as outfile: with open('{}/metadata.json'.format(output_dir), 'w') as outfile:
json.dump(graph_metadata, outfile, sort_keys=False, indent=4) json.dump(graph_metadata, outfile, sort_keys=False, indent=4)
def augment_edge_data(edge_data, part_ids, edge_tids, rank, world_size): def augment_edge_data(edge_data, lookup_service, edge_tids, rank, world_size):
""" """
Add partition-id (rank which owns an edge) column to the edge_data. Add partition-id (rank which owns an edge) column to the edge_data.
...@@ -202,8 +202,23 @@ def augment_edge_data(edge_data, part_ids, edge_tids, rank, world_size): ...@@ -202,8 +202,23 @@ def augment_edge_data(edge_data, part_ids, edge_tids, rank, world_size):
----------- -----------
edge_data : numpy ndarray edge_data : numpy ndarray
Edge information as read from the xxx_edges.txt file Edge information as read from the xxx_edges.txt file
part_ids : numpy array lookup_service : instance of class DistLookupService
array of part_ids indexed by global_nid Distributed lookup service used to map global-nids to respective partition-ids and▒
shuffle-global-nids
edge_tids: dictionary
dictionary where keys are canonical edge types and values are list of tuples
which indicate the range of edges assigned to each of the partitions
rank : integer
rank of the current process
world_size : integer
total no. of process participating in the communication primitives
Returns:
--------
dictionary :
dictionary with keys as column names and values as numpy arrays and this information is
loaded from input dataset files. In addition to this we include additional columns which
aid this pipelines computation, like constants.OWNER_PROCESS
""" """
#add global_nids to the node_data #add global_nids to the node_data
etype_offset = {} etype_offset = {}
...@@ -225,7 +240,8 @@ def augment_edge_data(edge_data, part_ids, edge_tids, rank, world_size): ...@@ -225,7 +240,8 @@ def augment_edge_data(edge_data, part_ids, edge_tids, rank, world_size):
edge_data[constants.GLOBAL_EID] = global_eids edge_data[constants.GLOBAL_EID] = global_eids
#assign the owner process/rank for each edge #assign the owner process/rank for each edge
edge_data[constants.OWNER_PROCESS] = part_ids[edge_data[constants.GLOBAL_DST_ID]] edge_data[constants.OWNER_PROCESS] = lookup_service.get_partition_ids(edge_data[constants.GLOBAL_DST_ID])
return edge_data
def read_edges_file(edge_file, edge_data_dict): def read_edges_file(edge_file, edge_data_dict):
""" """
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment