Unverified Commit dad3606a authored by kylasa's avatar kylasa Committed by GitHub
Browse files

Support new format for multi-file support in distributed partitioning. (#4217)

* Code changes for the following

1. Generating node data at each process
2. Reading csv files using pyarrow
3. feature complete code.

* Removed some typo's because of which unit tests were failing

1. Change the file name to correct file name when loading edges from file
2. When storing node-features after shuffling, use the correct key to store the global-nids of node features which are received after transmitted.

* Code changes to address CI comments by reviewers

1. Removed some redundant code and added text in the doc-strings to describe the functionality of some functions.
2 function signatures and invocations now match w.r.t argument list
3. Added detailed description of the metadata json structure so that the users understand the the type of information present in this file and how it is used through out the code.

* Addressing code review comments

1. Addressed all the CI comments and some of the changes include simplifying the code related to the concatenation of lists and enhancing the docstrings of functions which are changed in this process.

* Update docstring's of two functions appropriately in response to code review comments

Removed "todo" from the docstring of the gen_nodedata function.
Added "todo" to the gen_dist_partitions function when node-id to partition-id's are read for the first time.

Removed 'num-node-weights' from the docstring for the get_dataset function and added schema_map docstring to the argument list.
parent 9948ef4d
......@@ -18,6 +18,48 @@ def create_dgl_object(graph_name, num_parts, \
This function creates dgl objects for a given graph partition, as in function
arguments.
The "schema" argument is a dictionary, which contains the metadata related to node ids
and edge ids. It contains two keys: "nid" and "eid", whose value is also a dictionary
with the following structure.
1. The key-value pairs in the "nid" dictionary has the following format.
"ntype-name" is the user assigned name to this node type. "format" describes the
format of the contents of the files. and "data" is a list of lists, each list has
3 elements: file-name, start_id and end_id. File-name can be either absolute or
relative path to this file and starting and ending ids are type ids of the nodes
which are contained in this file. These type ids are later used to compute global ids
of these nodes which are used throughout the processing of this pipeline.
"ntype-name" : {
"format" : "csv",
"data" : [
[ <path-to-file>/ntype0-name-0.csv, start_id0, end_id0],
[ <path-to-file>/ntype0-name-1.csv, start_id1, end_id1],
...
[ <path-to-file>/ntype0-name-<p-1>.csv, start_id<p-1>, end_id<p-1>],
]
}
2. The key-value pairs in the "eid" dictionary has the following format.
As described for the "nid" dictionary the "eid" dictionary is similarly structured
except that these entries are for edges.
"etype-name" : {
"format" : "csv",
"data" : [
[ <path-to-file>/etype0-name-0, start_id0, end_id0],
[ <path-to-file>/etype0-name-1 start_id1, end_id1],
...
[ <path-to-file>/etype0-name-1 start_id<p-1>, end_id<p-1>]
]
}
In "nid" dictionary, the type_nids are specified that
should be assigned to nodes which are read from the corresponding nodes file.
Along the same lines dictionary for the key "eid" is used for edges in the
input graph.
These type ids, for nodes and edges, are used to compute global ids for nodes
and edges which are stored in the graph object.
Parameters:
-----------
graph_name : string
......@@ -39,8 +81,6 @@ def create_dgl_object(graph_name, num_parts, \
edgeid_offset : int
offset to be used when assigning edge global ids in the current partition
return compact_g2, node_map_val, edge_map_val, ntypes_map, etypes_map
Returns:
--------
dgl object
......@@ -54,14 +94,21 @@ def create_dgl_object(graph_name, num_parts, \
dictionary
map between edge type(string) and edge_type_id(int)
"""
#create auxiliary data structures from the schema object
global_nid_ranges = schema['nid']
global_eid_ranges = schema['eid']
global_nid_ranges = {key: np.array(global_nid_ranges[key]).reshape(
1, 2) for key in global_nid_ranges}
global_eid_ranges = {key: np.array(global_eid_ranges[key]).reshape(
1, 2) for key in global_eid_ranges}
node_info = schema["nid"]
offset = 0
global_nid_ranges = {}
for k, v in node_info.items():
global_nid_ranges[k] = np.array([offset + int(v["data"][0][1]), offset + int(v["data"][-1][2])]).reshape(1,2)
offset += int(v["data"][-1][2])
edge_info = schema["eid"]
offset = 0
global_eid_ranges = {}
for k, v in edge_info.items():
global_eid_ranges[k] = np.array([offset + int(v["data"][0][1]), offset + int(v["data"][-1][2])]).reshape(1,2)
offset += int(v["data"][-1][2])
id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)
ntypes = [(key, global_nid_ranges[key][0, 0]) for key in global_nid_ranges]
......
......@@ -15,19 +15,9 @@ def log_params(params):
print('Graph Name: ', params.graph_name)
print('Schema File: ', params.schema)
print('No. partitions: ', params.num_parts)
print('No. node weights: ', params.num_node_weights)
print('Workspace dir: ', params.workspace)
print('Node Attr Type: ', params.node_attr_dtype)
print('Edge Attr Dtype: ', params.edge_attr_dtype)
print('Output Dir: ', params.output)
print('Removed Edges File: ', params.removed_edges)
print('WorldSize: ', params.world_size)
print('Nodes File: ', params.nodes_file)
print('Edges File: ', params.edges_file)
print('Node feats: ', params.node_feats_file)
print('Edge feats: ', params.edge_feats_file)
print('Metis partitions: ', params.partitions_file)
print('Exec Type: ', params.exec_type)
if __name__ == "__main__":
"""
......@@ -44,38 +34,15 @@ if __name__ == "__main__":
help='The schema of the graph')
parser.add_argument('--num-parts', required=True, type=int,
help='The number of partitions')
parser.add_argument('--num-node-weights', required=True, type=int,
help='The number of node weights used by METIS.')
parser.add_argument('--workspace', type=str, default='/tmp',
help='The directory to store the intermediate results')
parser.add_argument('--node-attr-dtype', type=str, default=None,
help='The data type of the node attributes')
parser.add_argument('--edge-attr-dtype', type=str, default=None,
help='The data type of the edge attributes')
parser.add_argument('--output', required=True, type=str,
help='The output directory of the partitioned results')
parser.add_argument('--removed-edges', help='a file that contains the removed self-loops and duplicated edges',
default=None, type=str)
parser.add_argument('--exec-type', type=int, default=0,
help='Use 0 for single machine run and 1 for distributed execution')
#arguments needed for the distributed implementation
parser.add_argument('--world-size', help='no. of processes to spawn',
default=1, type=int, required=True)
parser.add_argument('--nodes-file', help='filename of the nodes metadata',
default=None, type=str, required=True)
parser.add_argument('--edges-file', help='filename of the nodes metadata',
default=None, type=str, required=True)
parser.add_argument('--node-feats-file', help='filename of the nodes features',
default=None, type=str, required=True)
parser.add_argument('--edge-feats-file', help='filename of the nodes metadata',
default=None, type=str )
parser.add_argument('--partitions-file', help='filename of the output of dgl_part2 (metis partitions)',
default=None, type=str)
params = parser.parse_args()
#invoke the starting function here.
if(params.exec_type == 0):
single_machine_run(params)
else:
#invoke the pipeline function
multi_machine_run(params)
......@@ -12,82 +12,110 @@ from timeit import default_timer as timer
from datetime import timedelta
from dataset_utils import get_dataset
from utils import read_partitions_file, read_json, get_node_types, \
augment_node_data, augment_edge_data, get_ntypes_map, \
augment_edge_data, get_gnid_range_map, \
write_dgl_objects, write_metadata_json
from gloo_wrapper import alltoall_cpu_object_lst, alltoallv_cpu, \
alltoall_cpu, allgather_sizes, gather_metadata_json
from globalids import assign_shuffle_global_nids_nodes, \
assign_shuffle_global_nids_edges, get_shuffle_global_nids_edges
from convert_partition import create_dgl_object, create_metadata_json, validateDGLObjects
assign_shuffle_global_nids_edges, \
get_shuffle_global_nids_edges
from convert_partition import create_dgl_object, create_metadata_json
def exchange_node_data(rank, world_size, node_data):
"""
Exchange node_data among the processes in the world
Prepare the list of slices targeting each of the process and
trigger alltoallv_cpu for the message exchange.
def gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, nid_schema_map):
'''
For this data processing pipeline, reading node files is not needed. All the needed information about
the nodes can be found in the metadata json file. This function generates the nodes owned by a given
process, using metis partitions.
Parameters:
-----------
rank : int
rank of the current process
rank of the process
world_size : int
total no. of participating processes
node_data : dictionary
nodes data dictionary with keys as column names and values as
columns from the nodes csv file
total no. of processes
node_part_ids :
numpy array, whose length is same as no. of nodes in the graph. Index in this array is the global_nid
and value is the partition-id which owns the node
ntid_ntype_map :
a dictionary where keys are node_type ids and values are node_type names
nid_schema_map:
a dictionary, which is extracted from the input graph metadata json file for node information,
using the key is "nid". This dictionary, as described below, has information about all the node types
present in the input graph.
Please note that, it is assumed that for the input graph files, the nodes of a particular node-type are
split into `p` files (because of `p` partitions to be generated). On a similar node, edges of a particular
edge-type are split into `p` files as well.
For instance, a generic dictionaries for "nid" keys are as follows:
"ntype0-name", "ntype1-name" etc... are the user supplied names for the node types present in the input graph.
"format" specifies the structure of the files' content. And "data" has a value which is a list of lists.
Each list has 3 entries which are file-name (including either an absolute path or relative), start and end ids
which are type ids of the nodes read from the corresponding files.
"nid" : { #m : no. of node types
"ntype0-name": {
"format": "csv",
"data" : [ #list of lists
["<path>/ntype0-name-0.txt", 0, id_end0], # These are type_nids for the nodes
["<path>/ntype0-name-1.txt", id_start1, id_end1],
...,
["<path>/ntype0-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
]
},
....,
"ntype<m-1>-name" : {
"format" : "csv",
"data" : [
["<path>/ntype<m-1>-name-0.txt", 0, id_end0],
["<path>/ntype<m-1>-name-1.txt", id_start1, id_end1],
...
["<path>/ntype<m-1>-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
]
}
}
Returns:
--------
dictionary :
the input argument, node_data, is updated with the node data received by other processes
in the world.
"""
input_list = []
send_sizes = []
recv_sizes = []
start = timer()
for i in np.arange(world_size):
send_idx = (node_data[constants.OWNER_PROCESS] == i)
idx = send_idx.reshape(node_data[constants.GLOBAL_NID].shape[0])
filt_data = np.column_stack((node_data[constants.NTYPE_ID][idx == 1], \
node_data[constants.GLOBAL_TYPE_NID][idx == 1], \
node_data[constants.GLOBAL_NID][idx == 1]))
if(filt_data.shape[0] <= 0):
input_list.append(torch.empty((0,), dtype=torch.int64))
send_sizes.append(torch.empty((0,), dtype=torch.int64))
else:
input_list.append(torch.from_numpy(filt_data))
send_sizes.append(torch.tensor(filt_data.shape, dtype=torch.int64))
recv_sizes.append(torch.zeros((2,), dtype=torch.int64))
end = timer()
print('[Rank: ', rank, '] Preparing node_data to send out: ', timedelta(seconds=end - start))
#exchange sizes first followed by data.
dist.barrier()
start = timer()
alltoall_cpu(rank, world_size, recv_sizes, send_sizes)
output_list = []
for s in recv_sizes:
output_list.append(torch.zeros(s.tolist(), dtype=torch.int64))
dist.barrier()
alltoallv_cpu(rank, world_size, output_list, input_list)
end = timer()
print('[Rank: ', rank, '] Time to exchange node data : ', timedelta(seconds=end - start))
#stitch together the received data to form a consolidated data-structure
rcvd_node_data = torch.cat(output_list).numpy()
print('[Rank: ', rank, '] Received node data shape ', rcvd_node_data.shape)
#Replace the node_data values with the received node data and the OWNER_PROCESS key-value
#pair is removed after the data communication
node_data[constants.NTYPE_ID] = rcvd_node_data[:,0]
node_data[constants.GLOBAL_TYPE_NID] = rcvd_node_data[:,1]
node_data[constants.GLOBAL_NID] = rcvd_node_data[:,2]
node_data.pop(constants.OWNER_PROCESS)
return node_data
dictionary where keys are column names and values are numpy arrays, these arrays are generated by
using information present in the metadata json file
'''
local_node_data = { constants.GLOBAL_NID : [],
constants.NTYPE_ID : [],
constants.GLOBAL_TYPE_NID : []
}
gnid_start = 0
gnid_end = 0
for ntypeid in range(len(ntid_ntype_map)):
ntype_name = ntid_ntype_map[str(ntypeid)]
ntype_info = nid_schema_map[ntype_name]
type_start = int(ntype_info["data"][0][1])
type_end = int(ntype_info["data"][-1][2])
gnid_end += type_end
node_partid_slice = node_part_ids[gnid_start:gnid_end]
cond = node_partid_slice == rank
own_gnids = np.arange(gnid_start, gnid_end, dtype=np.int64)
own_gnids = own_gnids[cond]
own_tnids = np.arange(type_start, type_end, dtype=np.int64)
own_tnids = own_tnids[cond]
local_node_data[constants.NTYPE_ID].append(np.ones(own_gnids.shape, dtype=np.int64)*ntypeid)
local_node_data[constants.GLOBAL_NID].append(own_gnids)
local_node_data[constants.GLOBAL_TYPE_NID].append(own_tnids)
gnid_start = gnid_end
for k in local_node_data.keys():
local_node_data[k] = np.concatenate(local_node_data[k])
return local_node_data
def exchange_edge_data(rank, world_size, edge_data):
"""
......@@ -155,9 +183,7 @@ def exchange_edge_data(rank, world_size, edge_data):
edge_data.pop(constants.OWNER_PROCESS)
return edge_data
def exchange_node_features(rank, world_size, node_data, node_features, ntypes_map, \
ntypes_nid_map, ntype_id_count, node_part_ids):
def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map, node_part_ids, node_features):
"""
This function is used to shuffle node features so that each process will receive
all the node features whose corresponding nodes are owned by the same process.
......@@ -179,103 +205,105 @@ def exchange_node_features(rank, world_size, node_data, node_features, ntypes_ma
rank of the current process
world_size : int
total no. of participating processes.
node_data : dictionary
dictionary where node data is stored, which is initially read from the nodes txt file mapped
to the current process
node_feautres: dicitonary
dictionry where node_features are stored and this information is read from the appropriate
node features file which belongs to the current process
ntypes_map : dictionary
mappings between node type names and node type ids
ntypes_nid_map : dictionary
node_feature_tids : dictionary
dictionary with keys as node-type names and value is a dictionary. This dictionary
contains information about node-features associated with a given node-type and value
is a list. This list contains a of indexes, like [starting-idx, ending-idx) which
can be used to index into the node feature tensors read from corresponding input files.
ntypes_gnid_map : dictionary
mapping between node type names and global_nids which belong to the keys in this dictionary
ntype_id_count : dictionary
mapping between node type id and no of nodes which belong to each node_type_id
node_part_ids : numpy array
numpy array which store the partition-ids and indexed by global_nids
node_feautres: dicitonary
dictionry where node_features are stored and this information is read from the appropriate
node features file which belongs to the current process
Returns:
--------
dictionary :
node features are returned as a dictionary where keys are node type names and node feature names
and values are tensors
list :
a list of global_nids for the nodes whose node features are received during the data shuffle
process.
dictionary :
a dictionary of global_nids for the nodes whose node features are received during the data shuffle
process
"""
#determine Global_type_nid for the residing features
start = timer()
node_features_rank_lst = []
global_nid_rank_lst = []
for part_id in np.arange(world_size):
#form outgoing features to each process
send_node_features = {}
send_global_nids = {}
for ntype_name, ntype_id in ntypes_map.items():
#check if features exist for this node_type
if (ntype_name+'/feat' in node_features) and (node_features[ntype_name+'/feat'].shape[0] > 0):
feature_count = node_features[ntype_name+'/feat'].shape[0]
global_feature_count = ntype_id_count[str(ntype_id)]
#determine the starting global_nid for this node_type_id
feat_per_proc = math.ceil(global_feature_count / world_size)
global_type_nid_start = feat_per_proc * rank
global_type_nid_end = global_type_nid_start
if((global_type_nid_start + feat_per_proc) > global_feature_count):
global_type_nid_end += (ntype_id_count[str(ntype_id)] - global_type_nid_start)
type_nid = np.arange(0, (ntype_id_count[str(ntype_id)] - global_type_nid_start))
own_node_features = {}
own_global_nids = {}
#To iterate over the node_types and associated node_features
for ntype_name, ntype_info in node_feature_tids.items():
#To iterate over the node_features, of a given node_type
#ntype_info is a list of 3 elements
#[node-feature-name, starting-idx, ending-idx]
#node-feature-name is the name given to the node-feature, read from the input metadata file
#[starting-idx, ending-idx) specifies the range of indexes associated with the node-features read from
#the associated input file. Note that the rows of node-features read from the input file should be same
#as specified with this range. So no. of rows = ending-idx - starting-idx.
for feat_info in ntype_info:
#determine the owner process for these node features.
node_feats_per_rank = []
global_nid_per_rank = []
feat_name = feat_info[0]
feat_key = ntype_name+'/'+feat_name
#compute the global_nid range for this node features
type_nid_start = int(feat_info[1])
type_nid_end = int(feat_info[2])
begin_global_nid = ntype_gnid_map[ntype_name][0]
gnid_start = begin_global_nid + type_nid_start
gnid_end = begin_global_nid + type_nid_end
#type_nids for this feature subset on the current rank
gnids_feat = np.arange(gnid_start, gnid_end)
tnids_feat = np.arange(type_nid_start, type_nid_end)
local_idx = np.arange(0, type_nid_end - type_nid_start)
#check if node features exist for this ntype_name + feat_name
#this check should always pass, because node_feature_tids are built
#by reading the input metadata json file for existing node features.
assert(feat_key in node_features)
node_feats = node_features[feat_key]
for part_id in range(world_size):
partid_slice = node_part_ids[gnid_start:gnid_end]
cond = (partid_slice == part_id)
gnids_per_partid = gnids_feat[cond]
tnids_per_partid = tnids_feat[cond]
local_idx_partid = local_idx[cond]
if (gnids_per_partid.shape[0] == 0):
node_feats_per_rank.append({feat_key : torch.empty((0,), dtype=torch.float)})
global_nid_per_rank.append({feat_key : torch.empty((0,), dtype=torch.int64)})
else:
global_type_nid_end += feat_per_proc
type_nid = np.arange(0, feat_per_proc)
#now map the global_ntype_id to global_nid
global_nid_offset = ntypes_nid_map[ntype_name][0]
global_nid_start = global_type_nid_start + global_nid_offset
global_nid_end = global_type_nid_end + global_nid_offset
#assert (global_nid_end - global_nid_start) == feature_count
global_nids = np.arange(global_nid_start, global_nid_end, dtype=np.int64)
node_feats_per_rank.append({feat_key : node_feats[local_idx_partid]})
global_nid_per_rank.append({feat_key : gnids_per_partid})
#determine node feature ownership
part_ids_slice = node_part_ids[global_nid_start:global_nid_end]
idx = (part_ids_slice == part_id)
out_global_nid = global_nids[idx == 1]
out_type_nid = type_nid[idx == 1]
out_features = node_features[ntype_name+'/feat'][out_type_nid]
send_node_features[ntype_name+'/feat'] = out_features
send_global_nids[ntype_name+'/feat'] = out_global_nid
#features (and global nids) per rank to be sent out are ready
#for transmission, perform alltoallv here.
output_feat_list = alltoall_cpu_object_lst(rank, world_size, node_feats_per_rank)
output_feat_list[rank] = node_feats_per_rank[rank]
node_features_rank_lst.append(send_node_features)
global_nid_rank_lst.append(send_global_nids)
dist.barrier ()
output_list = alltoall_cpu_object_lst(rank, world_size, node_features_rank_lst)
output_list[rank] = node_features_rank_lst[rank]
output_nid_list = alltoall_cpu_object_lst(rank, world_size, global_nid_rank_lst)
output_nid_list[rank] = global_nid_rank_lst[rank]
output_nid_list = alltoall_cpu_object_lst(rank, world_size, global_nid_per_rank)
output_nid_list[rank] = global_nid_per_rank[rank]
#stitch node_features together to form one large feature tensor
rcvd_node_features = {}
rcvd_global_nids = {}
for idx in range(world_size):
for ntype_name, ntype_id in ntypes_map.items():
if ((output_list[idx] is not None) and (ntype_name+'/feat' in output_list[idx])):
if (ntype_name+'/feat' not in rcvd_node_features):
rcvd_node_features[ntype_name+'/feat'] = torch.empty((0,), dtype=torch.float)
rcvd_global_nids[ntype_name+'/feat'] = torch.empty((0,), dtype=torch.int64)
rcvd_node_features[ntype_name+'/feat'] = \
torch.cat((rcvd_node_features[ntype_name+'/feat'], output_list[idx][ntype_name+'/feat']))
rcvd_global_nids[ntype_name+'/feat'] = \
np.concatenate((rcvd_global_nids[ntype_name+'/feat'], output_nid_list[idx][ntype_name+'/feat']))
own_node_features[feat_key] = []
own_global_nids[feat_key] = []
for idx, x in enumerate(output_feat_list):
own_node_features.append(x[feat_key])
own_global_nids[feat_key].append(output_nid_list[idx][feat_key])
for k in own_node_features.keys():
own_node_features[k] = th.cat(own_node_features[k])
own_global_nids[k] = np.concatenate(own_global_nids[k])
end = timer()
print('[Rank: ', rank, '] Total time for node feature exchange: ', timedelta(seconds = end - start))
return own_node_features, own_global_nids
return rcvd_node_features, rcvd_global_nids
def exchange_graph_data(rank, world_size, node_data, node_features, edge_data,
node_part_ids, ntypes_map, ntypes_nid_map, ntype_id_count):
def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_data,
node_part_ids, ntypes_map, ntypes_gnid_range_map, ntid_ntype_map, schema_map):
"""
Wrapper function which is used to shuffle graph data on all the processes.
......@@ -285,12 +313,14 @@ def exchange_graph_data(rank, world_size, node_data, node_features, edge_data,
rank of the current process
world_size : int
total no. of participating processes.
node_data : dictionary
dictionary where node data is stored, which is initially read from the nodes txt file mapped
to the current process
node_feautres: dicitonary
dictionry where node_features are stored and this information is read from the appropriate
node features file which belongs to the current process
node_feat_tids: dictionary
in which keys are node-type names and values are triplets. Each triplet has node-feature name
and the starting and ending type ids of the node-feature data read from the corresponding
node feature data file read by current process. Each node type may have several features and
hence each key may have several triplets.
edge_data : dictionary
dictionary which is used to store edge information as read from the edges.txt file assigned
to each process.
......@@ -298,10 +328,12 @@ def exchange_graph_data(rank, world_size, node_data, node_features, edge_data,
numpy array which store the partition-ids and indexed by global_nids
ntypes_map : dictionary
mappings between node type names and node type ids
ntypes_nid_map : dictionary
ntypes_gnid_range_map : dictionary
mapping between node type names and global_nids which belong to the keys in this dictionary
ntype_id_count : dictionary
ntid_ntype_map : dictionary
mapping between node type id and no of nodes which belong to each node_type_id
schema_map : dictionary
is the data structure read from the metadata json file for the input graph
Returns:
--------
......@@ -318,15 +350,15 @@ def exchange_graph_data(rank, world_size, node_data, node_features, edge_data,
the input argument, edge_data dictionary, is updated with the edge data received from other processes
in the world. The edge data is received by each rank in the process of data shuffling.
"""
rcvd_node_features, rcvd_global_nids = exchange_node_features(rank, world_size, node_data, \
node_features, ntypes_map, ntypes_nid_map, ntype_id_count, node_part_ids)
rcvd_node_features, rcvd_global_nids = exchange_node_features(rank, world_size, node_feat_tids, \
ntypes_gnid_range_map, node_part_ids, node_features)
print( 'Rank: ', rank, ' Done with node features exchange.')
node_data = exchange_node_data(rank, world_size, node_data)
node_data = gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, schema_map["nid"])
edge_data = exchange_edge_data(rank, world_size, edge_data)
return node_data, rcvd_node_features, rcvd_global_nids, edge_data
def read_dataset(rank, world_size, node_part_ids, params):
def read_dataset(rank, world_size, node_part_ids, params, schema_map):
"""
This function gets the dataset and performs post-processing on the data which is read from files.
Additional information(columns) are added to nodes metadata like owner_process, global_nid which
......@@ -344,85 +376,136 @@ def read_dataset(rank, world_size, node_part_ids, params):
metis partitions which are the output of partitioning algorithm
params : argparser object
argument parser object to access command line arguments
schema_map : dictionary
dictionary created by reading the input graph metadata json file
Returns :
---------
dictionary
node data information is read from nodes.txt and additionnal columns are added such as
owner process for each node.
in which keys are node-type names and values are are tuples representing the range of ids
for nodes to be read by the current process
dictionary
node features which is a dictionary where keys are feature names and values are feature
data as multi-dimensional tensors
dictionary
in which keys are node-type names and values are triplets. Each triplet has node-feature name
and the starting and ending type ids of the node-feature data read from the corresponding
node feature data file read by current process. Each node type may have several features and
hence each key may have several triplets.
dictionary
edge data information is read from edges.txt and additional columns are added such as
owner process for each edge.
dictionary
edge features which is also a dictionary, similar to node features dictionary
"""
edge_features = {}
node_data, node_features, edge_data = \
get_dataset(params.input_dir, params.graph_name, rank, params.num_node_weights)
prefix_sum_nodes = allgather_sizes([node_data[constants.NTYPE_ID].shape[0]], world_size)
augment_node_data(node_data, node_part_ids, prefix_sum_nodes[rank])
print('[Rank: ', rank, '] Done augmenting node_data: ', len(node_data), node_data[constants.GLOBAL_TYPE_NID].shape)
print('[Rank: ', rank, '] Done assigning Global_NIDS: ', prefix_sum_nodes[rank], prefix_sum_nodes[rank+1], prefix_sum_nodes[rank]+node_data[constants.GLOBAL_TYPE_NID].shape[0])
node_tids, node_features, node_feat_tids, edge_data, edge_tids = \
get_dataset(params.input_dir, params.graph_name, rank, world_size, schema_map)
prefix_sum_edges = allgather_sizes([edge_data[constants.ETYPE_ID].shape[0]], world_size)
augment_edge_data(edge_data, node_part_ids, prefix_sum_edges[rank])
print('[Rank: ', rank, '] Done augmenting edge_data: ', len(edge_data), edge_data[constants.GLOBAL_SRC_ID].shape)
return node_data, node_features, edge_data, edge_features
return node_tids, node_features, node_feat_tids, edge_data, edge_features
def gen_dist_partitions(rank, world_size, params):
"""
Function which will be executed by all Gloo processes to
begin execution of the pipeline. This function expects the input dataset is split
across multiple file format. Directory structure is described below in detail:
input_dir/
<graph-name>_nodes00.txt
Function which will be executed by all Gloo processes to begin execution of the pipeline.
This function expects the input dataset is split across multiple file format.
Input dataset and its file structure is described in metadata json file which is also part of the
input dataset. On a high-level, this metadata json file contains information about the following items
a) Nodes metadata, It is assumed that nodes which belong to each node-type are split into p files
(wherer `p` is no. of partitions).
b) Similarly edge metadata contains information about edges which are split into p-files.
c) Node and Edge features, it is also assumed that each node (and edge) feature, if present, is also
split into `p` files.
For example, a sample metadata json file might be as follows: :
(In this toy example, we assume that we have "m" node-types, "k" edge types, and for node_type = ntype0-name
we have two features namely feat0-name and feat1-name. Please note that the node-features are also split into
`p` files. This will help in load-balancing during data-shuffling phase).
Terminology used to identify any particular "id" assigned to nodes, edges or node features. Prefix "global" is
used to indicate that this information is either read from the input dataset or autogenerated based on the information
read from input dataset files. Prefix "type" is used to indicate a unique id assigned to either nodes or edges.
For instance, type_node_id means that a unique id, with a given node type, assigned to a node. And prefix "shuffle"
will be used to indicate a unique id, across entire graph, assigned to either a node or an edge. For instance,
SHUFFLE_GLOBAL_NID means a unique id which is assigned to a node after the data shuffle is completed.
Some high-level notes on the structure of the metadata json file.
1. path(s) mentioned in the entries for nodes, edges and node-features files can be either absolute or relative.
if these paths are relative, then it is assumed that they are relative to the folder from which the execution is
launched.
2. The id_startx and id_endx represent the type_node_id and type_edge_id respectively for nodes and edge data. This
means that these ids should match the no. of nodes/edges read from any given file. Since these are type_ids for
the nodes and edges in any given file, their global_ids can be easily computed as well.
{
"nid" : { #m : no. of node types
"ntype0-name": {
"format": "csv",
"data" : [ #list of lists
["<path>/ntype0-name-0.txt", 0, id_end0], # These are type_nids for the nodes
["<path>/ntype0-name-1.txt", id_start1, id_end1],
...,
["<path>/ntype0-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
]
},
....,
"ntype<m-1>-name" : {
"format" : "csv",
"data" : [
["<path>/user-sup-ntype<m-1>-name-0.txt", 0, id_end0],
["<path>/user-sup-ntype<m-1>-name-1.txt", id_start1, id_end1],
...
["<path>/user-sup-ntype<m-1>-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
]
}
},
"node_data" : {
"ntype0-name" : {
"feat0-name" : [ #list of lists
["<path>/feat-0.npy", 0, id_end0],
["<path>/feat-1.npy", id_start1, id_end1],
....
<graph-name>_nodes<world_size-1>.txt
<graph-name>_edges00.txt
["<path>/feat-<p-1>.npy", id_start<p-1>, id_end<p-1>]
]
"feat1-name" : [ #list of lists
["<path>/feat-0.npy", 0, id_end0],
["<path>/feat-1.npy", id_start1, id_end1],
....
<graph-name>_edges<world_size-1>.txt
<graph-name>_metadata.json
nodes-ntype0-XY/ #XY = no. of features to read for this ntype
node-feat-0/
0.npy
1.npy
....
<world_size-1>.npy
....
node-feat-<XY-1>/
0.npy
1.npy
....
<world_size-1>.npy
nodes-ntype1-XY/ #XY = no. of features to read for this ntype
node-feat-0/
0.npy
1.npy
....
<world_size-1>.npy
....
node-feat-<XY-1>/
0.npy
1.npy
....
<world_size-1>.npy
Basically, each individual file is split into "p" files, where "p" is the no. of processes in the
world. Directory names are encoded strings which consist of prefix and suffix strings. Suffix strings
indicate the no. of items present inside that directory. For instance, "nodes-ntype0-2" directory has
"2" node type sub-directories within it. And each feature file, whether it is node features file or edge
feature file, is split into "p" numpy files named as 0.npy, 1.npy, ..., <p-1>.npy.
["<path>/feat-<p-1>.npy", id_start<p-1>, id_end<p-1>]
]
}
},
"eid": { #k edge types
"src_ntype:etype0-name:dst_ntype" : {
"format": "csv",
"data" : [
["<path>/etype0-name-0.txt", 0, id_end0], #These are type_edge_ids for edges of this type
["<path>/etype0-name-1.txt", id_start1, id_end1],
...,
["<path>/etype0-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
]
},
...,
"src_ntype:etype<k-1>-name:dst_ntype" : {
"format": "csv",
"data" : [
["<path>/etype<k-1>-name-0.txt", 0, id_end0],
["<path>/etype<k-1>-name-1.txt", id_start1, id_end1],
...,
["<path>/etype<k-1>-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
]
},
},
The function performs the following steps:
1. Reads the metis partitions to identify the owner process of all the nodes in the entire graph.
2. Reads the input data set, each partitipating process will map to a single file for the nodes, edges,
node-features and edge-features for each node-type and edge-types respectively.
2. Reads the input data set, each partitipating process will map to a single file for the edges,
node-features and edge-features for each node-type and edge-types respectively. Using nodes metadata
information, nodes which are owned by a given process are generated to optimize communication to some
extent.
3. Now each process shuffles the data by identifying the respective owner processes using metis
partitions.
a. To identify owner processes for nodes, metis partitions will be used.
......@@ -451,22 +534,27 @@ def gen_dist_partitions(rank, world_size, params):
print('[Rank: ', rank, '] Starting distributed data processing pipeline...')
#init processing
#TODO: For large graphs, this mapping function can be memory intensive. This needs to be changed to
#processes owning a set of global-nids, per partitioning algorithm, and messaging will be used to
#identify the ownership instead of mem. lookups.
node_part_ids = read_partitions_file(params.input_dir+'/'+params.partitions_file)
schema_map = read_json(params.input_dir+'/'+params.schema)
ntypes_map, ntypes = get_node_types(schema_map)
ntypes_map, ntypes, ntypeid_ntypes_map = get_node_types(schema_map)
print('[Rank: ', rank, '] Initialized metis partitions and node_types map...')
#read input graph files and augment these datastructures with
#appropriate information (global_nid and owner process) for node and edge data
node_data, node_features, edge_data, edge_features = read_dataset(rank, world_size, node_part_ids, params)
node_tids, node_features, node_feat_tids, edge_data, edge_features = read_dataset(rank, world_size, node_part_ids, params, schema_map)
print('[Rank: ', rank, '] Done augmenting file input data with auxilary columns')
#send out node and edge data --- and appropriate features.
#this function will also stitch the data recvd from other processes
#and return the aggregated data
ntypes_nid_map, ntype_id_count = get_ntypes_map(schema_map)
node_data, rcvd_node_features, rcvd_global_nids, edge_data = exchange_graph_data(rank, world_size, node_data, \
node_features, edge_data, node_part_ids, ntypes_map, ntypes_nid_map, ntype_id_count)
ntypes_gnid_range_map = get_gnid_range_map(node_tids)
node_data, rcvd_node_features, rcvd_global_nids = \
exchange_graph_data(rank, world_size, node_features, node_feat_tids, \
edge_data, node_part_ids, ntypes_map, ntypes_gnid_range_map, \
ntypeid_ntypes_map, schema_map)
print('[Rank: ', rank, '] Done with data shuffling...')
#sort node_data by ntype
......@@ -481,13 +569,17 @@ def gen_dist_partitions(rank, world_size, params):
#shuffle node feature according to the node order on each rank.
for ntype_name in ntypes:
if (ntype_name+'/feat' in rcvd_global_nids):
global_nids = rcvd_global_nids[ntype_name+'/feat']
featnames = get_ntype_featnames(ntype_name, schema_map)
for featname in featnames:
#if a feature name exists for a node-type, then it should also have
#feature data as well. Hence using the assert statement.
assert(ntype_name+'/'+featname in rcvd_global_nids)
global_nids = rcvd_global_nids[ntype_name+'/'+featname]
common, idx1, idx2 = np.intersect1d(node_data[constants.GLOBAL_NID], global_nids, return_indices=True)
shuffle_global_ids = node_data[constants.SHUFFLE_GLOBAL_NID][idx1]
feature_idx = shuffle_global_ids.argsort()
rcvd_node_features[ntype_name+'/feat'] = rcvd_node_features[ntype_name+'/feat'][feature_idx]
rcvd_node_features[ntype_name+'/'+featname] = rcvd_node_features[ntype_name+'/'+featname][feature_idx]
#sort edge_data by etype
sorted_idx = edge_data[constants.ETYPE_ID].argsort()
......
......@@ -3,7 +3,10 @@ import numpy as np
import constants
import torch
def get_dataset(input_dir, graph_name, rank, num_node_weights):
import pyarrow
from pyarrow import csv
def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
"""
Function to read the multiple file formatted dataset.
......@@ -15,64 +18,91 @@ def get_dataset(input_dir, graph_name, rank, num_node_weights):
graph name string
rank : int
rank of the current process
num_node_weights : int
integer indicating the no. of weights each node is attributed with
world_size : int
total number of process in the current execution
schema_map : dictionary
this is the dictionary created by reading the graph metadata json file
for the input graph dataset
Return:
-------
dictionary
Data read from nodes.txt file and used to build a dictionary with keys as column names
and values as columns in the csv file.
where keys are node-type names and values are tuples. Each tuple represents the
range of type ids read from a file by the current process. Please note that node
data for each node type is split into "p" files and each one of these "p" files are
read a process in the distributed graph partitioning pipeline
dictionary
Data read from numpy files for all the node features in this dataset. Dictionary built
using this data has keys as node feature names and values as tensor data representing
node features
dictionary
in which keys are node-type and values are a triplet. This triplet has node-feature name,
and range of tids for the node feature data read from files by the current process. Each
node-type may have mutiple feature(s) and associated tensor data.
dictionary
Data read from edges.txt file and used to build a dictionary with keys as column names
and values as columns in the csv file.
dictionary
in which keys are edge-type names and values are triplets. This triplet has edge-feature name,
and range of tids for theedge feature data read from the files by the current process. Each
edge-type may have several edge features and associated tensor data.
"""
#node features dictionary
node_features = {}
node_feature_tids = {}
#iterate over the sub-dirs and extract the nodetypes
#in each nodetype folder read all the features assigned to
#current rank
siblings = os.listdir(input_dir)
for s in siblings:
if s.startswith("nodes-"):
tokens = s.split("-")
ntype = tokens[1]
num_feats = tokens[2]
for idx in range(int(num_feats)):
feat_file = s +'/node-feat-'+'{:02d}'.format(idx) +'/'+ str(rank)+'.npy'
if (os.path.exists(input_dir+'/'+feat_file)):
features = np.load(input_dir+'/'+feat_file)
node_features[ntype+'/feat'] = torch.tensor(features)
#done build node_features locally.
if len(node_features) <= 0:
print('[Rank: ', rank, '] This dataset does not have any node features')
#iterate over the "node_data" dictionary in the schema_map
#read the node features if exists
#also keep track of the type_nids for which the node_features are read.
dataset_features = schema_map["node_data"]
for ntype_name, ntype_feature_data in dataset_features.items():
#ntype_feature_data is a dictionary
#where key: feature_name, value: list of lists
node_feature_tids[ntype_name] = []
for feat_name, feat_data in ntype_feature_data.items():
assert len(feat_data) == world_size
my_feat_data = feat_data[rank]
if (os.path.isabs(my_feat_data[0])):
node_features[ntype_name+'/'+feat_name] = torch.from_numpy(np.load(my_feat_data[0]))
else:
for k, v in node_features.items():
print('[Rank: ', rank, '] node feature name: ', k, ', feature data shape: ', v.size())
node_features[ntype_name+'/'+feat_name] = torch.from_numpy(np.load(input_dir+my_feat_data[0]))
#read (split) xxx_nodes.txt file
node_file = input_dir+'/'+graph_name+'_nodes'+'_{:02d}.txt'.format(rank)
node_data = np.loadtxt(node_file, delimiter=' ', dtype='int64')
nodes_datadict = {}
nodes_datadict[constants.NTYPE_ID] = node_data[:,0]
type_idx = 0 + num_node_weights + 1
nodes_datadict[constants.GLOBAL_TYPE_NID] = node_data[:,type_idx]
print('[Rank: ', rank, '] Done reading node_data: ', len(nodes_datadict), nodes_datadict[constants.NTYPE_ID].shape)
node_feature_tids[ntype_name].append([feat_name, my_feat_data[1], my_feat_data[2]])
#read (split) xxx_edges.txt file
#read my nodes for each node type
node_tids = {}
node_data = schema_map["nid"]
for ntype_name, ntype_info in node_data.items():
v = []
node_file_info = ntype_info["data"]
for idx in range(len(node_file_info)):
v.append((node_file_info[idx][1], node_file_info[idx][2]))
node_tids[ntype_name] = v
#read my edges for each edge type
edge_tids = {}
edge_datadict = {}
edge_file = input_dir+'/'+graph_name+'_edges'+'_{:02d}.txt'.format(rank)
edge_data = np.loadtxt(edge_file, delimiter=' ', dtype='int64')
edge_datadict[constants.GLOBAL_SRC_ID] = edge_data[:,0]
edge_datadict[constants.GLOBAL_DST_ID] = edge_data[:,1]
edge_datadict[constants.GLOBAL_TYPE_EID] = edge_data[:,2]
edge_datadict[constants.ETYPE_ID] = edge_data[:,3]
edge_data = schema_map["eid"]
for etype_name, etype_info in edge_data.items():
assert etype_info["format"] == "csv"
edge_info = etype_info["data"]
assert len(edge_info) == world_size
data_df = csv.read_csv(edge_info[rank][0], read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
edge_datadict[constants.GLOBAL_SRC_ID] = data_df['f0'].to_numpy()
edge_datadict[constants.GLOBAL_DST_ID] = data_df['f1'].to_numpy()
edge_datadict[constants.GLOBAL_TYPE_EID] = data_df['f2'].to_numpy()
edge_datadict[constants.ETYPE_ID] = data_df['f3'].to_numpy()
v = []
edge_file_info = etype_info["data"]
for idx in range(len(edge_file_info)):
v.append((edge_file_info[idx][1], edge_file_info[idx][2]))
edge_tids[etype_name] = v
print('[Rank: ', rank, '] Done reading edge_file: ', len(edge_datadict), edge_datadict[constants.GLOBAL_SRC_ID].shape)
return nodes_datadict, node_features, edge_datadict
return node_tids, node_features, node_feature_tids, edge_datadict, edge_tids
......@@ -45,7 +45,7 @@ def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data):
#allocate buffers to receive node-ids
recv_nodes = []
for i in recv_counts:
recv_nodes.append(torch.zeros([i.item()], dtype=torch.int64))
recv_nodes.append(torch.zeros(i.tolist(), dtype=torch.int64))
#form the outgoing message
send_nodes = []
......@@ -67,17 +67,18 @@ def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data):
global_nids = proc_i_nodes.numpy()
if (len(global_nids) != 0):
common, ind1, ind2 = np.intersect1d(node_data[constants.GLOBAL_NID], global_nids, return_indices=True)
values = node_data[constants.SHUFFLE_GLOBAL_NID][ind1]
send_nodes.append(torch.Tensor(values).type(dtype=torch.int64))
shuffle_global_nids = node_data[constants.SHUFFLE_GLOBAL_NID][ind1]
send_nodes.append(torch.from_numpy(shuffle_global_nids).type(dtype=torch.int64))
else:
send_nodes.append(torch.empty((0,), dtype=torch.int64))
send_nodes.append(torch.empty((0), dtype=torch.int64))
#send receive global-ids
alltoallv_cpu(rank, world_size, recv_shuffle_global_nids, send_nodes)
shuffle_global_nids = [x.numpy() for x in recv_shuffle_global_nids]
global_nids = [x for x in global_nids_ranks]
return np.column_stack((np.concatenate(global_nids), np.concatenate(shuffle_global_nids)))
shuffle_global_nids = np.concatenate([x.numpy() for x in recv_shuffle_global_nids])
global_nids = np.concatenate([x for x in global_nids_ranks])
ret_val = np.column_stack([global_nids, shuffle_global_nids])
return ret_val
def get_shuffle_global_nids_edges(rank, world_size, edge_data, node_part_ids, node_data):
......@@ -122,7 +123,7 @@ def get_shuffle_global_nids_edges(rank, world_size, edge_data, node_part_ids, no
global_nids_ranks.append(not_owned_nodes)
#Retrieve Global-ids for respective node owners
resolved_global_nids = get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data)
non_local_nids = get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data)
#Add global_nid <-> shuffle_global_nid mappings to the received data
for i in range(world_size):
......@@ -132,7 +133,7 @@ def get_shuffle_global_nids_edges(rank, world_size, edge_data, node_part_ids, no
common, ind1, ind2 = np.intersect1d(node_data[constants.GLOBAL_NID], own_global_nids, return_indices=True)
my_shuffle_global_nids = node_data[constants.SHUFFLE_GLOBAL_NID][ind1]
local_mappings = np.column_stack((own_global_nids, my_shuffle_global_nids))
resolved_global_nids = np.concatenate((resolved_global_nids, local_mappings))
resolved_global_nids = np.concatenate((non_local_nids, local_mappings))
#form a dictionary of mappings between orig-node-ids and global-ids
resolved_mappings = dict(zip(resolved_global_nids[:,0], resolved_global_nids[:,1]))
......
......@@ -5,6 +5,9 @@ import json
import dgl
import constants
import pyarrow
from pyarrow import csv
def read_partitions_file(part_file):
"""
Utility method to read metis partitions, which is the output of
......@@ -47,6 +50,34 @@ def read_json(json_file):
return val
def get_ntype_featnames(ntype_name, schema):
"""
Retrieves node feature names for a given node_type
Parameters:
-----------
ntype_name : string
a string specifying a node_type name
schema : dictionary
metadata json object as a dictionary, which is read from the input
metadata file from the input dataset
Returns:
--------
list :
a list of feature names for a given node_type
"""
ntype_dict = schema["node_data"]
if (ntype_name in ntype_dict):
featnames = []
ntype_info = ntype_dict[ntype_name]
for k, v in ntype_info.items():
featnames.append(k)
return featnames
else:
return []
def get_node_types(schema):
"""
Utility method to extract node_typename -> node_type mappings
......@@ -60,72 +91,47 @@ def get_node_types(schema):
Returns:
--------
dictionary, list
dictionary with ntype <-> type_nid mappings
list of ntype strings
"""
#Get the node_id ranges from the schema
global_nid_ranges = schema['nid']
global_nid_ranges = {key: np.array(global_nid_ranges[key]).reshape(1,2)
for key in global_nid_ranges}
#Create an array with the starting id for each node_type and sort
ntypes = [(key, global_nid_ranges[key][0,0]) for key in global_nid_ranges]
ntypes.sort(key=lambda e: e[1])
#Create node_typename -> node_type dictionary
ntypes = [e[0] for e in ntypes]
ntypes_map = {e: i for i, e in enumerate(ntypes)}
return ntypes_map, ntypes
def get_edge_types(schema):
"""
Utility function to form edges dictionary between edge_type names and ids
Parameters
----------
schema : dictionary
Input schema from which the edge_typename -> edge_type
dictionary is defined
Returns
-------
dictionary:
a map between edgetype_names and ids
list:
list of edgetype_names
dictionary
with keys as node type names and values as ids (integers)
list
list of ntype name strings
dictionary
with keys as ntype ids (integers) and values as node type names
"""
ntype_info = schema["nid"]
ntypes = []
for k in ntype_info.keys():
ntypes.append(k)
ntype_ntypeid_map = {e: i for i, e in enumerate(ntypes)}
ntypeid_ntype_map = {str(i): e for i, e in enumerate(ntypes)}
return ntype_ntypeid_map, ntypes, ntypeid_ntype_map
global_eid_ranges = schema['eid']
global_eid_ranges = {key: np.array(global_eid_ranges[key]).reshape(1,2)
for key in global_eid_ranges}
etypes = [(key, global_eid_ranges[key][0, 0]) for key in global_eid_ranges]
etypes.sort(key=lambda e: e[1])
etypes = [e[0] for e in etypes]
etypes_map = {e: i for i, e in enumerate(etypes)}
return etypes_map, etypes
def get_ntypes_map(schema):
def get_gnid_range_map(node_tids):
"""
Utility function to return nodes global id range from the input schema
as well as node count per each node type
Retrieves auxiliary dictionaries from the metadata json object
Parameters:
-----------
schema : dictionary
Input schema where the requested dictionaries are defined
node_tids: dictionary
This dictionary contains the information about nodes for each node_type.
Typically this information contains p-entries, where each entry has a file-name,
starting and ending type_node_ids for the nodes in this file. Keys in this dictionary
are the node_type and value is a list of lists. Each individual entry in this list has
three items: file-name, starting type_nid and ending type_nid
Returns:
--------
dictionary :
map between the node_types and global_id ranges for each node_type
dictionary :
map between the node_type and total node count for that type
a dictionary where keys are node_type names and values are global_nid range, which is a tuple.
"""
return schema["nid"], schema["node_type_id_count"]
ntypes_gid_range = {}
offset = 0
for k, v in node_tids.items():
ntypes_gid_range[k] = [offset + int(v[0][0]), offset + int(v[-1][1])]
offset += int(v[-1][1])
return ntypes_gid_range
def write_metadata_json(metadata_list, output_dir, graph_name):
"""
......@@ -178,7 +184,7 @@ def write_metadata_json(metadata_list, output_dir, graph_name):
with open('{}/{}.json'.format(output_dir, graph_name), 'w') as outfile:
json.dump(graph_metadata, outfile, sort_keys=True, indent=4)
def augment_edge_data(edge_data, part_ids, id_offset):
def augment_edge_data(edge_data, part_ids, edge_tids, rank, world_size):
"""
Add partition-id (rank which owns an edge) column to the edge_data.
......@@ -190,61 +196,27 @@ def augment_edge_data(edge_data, part_ids, id_offset):
array of part_ids indexed by global_nid
"""
#add global_nids to the node_data
global_eids = np.arange(id_offset, id_offset + len(edge_data[constants.GLOBAL_TYPE_EID]), dtype=np.int64)
etype_offset = {}
offset = 0
for etype_name, tid_range in edge_tids.items():
assert int(tid_range[0][0]) == 0
assert len(tid_range) == world_size
etype_offset[etype_name] = offset + int(tid_range[0][0])
offset += int(tid_range[-1][1])
global_eids = []
for etype_name, tid_range in edge_tids.items():
global_eid_start = etype_offset[etype_name]
begin = global_eid_start + int(tid_range[rank][0])
end = global_eid_start + int(tid_range[rank][1])
global_eids.append(np.arange(begin, end, dtype=np.int64))
global_eids = np.concatenate(global_eids)
assert global_eids.shape[0] == edge_data[constants.ETYPE_ID].shape[0]
edge_data[constants.GLOBAL_EID] = global_eids
#assign the owner process/rank for each edge
edge_data[constants.OWNER_PROCESS] = part_ids[edge_data[constants.GLOBAL_DST_ID]]
def augment_node_data(node_data, part_ids, offset):
"""
Utility function to add auxilary columns to the node_data numpy ndarray.
Parameters:
-----------
node_data : dictionary
Node information as read from xxx_nodes.txt file and a dictionary is built using this data
using keys as column names and values as column data from the csv txt file.
part_ids : numpy array
array of part_ids indexed by global_nid
"""
#add global_nids to the node_data
global_nids = np.arange(offset, offset + len(node_data[constants.GLOBAL_TYPE_NID]), dtype=np.int64)
node_data[constants.GLOBAL_NID] = global_nids
#add owner proc_ids to the node_data
proc_ids = part_ids[node_data[constants.GLOBAL_NID]]
node_data[constants.OWNER_PROCESS] = proc_ids
def read_nodes_file(nodes_file):
"""
Utility function to read xxx_nodes.txt file
Parameters:
-----------
nodesfile : string
Graph file for nodes in the input graph
Returns:
--------
dictionary
Nodes data stored in dictionary where keys are column names
and values are the columns from the numpy ndarray as read from the
xxx_nodes.txt file
"""
if nodes_file == "" or nodes_file == None:
return None
# Read the file from here.
# Assuming the nodes file is a numpy file
# nodes.txt file is of the following format
# <node_type> <weight1> <weight2> <weight3> <weight4> <global_type_nid> <attributes>
# For the ogb-mag dataset, nodes.txt is of the above format.
nodes_data = np.loadtxt(nodes_file, delimiter=' ', dtype='int64')
nodes_datadict = {}
nodes_datadict[constants.NTYPE_ID] = nodes_data[:,0]
nodes_datadict[constants.GLOBAL_TYPE_NID] = nodes_data[:,5]
return nodes_datadict
def read_edges_file(edge_file, edge_data_dict):
"""
Utility function to read xxx_edges.txt file
......@@ -268,23 +240,13 @@ def read_edges_file(edge_file, edge_data_dict):
# global_src_id -- global idx for the source node ... line # in the graph_nodes.txt
# global_dst_id -- global idx for the destination id node ... line # in the graph_nodes.txt
edge_data = np.loadtxt(edge_file , delimiter=' ', dtype = 'int64')
if (edge_data_dict == None):
edge_data_df = csv.read_csv(edge_file, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
edge_data_dict = {}
edge_data_dict[constants.GLOBAL_SRC_ID] = edge_data[:,0]
edge_data_dict[constants.GLOBAL_DST_ID] = edge_data[:,1]
edge_data_dict[constants.GLOBAL_TYPE_EID] = edge_data[:,2]
edge_data_dict[constants.ETYPE_ID] = edge_data[:,3]
else:
edge_data_dict[constants.GLOBAL_SRC_ID] = \
np.concatenate((edge_data_dict[constants.GLOBAL_SRC_ID], edge_data[:,0]))
edge_data_dict[constants.GLOBAL_DST_ID] = \
np.concatenate((edge_data_dict[constants.GLOBAL_DST_ID], edge_data[:,1]))
edge_data_dict[constants.GLOBAL_TYPE_EID] = \
np.concatenate((edge_data_dict[constants.GLOBAL_TYPE_EID], edge_data[:,2]))
edge_data_dict[constants.ETYPE_ID] = \
np.concatenate((edge_data_dict[constants.ETYPE_ID], edge_data[:,3]))
edge_data_dict[constants.GLOBAL_SRC_ID] = edge_data_df['f0'].to_numpy()
edge_data_dict[constants.GLOBAL_DST_ID] = edge_data_df['f1'].to_numpy()
edge_data_dict[constants.GLOBAL_TYPE_EID] = edge_data_df['f2'].to_numpy()
edge_data_dict[constants.ETYPE_ID] = edge_data_df['f3'].to_numpy()
return edge_data_dict
def read_node_features_file(nodes_features_file):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment