Unverified Commit 6c1500d4 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[Dist] save original node/edge IDs into separate files (#4649)

* [Dist] save original node/edge IDs into separate files

* separate nids and eids
parent 0b9df9d7
......@@ -46,6 +46,7 @@ def test_part_pipeline():
num_classes = 4
paper_label = np.random.choice(num_classes, num_papers)
paper_year = np.random.choice(2022, num_papers)
paper_orig_ids = np.arange(0, num_papers)
# edge features
cite_count = np.random.choice(10, num_cite_edges)
......@@ -71,6 +72,10 @@ def test_part_pipeline():
with open(paper_year_path, 'wb') as f:
np.save(f, paper_year)
paper_orig_ids_path = os.path.join(input_dir, 'paper/orig_ids.npy')
with open(paper_orig_ids_path, 'wb') as f:
np.save(f, paper_orig_ids)
cite_count_path = os.path.join(input_dir, 'cites/count.npy')
with open(cite_count_path, 'wb') as f:
np.save(f, cite_count)
......@@ -88,7 +93,8 @@ def test_part_pipeline():
{
'feat': paper_feat_path,
'label': paper_label_path,
'year': paper_year_path
'year': paper_year_path,
'orig_ids': paper_orig_ids_path
}
},
{
......@@ -123,7 +129,7 @@ def test_part_pipeline():
# check node_data
output_node_data_dir = os.path.join(output_dir, 'node_data', 'paper')
for feat in ['feat', 'label', 'year']:
for feat in ['feat', 'label', 'year', 'orig_ids']:
for i in range(num_chunks):
chunk_f_name = '{}-{}.npy'.format(feat, i)
chunk_f_name = os.path.join(output_node_data_dir, chunk_f_name)
......@@ -154,7 +160,7 @@ def test_part_pipeline():
# Step1: graph partition
in_dir = os.path.join(root_dir, 'chunked-data')
output_dir = os.path.join(root_dir, '2parts')
os.system('python tools/partition_algo/random_partition.py '\
os.system('python3 tools/partition_algo/random_partition.py '\
'--in_dir {} --out_dir {} --num_partitions {}'.format(
in_dir, output_dir, num_chunks))
for ntype in ['author', 'institution', 'paper']:
......@@ -171,13 +177,15 @@ def test_part_pipeline():
f.write('127.0.0.1\n')
f.write('127.0.0.2\n')
cmd = 'python tools/dispatch_data.py'
cmd = 'python3 tools/dispatch_data.py'
cmd += f' --in-dir {in_dir}'
cmd += f' --partitions-dir {partition_dir}'
cmd += f' --out-dir {out_dir}'
cmd += f' --ip-config {ip_config}'
cmd += ' --ssh-port 22'
cmd += ' --process-group-timeout 60'
cmd += ' --save-orig-nids'
cmd += ' --save-orig-eids'
os.system(cmd)
# check metadata.json
......@@ -212,22 +220,36 @@ def test_part_pipeline():
fname = os.path.join(sub_dir, 'graph.dgl')
assert os.path.isfile(fname)
g_list, data_dict = load_graphs(fname)
g = g_list[0]
assert isinstance(g, dgl.DGLGraph)
part_g = g_list[0]
assert isinstance(part_g, dgl.DGLGraph)
# node_feat.dgl
fname = os.path.join(sub_dir, 'node_feat.dgl')
assert os.path.isfile(fname)
tensor_dict = load_tensors(fname)
all_tensors = ['paper/feat', 'paper/label', 'paper/year']
all_tensors = ['paper/feat', 'paper/label', 'paper/year', 'paper/orig_ids']
assert tensor_dict.keys() == set(all_tensors)
for key in all_tensors:
assert isinstance(tensor_dict[key], torch.Tensor)
ndata_paper_orig_ids = tensor_dict['paper/orig_ids']
# edge_feat.dgl
fname = os.path.join(sub_dir, 'edge_feat.dgl')
assert os.path.isfile(fname)
tensor_dict = load_tensors(fname)
# orig_nids.dgl
fname = os.path.join(sub_dir, 'orig_nids.dgl')
assert os.path.isfile(fname)
orig_nids = load_tensors(fname)
assert len(orig_nids.keys()) == 3
assert torch.equal(ndata_paper_orig_ids, orig_nids['paper'])
# orig_eids.dgl
fname = os.path.join(sub_dir, 'orig_eids.dgl')
assert os.path.isfile(fname)
orig_eids = load_tensors(fname)
assert len(orig_eids.keys()) == 4
if __name__ == '__main__':
test_part_pipeline()
......@@ -53,6 +53,8 @@ def submit_jobs(args) -> str:
argslist += "--output {} ".format(os.path.abspath(args.out_dir))
argslist += "--process-group-timeout {} ".format(args.process_group_timeout)
argslist += "--log-level {} ".format(args.log_level)
argslist += "--save-orig-nids " if args.save_orig_nids else ""
argslist += "--save-orig-eids " if args.save_orig_eids else ""
# (BarclayII) Is it safe to assume all the workers have the Python executable at the same path?
pipeline_cmd = os.path.join(INSTALL_DIR, PIPELINE_SCRIPT)
......@@ -77,6 +79,10 @@ def main():
parser.add_argument('--ssh-port', type=int, default=22, help='SSH Port.')
parser.add_argument('--process-group-timeout', type=int, default=1800,
help='timeout[seconds] for operations executed against the process group')
parser.add_argument('--save-orig-nids', action='store_true',
help='Save original node IDs into files')
parser.add_argument('--save-orig-eids', action='store_true',
help='Save original edge IDs into files')
args, udf_command = parser.parse_known_args()
......
......@@ -16,9 +16,8 @@ import constants
from utils import get_idranges, memory_snapshot, read_json
def create_dgl_object(graph_name, num_parts, \
schema, part_id, node_data, \
edge_data, nodeid_offset, edgeid_offset):
def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
return_orig_nids=False, return_orig_eids=False):
"""
This function creates dgl objects for a given graph partition, as in function
arguments.
......@@ -67,10 +66,6 @@ def create_dgl_object(graph_name, num_parts, \
Parameters:
-----------
graph_name : string
name of the graph
num_parts : int
total no. of partitions (of the original graph)
schame : json object
json object created by reading the graph metadata json file
part_id : int
......@@ -81,10 +76,10 @@ def create_dgl_object(graph_name, num_parts, \
edge_data : numpy ndarray
edge_data, where each row is of the following format:
<global_src_id> <global_dst_id> <etype_id> <global_type_eid>
nodeid_offset : int
offset to be used when assigning node global ids in the current partition
edgeid_offset : int
offset to be used when assigning edge global ids in the current partition
return_orig_ids : bool, optional
Indicates whether to return original node/edge IDs.
Returns:
--------
......@@ -98,6 +93,14 @@ def create_dgl_object(graph_name, num_parts, \
map between node type(string) and node_type_id(int)
dictionary
map between edge type(string) and edge_type_id(int)
dict of tensors
If `return_orig_nids=True`, return a dict of 1D tensors whose key is the node type
and value is a 1D tensor mapping between shuffled node IDs and the original node
IDs for each node type. Otherwise, ``None`` is returned.
dict of tensors
If `return_orig_eids=True`, return a dict of 1D tensors whose key is the edge type
and value is a 1D tensor mapping between shuffled edge IDs and the original edge
IDs for each edge type. Otherwise, ``None`` is returned.
"""
#create auxiliary data structures from the schema object
memory_snapshot("CreateDGLObj_Begin", part_id)
......@@ -283,7 +286,24 @@ def create_dgl_object(graph_name, num_parts, \
part_graph.ndata[dgl.NID] = th.as_tensor(uniq_ids[reshuffle_nodes])
part_graph.ndata['inner_node'] = inner_nodes[reshuffle_nodes]
return part_graph, node_map_val, edge_map_val, ntypes_map, etypes_map
orig_nids = None
orig_eids = None
if return_orig_nids:
orig_nids = {}
for ntype, ntype_id in ntypes_map.items():
mask = th.logical_and(part_graph.ndata[dgl.NTYPE] == ntype_id,
part_graph.ndata['inner_node'])
orig_nids[ntype] = th.as_tensor(per_type_ids[mask])
if return_orig_eids:
orig_eids = {}
for etype, etype_id in etypes_map.items():
mask = th.logical_and(part_graph.edata[dgl.ETYPE] == etype_id,
part_graph.edata['inner_edge'])
orig_eids[etype] = th.as_tensor(global_edge_id[mask])
return part_graph, node_map_val, edge_map_val, ntypes_map, etypes_map, \
orig_nids, orig_eids
def create_metadata_json(graph_name, num_nodes, num_edges, part_id, num_parts, node_map_val, \
edge_map_val, ntypes_map, etypes_map, output_dir ):
......
......@@ -54,6 +54,10 @@ if __name__ == "__main__":
parser.add_argument('--process-group-timeout', required=True, type=int,
help='timeout[seconds] for operations executed against the process group '
'(see torch.distributed.init_process_group)')
parser.add_argument('--save-orig-nids', action='store_true',
help='Save original node IDs into files')
parser.add_argument('--save-orig-eids', action='store_true',
help='Save original edge IDs into files')
params = parser.parse_args()
#invoke the pipeline function
......
......@@ -614,17 +614,18 @@ def gen_dist_partitions(rank, world_size, params):
num_edges = shuffle_global_eid_start
node_count = len(node_data[constants.NTYPE_ID])
edge_count = len(edge_data[constants.ETYPE_ID])
graph_obj, ntypes_map_val, etypes_map_val, ntypes_ntypeid_map, etypes_map = create_dgl_object(\
params.graph_name, params.num_parts, \
schema_map, rank, node_data, edge_data, num_nodes, num_edges)
graph_obj, ntypes_map_val, etypes_map_val, ntypes_map, etypes_map, \
orig_nids, orig_eids = create_dgl_object(schema_map, rank, node_data, \
edge_data, num_edges, params.save_orig_nids, params.save_orig_eids)
memory_snapshot("CreateDGLObjectsComplete: ", rank)
write_dgl_objects(graph_obj, rcvd_node_features, edge_features, params.output, rank)
write_dgl_objects(graph_obj, rcvd_node_features, edge_features, params.output, \
rank, orig_nids, orig_eids)
memory_snapshot("DiskWriteDGLObjectsComplete: ", rank)
#get the meta-data
json_metadata = create_metadata_json(params.graph_name, node_count, edge_count, \
rank, world_size, ntypes_map_val, \
etypes_map_val, ntypes_ntypeid_map, etypes_map, params.output)
etypes_map_val, ntypes_map, etypes_map, params.output)
memory_snapshot("MetadataCreateComplete: ", rank)
if (rank == 0):
......
......@@ -353,9 +353,10 @@ def write_graph_dgl(graph_file, graph_obj):
"""
dgl.save_graphs(graph_file, [graph_obj])
def write_dgl_objects(graph_obj, node_features, edge_features, output_dir, part_id):
def write_dgl_objects(graph_obj, node_features, edge_features,
output_dir, part_id, orig_nids, orig_eids):
"""
Wrapper function to create dgl objects for graph, node-features and edge-features
Wrapper function to write graph, node/edge feature, original node/edge IDs.
Parameters:
-----------
......@@ -369,6 +370,10 @@ def write_dgl_objects(graph_obj, node_features, edge_features, output_dir, part_
location where the output files will be located
part_id : int
integer indicating the partition-id
orig_nids : dict
original node IDs
orig_eids : dict
original edge IDs
"""
part_dir = output_dir + '/part' + str(part_id)
......@@ -381,6 +386,13 @@ def write_dgl_objects(graph_obj, node_features, edge_features, output_dir, part_
if (edge_features != None):
write_edge_features(edge_features, os.path.join(part_dir, "edge_feat.dgl"))
if orig_nids is not None:
orig_nids_file = os.path.join(part_dir, 'orig_nids.dgl')
dgl.data.utils.save_tensors(orig_nids_file, orig_nids)
if orig_eids is not None:
orig_eids_file = os.path.join(part_dir, 'orig_eids.dgl')
dgl.data.utils.save_tensors(orig_eids_file, orig_eids)
def get_idranges(names, counts):
"""
Utility function to compute typd_id/global_id ranges for both nodes and edges.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment