Unverified Commit c8ea9fa4 authored by kylasa's avatar kylasa Committed by GitHub
Browse files

[Dist] Flexible pipeline - Initial commit (#4733)

* Flexible pipeline - Initial commit

1. Implementation of flexible pipeline feature.
2. With this implementation, the pipeline now supports multiple partitions per process. And also assumes that num_partitions is always a multiple of num_processes.

* Update test_dist_part.py

* Code changes to address review comments

* Code refactoring of exchange_features function into two functions for better readability

* Upadting test_dist_part to fix merge issues with the master branch

* corrected variable names...

* Fixed code refactoring issues.

* Provide missing function arguments to exchange_feature function

* Providing the missing function argument to fix error.

* Provide missing function argument to 'get_shuffle_nids' function.

* Repositioned a variable within its scope.

* Removed tab space which is causing the indentation problem

* Fix issue with the CI test framework, which is the root cause for the failure of the CI tests.

1. Now we read files specific to the partition-id and store this data separately, identified by the local_part_id, in the local process.
2. Similarly as above, we also differentiate the node and edge features type_ids with the same keys as above.
3. These above two changes will help up to get the appropriate feature data during the feature exchange and send to the destination process correctly.

* Correct the parametrization for the CI unit test cases.

* Addressing Rui's code review comments.

* Addressing code review comments.
parent ee5f0967
...@@ -136,11 +136,15 @@ def test_chunk_graph(num_chunks): ...@@ -136,11 +136,15 @@ def test_chunk_graph(num_chunks):
test_data(sub_dir, feat, data, g.num_edges(c_etype) // num_chunks) test_data(sub_dir, feat, data, g.num_edges(c_etype) // num_chunks)
def _test_pipeline(num_chunks, num_parts, graph_formats=None): def _test_pipeline(num_chunks, num_parts, world_size, graph_formats=None):
if num_chunks < num_parts: if num_chunks < num_parts:
# num_parts should less/equal than num_chunks # num_parts should less/equal than num_chunks
return return
if num_parts % world_size != 0:
# num_parts should be a multiple of world_size
return
with tempfile.TemporaryDirectory() as root_dir: with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(root_dir, num_chunks) g = create_chunked_dataset(root_dir, num_chunks)
...@@ -161,12 +165,12 @@ def _test_pipeline(num_chunks, num_parts, graph_formats=None): ...@@ -161,12 +165,12 @@ def _test_pipeline(num_chunks, num_parts, graph_formats=None):
assert isinstance(int(header), int) assert isinstance(int(header), int)
# Step2: data dispatch # Step2: data dispatch
partition_dir = os.path.join(root_dir, "parted_data") partition_dir = os.path.join(root_dir, 'parted_data')
out_dir = os.path.join(root_dir, "partitioned") out_dir = os.path.join(root_dir, 'partitioned')
ip_config = os.path.join(root_dir, "ip_config.txt") ip_config = os.path.join(root_dir, 'ip_config.txt')
with open(ip_config, "w") as f: with open(ip_config, 'w') as f:
for i in range(num_parts): for i in range(world_size):
f.write(f"127.0.0.{i + 1}\n") f.write(f'127.0.0.{i + 1}\n')
cmd = "python3 tools/dispatch_data.py" cmd = "python3 tools/dispatch_data.py"
cmd += f" --in-dir {in_dir}" cmd += f" --in-dir {in_dir}"
...@@ -209,15 +213,14 @@ def _test_pipeline(num_chunks, num_parts, graph_formats=None): ...@@ -209,15 +213,14 @@ def _test_pipeline(num_chunks, num_parts, graph_formats=None):
) )
@pytest.mark.parametrize("num_chunks", [1, 3, 4, 8]) @pytest.mark.parametrize("num_chunks, num_parts, world_size", [[8, 4, 2], [9, 6, 3], [11, 11, 1], [11, 4, 2], [5, 3, 1]])
@pytest.mark.parametrize("num_parts", [1, 3, 4, 8]) def test_pipeline_basics(num_chunks, num_parts, world_size):
def test_pipeline_basics(num_chunks, num_parts): _test_pipeline(num_chunks, num_parts, world_size)
_test_pipeline(num_chunks, num_parts)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"graph_formats", [None, "csc", "coo,csc", "coo,csc,csr"] "graph_formats", [None, "csc", "coo,csc", "coo,csc,csr"]
) )
def test_pipeline_formats(graph_formats): def test_pipeline_formats(graph_formats):
_test_pipeline(4, 4, graph_formats) _test_pipeline(4, 4, 4, graph_formats)
...@@ -59,14 +59,12 @@ def submit_jobs(args) -> str: ...@@ -59,14 +59,12 @@ def submit_jobs(args) -> str:
with open(args.ip_config, "r") as f: with open(args.ip_config, "r") as f:
num_ips = len(f.readlines()) num_ips = len(f.readlines())
assert ( assert (
num_ips == num_parts num_parts % num_ips == 0
), f"The number of lines[{args.ip_config}] should be equal to num_parts[{num_parts}]." ), f"The num_parts[{args.num_parts}] should be a multiple of number of lines(ip addresses)[{args.ip_config}]."
argslist = "" argslist = ""
argslist += "--world-size {} ".format(num_parts) argslist += "--world-size {} ".format(num_ips)
argslist += "--partitions-dir {} ".format( argslist += "--partitions-dir {} ".format(os.path.abspath(args.partitions_dir))
os.path.abspath(args.partitions_dir)
)
argslist += "--input-dir {} ".format(os.path.abspath(args.in_dir)) argslist += "--input-dir {} ".format(os.path.abspath(args.in_dir))
argslist += "--graph-name {} ".format(graph_name) argslist += "--graph-name {} ".format(graph_name)
argslist += "--schema {} ".format(schema_path) argslist += "--schema {} ".format(schema_path)
......
...@@ -110,8 +110,7 @@ def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset, ...@@ -110,8 +110,7 @@ def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
#create auxiliary data structures from the schema object #create auxiliary data structures from the schema object
memory_snapshot("CreateDGLObj_Begin", part_id) memory_snapshot("CreateDGLObj_Begin", part_id)
_, global_nid_ranges = get_idranges(schema[constants.STR_NODE_TYPE], _, global_nid_ranges = get_idranges(schema[constants.STR_NODE_TYPE],
schema[constants.STR_NUM_NODES_PER_CHUNK]) schema[constants.STR_NUM_NODES_PER_CHUNK])
_, global_eid_ranges = get_idranges(schema[constants.STR_EDGE_TYPE], _, global_eid_ranges = get_idranges(schema[constants.STR_EDGE_TYPE],
schema[constants.STR_NUM_EDGES_PER_CHUNK]) schema[constants.STR_NUM_EDGES_PER_CHUNK])
......
...@@ -24,10 +24,9 @@ from utils import (augment_edge_data, get_edge_types, get_etype_featnames, ...@@ -24,10 +24,9 @@ from utils import (augment_edge_data, get_edge_types, get_etype_featnames,
get_gnid_range_map, get_idranges, get_node_types, get_gnid_range_map, get_idranges, get_node_types,
get_ntype_featnames, memory_snapshot, read_json, get_ntype_featnames, memory_snapshot, read_json,
read_ntype_partition_files, write_dgl_objects, read_ntype_partition_files, write_dgl_objects,
write_metadata_json) write_metadata_json, map_partid_rank)
def gen_node_data(rank, world_size, num_parts, id_lookup, ntid_ntype_map, schema_map):
def gen_node_data(rank, world_size, id_lookup, ntid_ntype_map, schema_map):
''' '''
For this data processing pipeline, reading node files is not needed. All the needed information about For this data processing pipeline, reading node files is not needed. All the needed information about
the nodes can be found in the metadata json file. This function generates the nodes owned by a given the nodes can be found in the metadata json file. This function generates the nodes owned by a given
...@@ -39,6 +38,8 @@ def gen_node_data(rank, world_size, id_lookup, ntid_ntype_map, schema_map): ...@@ -39,6 +38,8 @@ def gen_node_data(rank, world_size, id_lookup, ntid_ntype_map, schema_map):
rank of the process rank of the process
world_size : int world_size : int
total no. of processes total no. of processes
num_parts : int
total no. of partitions
id_lookup : instance of class DistLookupService id_lookup : instance of class DistLookupService
Distributed lookup service used to map global-nids to respective partition-ids and Distributed lookup service used to map global-nids to respective partition-ids and
shuffle-global-nids shuffle-global-nids
...@@ -92,37 +93,44 @@ def gen_node_data(rank, world_size, id_lookup, ntid_ntype_map, schema_map): ...@@ -92,37 +93,44 @@ def gen_node_data(rank, world_size, id_lookup, ntid_ntype_map, schema_map):
using information present in the metadata json file using information present in the metadata json file
''' '''
local_node_data = { constants.GLOBAL_NID : [], local_node_data = {}
constants.NTYPE_ID : [], for local_part_id in range(num_parts//world_size):
constants.GLOBAL_TYPE_NID : [] local_node_data[constants.GLOBAL_NID+"/"+str(local_part_id)] = []
} local_node_data[constants.NTYPE_ID+"/"+str(local_part_id)] = []
local_node_data[constants.GLOBAL_TYPE_NID+"/"+str(local_part_id)] = []
# Note that `get_idranges` always returns two dictionaries. Keys in these
# dictionaries are type names for nodes and edges and values are
# `num_parts` number of tuples indicating the range of type-ids in first
# dictionary and range of global-nids in the second dictionary.
type_nid_dict, global_nid_dict = get_idranges(schema_map[constants.STR_NODE_TYPE], type_nid_dict, global_nid_dict = get_idranges(schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_CHUNK], schema_map[constants.STR_NUM_NODES_PER_CHUNK],
num_chunks=world_size) num_chunks=num_parts)
for ntype_id, ntype_name in ntid_ntype_map.items(): for ntype_id, ntype_name in ntid_ntype_map.items():
type_start, type_end = type_nid_dict[ntype_name][0][0], type_nid_dict[ntype_name][-1][1] type_start, type_end = type_nid_dict[ntype_name][0][0], type_nid_dict[ntype_name][-1][1]
gnid_start, gnid_end = global_nid_dict[ntype_name][0, 0], global_nid_dict[ntype_name][0, 1] gnid_start, gnid_end = global_nid_dict[ntype_name][0, 0], global_nid_dict[ntype_name][0, 1]
node_partid_slice = id_lookup.get_partition_ids(np.arange(gnid_start, gnid_end, dtype=np.int64)) #exclusive node_partid_slice = id_lookup.get_partition_ids(np.arange(gnid_start, gnid_end, dtype=np.int64)) #exclusive
cond = node_partid_slice == rank
own_gnids = np.arange(gnid_start, gnid_end, dtype=np.int64)
own_gnids = own_gnids[cond]
own_tnids = np.arange(type_start, type_end, dtype=np.int64) for local_part_id in range(num_parts//world_size):
own_tnids = own_tnids[cond] cond = node_partid_slice == (rank + local_part_id*world_size)
own_gnids = np.arange(gnid_start, gnid_end, dtype=np.int64)
own_gnids = own_gnids[cond]
own_tnids = np.arange(type_start, type_end, dtype=np.int64)
own_tnids = own_tnids[cond]
local_node_data[constants.NTYPE_ID].append(np.ones(own_gnids.shape, dtype=np.int64)*ntype_id) local_node_data[constants.NTYPE_ID+"/"+str(local_part_id)].append(np.ones(own_gnids.shape, dtype=np.int64)*ntype_id)
local_node_data[constants.GLOBAL_NID].append(own_gnids) local_node_data[constants.GLOBAL_NID+"/"+str(local_part_id)].append(own_gnids)
local_node_data[constants.GLOBAL_TYPE_NID].append(own_tnids) local_node_data[constants.GLOBAL_TYPE_NID+"/"+str(local_part_id)].append(own_tnids)
for k in local_node_data.keys(): for k in local_node_data.keys():
local_node_data[k] = np.concatenate(local_node_data[k]) local_node_data[k] = np.concatenate(local_node_data[k])
return local_node_data return local_node_data
def exchange_edge_data(rank, world_size, edge_data): def exchange_edge_data(rank, world_size, num_parts, edge_data):
""" """
Exchange edge_data among processes in the world. Exchange edge_data among processes in the world.
Prepare list of sliced data targeting each process and trigger Prepare list of sliced data targeting each process and trigger
...@@ -145,38 +153,172 @@ def exchange_edge_data(rank, world_size, edge_data): ...@@ -145,38 +153,172 @@ def exchange_edge_data(rank, world_size, edge_data):
in the world. in the world.
""" """
input_list = [] # Prepare data for each rank in the cluster.
start = timer() start = timer()
for i in np.arange(world_size): for local_part_id in range(num_parts//world_size):
send_idx = (edge_data[constants.OWNER_PROCESS] == i)
send_idx = send_idx.reshape(edge_data[constants.GLOBAL_SRC_ID].shape[0]) input_list = []
filt_data = np.column_stack((edge_data[constants.GLOBAL_SRC_ID][send_idx == 1], \ for idx in range(world_size):
send_idx = (edge_data[constants.OWNER_PROCESS] == (idx + local_part_id*world_size))
send_idx = send_idx.reshape(edge_data[constants.GLOBAL_SRC_ID].shape[0])
filt_data = np.column_stack((edge_data[constants.GLOBAL_SRC_ID][send_idx == 1], \
edge_data[constants.GLOBAL_DST_ID][send_idx == 1], \ edge_data[constants.GLOBAL_DST_ID][send_idx == 1], \
edge_data[constants.GLOBAL_TYPE_EID][send_idx == 1], \ edge_data[constants.GLOBAL_TYPE_EID][send_idx == 1], \
edge_data[constants.ETYPE_ID][send_idx == 1], \ edge_data[constants.ETYPE_ID][send_idx == 1], \
edge_data[constants.GLOBAL_EID][send_idx == 1])) edge_data[constants.GLOBAL_EID][send_idx == 1]))
if(filt_data.shape[0] <= 0): if(filt_data.shape[0] <= 0):
input_list.append(torch.empty((0,5), dtype=torch.int64)) input_list.append(torch.empty((0,5), dtype=torch.int64))
else: else:
input_list.append(torch.from_numpy(filt_data)) input_list.append(torch.from_numpy(filt_data))
end = timer()
dist.barrier ()
output_list = alltoallv_cpu(rank, world_size, input_list)
#Replace the values of the edge_data, with the received data from all the other processes.
rcvd_edge_data = torch.cat(output_list).numpy()
edge_data[constants.GLOBAL_SRC_ID+"/"+str(local_part_id)] = rcvd_edge_data[:,0]
edge_data[constants.GLOBAL_DST_ID+"/"+str(local_part_id)] = rcvd_edge_data[:,1]
edge_data[constants.GLOBAL_TYPE_EID+"/"+str(local_part_id)] = rcvd_edge_data[:,2]
edge_data[constants.ETYPE_ID+"/"+str(local_part_id)] = rcvd_edge_data[:,3]
edge_data[constants.GLOBAL_EID+"/"+str(local_part_id)] = rcvd_edge_data[:,4]
dist.barrier ()
output_list = alltoallv_cpu(rank, world_size, input_list)
end = timer() end = timer()
logging.info(f'[Rank: {rank}] Time to send/rcv edge data: {timedelta(seconds=end-start)}') logging.info(f'[Rank: {rank}] Time to send/rcv edge data: {timedelta(seconds=end-start)}')
#Replace the values of the edge_data, with the received data from all the other processes. # Clean up.
rcvd_edge_data = torch.cat(output_list).numpy()
edge_data[constants.GLOBAL_SRC_ID] = rcvd_edge_data[:,0]
edge_data[constants.GLOBAL_DST_ID] = rcvd_edge_data[:,1]
edge_data[constants.GLOBAL_TYPE_EID] = rcvd_edge_data[:,2]
edge_data[constants.ETYPE_ID] = rcvd_edge_data[:,3]
edge_data[constants.GLOBAL_EID] = rcvd_edge_data[:,4]
edge_data.pop(constants.OWNER_PROCESS) edge_data.pop(constants.OWNER_PROCESS)
edge_data.pop(constants.GLOBAL_SRC_ID)
edge_data.pop(constants.GLOBAL_DST_ID)
edge_data.pop(constants.GLOBAL_TYPE_EID)
edge_data.pop(constants.ETYPE_ID)
edge_data.pop(constants.GLOBAL_EID)
return edge_data return edge_data
def exchange_features(rank, world_size, feature_tids, ntype_gnid_map, id_lookup, feature_data, feat_type, data): def exchange_feature(rank, data, id_lookup, feat_type, feat_key, featdata_key, gid_start,
gid_end, type_id_start, type_id_end, local_part_id, world_size, num_parts,
cur_features, cur_global_ids):
"""This function is used to send/receive one feature for either nodes or
edges of the input graph dataset.
Parameters:
-----------
rank : int
integer, unique id assigned to the current process
data: dicitonary
dictionry in which node or edge features are stored and this information
is read from the appropriate node features file which belongs to the
current process
id_lookup : instance of DistLookupService
instance of an implementation of dist. lookup service to retrieve values
for keys
feat_type : string
this is used to distinguish which features are being exchanged. Please
note that for nodes ownership is clearly defined and for edges it is
always assumed that destination end point of the edge defines the
ownership of that particular edge
feat_key : string
this string is used as a key in the dictionary to store features, as
tensors, in local dictionaries
featdata_key : numpy array
features associated with this feature key being processed
gid_start : int
starting global_id, of either node or edge, for the feature data
gid_end : int
ending global_if, of either node or edge, for the feature data
type_id_start : int
starting type_id for the feature data
type_id_end : int
ending type_id for the feature data
local_part_id : int
integers used to the identify the local partition id used to locate
data belonging to this partition
world_size : int
total number of processes created
num_parts : int
total number of partitions
cur_features : dictionary
dictionary to store the feature data which belongs to the current
process
cur_global_ids : dictionary
dictionary to store global ids, of either nodes or edges, for which
the features stored in the cur_features dictionary
Returns:
-------
dictionary :
a dictionary is returned where keys are type names and
feature data are the values
list :
a dictionary of global_ids either nodes or edges whose features are
received during the data shuffle process
"""
#type_ids for this feature subset on the current rank
gids_feat = np.arange(gid_start, gid_end)
tids_feat = np.arange(type_id_start, type_id_end)
local_idx = np.arange(0, type_id_end - type_id_start)
feats_per_rank = []
global_id_per_rank = []
tokens = feat_key.split("/")
assert len(tokens) == 3
local_feat_key = "/".join(tokens[:-1]) +"/"+ str(local_part_id)
for idx in range(world_size):
# Get the partition ids for the range of global nids.
if feat_type == constants.STR_NODE_FEATURES:
# Retrieve the partition ids for the node features.
# Each partition id will be in the range [0, num_parts).
partid_slice = id_lookup.get_partition_ids(np.arange(gid_start, gid_end, dtype=np.int64))
else:
#Edge data case.
#Ownership is determined by the destination node.
assert data is not None
global_eids = np.arange(gid_start, gid_end, dtype=np.int64)
#Now use `data` to extract destination nodes' global id
#and use that to get the ownership
common, idx1, idx2 = np.intersect1d(data[constants.GLOBAL_EID], global_eids, return_indices=True)
assert common.shape[0] == idx2.shape[0]
global_dst_nids = data[constants.GLOBAL_DST_ID][idx1]
assert np.all(global_eids == data[constants.GLOBAL_EID][idx1])
partid_slice = id_lookup.get_partition_ids(global_dst_nids)
cond = (partid_slice == (idx + local_part_id*world_size))
gids_per_partid = gids_feat[cond]
tids_per_partid = tids_feat[cond]
local_idx_partid = local_idx[cond]
if (gids_per_partid.shape[0] == 0):
feats_per_rank.append(torch.empty((0,1), dtype=torch.float))
global_id_per_rank.append(np.empty((0,1), dtype=np.int64))
else:
feats_per_rank.append(featdata_key[local_idx_partid])
global_id_per_rank.append(torch.from_numpy(gids_per_partid).type(torch.int64))
#features (and global nids) per rank to be sent out are ready
#for transmission, perform alltoallv here.
output_feat_list = alltoallv_cpu(rank, world_size, feats_per_rank)
output_id_list = alltoallv_cpu(rank, world_size, global_id_per_rank)
#stitch node_features together to form one large feature tensor
output_feat_list = torch.cat(output_feat_list)
output_id_list = torch.cat(output_id_list)
if local_feat_key in cur_features:
temp = cur_features[local_feat_key]
cur_features[local_feat_key] = torch.cat([temp, output_feat_list])
temp = cur_global_ids[local_feat_key]
cur_global_ids[local_feat_key] = torch.cat([temp, output_id_list])
else:
cur_features[local_feat_key] = output_feat_list
cur_global_ids[local_feat_key] = output_id_list
return cur_features, cur_global_ids
def exchange_features(rank, world_size, num_parts, feature_tids, type_id_map, id_lookup, feature_data, feat_type, data):
""" """
This function is used to shuffle node features so that each process will receive This function is used to shuffle node features so that each process will receive
all the node features whose corresponding nodes are owned by the same process. all the node features whose corresponding nodes are owned by the same process.
...@@ -201,119 +343,98 @@ def exchange_features(rank, world_size, feature_tids, ntype_gnid_map, id_lookup, ...@@ -201,119 +343,98 @@ def exchange_features(rank, world_size, feature_tids, ntype_gnid_map, id_lookup,
rank of the current process rank of the current process
world_size : int world_size : int
total no. of participating processes. total no. of participating processes.
node_feature_tids : dictionary feature_tids : dictionary
dictionary with keys as node-type names and value is a dictionary. This dictionary dictionary with keys as node-type names with suffixes as feature names
contains information about node-features associated with a given node-type and value and value is a dictionary. This dictionary contains information about
is a list. This list contains a of indexes, like [starting-idx, ending-idx) which node-features associated with a given node-type and value is a list.
can be used to index into the node feature tensors read from corresponding input files. This list contains a of indexes, like [starting-idx, ending-idx) which
ntypes_gnid_map : dictionary can be used to index into the node feature tensors read from
mapping between node type names and global_nids which belong to the keys in this dictionary corresponding input files.
type_id_map : dictionary
mapping between type names and global_ids, of either nodes or edges,
which belong to the keys in this dictionary
id_lookup : instance of class DistLookupService id_lookup : instance of class DistLookupService
Distributed lookup service used to map global-nids to respective partition-ids and Distributed lookup service used to map global-nids to respective
shuffle-global-nids partition-ids and shuffle-global-nids
feature_data: dicitonary
dictionry in which node or edge features are stored and this information is read from the appropriate
node features file which belongs to the current process
feat_type : string feat_type : string
this is used to distinguish which features are being exchanged. Please note that this is used to distinguish which features are being exchanged. Please
for nodes ownership is clearly defined and for edges it is always assumed that note that for nodes ownership is clearly defined and for edges it is
destination end point of the edge defines the ownership of that particular always assumed that destination end point of the edge defines the
edge ownership of that particular edge
data: dicitonary
dictionry in which node or edge features are stored and this information
is read from the appropriate node features file which belongs to the
current process
Returns: Returns:
-------- --------
dictionary : dictionary :
node features are returned as a dictionary where keys are node type names and node feature names a dictionary is returned where keys are type names and
and values are tensors feature data are the values
dictionary : list :
a dictionary of global_nids for the nodes whose node features are received during the data shuffle a dictionary of global_ids either nodes or edges whose features are
process received during the data shuffle process
""" """
start = timer() start = timer()
own_features = {} own_features = {}
own_global_nids = {} own_global_ids = {}
#To iterate over the node_types and associated node_features
for type_name, type_info in feature_tids.items(): # To iterate over the node_types and associated node_features
for feat_key, type_info in feature_tids.items():
#To iterate over the node_features, of a given node_type
#type_info is a list of 3 elements # To iterate over the feature data, of a given (node or edge )type
#[feature-name, starting-idx, ending-idx] # type_info is a list of 3 elements (as shown below):
#feature-name is the name given to the feature-data, read from the input metadata file # [feature-name, starting-idx, ending-idx]
#[starting-idx, ending-idx) specifies the range of indexes associated with the features read from # feature-name is the name given to the feature-data,
#the associated input file. Note that the rows of features read from the input file should be same # read from the input metadata file
#as specified with this range. So no. of rows = ending-idx - starting-idx. # [starting-idx, ending-idx) specifies the range of indexes
for feat_info in type_info: # associated with the features data
# Determine the owner process for these features.
# Note that the keys in the node features (and similarly edge features)
# dictionary is of the following format:
# `node_type/feature_name/local_part_id`:
# where node_type and feature_name are self-explanatory and
# local_part_id denotes the partition-id, in the local process,
# which will be used a suffix to store all the information of a
# given partition which is processed by the current process. Its
# values start from 0 onwards, for instance 0, 1, 2 ... etc.
# local_part_id can be easily mapped to global partition id very
# easily, using cyclic ordering. All local_part_ids = 0 from all
# processes will form global partition-ids between 0 and world_size-1.
# Similarly all local_part_ids = 1 from all processes will form
# global partition ids in the range [world_size, 2*world_size-1] and
# so on.
tokens = feat_key.split("/")
assert len(tokens) == 3
type_name = tokens[0]
feat_name = tokens[1]
logging.info(f'[Rank: {rank}] processing feature: {feat_key}')
#determine the owner process for these node features. for feat_info in type_info:
feats_per_rank = [] # Compute the global_id range for this feature data
global_nid_per_rank = [] type_id_start = int(feat_info[0])
feat_name = feat_info[0] type_id_end = int(feat_info[1])
feat_key = type_name+'/'+feat_name begin_global_id = type_id_map[type_name][0]
logging.info(f'[Rank: {rank}] processing node feature: {feat_key}') gid_start = begin_global_id + type_id_start
gid_end = begin_global_id + type_id_end
#compute the global_nid range for this node features
type_nid_start = int(feat_info[1]) # Check if features exist for this type_name + feat_name.
type_nid_end = int(feat_info[2]) # This check should always pass, because feature_tids are built
begin_global_nid = ntype_gnid_map[type_name][0] # by reading the input metadata json file for existing features.
gnid_start = begin_global_nid + type_nid_start
gnid_end = begin_global_nid + type_nid_end
#type_nids for this feature subset on the current rank
gnids_feat = np.arange(gnid_start, gnid_end)
tnids_feat = np.arange(type_nid_start, type_nid_end)
local_idx = np.arange(0, type_nid_end - type_nid_start)
#check if node features exist for this ntype_name + feat_name
#this check should always pass, because node_feature_tids are built
#by reading the input metadata json file for existing node features.
assert(feat_key in feature_data) assert(feat_key in feature_data)
key_feats = feature_data[feat_key] for local_part_id in range(num_parts//world_size):
for part_id in range(world_size): featdata_key = feature_data[feat_key]
# Get the partition ids for the range of global nids. own_features, own_global_ids = exchange_feature(rank, data, id_lookup,
if feat_type == constants.STR_NODE_FEATURES: feat_type, feat_key, featdata_key, gid_start, gid_end, type_id_start,
partid_slice = id_lookup.get_partition_ids(np.arange(gnid_start, gnid_end, dtype=np.int64)) type_id_end, local_part_id, world_size, num_parts, own_features,
else: own_global_ids)
#Edge data case.
#Ownership is determined by the destination node.
assert data is not None
global_eids = np.arange(gnid_start, gnid_end, dtype=np.int64)
#Now use `data` to extract destination nodes' global id
#and use that to get the ownership
common, idx1, idx2 = np.intersect1d(data[constants.GLOBAL_EID], global_eids, return_indices=True)
assert common.shape[0] == idx2.shape[0]
global_dst_nids = data[constants.GLOBAL_DST_ID][idx1]
assert np.all(global_eids == data[constants.GLOBAL_EID][idx1])
partid_slice = id_lookup.get_partition_ids(global_dst_nids)
cond = (partid_slice == part_id)
gnids_per_partid = gnids_feat[cond]
tnids_per_partid = tnids_feat[cond]
local_idx_partid = local_idx[cond]
if (gnids_per_partid.shape[0] == 0):
feats_per_rank.append(torch.empty((0,1), dtype=torch.float))
global_nid_per_rank.append(np.empty((0,1), dtype=np.int64))
else:
feats_per_rank.append(key_feats[local_idx_partid])
global_nid_per_rank.append(torch.from_numpy(gnids_per_partid).type(torch.int64))
#features (and global nids) per rank to be sent out are ready
#for transmission, perform alltoallv here.
output_feat_list = alltoallv_cpu(rank, world_size, feats_per_rank)
output_nid_list = alltoallv_cpu(rank, world_size, global_nid_per_rank)
#stitch node_features together to form one large feature tensor
own_features[feat_key] = torch.cat(output_feat_list)
own_global_nids[feat_key] = torch.cat(output_nid_list).numpy()
end = timer() end = timer()
logging.info(f'[Rank: {rank}] Total time for node feature exchange: {timedelta(seconds = end - start)}') logging.info(f'[Rank: {rank}] Total time for feature exchange {feat_key}: {timedelta(seconds = end - start)}')
return own_features, own_global_nids return own_features, own_global_ids
def exchange_graph_data(rank, world_size, node_features, edge_features, def exchange_graph_data(rank, world_size, num_parts, node_features, edge_features,
node_feat_tids, edge_feat_tids, node_feat_tids, edge_feat_tids,
edge_data, id_lookup, ntypes_ntypeid_map, edge_data, id_lookup, ntypes_ntypeid_map,
ntypes_gnid_range_map, etypes_geid_range_map, ntypes_gnid_range_map, etypes_geid_range_map,
...@@ -327,6 +448,8 @@ def exchange_graph_data(rank, world_size, node_features, edge_features, ...@@ -327,6 +448,8 @@ def exchange_graph_data(rank, world_size, node_features, edge_features,
rank of the current process rank of the current process
world_size : int world_size : int
total no. of participating processes. total no. of participating processes.
num_parts : int
total no. of graph partitions.
node_feautres : dicitonary node_feautres : dicitonary
dictionry where node_features are stored and this information is read from the appropriate dictionry where node_features are stored and this information is read from the appropriate
node features file which belongs to the current process node features file which belongs to the current process
...@@ -382,21 +505,21 @@ def exchange_graph_data(rank, world_size, node_features, edge_features, ...@@ -382,21 +505,21 @@ def exchange_graph_data(rank, world_size, node_features, edge_features,
was performed in the `exchange_features` function call was performed in the `exchange_features` function call
""" """
memory_snapshot("ShuffleNodeFeaturesBegin: ", rank) memory_snapshot("ShuffleNodeFeaturesBegin: ", rank)
rcvd_node_features, rcvd_global_nids = exchange_features(rank, world_size, node_feat_tids, rcvd_node_features, rcvd_global_nids = exchange_features(rank, world_size, num_parts, node_feat_tids,
ntypes_gnid_range_map, id_lookup, node_features, ntypes_gnid_range_map, id_lookup, node_features,
constants.STR_NODE_FEATURES, None) constants.STR_NODE_FEATURES, None)
memory_snapshot("ShuffleNodeFeaturesComplete: ", rank) memory_snapshot("ShuffleNodeFeaturesComplete: ", rank)
logging.info(f'[Rank: {rank}] Done with node features exchange.') logging.info(f'[Rank: {rank}] Done with node features exchange.')
rcvd_edge_features, rcvd_global_eids = exchange_features(rank, world_size, edge_feat_tids, rcvd_edge_features, rcvd_global_eids = exchange_features(rank, world_size, num_parts, edge_feat_tids,
etypes_geid_range_map, id_lookup, edge_features, etypes_geid_range_map, id_lookup, edge_features,
constants.STR_EDGE_FEATURES, edge_data) constants.STR_EDGE_FEATURES, edge_data)
logging.info(f'[Rank: {rank}] Done with edge features exchange.') logging.info(f'[Rank: {rank}] Done with edge features exchange.')
node_data = gen_node_data(rank, world_size, id_lookup, ntid_ntype_map, schema_map) node_data = gen_node_data(rank, world_size, num_parts, id_lookup, ntid_ntype_map, schema_map)
memory_snapshot("NodeDataGenerationComplete: ", rank) memory_snapshot("NodeDataGenerationComplete: ", rank)
edge_data = exchange_edge_data(rank, world_size, edge_data)
edge_data = exchange_edge_data(rank, world_size, num_parts, edge_data)
memory_snapshot("ShuffleEdgeDataComplete: ", rank) memory_snapshot("ShuffleEdgeDataComplete: ", rank)
return node_data, rcvd_node_features, rcvd_global_nids, edge_data, rcvd_edge_features, rcvd_global_eids return node_data, rcvd_node_features, rcvd_global_nids, edge_data, rcvd_edge_features, rcvd_global_eids
...@@ -451,10 +574,10 @@ def read_dataset(rank, world_size, id_lookup, params, schema_map): ...@@ -451,10 +574,10 @@ def read_dataset(rank, world_size, id_lookup, params, schema_map):
edge_features = {} edge_features = {}
#node_tids, node_features, edge_datadict, edge_tids #node_tids, node_features, edge_datadict, edge_tids
node_tids, node_features, node_feat_tids, edge_data, edge_tids, edge_features, edge_feat_tids = \ node_tids, node_features, node_feat_tids, edge_data, edge_tids, edge_features, edge_feat_tids = \
get_dataset(params.input_dir, params.graph_name, rank, world_size, schema_map) get_dataset(params.input_dir, params.graph_name, rank, world_size, params.num_parts, schema_map)
logging.info(f'[Rank: {rank}] Done reading dataset deom {params.input_dir}') logging.info(f'[Rank: {rank}] Done reading dataset {params.input_dir}')
edge_data = augment_edge_data(edge_data, id_lookup, edge_tids, rank, world_size) edge_data = augment_edge_data(edge_data, id_lookup, edge_tids, rank, world_size, params.num_parts)
logging.info(f'[Rank: {rank}] Done augmenting edge_data: {len(edge_data)}, {edge_data[constants.GLOBAL_SRC_ID].shape}') logging.info(f'[Rank: {rank}] Done augmenting edge_data: {len(edge_data)}, {edge_data[constants.GLOBAL_SRC_ID].shape}')
return node_tids, node_features, node_feat_tids, edge_data, edge_features, edge_tids, edge_feat_tids return node_tids, node_features, node_feat_tids, edge_data, edge_features, edge_tids, edge_feat_tids
...@@ -592,8 +715,12 @@ def gen_dist_partitions(rank, world_size, params): ...@@ -592,8 +715,12 @@ def gen_dist_partitions(rank, world_size, params):
#Initialize distributed lookup service for partition-id and shuffle-global-nids mappings #Initialize distributed lookup service for partition-id and shuffle-global-nids mappings
#for global-nids #for global-nids
_, global_nid_ranges = get_idranges(schema_map[constants.STR_NODE_TYPE], _, global_nid_ranges = get_idranges(schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_CHUNK]) schema_map[constants.STR_NUM_NODES_PER_CHUNK], params.num_parts)
id_map = dgl.distributed.id_map.IdMap(global_nid_ranges) id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)
# The resources, which are node-id to partition-id mappings, are split
# into `world_size` number of parts, where each part can be mapped to
# each physical node.
id_lookup = DistLookupService(os.path.join(params.input_dir, params.partitions_dir),\ id_lookup = DistLookupService(os.path.join(params.input_dir, params.partitions_dir),\
schema_map[constants.STR_NODE_TYPE],\ schema_map[constants.STR_NODE_TYPE],\
id_map, rank, world_size) id_map, rank, world_size)
...@@ -615,7 +742,7 @@ def gen_dist_partitions(rank, world_size, params): ...@@ -615,7 +742,7 @@ def gen_dist_partitions(rank, world_size, params):
ntypes_gnid_range_map = get_gnid_range_map(node_tids) ntypes_gnid_range_map = get_gnid_range_map(node_tids)
etypes_geid_range_map = get_gnid_range_map(edge_tids) etypes_geid_range_map = get_gnid_range_map(edge_tids)
node_data, rcvd_node_features, rcvd_global_nids, edge_data, rcvd_edge_features, rcvd_global_eids = \ node_data, rcvd_node_features, rcvd_global_nids, edge_data, rcvd_edge_features, rcvd_global_eids = \
exchange_graph_data(rank, world_size, node_features, edge_features, \ exchange_graph_data(rank, world_size, params.num_parts, node_features, edge_features, \
node_feat_tids, edge_feat_tids, edge_data, id_lookup, ntypes_ntypeid_map, \ node_feat_tids, edge_feat_tids, edge_data, id_lookup, ntypes_ntypeid_map, \
ntypes_gnid_range_map, etypes_geid_range_map, \ ntypes_gnid_range_map, etypes_geid_range_map, \
ntypeid_ntypes_map, schema_map) ntypeid_ntypes_map, schema_map)
...@@ -624,16 +751,19 @@ def gen_dist_partitions(rank, world_size, params): ...@@ -624,16 +751,19 @@ def gen_dist_partitions(rank, world_size, params):
memory_snapshot("DataShuffleComplete: ", rank) memory_snapshot("DataShuffleComplete: ", rank)
#sort node_data by ntype #sort node_data by ntype
idx = node_data[constants.NTYPE_ID].argsort() for local_part_id in range(params.num_parts//world_size):
for k, v in node_data.items(): idx = node_data[constants.NTYPE_ID+"/"+str(local_part_id)].argsort()
node_data[k] = v[idx] for k, v in node_data.items():
idx = None tokens = k.split("/")
assert len(tokens) == 2
if tokens[1] == str(local_part_id):
node_data[k] = v[idx]
idx = None
gc.collect() gc.collect()
logging.info(f'[Rank: {rank}] Sorted node_data by node_type') logging.info(f'[Rank: {rank}] Sorted node_data by node_type')
#resolve global_ids for nodes #resolve global_ids for nodes
assign_shuffle_global_nids_nodes(rank, world_size, node_data) assign_shuffle_global_nids_nodes(rank, world_size, params.num_parts, node_data)
logging.info(f'[Rank: {rank}] Done assigning global-ids to nodes...') logging.info(f'[Rank: {rank}] Done assigning global-ids to nodes...')
memory_snapshot("ShuffleGlobalID_Nodes_Complete: ", rank) memory_snapshot("ShuffleGlobalID_Nodes_Complete: ", rank)
...@@ -643,25 +773,30 @@ def gen_dist_partitions(rank, world_size, params): ...@@ -643,25 +773,30 @@ def gen_dist_partitions(rank, world_size, params):
for featname in featnames: for featname in featnames:
#if a feature name exists for a node-type, then it should also have #if a feature name exists for a node-type, then it should also have
#feature data as well. Hence using the assert statement. #feature data as well. Hence using the assert statement.
feature_key = ntype_name+'/'+featname for local_part_id in range(params.num_parts//world_size):
assert(feature_key in rcvd_global_nids) feature_key = ntype_name+'/'+featname+"/"+str(local_part_id)
global_nids = rcvd_global_nids[feature_key] assert(feature_key in rcvd_global_nids)
global_nids = rcvd_global_nids[feature_key]
_, idx1, _ = np.intersect1d(node_data[constants.GLOBAL_NID], global_nids, return_indices=True) _, idx1, _ = np.intersect1d(node_data[constants.GLOBAL_NID+"/"+str(local_part_id)], global_nids, return_indices=True)
shuffle_global_ids = node_data[constants.SHUFFLE_GLOBAL_NID][idx1] shuffle_global_ids = node_data[constants.SHUFFLE_GLOBAL_NID+"/"+str(local_part_id)][idx1]
feature_idx = shuffle_global_ids.argsort() feature_idx = shuffle_global_ids.argsort()
rcvd_node_features[feature_key] = rcvd_node_features[feature_key][feature_idx] rcvd_node_features[feature_key] = rcvd_node_features[feature_key][feature_idx]
memory_snapshot("ReorderNodeFeaturesComplete: ", rank) memory_snapshot("ReorderNodeFeaturesComplete: ", rank)
#sort edge_data by etype #sort edge_data by etype
sorted_idx = edge_data[constants.ETYPE_ID].argsort() for local_part_id in range(params.num_parts//world_size):
for k, v in edge_data.items(): sorted_idx = edge_data[constants.ETYPE_ID+"/"+str(local_part_id)].argsort()
edge_data[k] = v[sorted_idx] for k, v in edge_data.items():
sorted_idx = None tokens = k.split("/")
assert len(tokens) == 2
if tokens[1] == str(local_part_id):
edge_data[k] = v[sorted_idx]
sorted_idx = None
gc.collect() gc.collect()
shuffle_global_eid_start = assign_shuffle_global_nids_edges(rank, world_size, edge_data) shuffle_global_eid_offsets = assign_shuffle_global_nids_edges(rank, world_size, params.num_parts, edge_data)
logging.info(f'[Rank: {rank}] Done assigning global_ids to edges ...') logging.info(f'[Rank: {rank}] Done assigning global_ids to edges ...')
memory_snapshot("ShuffleGlobalID_Edges_Complete: ", rank) memory_snapshot("ShuffleGlobalID_Edges_Complete: ", rank)
...@@ -669,56 +804,73 @@ def gen_dist_partitions(rank, world_size, params): ...@@ -669,56 +804,73 @@ def gen_dist_partitions(rank, world_size, params):
for etype_name in etypes: for etype_name in etypes:
featnames = get_etype_featnames(etype_name, schema_map) featnames = get_etype_featnames(etype_name, schema_map)
for featname in featnames: for featname in featnames:
feature_key = etype_name+'/'+featname for local_part_id in range(params.num_parts//world_size):
assert feature_key in rcvd_global_eids feature_key = etype_name+'/'+featname+"/"+str(local_part_id)
global_eids = rcvd_global_eids[feature_key] assert feature_key in rcvd_global_eids
global_eids = rcvd_global_eids[feature_key]
_, idx1, _ = np.intersect1d(edge_data[constants.GLOBAL_EID], global_eids, return_indices=True) _, idx1, _ = np.intersect1d(edge_data[constants.GLOBAL_EID+"/"+str(local_part_id)], global_eids, return_indices=True)
shuffle_global_ids = edge_data[constants.SHUFFLE_GLOBAL_EID][idx1] shuffle_global_ids = edge_data[constants.SHUFFLE_GLOBAL_EID+"/"+str(local_part_id)][idx1]
feature_idx = shuffle_global_ids.argsort() feature_idx = shuffle_global_ids.argsort()
rcvd_edge_features[feature_key] = rcvd_edge_features[feature_key][feature_idx] rcvd_edge_features[feature_key] = rcvd_edge_features[feature_key][feature_idx]
for k, v in rcvd_edge_features.items():
logging.info(f'[Rank: {rank}] key: {k} v: {v.shape}')
#determine global-ids for edge end-points #determine global-ids for edge end-points
edge_data = lookup_shuffle_global_nids_edges(rank, world_size, edge_data, id_lookup, node_data) edge_data = lookup_shuffle_global_nids_edges(rank, world_size, params.num_parts, edge_data, id_lookup, node_data)
logging.info(f'[Rank: {rank}] Done resolving orig_node_id for local node_ids...') logging.info(f'[Rank: {rank}] Done resolving orig_node_id for local node_ids...')
memory_snapshot("ShuffleGlobalID_Lookup_Complete: ", rank) memory_snapshot("ShuffleGlobalID_Lookup_Complete: ", rank)
def prepare_local_data(src_data, local_part_id):
local_data = {}
for k, v in src_data.items():
tokens = k.split("/")
if tokens[len(tokens)-1] == str(local_part_id):
local_data["/".join(tokens[:-1])] = v
return local_data
#create dgl objects here #create dgl objects here
output_meta_json = {}
start = timer() start = timer()
num_nodes = 0
num_edges = shuffle_global_eid_start
node_count = len(node_data[constants.NTYPE_ID])
edge_count = len(edge_data[constants.ETYPE_ID])
graph_obj, ntypes_map_val, etypes_map_val, ntypes_map, etypes_map, \
orig_nids, orig_eids = create_dgl_object(schema_map, rank, node_data, \
edge_data, num_edges, params.save_orig_nids, params.save_orig_eids)
memory_snapshot("CreateDGLObjectsComplete: ", rank)
graph_formats = None graph_formats = None
if params.graph_formats: if params.graph_formats:
graph_formats = params.graph_formats.split(',') graph_formats = params.graph_formats.split(',')
sort_etypes = len(etypes_map) > 1
write_dgl_objects(graph_obj, rcvd_node_features, rcvd_edge_features, params.output, \ for local_part_id in range(params.num_parts//world_size):
rank, orig_nids, orig_eids, graph_formats, sort_etypes) num_edges = shuffle_global_eid_offsets[local_part_id]
memory_snapshot("DiskWriteDGLObjectsComplete: ", rank) node_count = len(node_data[constants.NTYPE_ID+"/"+str(local_part_id)])
edge_count = len(edge_data[constants.ETYPE_ID+"/"+str(local_part_id)])
#get the meta-data local_node_data = prepare_local_data(node_data, local_part_id)
json_metadata = create_metadata_json(params.graph_name, node_count, edge_count, \ local_edge_data = prepare_local_data(edge_data, local_part_id)
rank, world_size, ntypes_map_val, \ graph_obj, ntypes_map_val, etypes_map_val, ntypes_map, etypes_map, \
orig_nids, orig_eids = create_dgl_object(schema_map, rank+local_part_id*world_size,
local_node_data, local_edge_data,
num_edges, params.save_orig_nids, params.save_orig_eids)
sort_etypes = len(etypes_map) > 1
local_node_features = prepare_local_data(rcvd_node_features, local_part_id)
local_edge_features = prepare_local_data(rcvd_edge_features, local_part_id)
write_dgl_objects(graph_obj,
local_node_features, local_edge_features,
params.output,
rank + (local_part_id*world_size),
orig_nids, orig_eids, graph_formats, sort_etypes)
memory_snapshot("DiskWriteDGLObjectsComplete: ", rank)
#get the meta-data
json_metadata = create_metadata_json(params.graph_name, node_count, edge_count, \
local_part_id * world_size + rank, params.num_parts, ntypes_map_val, \
etypes_map_val, ntypes_map, etypes_map, params.output) etypes_map_val, ntypes_map, etypes_map, params.output)
memory_snapshot("MetadataCreateComplete: ", rank) output_meta_json["local-part-id-"+str(local_part_id*world_size + rank)] = json_metadata
memory_snapshot("MetadataCreateComplete: ", rank)
if (rank == 0): if (rank == 0):
#get meta-data from all partitions and merge them on rank-0 #get meta-data from all partitions and merge them on rank-0
metadata_list = gather_metadata_json(json_metadata, rank, world_size) metadata_list = gather_metadata_json(output_meta_json, rank, world_size)
metadata_list[0] = json_metadata metadata_list[0] = output_meta_json
write_metadata_json(metadata_list, params.output, params.graph_name) write_metadata_json(metadata_list, params.output, params.graph_name, world_size, params.num_parts)
else: else:
#send meta-data to Rank-0 process #send meta-data to Rank-0 process
gather_metadata_json(json_metadata, rank, world_size) gather_metadata_json(output_meta_json, rank, world_size)
end = timer() end = timer()
logging.info(f'[Rank: {rank}] Time to create dgl objects: {timedelta(seconds = end - start)}') logging.info(f'[Rank: {rank}] Time to create dgl objects: {timedelta(seconds = end - start)}')
memory_snapshot("MetadataWriteComplete: ", rank) memory_snapshot("MetadataWriteComplete: ", rank)
......
...@@ -7,10 +7,10 @@ import torch ...@@ -7,10 +7,10 @@ import torch
from pyarrow import csv from pyarrow import csv
import constants import constants
from utils import get_idranges from utils import get_idranges, map_partid_rank
def get_dataset(input_dir, graph_name, rank, world_size, schema_map): def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
""" """
Function to read the multiple file formatted dataset. Function to read the multiple file formatted dataset.
...@@ -24,6 +24,8 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map): ...@@ -24,6 +24,8 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
rank of the current process rank of the current process
world_size : int world_size : int
total number of process in the current execution total number of process in the current execution
num_parts : int
total number of output graph partitions
schema_map : dictionary schema_map : dictionary
this is the dictionary created by reading the graph metadata json file this is the dictionary created by reading the graph metadata json file
for the input graph dataset for the input graph dataset
...@@ -105,34 +107,7 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map): ...@@ -105,34 +107,7 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
Data read from each of the node features file is a multi-dimensional tensor data and is read Data read from each of the node features file is a multi-dimensional tensor data and is read
in numpy format, which is also the storage format of node features on the permanent storage. in numpy format, which is also the storage format of node features on the permanent storage.
'''
#iterate over the "node_data" dictionary in the schema_map
#read the node features if exists
#also keep track of the type_nids for which the node_features are read.
dataset_features = schema_map[constants.STR_NODE_DATA]
if((dataset_features is not None) and (len(dataset_features) > 0)):
for ntype_name, ntype_feature_data in dataset_features.items():
#ntype_feature_data is a dictionary
#where key: feature_name, value: dictionary in which keys are "format", "data"
node_feature_tids[ntype_name] = []
for feat_name, feat_data in ntype_feature_data.items():
assert feat_data[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_NUMPY
num_chunks = len(feat_data[constants.STR_DATA])
read_list = np.array_split(np.arange(num_chunks), world_size)
nfeat = []
for idx in read_list[rank]:
nfeat_file = feat_data[constants.STR_DATA][idx]
if not os.path.isabs(nfeat_file):
nfeat_file = os.path.join(input_dir, nfeat_file)
logging.info(f'Loading node feature[{feat_name}] of ntype[{ntype_name}] from {nfeat_file}')
nfeat.append(np.load(nfeat_file))
nfeat = np.concatenate(nfeat)
node_features[ntype_name + '/' + feat_name] = torch.from_numpy(nfeat)
node_feature_tids[ntype_name].append([feat_name, -1, -1])
'''
"node_type" : ["ntype0-name", "ntype1-name", ....], #m node types "node_type" : ["ntype0-name", "ntype1-name", ....], #m node types
"num_nodes_per_chunk" : [ "num_nodes_per_chunk" : [
[a0, a1, ...a<p-1>], #p partitions [a0, a1, ...a<p-1>], #p partitions
...@@ -154,25 +129,66 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map): ...@@ -154,25 +129,66 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
which are owned by that particular rank. And using the "num_nodes_per_chunk" information each which are owned by that particular rank. And using the "num_nodes_per_chunk" information each
process can easily compute any nodes per-type node_id and global node_id. process can easily compute any nodes per-type node_id and global node_id.
The node-ids are treated as int64's in order to support billions of nodes in the input graph. The node-ids are treated as int64's in order to support billions of nodes in the input graph.
''' '''
#read my nodes for each node type #read my nodes for each node type
node_tids, ntype_gnid_offset = get_idranges(schema_map[constants.STR_NODE_TYPE], node_tids, ntype_gnid_offset = get_idranges(schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_CHUNK], schema_map[constants.STR_NUM_NODES_PER_CHUNK],
num_chunks=world_size) num_chunks=num_parts)
for ntype_name in schema_map[constants.STR_NODE_TYPE]:
if ntype_name in node_feature_tids: #iterate over the "node_data" dictionary in the schema_map
for item in node_feature_tids[ntype_name]: #read the node features if exists
item[1] = node_tids[ntype_name][rank][0] #also keep track of the type_nids for which the node_features are read.
item[2] = node_tids[ntype_name][rank][1] dataset_features = schema_map[constants.STR_NODE_DATA]
if((dataset_features is not None) and (len(dataset_features) > 0)):
for ntype_name, ntype_feature_data in dataset_features.items():
for feat_name, feat_data in ntype_feature_data.items():
assert feat_data[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_NUMPY
# It is guaranteed that num_chunks is always greater
# than num_partitions.
num_chunks = len(feat_data[constants.STR_DATA])
read_list = np.array_split(np.arange(num_chunks), num_parts)
for local_part_id in range(num_parts):
if map_partid_rank(local_part_id, world_size) == rank:
nfeat = []
nfeat_tids = []
for idx in read_list[local_part_id]:
nfeat_file = feat_data[constants.STR_DATA][idx]
if not os.path.isabs(nfeat_file):
nfeat_file = os.path.join(input_dir, nfeat_file)
logging.info(f'Loading node feature[{feat_name}] of ntype[{ntype_name}] from {nfeat_file}')
nfeat.append(np.load(nfeat_file))
nfeat = np.concatenate(nfeat) if len(nfeat) != 0 else np.array([])
node_features[ntype_name+"/"+feat_name+"/"+str(local_part_id//world_size)] = torch.from_numpy(nfeat)
nfeat_tids.append(node_tids[ntype_name][local_part_id])
node_feature_tids[ntype_name+"/"+feat_name+"/"+str(local_part_id//world_size)] = nfeat_tids
#done building node_features locally. #done building node_features locally.
if len(node_features) <= 0: if len(node_features) <= 0:
logging.info(f'[Rank: {rank}] This dataset does not have any node features') logging.info(f'[Rank: {rank}] This dataset does not have any node features')
else: else:
for k, v in node_features.items(): assert len(node_features) == len(node_feature_tids)
logging.info(f'[Rank: {rank}] node feature name: {k}, feature data shape: {v.size()}')
# Note that the keys in the node_features dictionary are as follows:
# `ntype_name/feat_name/local_part_id`.
# where ntype_name and feat_name are self-explanatory, and
# local_part_id indicates the partition-id, in the context of current
# process which take the values 0, 1, 2, ....
for feat_name, feat_info in node_features.items():
logging.info(f'[Rank: {rank}] node feature name: {feat_name}, feature data shape: {feat_info.size()}')
tokens = feat_name.split("/")
assert len(tokens) == 3
# Get the range of type ids which are mapped to the current node.
tids = node_feature_tids[feat_name]
# Iterate over the range of type ids for the current node feature
# and count the number of features for this feature name.
count = tids[0][1] - tids[0][0]
assert count == feat_info.size()[0]
''' '''
Reading edge features now. Reading edge features now.
...@@ -214,50 +230,48 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map): ...@@ -214,50 +230,48 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
edge_features = {} edge_features = {}
edge_feature_tids = {} edge_feature_tids = {}
# Read edges for each edge type that are processed by the currnet process.
edge_tids, _ = get_idranges(schema_map[constants.STR_EDGE_TYPE],
schema_map[constants.STR_NUM_EDGES_PER_CHUNK], num_parts)
# Iterate over the "edge_data" dictionary in the schema_map. # Iterate over the "edge_data" dictionary in the schema_map.
# Read the edge features if exists. # Read the edge features if exists.
# Also keep track of the type_eids for which the edge_features are read. # Also keep track of the type_eids for which the edge_features are read.
dataset_features = schema_map[constants.STR_EDGE_DATA] dataset_features = schema_map[constants.STR_EDGE_DATA]
if dataset_features and (len(dataset_features) > 0): if dataset_features and (len(dataset_features) > 0):
for etype_name, etype_feature_data in dataset_features.items(): for etype_name, etype_feature_data in dataset_features.items():
#etype_feature_data is a dictionary
#where key: feature_name, value: dictionary in which keys are "format", "data"
edge_feature_tids[etype_name] = []
for feat_name, feat_data in etype_feature_data.items(): for feat_name, feat_data in etype_feature_data.items():
assert feat_data[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_NUMPY assert feat_data[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_NUMPY
num_chunks = len(feat_data[constants.STR_DATA]) num_chunks = len(feat_data[constants.STR_DATA])
read_list = np.array_split(np.arange(num_chunks), world_size) read_list = np.array_split(np.arange(num_chunks), num_parts)
efeat = [] for local_part_id in range(num_parts):
for idx in read_list[rank]: if map_partid_rank(local_part_id, world_size) == rank:
efeat_file = feat_data[constants.STR_DATA][idx] efeats = []
if not os.path.isabs(efeat_file): efeat_tids = []
efeat_file = os.path.join(input_dir, efeat_file) for idx in read_list[local_part_id]:
logging.info( feature_fname = feat_data[constants.STR_DATA][idx]
f'Loading edge feature[{feat_name}] of etype[{etype_name}] from {efeat_file}' if (os.path.isabs(feature_fname)):
) logging.info(f'Loading numpy from {feature_fname}')
efeat.append(np.load(efeat_file)) efeats.append(torch.from_numpy(np.load(feature_fname)))
efeat = np.concatenate(efeat) else:
edge_features[etype_name + '/' + feat_name] = torch.from_numpy(efeat) numpy_path = os.path.join(input_dir, feature_fname)
logging.info(f'Loading numpy from {numpy_path}')
edge_feature_tids[etype_name].append([feat_name, -1, -1]) efeats.append(torch.from_numpy(np.load(numpy_path)))
efeat_tids.append(edge_tids[etype_name][local_part_id])
# Read edges for each node types that are processed by the currnet process. edge_features[etype_name+'/'+feat_name+"/"+str(local_part_id//world_size)] = torch.from_numpy(np.concatenate(efeats))
edge_tids, _ = get_idranges(schema_map[constants.STR_EDGE_TYPE], edge_feature_tids[etype_name+"/"+feat_name+"/"+str(local_part_id//world_size)] = efeat_tids
schema_map[constants.STR_NUM_EDGES_PER_CHUNK],
num_chunks=world_size)
for etype_name in schema_map[constants.STR_EDGE_TYPE]:
if etype_name in edge_feature_tids:
for item in edge_feature_tids[etype_name]:
item[1] = edge_tids[etype_name][rank][0]
item[2] = edge_tids[etype_name][rank][1]
# Done with building node_features locally. # Done with building node_features locally.
if len(edge_features) <= 0: if len(edge_features) <= 0:
logging.info(f'[Rank: {rank}] This dataset does not have any edge features') logging.info(f'[Rank: {rank}] This dataset does not have any edge features')
else: else:
for k, v in edge_features.items(): assert len(edge_features) == len(edge_feature_tids)
logging.info(f'[Rank: {rank}] edge feature name: {k}, feature data shape: {v.size()}')
for k, v in edge_features.items():
logging.info(f'[Rank: {rank}] edge feature name: {k}, feature data shape: {v.shape}')
tids = edge_feature_tids[k]
count = tids[0][1] - tids[0][0]
assert count == v.size()[0]
''' '''
Code below is used to read edges from the input dataset with the help of the metadata json file Code below is used to read edges from the input dataset with the help of the metadata json file
...@@ -306,7 +320,7 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map): ...@@ -306,7 +320,7 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
etype_name_idmap = {e : idx for idx, e in enumerate(etype_names)} etype_name_idmap = {e : idx for idx, e in enumerate(etype_names)}
edge_tids, _ = get_idranges(schema_map[constants.STR_EDGE_TYPE], edge_tids, _ = get_idranges(schema_map[constants.STR_EDGE_TYPE],
schema_map[constants.STR_NUM_EDGES_PER_CHUNK], schema_map[constants.STR_NUM_EDGES_PER_CHUNK],
num_chunks=world_size) num_chunks=num_parts)
edge_datadict = {} edge_datadict = {}
edge_data = schema_map[constants.STR_EDGES] edge_data = schema_map[constants.STR_EDGES]
...@@ -329,10 +343,16 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map): ...@@ -329,10 +343,16 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
dst_ntype_name = tokens[2] dst_ntype_name = tokens[2]
num_chunks = len(edge_info) num_chunks = len(edge_info)
read_list = np.array_split(np.arange(num_chunks), world_size) read_list = np.array_split(np.arange(num_chunks), num_parts)
src_ids = [] src_ids = []
dst_ids = [] dst_ids = []
for idx in read_list[rank]:
curr_partids = []
for part_id in range(num_parts):
if map_partid_rank(part_id, world_size) == rank:
curr_partids.append(read_list[part_id])
for idx in np.concatenate(curr_partids):
edge_file = edge_info[idx] edge_file = edge_info[idx]
if not os.path.isabs(edge_file): if not os.path.isabs(edge_file):
edge_file = os.path.join(input_dir, edge_file) edge_file = os.path.join(input_dir, edge_file)
...@@ -355,10 +375,13 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map): ...@@ -355,10 +375,13 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
#currently these are just type_edge_ids... which will be converted to global ids #currently these are just type_edge_ids... which will be converted to global ids
edge_datadict[constants.GLOBAL_SRC_ID].append(src_ids + ntype_gnid_offset[src_ntype_name][0, 0]) edge_datadict[constants.GLOBAL_SRC_ID].append(src_ids + ntype_gnid_offset[src_ntype_name][0, 0])
edge_datadict[constants.GLOBAL_DST_ID].append(dst_ids + ntype_gnid_offset[dst_ntype_name][0, 0]) edge_datadict[constants.GLOBAL_DST_ID].append(dst_ids + ntype_gnid_offset[dst_ntype_name][0, 0])
edge_datadict[constants.GLOBAL_TYPE_EID].append(np.arange(edge_tids[etype_name][rank][0],\
edge_tids[etype_name][rank][1] ,dtype=np.int64))
edge_datadict[constants.ETYPE_ID].append(etype_name_idmap[etype_name] * \ edge_datadict[constants.ETYPE_ID].append(etype_name_idmap[etype_name] * \
np.ones(shape=(src_ids.shape), dtype=np.int64)) np.ones(shape=(src_ids.shape), dtype=np.int64))
for local_part_id in range(num_parts):
if (map_partid_rank(local_part_id, world_size) == rank):
edge_datadict[constants.GLOBAL_TYPE_EID].append(np.arange(edge_tids[etype_name][local_part_id][0],\
edge_tids[etype_name][local_part_id][1] ,dtype=np.int64))
#stitch together to create the final data on the local machine #stitch together to create the final data on the local machine
for col in [constants.GLOBAL_SRC_ID, constants.GLOBAL_DST_ID, constants.GLOBAL_TYPE_EID, constants.ETYPE_ID]: for col in [constants.GLOBAL_SRC_ID, constants.GLOBAL_DST_ID, constants.GLOBAL_TYPE_EID, constants.ETYPE_ID]:
...@@ -368,6 +391,7 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map): ...@@ -368,6 +391,7 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
assert edge_datadict[constants.GLOBAL_DST_ID].shape == edge_datadict[constants.GLOBAL_TYPE_EID].shape assert edge_datadict[constants.GLOBAL_DST_ID].shape == edge_datadict[constants.GLOBAL_TYPE_EID].shape
assert edge_datadict[constants.GLOBAL_TYPE_EID].shape == edge_datadict[constants.ETYPE_ID].shape assert edge_datadict[constants.GLOBAL_TYPE_EID].shape == edge_datadict[constants.ETYPE_ID].shape
logging.info(f'[Rank: {rank}] Done reading edge_file: {len(edge_datadict)}, {edge_datadict[constants.GLOBAL_SRC_ID].shape}') logging.info(f'[Rank: {rank}] Done reading edge_file: {len(edge_datadict)}, {edge_datadict[constants.GLOBAL_SRC_ID].shape}')
logging.info(f'Rank: {rank} edge_feat_tids: {edge_feature_tids}')
return node_tids, node_features, node_feature_tids, edge_datadict, edge_tids, edge_features, edge_feature_tids return node_tids, node_features, node_feature_tids, edge_datadict, edge_tids, edge_features, edge_feature_tids
...@@ -7,6 +7,7 @@ import copy ...@@ -7,6 +7,7 @@ import copy
from pyarrow import csv from pyarrow import csv
from gloo_wrapper import alltoallv_cpu from gloo_wrapper import alltoallv_cpu
from utils import map_partid_rank
class DistLookupService: class DistLookupService:
...@@ -100,7 +101,7 @@ class DistLookupService: ...@@ -100,7 +101,7 @@ class DistLookupService:
self.ntype_count = np.array(ntype_count, dtype=np.int64) self.ntype_count = np.array(ntype_count, dtype=np.int64)
self.rank = rank self.rank = rank
self.world_size = world_size self.world_size = world_size
def get_partition_ids(self, global_nids): def get_partition_ids(self, global_nids):
''' '''
...@@ -237,7 +238,7 @@ class DistLookupService: ...@@ -237,7 +238,7 @@ class DistLookupService:
# Now the owner_ids (partition-ids) which corresponding to the global_nids. # Now the owner_ids (partition-ids) which corresponding to the global_nids.
return owner_ids return owner_ids
def get_shuffle_nids(self, global_nids, my_global_nids, my_shuffle_global_nids): def get_shuffle_nids(self, global_nids, my_global_nids, my_shuffle_global_nids, world_size):
''' '''
This function is used to retrieve shuffle_global_nids for a given set of incoming This function is used to retrieve shuffle_global_nids for a given set of incoming
global_nids. Note that global_nids are of random order and will contain duplicates global_nids. Note that global_nids are of random order and will contain duplicates
...@@ -267,6 +268,8 @@ class DistLookupService: ...@@ -267,6 +268,8 @@ class DistLookupService:
This process has the node <-> partition id mapping This process has the node <-> partition id mapping
my_shuffle_global_nids : numpy ndarray my_shuffle_global_nids : numpy ndarray
array of shuffle_global_nids which are assigned by the current process/rank array of shuffle_global_nids which are assigned by the current process/rank
world_size : int
total no. of processes in the MPI_WORLD
Returns: Returns:
-------- --------
...@@ -278,6 +281,21 @@ class DistLookupService: ...@@ -278,6 +281,21 @@ class DistLookupService:
# Get the owner_ids (partition-ids or rank). # Get the owner_ids (partition-ids or rank).
owner_ids = self.get_partition_ids(global_nids) owner_ids = self.get_partition_ids(global_nids)
# These owner_ids, which are also partition ids of the nodes in the
# input graph, are in the range 0 - (num_partitions - 1).
# These ids are generated using some kind of graph partitioning method.
# Distribuged lookup service, as used by the graph partitioning
# pipeline, is used to store ntype-ids (also type_nids) and their
# mapping to the associated partition-id.
# These ids are split into `num_process` chunks and processes in the
# dist. lookup service are assigned the owernship of these chunks.
# The pipeline also enforeces the following constraint among the
# pipeline input parameters: num_partitions, num_processes
# num_partitions is an integer multiple of num_processes
# which means each individual node in the cluster will be running
# equal number of processes.
owner_ids = map_partid_rank(owner_ids, world_size)
# Ask these owners to supply for the shuffle_global_nids. # Ask these owners to supply for the shuffle_global_nids.
send_list = [] send_list = []
id_list = [] id_list = []
......
...@@ -59,7 +59,7 @@ def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data): ...@@ -59,7 +59,7 @@ def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data):
ret_val = np.column_stack([global_nids, shuffle_global_nids]) ret_val = np.column_stack([global_nids, shuffle_global_nids])
return ret_val return ret_val
def lookup_shuffle_global_nids_edges(rank, world_size, edge_data, id_lookup, node_data): def lookup_shuffle_global_nids_edges(rank, world_size, num_parts, edge_data, id_lookup, node_data):
''' '''
This function is a helper function used to lookup shuffle-global-nids for a given set of This function is a helper function used to lookup shuffle-global-nids for a given set of
global-nids using a distributed lookup service. global-nids using a distributed lookup service.
...@@ -70,6 +70,8 @@ def lookup_shuffle_global_nids_edges(rank, world_size, edge_data, id_lookup, nod ...@@ -70,6 +70,8 @@ def lookup_shuffle_global_nids_edges(rank, world_size, edge_data, id_lookup, nod
rank of the process rank of the process
world_size : integer world_size : integer
total number of processes used in the process group total number of processes used in the process group
num_parts : integer
total number of output graph partitions
edge_data : dictionary edge_data : dictionary
edge_data is a dicitonary with keys as column names and values as numpy arrays representing edge_data is a dicitonary with keys as column names and values as numpy arrays representing
all the edges present in the current graph partition all the edges present in the current graph partition
...@@ -93,40 +95,49 @@ def lookup_shuffle_global_nids_edges(rank, world_size, edge_data, id_lookup, nod ...@@ -93,40 +95,49 @@ def lookup_shuffle_global_nids_edges(rank, world_size, edge_data, id_lookup, nod
MILLION = 1000 * 1000 MILLION = 1000 * 1000
BATCH_SIZE = 250 * MILLION BATCH_SIZE = 250 * MILLION
memory_snapshot("GlobalToShuffleIDMapBegin: ", rank) memory_snapshot("GlobalToShuffleIDMapBegin: ", rank)
node_list = edge_data[constants.GLOBAL_SRC_ID]
local_nids = []
# Determine the no. of times each process has to send alltoall messages. local_shuffle_nids = []
all_sizes = allgather_sizes([node_list.shape[0]], world_size, return_sizes=True) for local_part_id in range(num_parts//world_size):
max_count = np.amax(all_sizes) local_nids.append(node_data[constants.GLOBAL_NID+"/"+str(local_part_id)])
num_splits = max_count // BATCH_SIZE + 1 local_shuffle_nids.append(node_data[constants.SHUFFLE_GLOBAL_NID+"/"+str(local_part_id)])
# Split the message into batches and send. local_nids = np.concatenate(local_nids)
splits = np.array_split(node_list, num_splits) local_shuffle_nids = np.concatenate(local_shuffle_nids)
shuffle_mappings = []
for item in splits: for local_part_id in range(num_parts//world_size):
shuffle_ids = id_lookup.get_shuffle_nids(item, node_list = edge_data[constants.GLOBAL_SRC_ID+"/"+str(local_part_id)]
node_data[constants.GLOBAL_NID],
node_data[constants.SHUFFLE_GLOBAL_NID]) # Determine the no. of times each process has to send alltoall messages.
shuffle_mappings.append(shuffle_ids) all_sizes = allgather_sizes([node_list.shape[0]], world_size, num_parts, return_sizes=True)
max_count = np.amax(all_sizes)
shuffle_ids = np.concatenate(shuffle_mappings) num_splits = max_count // BATCH_SIZE + 1
assert shuffle_ids.shape[0] == node_list.shape[0]
edge_data[constants.SHUFFLE_GLOBAL_SRC_ID] = shuffle_ids # Split the message into batches and send.
splits = np.array_split(node_list, num_splits)
# Destination end points of edges are owned by the current node and therefore shuffle_mappings = []
# should have corresponding SHUFFLE_GLOBAL_NODE_IDs. for item in splits:
# Here retrieve SHUFFLE_GLOBAL_NODE_IDs for the destination end points of local edges. shuffle_ids = id_lookup.get_shuffle_nids(item, local_nids, local_shuffle_nids, world_size)
uniq_ids, inverse_idx = np.unique(edge_data[constants.GLOBAL_DST_ID], return_inverse=True) shuffle_mappings.append(shuffle_ids)
common, idx1, idx2 = np.intersect1d(uniq_ids, node_data[constants.GLOBAL_NID], assume_unique=True, return_indices=True)
assert len(common) == len(uniq_ids) shuffle_ids = np.concatenate(shuffle_mappings)
assert shuffle_ids.shape[0] == node_list.shape[0]
edge_data[constants.SHUFFLE_GLOBAL_DST_ID] = node_data[constants.SHUFFLE_GLOBAL_NID][idx2][inverse_idx] edge_data[constants.SHUFFLE_GLOBAL_SRC_ID+"/"+str(local_part_id)] = shuffle_ids
assert len(edge_data[constants.SHUFFLE_GLOBAL_DST_ID]) == len(edge_data[constants.GLOBAL_DST_ID])
# Destination end points of edges are owned by the current node and therefore
# should have corresponding SHUFFLE_GLOBAL_NODE_IDs.
# Here retrieve SHUFFLE_GLOBAL_NODE_IDs for the destination end points of local edges.
uniq_ids, inverse_idx = np.unique(edge_data[constants.GLOBAL_DST_ID+"/"+str(local_part_id)], return_inverse=True)
common, idx1, idx2 = np.intersect1d(uniq_ids, node_data[constants.GLOBAL_NID+"/"+str(local_part_id)], assume_unique=True, return_indices=True)
assert len(common) == len(uniq_ids)
edge_data[constants.SHUFFLE_GLOBAL_DST_ID+"/"+str(local_part_id)] = node_data[constants.SHUFFLE_GLOBAL_NID+"/"+str(local_part_id)][idx2][inverse_idx]
assert len(edge_data[constants.SHUFFLE_GLOBAL_DST_ID+"/"+str(local_part_id)]) == len(edge_data[constants.GLOBAL_DST_ID+"/"+str(local_part_id)])
memory_snapshot("GlobalToShuffleIDMap_AfterLookupServiceCalls: ", rank) memory_snapshot("GlobalToShuffleIDMap_AfterLookupServiceCalls: ", rank)
return edge_data return edge_data
def assign_shuffle_global_nids_nodes(rank, world_size, node_data): def assign_shuffle_global_nids_nodes(rank, world_size, num_parts, node_data):
""" """
Utility function to assign shuffle global ids to nodes at a given rank Utility function to assign shuffle global ids to nodes at a given rank
node_data gets converted from [ntype, global_type_nid, global_nid] node_data gets converted from [ntype, global_type_nid, global_nid]
...@@ -144,25 +155,27 @@ def assign_shuffle_global_nids_nodes(rank, world_size, node_data): ...@@ -144,25 +155,27 @@ def assign_shuffle_global_nids_nodes(rank, world_size, node_data):
rank of the process rank of the process
world_size : integer world_size : integer
total number of processes used in the process group total number of processes used in the process group
ntype_counts: list of tuples num_parts : integer
list of tuples (x,y), where x=ntype and y=no. of nodes whose shuffle_global_nids are needed total number of output graph partitions
node_data : dictionary node_data : dictionary
node_data is a dictionary with keys as column names and values as numpy arrays node_data is a dictionary with keys as column names and values as numpy arrays
""" """
# Compute prefix sum to determine node-id offsets # Compute prefix sum to determine node-id offsets
prefix_sum_nodes = allgather_sizes([node_data[constants.GLOBAL_NID].shape[0]], world_size) local_row_counts = []
for local_part_id in range(num_parts//world_size):
local_row_counts.append(node_data[constants.GLOBAL_NID+"/"+str(local_part_id)].shape[0])
# assigning node-ids from localNodeStartId to (localNodeEndId - 1) # Perform allgather to compute the local offsets.
# Assuming here that the nodeDataArr is sorted based on the nodeType. prefix_sum_nodes = allgather_sizes(local_row_counts, world_size, num_parts)
shuffle_global_nid_start = prefix_sum_nodes[rank]
shuffle_global_nid_end = prefix_sum_nodes[rank + 1]
# add a column with global-ids (after data shuffle) for local_part_id in range(num_parts//world_size):
shuffle_global_nids = np.arange(shuffle_global_nid_start, shuffle_global_nid_end, dtype=np.int64) shuffle_global_nid_start = prefix_sum_nodes[rank + (local_part_id*world_size)]
node_data[constants.SHUFFLE_GLOBAL_NID] = shuffle_global_nids shuffle_global_nid_end = prefix_sum_nodes[rank + 1 + (local_part_id*world_size)]
shuffle_global_nids = np.arange(shuffle_global_nid_start, shuffle_global_nid_end, dtype=np.int64)
node_data[constants.SHUFFLE_GLOBAL_NID+"/"+str(local_part_id)] = shuffle_global_nids
def assign_shuffle_global_nids_edges(rank, world_size, edge_data): def assign_shuffle_global_nids_edges(rank, world_size, num_parts, edge_data):
""" """
Utility function to assign shuffle_global_eids to edges Utility function to assign shuffle_global_eids to edges
edge_data gets converted from [global_src_nid, global_dst_nid, global_type_eid, etype] edge_data gets converted from [global_src_nid, global_dst_nid, global_type_eid, etype]
...@@ -174,8 +187,8 @@ def assign_shuffle_global_nids_edges(rank, world_size, edge_data): ...@@ -174,8 +187,8 @@ def assign_shuffle_global_nids_edges(rank, world_size, edge_data):
rank of the current process rank of the current process
world_size : integer world_size : integer
total count of processes in execution total count of processes in execution
etype_counts : list of tuples num_parts : integer
list of tuples (x,y), x = rank, y = no. of edges total number of output graph partitions
edge_data : numpy ndarray edge_data : numpy ndarray
edge data as read from xxx_edges.txt file edge data as read from xxx_edges.txt file
...@@ -187,12 +200,17 @@ def assign_shuffle_global_nids_edges(rank, world_size, edge_data): ...@@ -187,12 +200,17 @@ def assign_shuffle_global_nids_edges(rank, world_size, edge_data):
""" """
#get prefix sum of edge counts per rank to locate the starting point #get prefix sum of edge counts per rank to locate the starting point
#from which global-ids to edges are assigned in the current rank #from which global-ids to edges are assigned in the current rank
prefix_sum_edges = allgather_sizes([edge_data[constants.GLOBAL_SRC_ID].shape[0]], world_size) local_row_counts = []
shuffle_global_eid_start = prefix_sum_edges[rank] for local_part_id in range(num_parts//world_size):
shuffle_global_eid_end = prefix_sum_edges[rank + 1] local_row_counts.append(edge_data[constants.GLOBAL_SRC_ID+"/"+str(local_part_id)].shape[0])
# assigning edge-ids from localEdgeStart to (localEdgeEndId - 1) shuffle_global_eid_offset = []
# Assuming here that the edge_data is sorted by edge_type prefix_sum_edges = allgather_sizes(local_row_counts, world_size, num_parts)
shuffle_global_eids = np.arange(shuffle_global_eid_start, shuffle_global_eid_end, dtype=np.int64) for local_part_id in range(num_parts//world_size):
edge_data[constants.SHUFFLE_GLOBAL_EID] = shuffle_global_eids shuffle_global_eid_start = prefix_sum_edges[rank + (local_part_id*world_size)]
return shuffle_global_eid_start shuffle_global_eid_end = prefix_sum_edges[rank + 1 + (local_part_id*world_size)]
shuffle_global_eids = np.arange(shuffle_global_eid_start, shuffle_global_eid_end, dtype=np.int64)
edge_data[constants.SHUFFLE_GLOBAL_EID+"/"+str(local_part_id)] = shuffle_global_eids
shuffle_global_eid_offset.append(shuffle_global_eid_start)
return shuffle_global_eid_offset
...@@ -2,7 +2,7 @@ import numpy as np ...@@ -2,7 +2,7 @@ import numpy as np
import torch import torch
import torch.distributed as dist import torch.distributed as dist
def allgather_sizes(send_data, world_size, return_sizes=False): def allgather_sizes(send_data, world_size, num_parts, return_sizes=False):
""" """
Perform all gather on list lengths, used to compute prefix sums Perform all gather on list lengths, used to compute prefix sums
to determine the offsets on each ranks. This is used to allocate to determine the offsets on each ranks. This is used to allocate
...@@ -14,6 +14,8 @@ def allgather_sizes(send_data, world_size, return_sizes=False): ...@@ -14,6 +14,8 @@ def allgather_sizes(send_data, world_size, return_sizes=False):
Data on which allgather is performed. Data on which allgather is performed.
world_size : integer world_size : integer
No. of processes configured for execution No. of processes configured for execution
num_parts : integer
No. of output graph partitions
return_sizes : bool return_sizes : bool
Boolean flag to indicate whether to return raw sizes from each process Boolean flag to indicate whether to return raw sizes from each process
or perform prefix sum on the raw sizes. or perform prefix sum on the raw sizes.
...@@ -24,6 +26,9 @@ def allgather_sizes(send_data, world_size, return_sizes=False): ...@@ -24,6 +26,9 @@ def allgather_sizes(send_data, world_size, return_sizes=False):
array with the prefix sum array with the prefix sum
""" """
# Assert on the world_size, num_parts
assert (num_parts % world_size) == 0
#compute the length of the local data #compute the length of the local data
send_length = len(send_data) send_length = len(send_data)
out_tensor = torch.as_tensor(send_data, dtype=torch.int64) out_tensor = torch.as_tensor(send_data, dtype=torch.int64)
...@@ -38,11 +43,16 @@ def allgather_sizes(send_data, world_size, return_sizes=False): ...@@ -38,11 +43,16 @@ def allgather_sizes(send_data, world_size, return_sizes=False):
return torch.cat(in_tensor).numpy() return torch.cat(in_tensor).numpy()
#gather sizes in on array to return to the invoking function #gather sizes in on array to return to the invoking function
rank_sizes = np.zeros(world_size + 1, dtype=np.int64) rank_sizes = np.zeros(num_parts + 1, dtype=np.int64)
part_counts = torch.cat(in_tensor).numpy()
count = rank_sizes[0] count = rank_sizes[0]
for i, t in enumerate(in_tensor): idx = 1
count += t.item() for local_part_id in range(num_parts//world_size):
rank_sizes[i+1] = count for r in range(world_size):
count += part_counts[r*(num_parts//world_size) + local_part_id]
rank_sizes[idx] = count
idx += 1
return rank_sizes return rank_sizes
......
...@@ -188,7 +188,7 @@ def get_gnid_range_map(node_tids): ...@@ -188,7 +188,7 @@ def get_gnid_range_map(node_tids):
return ntypes_gid_range return ntypes_gid_range
def write_metadata_json(metadata_list, output_dir, graph_name): def write_metadata_json(input_list, output_dir, graph_name, world_size, num_parts):
""" """
Merge json schema's from each of the rank's on rank-0. Merge json schema's from each of the rank's on rank-0.
This utility function, to be used on rank-0, to create aggregated json file. This utility function, to be used on rank-0, to create aggregated json file.
...@@ -202,6 +202,14 @@ def write_metadata_json(metadata_list, output_dir, graph_name): ...@@ -202,6 +202,14 @@ def write_metadata_json(metadata_list, output_dir, graph_name):
graph-name : string graph-name : string
a string specifying the graph name a string specifying the graph name
""" """
# Preprocess the input_list, a list of dictionaries
# each dictionary will contain num_parts/world_size metadata json
# which correspond to local partitions on the respective ranks.
metadata_list = []
for local_part_id in range(num_parts//world_size):
for idx in range(world_size):
metadata_list.append(input_list[idx]["local-part-id-"+str(local_part_id*world_size + idx)])
#Initialize global metadata #Initialize global metadata
graph_metadata = {} graph_metadata = {}
...@@ -238,7 +246,7 @@ def write_metadata_json(metadata_list, output_dir, graph_name): ...@@ -238,7 +246,7 @@ def write_metadata_json(metadata_list, output_dir, graph_name):
_dump_part_config(f'{output_dir}/metadata.json', graph_metadata) _dump_part_config(f'{output_dir}/metadata.json', graph_metadata)
def augment_edge_data(edge_data, lookup_service, edge_tids, rank, world_size): def augment_edge_data(edge_data, lookup_service, edge_tids, rank, world_size, num_parts):
""" """
Add partition-id (rank which owns an edge) column to the edge_data. Add partition-id (rank which owns an edge) column to the edge_data.
...@@ -256,6 +264,8 @@ def augment_edge_data(edge_data, lookup_service, edge_tids, rank, world_size): ...@@ -256,6 +264,8 @@ def augment_edge_data(edge_data, lookup_service, edge_tids, rank, world_size):
rank of the current process rank of the current process
world_size : integer world_size : integer
total no. of process participating in the communication primitives total no. of process participating in the communication primitives
num_parts : integer
total no. of partitions requested for the input graph
Returns: Returns:
-------- --------
...@@ -269,16 +279,18 @@ def augment_edge_data(edge_data, lookup_service, edge_tids, rank, world_size): ...@@ -269,16 +279,18 @@ def augment_edge_data(edge_data, lookup_service, edge_tids, rank, world_size):
offset = 0 offset = 0
for etype_name, tid_range in edge_tids.items(): for etype_name, tid_range in edge_tids.items():
assert int(tid_range[0][0]) == 0 assert int(tid_range[0][0]) == 0
assert len(tid_range) == world_size assert len(tid_range) == num_parts
etype_offset[etype_name] = offset + int(tid_range[0][0]) etype_offset[etype_name] = offset + int(tid_range[0][0])
offset += int(tid_range[-1][1]) offset += int(tid_range[-1][1])
global_eids = [] global_eids = []
for etype_name, tid_range in edge_tids.items(): for etype_name, tid_range in edge_tids.items():
global_eid_start = etype_offset[etype_name] for idx in range(num_parts):
begin = global_eid_start + int(tid_range[rank][0]) if map_partid_rank(idx, world_size) == rank:
end = global_eid_start + int(tid_range[rank][1]) global_eid_start = etype_offset[etype_name]
global_eids.append(np.arange(begin, end, dtype=np.int64)) begin = global_eid_start + int(tid_range[idx][0])
end = global_eid_start + int(tid_range[idx][1])
global_eids.append(np.arange(begin, end, dtype=np.int64))
global_eids = np.concatenate(global_eids) global_eids = np.concatenate(global_eids)
assert global_eids.shape[0] == edge_data[constants.ETYPE_ID].shape[0] assert global_eids.shape[0] == edge_data[constants.ETYPE_ID].shape[0]
edge_data[constants.GLOBAL_EID] = global_eids edge_data[constants.GLOBAL_EID] = global_eids
...@@ -528,3 +540,22 @@ def memory_snapshot(tag, rank): ...@@ -528,3 +540,22 @@ def memory_snapshot(tag, rank):
mem_string = f'{total:.0f} (MB) total, {peak:.0f} (MB) peak, {used:.0f} (MB) used, {avail:.0f} (MB) avail' mem_string = f'{total:.0f} (MB) total, {peak:.0f} (MB) peak, {used:.0f} (MB) used, {avail:.0f} (MB) avail'
logging.debug(f'[Rank: {rank} MEMORY_SNAPSHOT] {mem_string} - {tag}') logging.debug(f'[Rank: {rank} MEMORY_SNAPSHOT] {mem_string} - {tag}')
def map_partid_rank(partid, world_size):
"""Auxiliary function to map a given partition id to one of the rank in the
MPI_WORLD processes. The range of partition ids is assumed to equal or a
multiple of the total size of MPI_WORLD. In this implementation, we use
a cyclical mapping procedure to convert partition ids to ranks.
Parameters:
-----------
partid : int
partition id, as read from node id to partition id mappings.
Returns:
--------
int :
rank of the process, which will be responsible for the given partition
id.
"""
return partid % world_size
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment