"tests/python/vscode:/vscode.git/clone" did not exist on "5ffd2a022c3bc2b37a47d460d5f32d034f61d81e"
Unverified Commit 2e8ae9f9 authored by Mufei Li's avatar Mufei Li Committed by GitHub
Browse files

[Dist][CI] Unit test for the new distributed partitioning pipeline (#4394)



* chunked graph data format

* Update

* Update

* Update task_distributed_test.sh

* Update

* Update

* Revert "Update"

This reverts commit 03c461870f19375fb03125b061fc853ab555577f.

* Update

* Update

* ssh-keygen

* CI

* install openssh

* openssh

* Update

* CI

* Update

* Update
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-53-142.us-west-2.compute.internal>
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-16-87.us-west-2.compute.internal>
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-20-21.us-west-2.compute.internal>
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-9-26.ap-northeast-1.compute.internal>
parent b039ea99
......@@ -30,7 +30,7 @@ export CUDA_VISIBLE_DEVICES=-1
conda activate ${DGLBACKEND}-ci
python3 -m pip install pytest psutil pyyaml pydantic pandas rdflib ogb filelock || fail "pip install"
python3 -m pip install pytest psutil pyyaml pydantic pandas rdflib ogb filelock pyarrow || fail "pip install"
export PYTHONUNBUFFERED=1
export OMP_NUM_THREADS=1
......
import argparse
import dgl
import json
import numpy as np
import os
import sys
import tempfile
import torch
from dgl.data.utils import load_tensors, load_graphs
from chunk_graph import chunk_graph
def test_part_pipeline():
# Step0: prepare chunked graph data format
# A synthetic mini MAG240
num_institutions = 20
num_authors = 100
num_papers = 600
def rand_edges(num_src, num_dst, num_edges):
eids = np.random.choice(num_src * num_dst, num_edges, replace=False)
src = torch.from_numpy(eids // num_dst)
dst = torch.from_numpy(eids % num_dst)
return src, dst
num_cite_edges = 2000
num_write_edges = 1000
num_affiliate_edges = 200
# Structure
data_dict = {
('paper', 'cites', 'paper'): rand_edges(num_papers, num_papers, num_cite_edges),
('author', 'writes', 'paper'): rand_edges(num_authors, num_papers, num_write_edges),
('author', 'affiliated_with', 'institution'): rand_edges(num_authors, num_institutions, num_affiliate_edges)
}
src, dst = data_dict[('author', 'writes', 'paper')]
data_dict[('paper', 'rev_writes', 'author')] = (dst, src)
g = dgl.heterograph(data_dict)
# paper feat, label, year
num_paper_feats = 3
paper_feat = np.random.randn(num_papers, num_paper_feats)
num_classes = 4
paper_label = np.random.choice(num_classes, num_papers)
paper_year = np.random.choice(2022, num_papers)
# edge features
cite_count = np.random.choice(10, num_cite_edges)
write_year = np.random.choice(2022, num_write_edges)
# Save features
with tempfile.TemporaryDirectory() as root_dir:
print('root_dir', root_dir)
input_dir = os.path.join(root_dir, 'data_test')
os.makedirs(input_dir)
for sub_d in ['paper', 'cites', 'writes']:
os.makedirs(os.path.join(input_dir, sub_d))
paper_feat_path = os.path.join(input_dir, 'paper/feat.npy')
with open(paper_feat_path, 'wb') as f:
np.save(f, paper_feat)
paper_label_path = os.path.join(input_dir, 'paper/label.npy')
with open(paper_label_path, 'wb') as f:
np.save(f, paper_label)
paper_year_path = os.path.join(input_dir, 'paper/year.npy')
with open(paper_year_path, 'wb') as f:
np.save(f, paper_year)
cite_count_path = os.path.join(input_dir, 'cites/count.npy')
with open(cite_count_path, 'wb') as f:
np.save(f, cite_count)
write_year_path = os.path.join(input_dir, 'writes/year.npy')
with open(write_year_path, 'wb') as f:
np.save(f, write_year)
output_dir = os.path.join(root_dir, 'chunked-data')
num_chunks = 2
chunk_graph(
g,
'mag240m',
{'paper':
{
'feat': paper_feat_path,
'label': paper_label_path,
'year': paper_year_path
}
},
{
'cites': {'count': cite_count_path},
'writes': {'year': write_year_path},
# you can put the same data file if they indeed share the features.
'rev_writes': {'year': write_year_path}
},
num_chunks=num_chunks,
output_path=output_dir)
# check metadata.json
json_file = os.path.join(output_dir, 'metadata.json')
assert os.path.isfile(json_file)
with open(json_file, 'rb') as f:
meta_data = json.load(f)
assert meta_data['graph_name'] == 'mag240m'
assert len(meta_data['num_nodes_per_chunk'][0]) == num_chunks
# check edge_index
output_edge_index_dir = os.path.join(output_dir, 'edge_index')
for utype, etype, vtype in data_dict.keys():
fname = ':'.join([utype, etype, vtype])
for i in range(num_chunks):
chunk_f_name = os.path.join(output_edge_index_dir, fname + str(i) + '.txt')
assert os.path.isfile(chunk_f_name)
with open(chunk_f_name, 'r') as f:
header = f.readline()
num1, num2 = header.rstrip().split(' ')
assert isinstance(int(num1), int)
assert isinstance(int(num2), int)
# check node_data
output_node_data_dir = os.path.join(output_dir, 'node_data', 'paper')
for feat in ['feat', 'label', 'year']:
for i in range(num_chunks):
chunk_f_name = '{}-{}.npy'.format(feat, i)
chunk_f_name = os.path.join(output_node_data_dir, chunk_f_name)
assert os.path.isfile(chunk_f_name)
feat_array = np.load(chunk_f_name)
assert feat_array.shape[0] == num_papers // num_chunks
# check edge_data
num_edges = {
'paper:cites:paper': num_cite_edges,
'author:writes:paper': num_write_edges,
'paper:rev_writes:author': num_write_edges
}
output_edge_data_dir = os.path.join(output_dir, 'edge_data')
for etype, feat in [
['paper:cites:paper', 'count'],
['author:writes:paper', 'year'],
['paper:rev_writes:author', 'year']
]:
output_edge_sub_dir = os.path.join(output_edge_data_dir, etype)
for i in range(num_chunks):
chunk_f_name = '{}-{}.npy'.format(feat, i)
chunk_f_name = os.path.join(output_edge_sub_dir, chunk_f_name)
assert os.path.isfile(chunk_f_name)
feat_array = np.load(chunk_f_name)
assert feat_array.shape[0] == num_edges[etype] // num_chunks
# Step1: graph partition
in_dir = os.path.join(root_dir, 'chunked-data')
output_dir = os.path.join(root_dir, '2parts')
os.system('python tools/partition_algo/random_partition.py '\
'--metadata {}/metadata.json --output_path {} --num_partitions {}'.format(
in_dir, output_dir, num_chunks))
for ntype in ['author', 'institution', 'paper']:
fname = os.path.join(output_dir, '{}.txt'.format(ntype))
with open(fname, 'r') as f:
header = f.readline().rstrip()
assert isinstance(int(header), int)
# Step2: data dispatch
partition_dir = os.path.join(root_dir, '2parts')
out_dir = os.path.join(root_dir, 'partitioned')
ip_config = os.path.join(root_dir, 'ip_config.txt')
with open(ip_config, 'w') as f:
f.write('127.0.0.1\n')
f.write('127.0.0.2\n')
os.system('python tools/dispatch_data.py '\
'--in-dir {} --partitions-dir {} --out-dir {} --ip-config {}'.format(
in_dir, partition_dir, out_dir, ip_config))
# check metadata.json
meta_fname = os.path.join(out_dir, 'metadata.json')
with open(meta_fname, 'rb') as f:
meta_data = json.load(f)
all_etypes = ['affiliated_with', 'writes', 'cites', 'rev_writes']
for etype in all_etypes:
assert len(meta_data['edge_map'][etype]) == num_chunks
assert meta_data['etypes'].keys() == set(all_etypes)
assert meta_data['graph_name'] == 'mag240m'
all_ntypes = ['author', 'institution', 'paper']
for ntype in all_ntypes:
assert len(meta_data['node_map'][ntype]) == num_chunks
assert meta_data['ntypes'].keys() == set(all_ntypes)
assert meta_data['num_edges'] == 4200
assert meta_data['num_nodes'] == 720
assert meta_data['num_parts'] == num_chunks
for i in range(num_chunks):
sub_dir = 'part-' + str(i)
assert meta_data[sub_dir]['node_feats'] == 'part{}/node_feat.dgl'.format(i)
assert meta_data[sub_dir]['edge_feats'] == 'part{}/edge_feat.dgl'.format(i)
assert meta_data[sub_dir]['part_graph'] == 'part{}/graph.dgl'.format(i)
# check data
sub_dir = os.path.join(out_dir, 'part' + str(i))
# graph.dgl
fname = os.path.join(sub_dir, 'graph.dgl')
assert os.path.isfile(fname)
g_list, data_dict = load_graphs(fname)
g = g_list[0]
assert isinstance(g, dgl.DGLGraph)
# node_feat.dgl
fname = os.path.join(sub_dir, 'node_feat.dgl')
assert os.path.isfile(fname)
tensor_dict = load_tensors(fname)
all_tensors = ['paper/feat', 'paper/label', 'paper/year']
assert tensor_dict.keys() == set(all_tensors)
for key in all_tensors:
assert isinstance(tensor_dict[key], torch.Tensor)
# edge_feat.dgl
fname = os.path.join(sub_dir, 'edge_feat.dgl')
assert os.path.isfile(fname)
tensor_dict = load_tensors(fname)
if __name__ == '__main__':
test_part_pipeline()
......@@ -111,6 +111,7 @@ def _chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
reader_fmt_meta = writer_fmt_meta = {"name": "numpy"}
arr = array_readwriter.get_array_parser(**reader_fmt_meta).read(path)
edata_key_meta['format'] = writer_fmt_meta
etype = tuple(etypestr.split(':'))
edata_key_meta['data'] = chunk_numpy_array(
arr, writer_fmt_meta, num_edges_per_chunk_dict[etype],
key + '-%d.npy')
......
......@@ -21,7 +21,7 @@ LARG_PROCS_MACHINE = "num_proc_per_machine"
LARG_IPCONF = "ip_config"
LARG_MASTER_PORT = "master_port"
def get_launch_cmd(args) -> str:
def get_launch_cmd(args) -> str:
cmd = sys.executable + " " + os.path.join(INSTALL_DIR, LAUNCH_SCRIPT)
cmd = f"{cmd} --{LARG_PROCS_MACHINE} 1 "
cmd = f"{cmd} --{LARG_IPCONF} {args.ip_config} "
......@@ -33,9 +33,9 @@ def get_launch_cmd(args) -> str:
def submit_jobs(args) -> str:
wrapper_command = os.path.join(INSTALL_DIR, LAUNCH_SCRIPT)
#read the json file and get the remaining argument here.
schema_path = os.path.join(args.in_dir, "metadata.json")
with open(schema_path) as schema:
#read the json file and get the remaining argument here.
schema_path = "metadata.json"
with open(os.path.join(args.in_dir, schema_path)) as schema:
schema_map = json.load(schema)
num_parts = len(schema_map["num_nodes_per_chunk"][0])
......@@ -43,13 +43,13 @@ def submit_jobs(args) -> str:
argslist = ""
argslist += "--world-size {} ".format(num_parts)
argslist += "--partitions-dir {} ".format(args.partitions_dir)
argslist += "--input-dir {} ".format(args.in_dir)
argslist += "--partitions-dir {} ".format(os.path.abspath(args.partitions_dir))
argslist += "--input-dir {} ".format(os.path.abspath(args.in_dir))
argslist += "--graph-name {} ".format(graph_name)
argslist += "--schema {} ".format(schema_path)
argslist += "--num-parts {} ".format(num_parts)
argslist += "--output {} ".format(args.out_dir)
argslist += "--output {} ".format(os.path.abspath(args.out_dir))
# (BarclayII) Is it safe to assume all the workers have the Python executable at the same path?
pipeline_cmd = os.path.join(INSTALL_DIR, PIPELINE_SCRIPT)
udf_cmd = f"{args.python_path} {pipeline_cmd} {argslist}"
......
......@@ -28,9 +28,9 @@ def gen_node_data(rank, world_size, id_lookup, ntid_ntype_map, schema_map):
'''
For this data processing pipeline, reading node files is not needed. All the needed information about
the nodes can be found in the metadata json file. This function generates the nodes owned by a given
process, using metis partitions.
process, using metis partitions.
Parameters:
Parameters:
-----------
rank : int
rank of the process
......@@ -41,25 +41,25 @@ def gen_node_data(rank, world_size, id_lookup, ntid_ntype_map, schema_map):
shuffle-global-nids
ntid_ntype_map :
a dictionary where keys are node_type ids(integers) and values are node_type names(strings).
schema_map:
schema_map:
dictionary formed by reading the input metadata json file for the input dataset.
Please note that, it is assumed that for the input graph files, the nodes of a particular node-type are
split into `p` files (because of `p` partitions to be generated). On a similar node, edges of a particular
edge-type are split into `p` files as well.
edge-type are split into `p` files as well.
#assuming m nodetypes present in the input graph
"num_nodes_per_chunk" : [
[a0, a1, a2, ... a<p-1>],
[b0, b1, b2, ... b<p-1>],
[a0, a1, a2, ... a<p-1>],
[b0, b1, b2, ... b<p-1>],
...
[m0, m1, m2, ... m<p-1>]
]
Here, each sub-list, corresponding a nodetype in the input graph, has `p` elements. For instance [a0, a1, ... a<p-1>]
Here, each sub-list, corresponding a nodetype in the input graph, has `p` elements. For instance [a0, a1, ... a<p-1>]
where each element represents the number of nodes which are to be processed by a process during distributed partitioning.
In addition to the above key-value pair for the nodes in the graph, the node-features are captured in the
"node_data" key-value pair. In this dictionary the keys will be nodetype names and value will be a dictionary which
"node_data" key-value pair. In this dictionary the keys will be nodetype names and value will be a dictionary which
is used to capture all the features present for that particular node-type. This is shown in the following example:
"node_data" : {
......@@ -78,26 +78,26 @@ def gen_node_data(rank, world_size, id_lookup, ntid_ntype_map, schema_map):
}
}
}
In the above textual description we have a node-type, which is paper, and it has 3 features namely feat, label and year.
Each feature has `p` files whose location in the filesystem is the list for the key "data" and "foramt" is used to
describe storage format.
In the above textual description we have a node-type, which is paper, and it has 3 features namely feat, label and year.
Each feature has `p` files whose location in the filesystem is the list for the key "data" and "foramt" is used to
describe storage format.
Returns:
--------
dictionary :
dictionary where keys are column names and values are numpy arrays, these arrays are generated by
dictionary :
dictionary where keys are column names and values are numpy arrays, these arrays are generated by
using information present in the metadata json file
'''
local_node_data = { constants.GLOBAL_NID : [],
constants.NTYPE_ID : [],
local_node_data = { constants.GLOBAL_NID : [],
constants.NTYPE_ID : [],
constants.GLOBAL_TYPE_NID : []
}
type_nid_dict, global_nid_dict = get_idranges(schema_map[constants.STR_NODE_TYPE],
type_nid_dict, global_nid_dict = get_idranges(schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_CHUNK])
for ntype_id, ntype_name in ntid_ntype_map.items():
for ntype_id, ntype_name in ntid_ntype_map.items():
type_start, type_end = type_nid_dict[ntype_name][0][0], type_nid_dict[ntype_name][-1][1]
gnid_start, gnid_end = global_nid_dict[ntype_name][0, 0], global_nid_dict[ntype_name][0, 1]
......@@ -136,7 +136,7 @@ def exchange_edge_data(rank, world_size, edge_data):
Returns:
--------
dictionary :
dictionary :
the input argument, edge_data, is updated with the edge data received by other processes
in the world.
"""
......@@ -156,7 +156,7 @@ def exchange_edge_data(rank, world_size, edge_data):
else:
input_list.append(torch.from_numpy(filt_data))
end = timer()
dist.barrier ()
output_list = alltoallv_cpu(rank, world_size, input_list)
end = timer()
......@@ -175,32 +175,32 @@ def exchange_edge_data(rank, world_size, edge_data):
def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map, id_lookup, node_features):
"""
This function is used to shuffle node features so that each process will receive
all the node features whose corresponding nodes are owned by the same process.
all the node features whose corresponding nodes are owned by the same process.
The mapping procedure to identify the owner process is not straight forward. The
following steps are used to identify the owner processes for the locally read node-
features.
features.
a. Compute the global_nids for the locally read node features. Here metadata json file
is used to identify the corresponding global_nids. Please note that initial graph input
nodes.txt files are sorted based on node_types.
b. Using global_nids and metis partitions owner processes can be easily identified.
c. Now each process sends the global_nids for which shuffle_global_nids are needed to be
retrieved.
d. After receiving the corresponding shuffle_global_nids these ids are added to the
nodes.txt files are sorted based on node_types.
b. Using global_nids and metis partitions owner processes can be easily identified.
c. Now each process sends the global_nids for which shuffle_global_nids are needed to be
retrieved.
d. After receiving the corresponding shuffle_global_nids these ids are added to the
node_data and edge_data dictionaries
This pipeline assumes all the input data in numpy format, except node/edge features which
are maintained as tensors throughout the various stages of the pipeline execution.
Parameters:
Parameters:
-----------
rank : int
rank of the current process
world_size : int
total no. of participating processes.
total no. of participating processes.
node_feature_tids : dictionary
dictionary with keys as node-type names and value is a dictionary. This dictionary
contains information about node-features associated with a given node-type and value
is a list. This list contains a of indexes, like [starting-idx, ending-idx) which
is a list. This list contains a of indexes, like [starting-idx, ending-idx) which
can be used to index into the node feature tensors read from corresponding input files.
ntypes_gnid_map : dictionary
mapping between node type names and global_nids which belong to the keys in this dictionary
......@@ -213,11 +213,11 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map,
Returns:
--------
dictionary :
node features are returned as a dictionary where keys are node type names and node feature names
dictionary :
node features are returned as a dictionary where keys are node type names and node feature names
and values are tensors
dictionary :
a dictionary of global_nids for the nodes whose node features are received during the data shuffle
dictionary :
a dictionary of global_nids for the nodes whose node features are received during the data shuffle
process
"""
start = timer()
......@@ -226,7 +226,7 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map,
#To iterate over the node_types and associated node_features
for ntype_name, ntype_info in node_feature_tids.items():
#To iterate over the node_features, of a given node_type
#To iterate over the node_features, of a given node_type
#ntype_info is a list of 3 elements
#[node-feature-name, starting-idx, ending-idx]
#node-feature-name is the name given to the node-feature, read from the input metadata file
......@@ -235,7 +235,7 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map,
#as specified with this range. So no. of rows = ending-idx - starting-idx.
for feat_info in ntype_info:
#determine the owner process for these node features.
#determine the owner process for these node features.
node_feats_per_rank = []
global_nid_per_rank = []
feat_name = feat_info[0]
......@@ -290,14 +290,14 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map,
def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_data,
id_lookup, ntypes_ntypeid_map, ntypes_gnid_range_map, ntid_ntype_map, schema_map):
"""
Wrapper function which is used to shuffle graph data on all the processes.
Wrapper function which is used to shuffle graph data on all the processes.
Parameters:
Parameters:
-----------
rank : int
rank of the current process
world_size : int
total no. of participating processes.
total no. of participating processes.
node_feautres: dicitonary
dictionry where node_features are stored and this information is read from the appropriate
node features file which belongs to the current process
......@@ -323,16 +323,16 @@ def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_da
Returns:
--------
dictionary :
dictionary :
the input argument, node_data dictionary, is updated with the node data received from other processes
in the world. The node data is received by each rank in the process of data shuffling.
dictionary :
node features dictionary which has node features for the nodes which are owned by the current
dictionary :
node features dictionary which has node features for the nodes which are owned by the current
process
dictionary :
list of global_nids for the nodes whose node features are received when node features shuffling was
dictionary :
list of global_nids for the nodes whose node features are received when node features shuffling was
performed in the `exchange_node_features` function call
dictionary :
dictionary :
the input argument, edge_data dictionary, is updated with the edge data received from other processes
in the world. The edge data is received by each rank in the process of data shuffling.
"""
......@@ -347,10 +347,10 @@ def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_da
def read_dataset(rank, world_size, id_lookup, params, schema_map):
"""
This function gets the dataset and performs post-processing on the data which is read from files.
Additional information(columns) are added to nodes metadata like owner_process, global_nid which
Additional information(columns) are added to nodes metadata like owner_process, global_nid which
are later used in processing this information. For edge data, which is now a dictionary, we add new columns
like global_edge_id and owner_process. Augmenting these data structure helps in processing these data structures
when data shuffling is performed.
when data shuffling is performed.
Parameters:
-----------
......@@ -361,27 +361,27 @@ def read_dataset(rank, world_size, id_lookup, params, schema_map):
id_lookup : instance of class DistLookupService
Distributed lookup service used to map global-nids to respective partition-ids and
shuffle-global-nids
params : argparser object
params : argparser object
argument parser object to access command line arguments
schema_map : dictionary
dictionary created by reading the input graph metadata json file
Returns :
Returns :
---------
dictionary
in which keys are node-type names and values are are tuples representing the range of ids
for nodes to be read by the current process
dictionary
node features which is a dictionary where keys are feature names and values are feature
data as multi-dimensional tensors
data as multi-dimensional tensors
dictionary
in which keys are node-type names and values are triplets. Each triplet has node-feature name
and the starting and ending type ids of the node-feature data read from the corresponding
node feature data file read by current process. Each node type may have several features and
hence each key may have several triplets.
dictionary
edge data information is read from edges.txt and additional columns are added such as
owner process for each edge.
edge data information is read from edges.txt and additional columns are added such as
owner process for each edge.
dictionary
edge features which is also a dictionary, similar to node features dictionary
"""
......@@ -398,50 +398,50 @@ def read_dataset(rank, world_size, id_lookup, params, schema_map):
def gen_dist_partitions(rank, world_size, params):
"""
Function which will be executed by all Gloo processes to begin execution of the pipeline.
This function expects the input dataset is split across multiple file format.
Function which will be executed by all Gloo processes to begin execution of the pipeline.
This function expects the input dataset is split across multiple file format.
Input dataset and its file structure is described in metadata json file which is also part of the
Input dataset and its file structure is described in metadata json file which is also part of the
input dataset. On a high-level, this metadata json file contains information about the following items
a) Nodes metadata, It is assumed that nodes which belong to each node-type are split into p files
(wherer `p` is no. of partitions).
b) Similarly edge metadata contains information about edges which are split into p-files.
(wherer `p` is no. of partitions).
b) Similarly edge metadata contains information about edges which are split into p-files.
c) Node and Edge features, it is also assumed that each node (and edge) feature, if present, is also
split into `p` files.
For example, a sample metadata json file might be as follows: :
(In this toy example, we assume that we have "m" node-types, "k" edge types, and for node_type = ntype0-name
we have two features namely feat0-name and feat1-name. Please note that the node-features are also split into
we have two features namely feat0-name and feat1-name. Please note that the node-features are also split into
`p` files. This will help in load-balancing during data-shuffling phase).
Terminology used to identify any particular "id" assigned to nodes, edges or node features. Prefix "global" is
used to indicate that this information is either read from the input dataset or autogenerated based on the information
read from input dataset files. Prefix "type" is used to indicate a unique id assigned to either nodes or edges.
For instance, type_node_id means that a unique id, with a given node type, assigned to a node. And prefix "shuffle"
will be used to indicate a unique id, across entire graph, assigned to either a node or an edge. For instance,
SHUFFLE_GLOBAL_NID means a unique id which is assigned to a node after the data shuffle is completed.
read from input dataset files. Prefix "type" is used to indicate a unique id assigned to either nodes or edges.
For instance, type_node_id means that a unique id, with a given node type, assigned to a node. And prefix "shuffle"
will be used to indicate a unique id, across entire graph, assigned to either a node or an edge. For instance,
SHUFFLE_GLOBAL_NID means a unique id which is assigned to a node after the data shuffle is completed.
Some high-level notes on the structure of the metadata json file.
1. path(s) mentioned in the entries for nodes, edges and node-features files can be either absolute or relative.
Some high-level notes on the structure of the metadata json file.
1. path(s) mentioned in the entries for nodes, edges and node-features files can be either absolute or relative.
if these paths are relative, then it is assumed that they are relative to the folder from which the execution is
launched.
2. The id_startx and id_endx represent the type_node_id and type_edge_id respectively for nodes and edge data. This
means that these ids should match the no. of nodes/edges read from any given file. Since these are type_ids for
the nodes and edges in any given file, their global_ids can be easily computed as well.
launched.
2. The id_startx and id_endx represent the type_node_id and type_edge_id respectively for nodes and edge data. This
means that these ids should match the no. of nodes/edges read from any given file. Since these are type_ids for
the nodes and edges in any given file, their global_ids can be easily computed as well.
{
"graph_name" : xyz,
"node_type" : ["ntype0-name", "ntype1-name", ....], #m node types
"num_nodes_per_chunk" : [
[a0, a1, ...a<p-1>], #p partitions
[b0, b1, ... b<p-1>],
[b0, b1, ... b<p-1>],
....
[c0, c1, ..., c<p-1>] #no, of node types
],
"edge_type" : ["src_ntype:edge_type:dst_ntype", ....], #k edge types
"num_edges_per_chunk" : [
[a0, a1, ...a<p-1>], #p partitions
[b0, b1, ... b<p-1>],
[b0, b1, ... b<p-1>],
....
[c0, c1, ..., c<p-1>] #no, of edge types
],
......@@ -453,21 +453,21 @@ def gen_dist_partitions(rank, world_size, params):
["<path>/feat-0.npy", 0, id_end0],
["<path>/feat-1.npy", id_start1, id_end1],
....
["<path>/feat-<p-1>.npy", id_start<p-1>, id_end<p-1>]
["<path>/feat-<p-1>.npy", id_start<p-1>, id_end<p-1>]
]
},
"feat1-name" : {
"format" : {"name": "numpy"},
"format" : {"name": "numpy"},
"data" : [ #list of lists
["<path>/feat-0.npy", 0, id_end0],
["<path>/feat-1.npy", id_start1, id_end1],
....
["<path>/feat-<p-1>.npy", id_start<p-1>, id_end<p-1>]
["<path>/feat-<p-1>.npy", id_start<p-1>, id_end<p-1>]
]
}
}
},
"edges": { #k edge types
"edges": { #k edge types
"src_ntype:etype0-name:dst_ntype" : {
"format": {"name" : "csv", "delimiter" : " "},
"data" : [
......@@ -476,8 +476,8 @@ def gen_dist_partitions(rank, world_size, params):
...,
["<path>/etype0-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
]
},
...,
},
...,
"src_ntype:etype<k-1>-name:dst_ntype" : {
"format": {"name" : "csv", "delimiter" : " "},
"data" : [
......@@ -486,30 +486,30 @@ def gen_dist_partitions(rank, world_size, params):
...,
["<path>/etype<k-1>-name-<p-1>.txt", id_start<p-1>, id_end<p-1>]
]
},
},
},
},
}
The function performs the following steps:
The function performs the following steps:
1. Reads the metis partitions to identify the owner process of all the nodes in the entire graph.
2. Reads the input data set, each partitipating process will map to a single file for the edges,
2. Reads the input data set, each partitipating process will map to a single file for the edges,
node-features and edge-features for each node-type and edge-types respectively. Using nodes metadata
information, nodes which are owned by a given process are generated to optimize communication to some
extent.
3. Now each process shuffles the data by identifying the respective owner processes using metis
partitions.
a. To identify owner processes for nodes, metis partitions will be used.
b. For edges, the owner process of the destination node will be the owner of the edge as well.
c. For node and edge features, identifying the owner process is a little bit involved.
partitions.
a. To identify owner processes for nodes, metis partitions will be used.
b. For edges, the owner process of the destination node will be the owner of the edge as well.
c. For node and edge features, identifying the owner process is a little bit involved.
For this purpose, graph metadata json file is used to first map the locally read node features
to their global_nids. Now owner process is identified using metis partitions for these global_nids
to retrieve shuffle_global_nids. A similar process is used for edge_features as well.
d. After all the data shuffling is done, the order of node-features may be different when compared to
their global_type_nids. Node- and edge-data are ordered by node-type and edge-type respectively.
And now node features and edge features are re-ordered to match the order of their node- and edge-types.
4. Last step is to create the DGL objects with the data present on each of the processes.
a. DGL objects for nodes, edges, node- and edge- features.
b. Metadata is gathered from each process to create the global metadata json file, by process rank = 0.
to retrieve shuffle_global_nids. A similar process is used for edge_features as well.
d. After all the data shuffling is done, the order of node-features may be different when compared to
their global_type_nids. Node- and edge-data are ordered by node-type and edge-type respectively.
And now node features and edge features are re-ordered to match the order of their node- and edge-types.
4. Last step is to create the DGL objects with the data present on each of the processes.
a. DGL objects for nodes, edges, node- and edge- features.
b. Metadata is gathered from each process to create the global metadata json file, by process rank = 0.
Parameters:
----------
......@@ -544,7 +544,7 @@ def gen_dist_partitions(rank, world_size, params):
read_dataset(rank, world_size, id_lookup, params, schema_map)
logging.info(f'[Rank: {rank}] Done augmenting file input data with auxilary columns')
#send out node and edge data --- and appropriate features.
#send out node and edge data --- and appropriate features.
#this function will also stitch the data recvd from other processes
#and return the aggregated data
ntypes_gnid_range_map = get_gnid_range_map(node_tids)
......@@ -564,11 +564,11 @@ def gen_dist_partitions(rank, world_size, params):
assign_shuffle_global_nids_nodes(rank, world_size, node_data)
logging.info(f'[Rank: {rank}] Done assigning global-ids to nodes...')
#shuffle node feature according to the node order on each rank.
#shuffle node feature according to the node order on each rank.
for ntype_name in ntypes:
featnames = get_ntype_featnames(ntype_name, schema_map)
for featname in featnames:
#if a feature name exists for a node-type, then it should also have
#if a feature name exists for a node-type, then it should also have
#feature data as well. Hence using the assert statement.
assert(ntype_name+'/'+featname in rcvd_global_nids)
global_nids = rcvd_global_nids[ntype_name+'/'+featname]
......@@ -599,7 +599,7 @@ def gen_dist_partitions(rank, world_size, params):
schema_map, rank, node_data, edge_data, num_nodes, num_edges)
write_dgl_objects(graph_obj, rcvd_node_features, edge_features, params.output, rank)
#get the meta-data
#get the meta-data
json_metadata = create_metadata_json(params.graph_name, len(node_data[constants.NTYPE_ID]), len(edge_data[constants.ETYPE_ID]), \
rank, world_size, ntypes_map_val, \
etypes_map_val, ntypes_ntypeid_map, etypes_map, params.output)
......
......@@ -35,11 +35,11 @@ def random_partition(metadata, num_parts, output_path):
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'metadata', type=str, help='input metadata file of the chunked graph format')
'--metadata', type=str, help='input metadata file of the chunked graph format')
parser.add_argument(
'output_path', type=str, help='output directory')
'--output_path', type=str, help='output directory')
parser.add_argument(
'num_partitions', type=int, help='number of partitions')
'--num_partitions', type=int, help='number of partitions')
logging.basicConfig(level='INFO')
args = parser.parse_args()
with open(args.metadata) as f:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment