Unverified Commit 7f8e1cf2 authored by kylasa's avatar kylasa Committed by GitHub
Browse files

[Distributed] Change for the new input format for distributed partitioning (#4273)

* Code changes to address the updated file format support for massively large graphs.

1. Updated the docstring for the starting function 'gen_dist_partitions" to describe the newly proposed file format for input dataset.
2. Code which was dependent on the structure of the old-metadata json object has been updated to read from the newly proposed metadata file.
3. Fixed some errors when appropriate functions were invoked and the calling function expects return values from the invoked furnction.
4. This modified code has been tested on "mag" dataset using 4-way partitions and verified the results

* Code changes to address the CI review comments

1. Improved docstrings for some functions.
2. Added a new function in the utils.py to compute the id ranges and this is used in multiple places.

* Added TODO to indicate the redundant data structure.

Because of the new file format changes, one of the dictionaries (node_feature_tids, node_tids) will be redundant. Added TODO text so that this will be removed in the next iteration of code changes.
parent dad3606a
...@@ -18,3 +18,19 @@ SHUFFLE_GLOBAL_DST_ID = "shuffle_global_dst_id" ...@@ -18,3 +18,19 @@ SHUFFLE_GLOBAL_DST_ID = "shuffle_global_dst_id"
OWNER_PROCESS = "owner_proc_id" OWNER_PROCESS = "owner_proc_id"
PART_LOCAL_NID = "part_local_nid" PART_LOCAL_NID = "part_local_nid"
GLOO_MESSAGING_TIMEOUT = 60*60 #seconds
STR_NODE_TYPE = "node_type"
STR_NUM_NODES_PER_CHUNK = "num_nodes_per_chunk"
STR_EDGE_TYPE = "edge_type"
STR_NUM_EDGES_PER_CHUNK = "num_edges_per_chunk"
STR_EDGES = "edges"
STR_FORMAT = "format"
STR_DATA = "data"
STR_NODE_DATA = "node_data"
STR_EDGE_DATA = "edge_data"
STR_NUMPY = "numpy"
STR_CSV = "csv"
STR_NAME = "name"
...@@ -9,7 +9,7 @@ import pyarrow ...@@ -9,7 +9,7 @@ import pyarrow
import pandas as pd import pandas as pd
import constants import constants
from pyarrow import csv from pyarrow import csv
from utils import read_json from utils import read_json, get_idranges
def create_dgl_object(graph_name, num_parts, \ def create_dgl_object(graph_name, num_parts, \
schema, part_id, node_data, \ schema, part_id, node_data, \
...@@ -95,19 +95,11 @@ def create_dgl_object(graph_name, num_parts, \ ...@@ -95,19 +95,11 @@ def create_dgl_object(graph_name, num_parts, \
map between edge type(string) and edge_type_id(int) map between edge type(string) and edge_type_id(int)
""" """
#create auxiliary data structures from the schema object #create auxiliary data structures from the schema object
node_info = schema["nid"] ntid_dict, global_nid_ranges = get_idranges(schema[constants.STR_NODE_TYPE],
offset = 0 schema[constants.STR_NUM_NODES_PER_CHUNK])
global_nid_ranges = {}
for k, v in node_info.items():
global_nid_ranges[k] = np.array([offset + int(v["data"][0][1]), offset + int(v["data"][-1][2])]).reshape(1,2)
offset += int(v["data"][-1][2])
edge_info = schema["eid"] etid_dict, global_eid_ranges = get_idranges(schema[constants.STR_EDGE_TYPE],
offset = 0 schema[constants.STR_NUM_EDGES_PER_CHUNK])
global_eid_ranges = {}
for k, v in edge_info.items():
global_eid_ranges[k] = np.array([offset + int(v["data"][0][1]), offset + int(v["data"][-1][2])]).reshape(1,2)
offset += int(v["data"][-1][2])
id_map = dgl.distributed.id_map.IdMap(global_nid_ranges) id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)
...@@ -120,10 +112,10 @@ def create_dgl_object(graph_name, num_parts, \ ...@@ -120,10 +112,10 @@ def create_dgl_object(graph_name, num_parts, \
etypes.sort(key=lambda e: e[1]) etypes.sort(key=lambda e: e[1])
etype_offset_np = np.array([e[1] for e in etypes]) etype_offset_np = np.array([e[1] for e in etypes])
etypes = [e[0] for e in etypes] etypes = [e[0] for e in etypes]
etypes_map = {e: i for i, e in enumerate(etypes)} etypes_map = {e.split(":")[1]: i for i, e in enumerate(etypes)}
node_map_val = {ntype: [] for ntype in ntypes} node_map_val = {ntype: [] for ntype in ntypes}
edge_map_val = {etype: [] for etype in etypes} edge_map_val = {etype.split(":")[1]: [] for etype in etypes}
shuffle_global_nids, ntype_ids, global_type_nid = node_data[constants.SHUFFLE_GLOBAL_NID], \ shuffle_global_nids, ntype_ids, global_type_nid = node_data[constants.SHUFFLE_GLOBAL_NID], \
node_data[constants.NTYPE_ID], node_data[constants.GLOBAL_TYPE_NID] node_data[constants.NTYPE_ID], node_data[constants.GLOBAL_TYPE_NID]
...@@ -158,8 +150,10 @@ def create_dgl_object(graph_name, num_parts, \ ...@@ -158,8 +150,10 @@ def create_dgl_object(graph_name, num_parts, \
# Determine the edge ID range of different edge types. # Determine the edge ID range of different edge types.
edge_id_start = edgeid_offset edge_id_start = edgeid_offset
for etype_name in global_eid_ranges: for etype_name in global_eid_ranges:
etype_id = etypes_map[etype_name] tokens = etype_name.split(":")
edge_map_val[etype_name].append([edge_id_start, assert len(tokens) == 3
etype_id = etypes_map[tokens[1]]
edge_map_val[tokens[1]].append([edge_id_start,
edge_id_start + np.sum(etype_ids == etype_id)]) edge_id_start + np.sum(etype_ids == etype_id)])
edge_id_start += np.sum(etype_ids == etype_id) edge_id_start += np.sum(etype_ids == etype_id)
......
...@@ -36,12 +36,12 @@ if __name__ == "__main__": ...@@ -36,12 +36,12 @@ if __name__ == "__main__":
help='The number of partitions') help='The number of partitions')
parser.add_argument('--output', required=True, type=str, parser.add_argument('--output', required=True, type=str,
help='The output directory of the partitioned results') help='The output directory of the partitioned results')
parser.add_argument('--partitions-dir', help='directory of the partition-ids for each node type',
default=None, type=str)
#arguments needed for the distributed implementation #arguments needed for the distributed implementation
parser.add_argument('--world-size', help='no. of processes to spawn', parser.add_argument('--world-size', help='no. of processes to spawn',
default=1, type=int, required=True) default=1, type=int, required=True)
parser.add_argument('--partitions-file', help='filename of the output of dgl_part2 (metis partitions)',
default=None, type=str)
params = parser.parse_args() params = parser.parse_args()
#invoke the pipeline function #invoke the pipeline function
......
...@@ -11,9 +11,10 @@ import dgl ...@@ -11,9 +11,10 @@ import dgl
from timeit import default_timer as timer from timeit import default_timer as timer
from datetime import timedelta from datetime import timedelta
from dataset_utils import get_dataset from dataset_utils import get_dataset
from utils import read_partitions_file, read_json, get_node_types, \ from utils import read_ntype_partition_files, read_json, get_node_types, \
augment_edge_data, get_gnid_range_map, \ augment_edge_data, get_gnid_range_map, \
write_dgl_objects, write_metadata_json write_dgl_objects, write_metadata_json, get_ntype_featnames, \
get_idranges
from gloo_wrapper import alltoall_cpu_object_lst, alltoallv_cpu, \ from gloo_wrapper import alltoall_cpu_object_lst, alltoallv_cpu, \
alltoall_cpu, allgather_sizes, gather_metadata_json alltoall_cpu, allgather_sizes, gather_metadata_json
from globalids import assign_shuffle_global_nids_nodes, \ from globalids import assign_shuffle_global_nids_nodes, \
...@@ -21,7 +22,7 @@ from globalids import assign_shuffle_global_nids_nodes, \ ...@@ -21,7 +22,7 @@ from globalids import assign_shuffle_global_nids_nodes, \
get_shuffle_global_nids_edges get_shuffle_global_nids_edges
from convert_partition import create_dgl_object, create_metadata_json from convert_partition import create_dgl_object, create_metadata_json
def gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, nid_schema_map): def gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, schema_map):
''' '''
For this data processing pipeline, reading node files is not needed. All the needed information about For this data processing pipeline, reading node files is not needed. All the needed information about
the nodes can be found in the metadata json file. This function generates the nodes owned by a given the nodes can be found in the metadata json file. This function generates the nodes owned by a given
...@@ -37,46 +38,47 @@ def gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, nid_schema_ma ...@@ -37,46 +38,47 @@ def gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, nid_schema_ma
numpy array, whose length is same as no. of nodes in the graph. Index in this array is the global_nid numpy array, whose length is same as no. of nodes in the graph. Index in this array is the global_nid
and value is the partition-id which owns the node and value is the partition-id which owns the node
ntid_ntype_map : ntid_ntype_map :
a dictionary where keys are node_type ids and values are node_type names a dictionary where keys are node_type ids(integers) and values are node_type names(strings).
nid_schema_map: schema_map:
a dictionary, which is extracted from the input graph metadata json file for node information, dictionary formed by reading the input metadata json file for the input dataset.
using the key is "nid". This dictionary, as described below, has information about all the node types
present in the input graph.
Please note that, it is assumed that for the input graph files, the nodes of a particular node-type are Please note that, it is assumed that for the input graph files, the nodes of a particular node-type are
split into `p` files (because of `p` partitions to be generated). On a similar node, edges of a particular split into `p` files (because of `p` partitions to be generated). On a similar node, edges of a particular
edge-type are split into `p` files as well. edge-type are split into `p` files as well.
For instance, a generic dictionaries for "nid" keys are as follows:
"ntype0-name", "ntype1-name" etc... are the user supplied names for the node types present in the input graph.
"format" specifies the structure of the files' content. And "data" has a value which is a list of lists.
Each list has 3 entries which are file-name (including either an absolute path or relative), start and end ids
which are type ids of the nodes read from the corresponding files.
"nid" : { #m : no. of node types
"ntype0-name": {
"format": "csv",
"data" : [ #list of lists
["<path>/ntype0-name-0.txt", 0, id_end0], # These are type_nids for the nodes
["<path>/ntype0-name-1.txt", id_start1, id_end1],
...,
["<path>/ntype0-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
]
},
....,
"ntype<m-1>-name" : {
"format" : "csv",
"data" : [
["<path>/ntype<m-1>-name-0.txt", 0, id_end0],
["<path>/ntype<m-1>-name-1.txt", id_start1, id_end1],
...
["<path>/ntype<m-1>-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
]
}
}
#assuming m nodetypes present in the input graph
"num_nodes_per_chunk" : [
[a0, a1, a2, ... a<p-1>],
[b0, b1, b2, ... b<p-1>],
...
[m0, m1, m2, ... m<p-1>]
]
Here, each sub-list, corresponding a nodetype in the input graph, has `p` elements. For instance [a0, a1, ... a<p-1>]
where each element represents the number of nodes which are to be processed by a process during distributed partitioning.
In addition to the above key-value pair for the nodes in the graph, the node-features are captured in the
"node_data" key-value pair. In this dictionary the keys will be nodetype names and value will be a dictionary which
is used to capture all the features present for that particular node-type. This is shown in the following example:
"node_data" : {
"paper": { # node type
"feat": { # feature key
"format": {"name": "numpy"},
"data": ["node_data/paper-feat-part1.npy", "node_data/paper-feat-part2.npy"]
},
"label": { # feature key
"format": {"name": "numpy"},
"data": ["node_data/paper-label-part1.npy", "node_data/paper-label-part2.npy"]
},
"year": { # feature key
"format": {"name": "numpy"},
"data": ["node_data/paper-year-part1.npy", "node_data/paper-year-part2.npy"]
}
}
}
In the above textual description we have a node-type, which is paper, and it has 3 features namely feat, label and year.
Each feature has `p` files whose location in the filesystem is the list for the key "data" and "foramt" is used to
describe storage format.
Returns: Returns:
-------- --------
...@@ -89,15 +91,13 @@ def gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, nid_schema_ma ...@@ -89,15 +91,13 @@ def gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, nid_schema_ma
constants.NTYPE_ID : [], constants.NTYPE_ID : [],
constants.GLOBAL_TYPE_NID : [] constants.GLOBAL_TYPE_NID : []
} }
gnid_start = 0
gnid_end = 0
for ntypeid in range(len(ntid_ntype_map)):
ntype_name = ntid_ntype_map[str(ntypeid)]
ntype_info = nid_schema_map[ntype_name]
type_start = int(ntype_info["data"][0][1])
type_end = int(ntype_info["data"][-1][2])
gnid_end += type_end type_nid_dict, global_nid_dict = get_idranges(schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_CHUNK])
for ntype_id, ntype_name in ntid_ntype_map.items():
type_start, type_end = type_nid_dict[ntype_name][0][0], type_nid_dict[ntype_name][-1][1]
gnid_start, gnid_end = global_nid_dict[ntype_name][0, 0], global_nid_dict[ntype_name][0, 1]
node_partid_slice = node_part_ids[gnid_start:gnid_end] node_partid_slice = node_part_ids[gnid_start:gnid_end]
cond = node_partid_slice == rank cond = node_partid_slice == rank
...@@ -107,10 +107,9 @@ def gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, nid_schema_ma ...@@ -107,10 +107,9 @@ def gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, nid_schema_ma
own_tnids = np.arange(type_start, type_end, dtype=np.int64) own_tnids = np.arange(type_start, type_end, dtype=np.int64)
own_tnids = own_tnids[cond] own_tnids = own_tnids[cond]
local_node_data[constants.NTYPE_ID].append(np.ones(own_gnids.shape, dtype=np.int64)*ntypeid) local_node_data[constants.NTYPE_ID].append(np.ones(own_gnids.shape, dtype=np.int64)*ntype_id)
local_node_data[constants.GLOBAL_NID].append(own_gnids) local_node_data[constants.GLOBAL_NID].append(own_gnids)
local_node_data[constants.GLOBAL_TYPE_NID].append(own_tnids) local_node_data[constants.GLOBAL_TYPE_NID].append(own_tnids)
gnid_start = gnid_end
for k in local_node_data.keys(): for k in local_node_data.keys():
local_node_data[k] = np.concatenate(local_node_data[k]) local_node_data[k] = np.concatenate(local_node_data[k])
...@@ -292,10 +291,10 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map, ...@@ -292,10 +291,10 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map,
own_node_features[feat_key] = [] own_node_features[feat_key] = []
own_global_nids[feat_key] = [] own_global_nids[feat_key] = []
for idx, x in enumerate(output_feat_list): for idx, x in enumerate(output_feat_list):
own_node_features.append(x[feat_key]) own_node_features[feat_key].append(x[feat_key])
own_global_nids[feat_key].append(output_nid_list[idx][feat_key]) own_global_nids[feat_key].append(output_nid_list[idx][feat_key])
for k in own_node_features.keys(): for k in own_node_features.keys():
own_node_features[k] = th.cat(own_node_features[k]) own_node_features[k] = torch.cat(own_node_features[k])
own_global_nids[k] = np.concatenate(own_global_nids[k]) own_global_nids[k] = np.concatenate(own_global_nids[k])
end = timer() end = timer()
...@@ -303,7 +302,7 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map, ...@@ -303,7 +302,7 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map,
return own_node_features, own_global_nids return own_node_features, own_global_nids
def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_data, def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_data,
node_part_ids, ntypes_map, ntypes_gnid_range_map, ntid_ntype_map, schema_map): node_part_ids, ntypes_ntypeid_map, ntypes_gnid_range_map, ntid_ntype_map, schema_map):
""" """
Wrapper function which is used to shuffle graph data on all the processes. Wrapper function which is used to shuffle graph data on all the processes.
...@@ -326,7 +325,7 @@ def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_da ...@@ -326,7 +325,7 @@ def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_da
to each process. to each process.
node_part_ids : numpy array node_part_ids : numpy array
numpy array which store the partition-ids and indexed by global_nids numpy array which store the partition-ids and indexed by global_nids
ntypes_map : dictionary ntypes_ntypeid_map : dictionary
mappings between node type names and node type ids mappings between node type names and node type ids
ntypes_gnid_range_map : dictionary ntypes_gnid_range_map : dictionary
mapping between node type names and global_nids which belong to the keys in this dictionary mapping between node type names and global_nids which belong to the keys in this dictionary
...@@ -354,7 +353,7 @@ def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_da ...@@ -354,7 +353,7 @@ def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_da
ntypes_gnid_range_map, node_part_ids, node_features) ntypes_gnid_range_map, node_part_ids, node_features)
print( 'Rank: ', rank, ' Done with node features exchange.') print( 'Rank: ', rank, ' Done with node features exchange.')
node_data = gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, schema_map["nid"]) node_data = gen_node_data(rank, world_size, node_part_ids, ntid_ntype_map, schema_map)
edge_data = exchange_edge_data(rank, world_size, edge_data) edge_data = exchange_edge_data(rank, world_size, edge_data)
return node_data, rcvd_node_features, rcvd_global_nids, edge_data return node_data, rcvd_node_features, rcvd_global_nids, edge_data
...@@ -370,7 +369,7 @@ def read_dataset(rank, world_size, node_part_ids, params, schema_map): ...@@ -370,7 +369,7 @@ def read_dataset(rank, world_size, node_part_ids, params, schema_map):
----------- -----------
rank : int rank : int
rank of the current process rank of the current process
worls_size : int world_size : int
total no. of processes instantiated total no. of processes instantiated
node_part_ids : numpy array node_part_ids : numpy array
metis partitions which are the output of partitioning algorithm metis partitions which are the output of partitioning algorithm
...@@ -399,10 +398,11 @@ def read_dataset(rank, world_size, node_part_ids, params, schema_map): ...@@ -399,10 +398,11 @@ def read_dataset(rank, world_size, node_part_ids, params, schema_map):
edge features which is also a dictionary, similar to node features dictionary edge features which is also a dictionary, similar to node features dictionary
""" """
edge_features = {} edge_features = {}
#node_tids, node_features, edge_datadict, edge_tids
node_tids, node_features, node_feat_tids, edge_data, edge_tids = \ node_tids, node_features, node_feat_tids, edge_data, edge_tids = \
get_dataset(params.input_dir, params.graph_name, rank, world_size, schema_map) get_dataset(params.input_dir, params.graph_name, rank, world_size, schema_map)
augment_edge_data(edge_data, node_part_ids, prefix_sum_edges[rank]) augment_edge_data(edge_data, node_part_ids, edge_tids, rank, world_size)
print('[Rank: ', rank, '] Done augmenting edge_data: ', len(edge_data), edge_data[constants.GLOBAL_SRC_ID].shape) print('[Rank: ', rank, '] Done augmenting edge_data: ', len(edge_data), edge_data[constants.GLOBAL_SRC_ID].shape)
return node_tids, node_features, node_feat_tids, edge_data, edge_features return node_tids, node_features, node_feat_tids, edge_data, edge_features
...@@ -441,46 +441,46 @@ def gen_dist_partitions(rank, world_size, params): ...@@ -441,46 +441,46 @@ def gen_dist_partitions(rank, world_size, params):
the nodes and edges in any given file, their global_ids can be easily computed as well. the nodes and edges in any given file, their global_ids can be easily computed as well.
{ {
"nid" : { #m : no. of node types "graph_name" : xyz,
"ntype0-name": { "node_type" : ["ntype0-name", "ntype1-name", ....], #m node types
"format": "csv", "num_nodes_per_chunk" : [
"data" : [ #list of lists [a0, a1, ...a<p-1>], #p partitions
["<path>/ntype0-name-0.txt", 0, id_end0], # These are type_nids for the nodes [b0, b1, ... b<p-1>],
["<path>/ntype0-name-1.txt", id_start1, id_end1], ....
..., [c0, c1, ..., c<p-1>] #no, of node types
["<path>/ntype0-name-<p-1>.txt", id_start<p-1>, id_end<p-1>] ],
] "edge_type" : ["src_ntype:edge_type:dst_ntype", ....], #k edge types
}, "num_edges_per_chunk" : [
...., [a0, a1, ...a<p-1>], #p partitions
"ntype<m-1>-name" : { [b0, b1, ... b<p-1>],
"format" : "csv", ....
"data" : [ [c0, c1, ..., c<p-1>] #no, of edge types
["<path>/user-sup-ntype<m-1>-name-0.txt", 0, id_end0], ],
["<path>/user-sup-ntype<m-1>-name-1.txt", id_start1, id_end1],
...
["<path>/user-sup-ntype<m-1>-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
]
}
},
"node_data" : { "node_data" : {
"ntype0-name" : { "ntype0-name" : {
"feat0-name" : [ #list of lists "feat0-name" : {
["<path>/feat-0.npy", 0, id_end0], "format" : {"name": "numpy"},
["<path>/feat-1.npy", id_start1, id_end1], "data" : [ #list of lists
.... ["<path>/feat-0.npy", 0, id_end0],
["<path>/feat-<p-1>.npy", id_start<p-1>, id_end<p-1>] ["<path>/feat-1.npy", id_start1, id_end1],
] ....
"feat1-name" : [ #list of lists ["<path>/feat-<p-1>.npy", id_start<p-1>, id_end<p-1>]
["<path>/feat-0.npy", 0, id_end0], ]
["<path>/feat-1.npy", id_start1, id_end1], },
.... "feat1-name" : {
["<path>/feat-<p-1>.npy", id_start<p-1>, id_end<p-1>] "format" : {"name": "numpy"},
] "data" : [ #list of lists
["<path>/feat-0.npy", 0, id_end0],
["<path>/feat-1.npy", id_start1, id_end1],
....
["<path>/feat-<p-1>.npy", id_start<p-1>, id_end<p-1>]
]
}
} }
}, },
"eid": { #k edge types "edges": { #k edge types
"src_ntype:etype0-name:dst_ntype" : { "src_ntype:etype0-name:dst_ntype" : {
"format": "csv", "format": {"name" : "csv", "delimiter" : " "},
"data" : [ "data" : [
["<path>/etype0-name-0.txt", 0, id_end0], #These are type_edge_ids for edges of this type ["<path>/etype0-name-0.txt", 0, id_end0], #These are type_edge_ids for edges of this type
["<path>/etype0-name-1.txt", id_start1, id_end1], ["<path>/etype0-name-1.txt", id_start1, id_end1],
...@@ -490,7 +490,7 @@ def gen_dist_partitions(rank, world_size, params): ...@@ -490,7 +490,7 @@ def gen_dist_partitions(rank, world_size, params):
}, },
..., ...,
"src_ntype:etype<k-1>-name:dst_ntype" : { "src_ntype:etype<k-1>-name:dst_ntype" : {
"format": "csv", "format": {"name" : "csv", "delimiter" : " "},
"data" : [ "data" : [
["<path>/etype<k-1>-name-0.txt", 0, id_end0], ["<path>/etype<k-1>-name-0.txt", 0, id_end0],
["<path>/etype<k-1>-name-1.txt", id_start1, id_end1], ["<path>/etype<k-1>-name-1.txt", id_start1, id_end1],
...@@ -499,6 +499,7 @@ def gen_dist_partitions(rank, world_size, params): ...@@ -499,6 +499,7 @@ def gen_dist_partitions(rank, world_size, params):
] ]
}, },
}, },
}
The function performs the following steps: The function performs the following steps:
1. Reads the metis partitions to identify the owner process of all the nodes in the entire graph. 1. Reads the metis partitions to identify the owner process of all the nodes in the entire graph.
...@@ -534,12 +535,14 @@ def gen_dist_partitions(rank, world_size, params): ...@@ -534,12 +535,14 @@ def gen_dist_partitions(rank, world_size, params):
print('[Rank: ', rank, '] Starting distributed data processing pipeline...') print('[Rank: ', rank, '] Starting distributed data processing pipeline...')
#init processing #init processing
schema_map = read_json(os.path.join(params.input_dir, params.schema))
#TODO: For large graphs, this mapping function can be memory intensive. This needs to be changed to #TODO: For large graphs, this mapping function can be memory intensive. This needs to be changed to
#processes owning a set of global-nids, per partitioning algorithm, and messaging will be used to #processes owning a set of global-nids, per partitioning algorithm, and messaging will be used to
#identify the ownership instead of mem. lookups. #identify the ownership instead of mem. lookups.
node_part_ids = read_partitions_file(params.input_dir+'/'+params.partitions_file) node_part_ids = read_ntype_partition_files(schema_map, os.path.join(params.input_dir, params.partitions_dir))
schema_map = read_json(params.input_dir+'/'+params.schema)
ntypes_map, ntypes, ntypeid_ntypes_map = get_node_types(schema_map) ntypes_ntypeid_map, ntypes, ntypeid_ntypes_map = get_node_types(schema_map)
print('[Rank: ', rank, '] Initialized metis partitions and node_types map...') print('[Rank: ', rank, '] Initialized metis partitions and node_types map...')
#read input graph files and augment these datastructures with #read input graph files and augment these datastructures with
...@@ -551,9 +554,9 @@ def gen_dist_partitions(rank, world_size, params): ...@@ -551,9 +554,9 @@ def gen_dist_partitions(rank, world_size, params):
#this function will also stitch the data recvd from other processes #this function will also stitch the data recvd from other processes
#and return the aggregated data #and return the aggregated data
ntypes_gnid_range_map = get_gnid_range_map(node_tids) ntypes_gnid_range_map = get_gnid_range_map(node_tids)
node_data, rcvd_node_features, rcvd_global_nids = \ node_data, rcvd_node_features, rcvd_global_nids, edge_data = \
exchange_graph_data(rank, world_size, node_features, node_feat_tids, \ exchange_graph_data(rank, world_size, node_features, node_feat_tids, \
edge_data, node_part_ids, ntypes_map, ntypes_gnid_range_map, \ edge_data, node_part_ids, ntypes_ntypeid_map, ntypes_gnid_range_map, \
ntypeid_ntypes_map, schema_map) ntypeid_ntypes_map, schema_map)
print('[Rank: ', rank, '] Done with data shuffling...') print('[Rank: ', rank, '] Done with data shuffling...')
...@@ -597,7 +600,7 @@ def gen_dist_partitions(rank, world_size, params): ...@@ -597,7 +600,7 @@ def gen_dist_partitions(rank, world_size, params):
start = timer() start = timer()
num_nodes = 0 num_nodes = 0
num_edges = shuffle_global_eid_start num_edges = shuffle_global_eid_start
graph_obj, ntypes_map_val, etypes_map_val, ntypes_map, etypes_map = create_dgl_object(\ graph_obj, ntypes_map_val, etypes_map_val, ntypes_ntypeid_map, etypes_map = create_dgl_object(\
params.graph_name, params.num_parts, \ params.graph_name, params.num_parts, \
schema_map, rank, node_data, edge_data, num_nodes, num_edges) schema_map, rank, node_data, edge_data, num_nodes, num_edges)
write_dgl_objects(graph_obj, rcvd_node_features, edge_features, params.output, rank) write_dgl_objects(graph_obj, rcvd_node_features, edge_features, params.output, rank)
...@@ -605,7 +608,7 @@ def gen_dist_partitions(rank, world_size, params): ...@@ -605,7 +608,7 @@ def gen_dist_partitions(rank, world_size, params):
#get the meta-data #get the meta-data
json_metadata = create_metadata_json(params.graph_name, len(node_data[constants.NTYPE_ID]), len(edge_data[constants.ETYPE_ID]), \ json_metadata = create_metadata_json(params.graph_name, len(node_data[constants.NTYPE_ID]), len(edge_data[constants.ETYPE_ID]), \
rank, world_size, ntypes_map_val, \ rank, world_size, ntypes_map_val, \
etypes_map_val, ntypes_map, etypes_map, params.output) etypes_map_val, ntypes_ntypeid_map, etypes_map, params.output)
if (rank == 0): if (rank == 0):
#get meta-data from all partitions and merge them on rank-0 #get meta-data from all partitions and merge them on rank-0
...@@ -682,7 +685,7 @@ def multi_machine_run(params): ...@@ -682,7 +685,7 @@ def multi_machine_run(params):
rank = int(os.environ["RANK"]) rank = int(os.environ["RANK"])
#init the gloo process group here. #init the gloo process group here.
dist.init_process_group("gloo", rank=rank, world_size=params.world_size, timeout=timedelta(seconds=5*60)) dist.init_process_group("gloo", rank=rank, world_size=params.world_size, timeout=timedelta(seconds=constants.GLOO_MESSAGING_TIMEOUT))
print('[Rank: ', rank, '] Done with process group initialization...') print('[Rank: ', rank, '] Done with process group initialization...')
#invoke the main function here. #invoke the main function here.
......
...@@ -5,6 +5,7 @@ import torch ...@@ -5,6 +5,7 @@ import torch
import pyarrow import pyarrow
from pyarrow import csv from pyarrow import csv
from utils import get_idranges
def get_dataset(input_dir, graph_name, rank, world_size, schema_map): def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
""" """
...@@ -48,60 +49,204 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map): ...@@ -48,60 +49,204 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
edge-type may have several edge features and associated tensor data. edge-type may have several edge features and associated tensor data.
""" """
#node features dictionary #node features dictionary
#TODO: With the new file format, It is guaranteed that the input dataset will have
#no. of nodes with features (node-features) files and nodes metadata will always be the same.
#This means the dimension indicating the no. of nodes in any node-feature files and the no. of
#nodes in the corresponding nodes metadata file will always be the same. With this guarantee,
#we can eliminate the `node_feature_tids` dictionary since the same information is also populated
#in the `node_tids` dictionary. This will be remnoved in the next iteration of code changes.
node_features = {} node_features = {}
node_feature_tids = {} node_feature_tids = {}
'''
The structure of the node_data is as follows, which is present in the input metadata json file.
"node_data" : {
"ntype0-name" : {
"feat0-name" : {
"format" : {"name": "numpy"},
"data" : [ #list
"<path>/feat-0.npy",
"<path>/feat-1.npy",
....
"<path>/feat-<p-1>.npy"
]
},
"feat1-name" : {
"format" : {"name": "numpy"},
"data" : [ #list
"<path>/feat-0.npy",
"<path>/feat-1.npy",
....
"<path>/feat-<p-1>.npy"
]
}
}
}
As shown above, the value for the key "node_data" is a dictionary object, which is
used to describe the feature data for each of the node-type names. Keys in this top-level
dictionary are node-type names and value is a dictionary which captures all the features
for the current node-type. Feature data is captured with keys being the feature-names and
value is a dictionary object which has 2 keys namely format and data. Format entry is used
to mention the format of the storage used by the node features themselves and "data" is used
to mention all the files present for this given node feature.
Data read from each of the node features file is a multi-dimensional tensor data and is read
in numpy format, which is also the storage format of node features on the permanent storage.
'''
#iterate over the "node_data" dictionary in the schema_map #iterate over the "node_data" dictionary in the schema_map
#read the node features if exists #read the node features if exists
#also keep track of the type_nids for which the node_features are read. #also keep track of the type_nids for which the node_features are read.
dataset_features = schema_map["node_data"] dataset_features = schema_map[constants.STR_NODE_DATA]
for ntype_name, ntype_feature_data in dataset_features.items(): if((dataset_features is not None) and (len(dataset_features) > 0)):
#ntype_feature_data is a dictionary for ntype_name, ntype_feature_data in dataset_features.items():
#where key: feature_name, value: list of lists #ntype_feature_data is a dictionary
node_feature_tids[ntype_name] = [] #where key: feature_name, value: dictionary in which keys are "format", "data"
for feat_name, feat_data in ntype_feature_data.items(): node_feature_tids[ntype_name] = []
assert len(feat_data) == world_size for feat_name, feat_data in ntype_feature_data.items():
my_feat_data = feat_data[rank] assert len(feat_data[constants.STR_DATA]) == world_size
if (os.path.isabs(my_feat_data[0])): assert feat_data[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_NUMPY
node_features[ntype_name+'/'+feat_name] = torch.from_numpy(np.load(my_feat_data[0])) my_feat_data_fname = feat_data[constants.STR_DATA][rank] #this will be just the file name
else: if (os.path.isabs(my_feat_data_fname)):
node_features[ntype_name+'/'+feat_name] = torch.from_numpy(np.load(input_dir+my_feat_data[0])) node_features[ntype_name+'/'+feat_name] = \
torch.from_numpy(np.load(my_feat_data_fname))
node_feature_tids[ntype_name].append([feat_name, my_feat_data[1], my_feat_data[2]]) else:
node_features[ntype_name+'/'+feat_name] = \
torch.from_numpy(np.load(os.path.join(input_dir, my_feat_data_fname)))
node_feature_tids[ntype_name].append([feat_name, -1, -1])
'''
"node_type" : ["ntype0-name", "ntype1-name", ....], #m node types
"num_nodes_per_chunk" : [
[a0, a1, ...a<p-1>], #p partitions
[b0, b1, ... b<p-1>],
....
[c0, c1, ..., c<p-1>] #no, of node types
],
The "node_type" points to a list of all the node names present in the graph
And "num_nodes_per_chunk" is used to mention no. of nodes present in each of the
input nodes files. These node counters are used to compute the type_node_ids as
well as global node-ids by using a simple cumulative summation and maitaining an
offset counter to store the end of the current.
Since nodes are NOT actually associated with any additional metadata, w.r.t to the processing
involved in this pipeline this information is not needed to be stored in files. This optimization
saves a considerable amount of time when loading massively large datasets for paritioning.
As opposed to reading from files and performing shuffling process each process/rank generates nodes
which are owned by that particular rank. And using the "num_nodes_per_chunk" information each
process can easily compute any nodes per-type node_id and global node_id.
The node-ids are treated as int64's in order to support billions of nodes in the input graph.
'''
#read my nodes for each node type #read my nodes for each node type
node_tids = {} node_tids, ntype_gnid_offset = get_idranges(schema_map[constants.STR_NODE_TYPE],
node_data = schema_map["nid"] schema_map[constants.STR_NUM_NODES_PER_CHUNK])
for ntype_name, ntype_info in node_data.items(): for ntype_name in schema_map[constants.STR_NODE_TYPE]:
v = [] if ntype_name in node_feature_tids:
node_file_info = ntype_info["data"] for item in node_feature_tids[ntype_name]:
for idx in range(len(node_file_info)): item[1] = node_tids[ntype_name][rank][0]
v.append((node_file_info[idx][1], node_file_info[idx][2])) item[2] = node_tids[ntype_name][rank][1]
node_tids[ntype_name] = v
#done build node_features locally.
if len(node_features) <= 0:
print('[Rank: ', rank, '] This dataset does not have any node features')
else:
for k, v in node_features.items():
print('[Rank: ', rank, '] node feature name: ', k, ', feature data shape: ', v.size())
'''
Code below is used to read edges from the input dataset with the help of the metadata json file
for the input graph dataset.
In the metadata json file, we expect the following key-value pairs to help read the edges of the
input graph.
"edge_type" : [ # a total of n edge types
canonical_etype_0,
canonical_etype_1,
...,
canonical_etype_n-1
]
The value for the key is a list of strings, each string is associated with an edgetype in the input graph.
Note that these strings are in canonical edgetypes format. This means, these edge type strings follow the
following naming convention: src_ntype:etype:dst_ntype. src_ntype and dst_ntype are node type names of the
src and dst end points of this edge type, and etype is the relation name between src and dst ntypes.
The files in which edges are present and their storage format are present in the following key-value pair:
"edges" : {
"canonical_etype_0" : {
"format" : { "name" : "csv", "delimiter" : " " },
"data" : [
filename_0,
filename_1,
filename_2,
....
filename_<p-1>
]
},
}
As shown above the "edges" dictionary value has canonical edgetypes as keys and for each canonical edgetype
we have "format" and "data" which describe the storage format of the edge files and actual filenames respectively.
Please note that each edgetype data is split in to `p` files, where p is the no. of partitions to be made of
the input graph.
Each edge file contains two columns representing the source per-type node_ids and destination per-type node_ids
of any given edge. Since these are node-ids as well they are read in as int64's.
'''
#read my edges for each edge type #read my edges for each edge type
edge_tids = {} etype_names = schema_map[constants.STR_EDGE_TYPE]
etype_name_idmap = {e : idx for idx, e in enumerate(etype_names)}
edge_tids, _ = get_idranges(schema_map[constants.STR_EDGE_TYPE],
schema_map[constants.STR_NUM_EDGES_PER_CHUNK])
edge_datadict = {} edge_datadict = {}
edge_data = schema_map["eid"] edge_data = schema_map[constants.STR_EDGES]
#read the edges files and store this data in memory.
for col in [constants.GLOBAL_SRC_ID, constants.GLOBAL_DST_ID, \
constants.GLOBAL_TYPE_EID, constants.ETYPE_ID]:
edge_datadict[col] = []
for etype_name, etype_info in edge_data.items(): for etype_name, etype_info in edge_data.items():
assert etype_info["format"] == "csv" assert etype_info[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_CSV
edge_info = etype_info["data"] edge_info = etype_info[constants.STR_DATA]
assert len(edge_info) == world_size assert len(edge_info) == world_size
data_df = csv.read_csv(edge_info[rank][0], read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True), #edgetype strings are in canonical format, src_node_type:edge_type:dst_node_type
tokens = etype_name.split(":")
assert len(tokens) == 3
src_ntype_name = tokens[0]
rel_name = tokens[1]
dst_ntype_name = tokens[2]
data_df = csv.read_csv(edge_info[rank], read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' ')) parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
edge_datadict[constants.GLOBAL_SRC_ID] = data_df['f0'].to_numpy() #currently these are just type_edge_ids... which will be converted to global ids
edge_datadict[constants.GLOBAL_DST_ID] = data_df['f1'].to_numpy() edge_datadict[constants.GLOBAL_SRC_ID].append(data_df['f0'].to_numpy() + ntype_gnid_offset[src_ntype_name][0, 0])
edge_datadict[constants.GLOBAL_TYPE_EID] = data_df['f2'].to_numpy() edge_datadict[constants.GLOBAL_DST_ID].append(data_df['f1'].to_numpy() + ntype_gnid_offset[dst_ntype_name][0, 0])
edge_datadict[constants.ETYPE_ID] = data_df['f3'].to_numpy() edge_datadict[constants.GLOBAL_TYPE_EID].append(np.arange(edge_tids[etype_name][rank][0],\
edge_tids[etype_name][rank][1] ,dtype=np.int64))
v = [] edge_datadict[constants.ETYPE_ID].append(etype_name_idmap[etype_name] * \
edge_file_info = etype_info["data"] np.ones(shape=(data_df['f0'].to_numpy().shape), dtype=np.int64))
for idx in range(len(edge_file_info)):
v.append((edge_file_info[idx][1], edge_file_info[idx][2])) #stitch together to create the final data on the local machine
edge_tids[etype_name] = v for col in [constants.GLOBAL_SRC_ID, constants.GLOBAL_DST_ID, constants.GLOBAL_TYPE_EID, constants.ETYPE_ID]:
edge_datadict[col] = np.concatenate(edge_datadict[col])
assert edge_datadict[constants.GLOBAL_SRC_ID].shape == edge_datadict[constants.GLOBAL_DST_ID].shape
assert edge_datadict[constants.GLOBAL_DST_ID].shape == edge_datadict[constants.GLOBAL_TYPE_EID].shape
assert edge_datadict[constants.GLOBAL_TYPE_EID].shape == edge_datadict[constants.ETYPE_ID].shape
print('[Rank: ', rank, '] Done reading edge_file: ', len(edge_datadict), edge_datadict[constants.GLOBAL_SRC_ID].shape) print('[Rank: ', rank, '] Done reading edge_file: ', len(edge_datadict), edge_datadict[constants.GLOBAL_SRC_ID].shape)
return node_tids, node_features, node_feature_tids, edge_datadict, edge_tids return node_tids, node_features, node_feature_tids, edge_datadict, edge_tids
......
...@@ -8,29 +8,42 @@ import constants ...@@ -8,29 +8,42 @@ import constants
import pyarrow import pyarrow
from pyarrow import csv from pyarrow import csv
def read_partitions_file(part_file): def read_ntype_partition_files(schema_map, input_dir):
""" """
Utility method to read metis partitions, which is the output of Utility method to read the partition id mapping for each node.
pm_dglpart2 For each node type, there will be an file, in the input directory argument
containing the partition id mapping for a given nodeid.
Parameters: Parameters:
----------- -----------
part_file : string schema_map : dictionary
file name which is the output of metis partitioning dictionary created by reading the input metadata json file
algorithm (pm_dglpart2, in the METIS installation). input_dir : string
This function expects each line in `part_file` to be formatted as directory in which the node-id to partition-id mappings files are
<global_nid> <part_id> located for each of the node types in the input graph
and the contents of this file are sorted by <global_nid>.
Returns: Returns:
-------- --------
numpy array numpy array :
array of part_ids and the idx is the <global_nid> array of integers representing mapped partition-ids for a given node-id.
The line number, in these files, are used as the type_node_id in each of
the files. The index into this array will be the homogenized node-id and
value will be the partition-id for that node-id (index). Please note that
the partition-ids of each node-type are stacked together vertically and
in this way heterogenous node-ids are converted to homogenous node-ids.
""" """
partitions_map = np.loadtxt(part_file, delimiter=' ', dtype=np.int64) assert os.path.isdir(input_dir)
#as a precaution sort the lines based on the <global_nid>
partitions_map = partitions_map[partitions_map[:,0].argsort()] #iterate over the node types and extract the partition id mappings
return partitions_map[:,1] part_ids = []
ntype_names = schema_map[constants.STR_NODE_TYPE]
for ntype in ntype_names:
df = csv.read_csv(os.path.join(input_dir, '{}.txt'.format(ntype)), \
read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True), \
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
ntype_partids = df['f0'].to_numpy()
part_ids.append(ntype_partids)
return np.concatenate(part_ids)
def read_json(json_file): def read_json(json_file):
""" """
...@@ -50,7 +63,7 @@ def read_json(json_file): ...@@ -50,7 +63,7 @@ def read_json(json_file):
return val return val
def get_ntype_featnames(ntype_name, schema): def get_ntype_featnames(ntype_name, schema_map):
""" """
Retrieves node feature names for a given node_type Retrieves node feature names for a given node_type
...@@ -68,24 +81,24 @@ def get_ntype_featnames(ntype_name, schema): ...@@ -68,24 +81,24 @@ def get_ntype_featnames(ntype_name, schema):
list : list :
a list of feature names for a given node_type a list of feature names for a given node_type
""" """
ntype_dict = schema["node_data"] ntype_featdict = schema_map[constants.STR_NODE_DATA]
if (ntype_name in ntype_dict): if (ntype_name in ntype_featdict):
featnames = [] featnames = []
ntype_info = ntype_dict[ntype_name] ntype_info = ntype_featdict[ntype_name]
for k, v in ntype_info.items(): for k, v in ntype_info.items():
featnames.append(k) featnames.append(k)
return featnames return featnames
else: else:
return [] return []
def get_node_types(schema): def get_node_types(schema_map):
""" """
Utility method to extract node_typename -> node_type mappings Utility method to extract node_typename -> node_type mappings
as defined by the input schema as defined by the input schema
Parameters: Parameters:
----------- -----------
schema : dictionary schema_map : dictionary
Input schema from which the node_typename -> node_type Input schema from which the node_typename -> node_type
dictionary is created. dictionary is created.
...@@ -98,12 +111,9 @@ def get_node_types(schema): ...@@ -98,12 +111,9 @@ def get_node_types(schema):
dictionary dictionary
with keys as ntype ids (integers) and values as node type names with keys as ntype ids (integers) and values as node type names
""" """
ntype_info = schema["nid"] ntypes = schema_map[constants.STR_NODE_TYPE]
ntypes = [] ntype_ntypeid_map = {e : i for i, e in enumerate(ntypes)}
for k in ntype_info.keys(): ntypeid_ntype_map = {i : e for i, e in enumerate(ntypes)}
ntypes.append(k)
ntype_ntypeid_map = {e: i for i, e in enumerate(ntypes)}
ntypeid_ntype_map = {str(i): e for i, e in enumerate(ntypes)}
return ntype_ntypeid_map, ntypes, ntypeid_ntype_map return ntype_ntypeid_map, ntypes, ntypeid_ntype_map
def get_gnid_range_map(node_tids): def get_gnid_range_map(node_tids):
...@@ -181,8 +191,8 @@ def write_metadata_json(metadata_list, output_dir, graph_name): ...@@ -181,8 +191,8 @@ def write_metadata_json(metadata_list, output_dir, graph_name):
for i in range(len(metadata_list)): for i in range(len(metadata_list)):
graph_metadata["part-{}".format(i)] = metadata_list[i]["part-{}".format(i)] graph_metadata["part-{}".format(i)] = metadata_list[i]["part-{}".format(i)]
with open('{}/{}.json'.format(output_dir, graph_name), 'w') as outfile: with open('{}/metadata.json'.format(output_dir), 'w') as outfile:
json.dump(graph_metadata, outfile, sort_keys=True, indent=4) json.dump(graph_metadata, outfile, sort_keys=False, indent=4)
def augment_edge_data(edge_data, part_ids, edge_tids, rank, world_size): def augment_edge_data(edge_data, part_ids, edge_tids, rank, world_size):
""" """
...@@ -326,14 +336,19 @@ def write_graph_dgl(graph_file, graph_obj): ...@@ -326,14 +336,19 @@ def write_graph_dgl(graph_file, graph_obj):
def write_dgl_objects(graph_obj, node_features, edge_features, output_dir, part_id): def write_dgl_objects(graph_obj, node_features, edge_features, output_dir, part_id):
""" """
Wrapper function to create dgl objects for graph, node-features and edge-features Wrapper function to create dgl objects for graph, node-features and edge-features
Parameters:
-----------
graph_obj : dgl object graph_obj : dgl object
graph dgl object as created in convert_partition.py file graph dgl object as created in convert_partition.py file
node_features : dgl object node_features : dgl object
Tensor data for node features Tensor data for node features
edge_features : dgl object edge_features : dgl object
Tensor data for edge features Tensor data for edge features
output_dir : string
location where the output files will be located
part_id : int
integer indicating the partition-id
""" """
part_dir = output_dir + '/part' + str(part_id) part_dir = output_dir + '/part' + str(part_id)
...@@ -345,3 +360,47 @@ def write_dgl_objects(graph_obj, node_features, edge_features, output_dir, part_ ...@@ -345,3 +360,47 @@ def write_dgl_objects(graph_obj, node_features, edge_features, output_dir, part_
if (edge_features != None): if (edge_features != None):
write_edge_features(edge_features, os.path.join(part_dir, "edge_feat.dgl")) write_edge_features(edge_features, os.path.join(part_dir, "edge_feat.dgl"))
def get_idranges(names, counts):
"""
Utility function to compute typd_id/global_id ranges for both nodes and edges.
Parameters:
-----------
names : list of strings
list of node/edge types as strings
counts : list of lists
each list contains no. of nodes/edges in a given chunk
Returns:
--------
dictionary
dictionary where the keys are node-/edge-type names and values are
list of tuples where each tuple indicates the range of values for
corresponding type-ids.
dictionary
dictionary where the keys are node-/edge-type names and value is a tuple.
This tuple indicates the global-ids for the associated node-/edge-type.
"""
gnid_start = 0
gnid_end = gnid_start
tid_dict = {}
gid_dict = {}
for idx, typename in enumerate(names):
type_counts = counts[idx]
tid_start = np.cumsum([0] + type_counts[:-1])
tid_end = np.cumsum(type_counts)
tid_ranges = list(zip(tid_start, tid_end))
type_start = tid_ranges[0][0]
type_end = tid_ranges[-1][1]
gnid_end += tid_ranges[-1][1]
tid_dict[typename] = tid_ranges
gid_dict[typename] = np.array([gnid_start, gnid_end]).reshape([1,2])
gnid_start = gnid_end
return tid_dict, gid_dict
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment