Unverified Commit 1f471396 authored by kylasa's avatar kylasa Committed by GitHub
Browse files

Edge Feature support for input graph datasets for dist. graph partitioning pipeline (#4623)

* Added support for edge features.

* Added comments and removing unnecessary print statements.

* updated data_shuffle.py to remove compile error.

* Repaled python3 with python to match CI test framework.

* Removed unrelated files from the pull request.

* Isort changes.

* black changes on this file.

* Addressing CI review comments.

* Addressing CI comments.

* Removed duplicated and resolved merge conflict code.

* Addressing CI Comments from Rui.

* Addressing CI comments, and fixing merge issues.

* Addressing CI comments, code refactoring, isort and black
parent 72cfb934
import argparse
import json
import logging
import os
import platform
import sys
import tempfile
import dgl
import numpy as np
import torch
from chunk_graph import chunk_graph
from dgl.data.utils import load_graphs, load_tensors
def create_chunked_dataset(
root_dir, num_chunks, include_masks=False, include_edge_data=False
):
"""
This function creates a sample dataset, based on MAG240 dataset.
Parameters:
-----------
root_dir : string
directory in which all the files for the chunked dataset will be stored.
"""
# Step0: prepare chunked graph data format.
# A synthetic mini MAG240.
num_institutions = 1200
num_authors = 1200
num_papers = 1200
def rand_edges(num_src, num_dst, num_edges):
eids = np.random.choice(num_src * num_dst, num_edges, replace=False)
src = torch.from_numpy(eids // num_dst)
dst = torch.from_numpy(eids % num_dst)
return src, dst
num_cite_edges = 24 * 1000
num_write_edges = 12 * 1000
num_affiliate_edges = 2400
# Structure.
data_dict = {
('paper', 'cites', 'paper'): rand_edges(
num_papers, num_papers, num_cite_edges
),
('author', 'writes', 'paper'): rand_edges(
num_authors, num_papers, num_write_edges
),
('author', 'affiliated_with', 'institution'): rand_edges(
num_authors, num_institutions, num_affiliate_edges
),
}
src, dst = data_dict[('author', 'writes', 'paper')]
data_dict[('paper', 'rev_writes', 'author')] = (dst, src)
g = dgl.heterograph(data_dict)
# paper feat, label, year
num_paper_feats = 3
paper_feat = np.random.randn(num_papers, num_paper_feats)
num_classes = 4
paper_label = np.random.choice(num_classes, num_papers)
paper_year = np.random.choice(2022, num_papers)
paper_orig_ids = np.arange(0, num_papers)
# masks.
if include_masks:
paper_train_mask = np.random.randint(0, 2, num_papers)
paper_test_mask = np.random.randint(0, 2, num_papers)
paper_val_mask = np.random.randint(0, 2, num_papers)
author_train_mask = np.random.randint(0, 2, num_authors)
author_test_mask = np.random.randint(0, 2, num_authors)
author_val_mask = np.random.randint(0, 2, num_authors)
inst_train_mask = np.random.randint(0, 2, num_institutions)
inst_test_mask = np.random.randint(0, 2, num_institutions)
inst_val_mask = np.random.randint(0, 2, num_institutions)
# Edge features.
if include_edge_data:
cite_count = np.random.choice(10, num_cite_edges)
write_year = np.random.choice(2022, num_write_edges)
# Save features.
input_dir = os.path.join(root_dir, 'data_test')
os.makedirs(input_dir)
for sub_d in ['paper', 'cites', 'writes']:
os.makedirs(os.path.join(input_dir, sub_d))
paper_feat_path = os.path.join(input_dir, 'paper/feat.npy')
with open(paper_feat_path, 'wb') as f:
np.save(f, paper_feat)
paper_label_path = os.path.join(input_dir, 'paper/label.npy')
with open(paper_label_path, 'wb') as f:
np.save(f, paper_label)
paper_year_path = os.path.join(input_dir, 'paper/year.npy')
with open(paper_year_path, 'wb') as f:
np.save(f, paper_year)
paper_orig_ids_path = os.path.join(input_dir, 'paper/orig_ids.npy')
with open(paper_orig_ids_path, 'wb') as f:
np.save(f, paper_orig_ids)
if include_edge_data:
cite_count_path = os.path.join(input_dir, 'cites/count.npy')
with open(cite_count_path, 'wb') as f:
np.save(f, cite_count)
write_year_path = os.path.join(input_dir, 'writes/year.npy')
with open(write_year_path, 'wb') as f:
np.save(f, write_year)
node_data = None
if include_masks:
for sub_d in ['author', 'institution']:
os.makedirs(os.path.join(input_dir, sub_d))
paper_train_mask_path = os.path.join(input_dir, 'paper/train_mask.npy')
with open(paper_train_mask_path, 'wb') as f:
np.save(f, paper_train_mask)
paper_test_mask_path = os.path.join(input_dir, 'paper/test_mask.npy')
with open(paper_test_mask_path, 'wb') as f:
np.save(f, paper_test_mask)
paper_val_mask_path = os.path.join(input_dir, 'paper/val_mask.npy')
with open(paper_val_mask_path, 'wb') as f:
np.save(f, paper_val_mask)
author_train_mask_path = os.path.join(
input_dir, 'author/train_mask.npy'
)
with open(author_train_mask_path, 'wb') as f:
np.save(f, author_train_mask)
author_test_mask_path = os.path.join(input_dir, 'author/test_mask.npy')
with open(author_test_mask_path, 'wb') as f:
np.save(f, author_test_mask)
author_val_mask_path = os.path.join(input_dir, 'author/val_mask.npy')
with open(author_val_mask_path, 'wb') as f:
np.save(f, author_val_mask)
inst_train_mask_path = os.path.join(
input_dir, 'institution/train_mask.npy'
)
with open(inst_train_mask_path, 'wb') as f:
np.save(f, inst_train_mask)
inst_test_mask_path = os.path.join(
input_dir, 'institution/test_mask.npy'
)
with open(inst_test_mask_path, 'wb') as f:
np.save(f, inst_test_mask)
inst_val_mask_path = os.path.join(input_dir, 'institution/val_mask.npy')
with open(inst_val_mask_path, 'wb') as f:
np.save(f, inst_val_mask)
node_data = {
'paper': {
'feat': paper_feat_path,
'train_mask': paper_train_mask_path,
'test_mask': paper_test_mask_path,
'val_mask': paper_val_mask_path,
'label': paper_label_path,
'year': paper_year_path,
'orig_ids': paper_orig_ids_path,
},
'author': {
'train_mask': author_train_mask_path,
'test_mask': author_test_mask_path,
'val_mask': author_val_mask_path,
},
'institution': {
'train_mask': inst_train_mask_path,
'test_mask': inst_test_mask_path,
'val_mask': inst_val_mask_path,
},
}
else:
node_data = {
'paper': {
'feat': paper_feat_path,
'label': paper_label_path,
'year': paper_year_path,
'orig_ids': paper_orig_ids_path,
}
}
edge_data = {}
if include_edge_data:
edge_data = {
'cites': {'count': cite_count_path},
'writes': {'year': write_year_path},
'rev_writes': {'year': write_year_path},
}
output_dir = os.path.join(root_dir, 'chunked-data')
chunk_graph(
g,
'mag240m',
node_data,
edge_data,
num_chunks=num_chunks,
output_path=output_dir,
)
print('Done with creating chunked graph')
return g
import json
import numpy as np
import os
import tempfile
import torch
import pytest, unittest
import unittest
import dgl
from dgl.data.utils import load_tensors, load_graphs
import numpy as np
import pytest
import torch
from chunk_graph import chunk_graph
from dgl.data.utils import load_graphs, load_tensors
@pytest.mark.parametrize("num_chunks", [1, 2, 3, 4, 8])
@pytest.mark.parametrize("num_parts", [1, 2, 3, 4, 8])
def test_part_pipeline(num_chunks, num_parts):
if num_chunks < num_parts:
# num_parts should less/equal than num_chunks
return
# Step0: prepare chunked graph data format
# A synthetic mini MAG240
num_institutions = 1200
num_authors = 1200
num_papers = 1200
def rand_edges(num_src, num_dst, num_edges):
eids = np.random.choice(num_src * num_dst, num_edges, replace=False)
src = torch.from_numpy(eids // num_dst)
dst = torch.from_numpy(eids % num_dst)
return src, dst
from create_chunked_dataset import create_chunked_dataset
num_cite_edges = 24 * 1000
num_write_edges = 12 * 1000
num_affiliate_edges = 2400
# Structure
data_dict = {
('paper', 'cites', 'paper'): rand_edges(num_papers, num_papers, num_cite_edges),
('author', 'writes', 'paper'): rand_edges(num_authors, num_papers, num_write_edges),
('author', 'affiliated_with', 'institution'): rand_edges(num_authors, num_institutions, num_affiliate_edges)
}
src, dst = data_dict[('author', 'writes', 'paper')]
data_dict[('paper', 'rev_writes', 'author')] = (dst, src)
g = dgl.heterograph(data_dict)
# paper feat, label, year
num_paper_feats = 3
paper_feat = np.random.randn(num_papers, num_paper_feats)
num_classes = 4
paper_label = np.random.choice(num_classes, num_papers)
paper_year = np.random.choice(2022, num_papers)
paper_orig_ids = np.arange(0, num_papers)
# edge features
cite_count = np.random.choice(10, num_cite_edges)
write_year = np.random.choice(2022, num_write_edges)
# Save features
@pytest.mark.parametrize("num_chunks", [1, 8])
def test_chunk_graph(num_chunks):
with tempfile.TemporaryDirectory() as root_dir:
print('root_dir', root_dir)
input_dir = os.path.join(root_dir, 'data_test')
os.makedirs(input_dir)
for sub_d in ['paper', 'cites', 'writes']:
os.makedirs(os.path.join(input_dir, sub_d))
paper_feat_path = os.path.join(input_dir, 'paper/feat.npy')
with open(paper_feat_path, 'wb') as f:
np.save(f, paper_feat)
paper_label_path = os.path.join(input_dir, 'paper/label.npy')
with open(paper_label_path, 'wb') as f:
np.save(f, paper_label)
paper_year_path = os.path.join(input_dir, 'paper/year.npy')
with open(paper_year_path, 'wb') as f:
np.save(f, paper_year)
g = create_chunked_dataset(root_dir, num_chunks, include_edge_data=True)
paper_orig_ids_path = os.path.join(input_dir, 'paper/orig_ids.npy')
with open(paper_orig_ids_path, 'wb') as f:
np.save(f, paper_orig_ids)
num_cite_edges = g.number_of_edges('cites')
num_write_edges = g.number_of_edges('writes')
num_affiliate_edges = g.number_of_edges('affiliated_with')
cite_count_path = os.path.join(input_dir, 'cites/count.npy')
with open(cite_count_path, 'wb') as f:
np.save(f, cite_count)
write_year_path = os.path.join(input_dir, 'writes/year.npy')
with open(write_year_path, 'wb') as f:
np.save(f, write_year)
output_dir = os.path.join(root_dir, 'chunked-data')
chunk_graph(
g,
'mag240m',
{'paper':
{
'feat': paper_feat_path,
'label': paper_label_path,
'year': paper_year_path,
'orig_ids': paper_orig_ids_path
}
},
{
'cites': {'count': cite_count_path},
'writes': {'year': write_year_path},
# you can put the same data file if they indeed share the features.
'rev_writes': {'year': write_year_path}
},
num_chunks=num_chunks,
output_path=output_dir)
num_institutions = g.number_of_nodes('institution')
num_authors = g.number_of_nodes('author')
num_papers = g.number_of_nodes('paper')
# check metadata.json
output_dir = os.path.join(root_dir, 'chunked-data')
json_file = os.path.join(output_dir, 'metadata.json')
assert os.path.isfile(json_file)
with open(json_file, 'rb') as f:
......@@ -118,10 +37,12 @@ def test_part_pipeline(num_chunks, num_parts):
# check edge_index
output_edge_index_dir = os.path.join(output_dir, 'edge_index')
for utype, etype, vtype in data_dict.keys():
for utype, etype, vtype in g.canonical_etypes:
fname = ':'.join([utype, etype, vtype])
for i in range(num_chunks):
chunk_f_name = os.path.join(output_edge_index_dir, fname + str(i) + '.txt')
chunk_f_name = os.path.join(
output_edge_index_dir, fname + str(i) + '.txt'
)
assert os.path.isfile(chunk_f_name)
with open(chunk_f_name, 'r') as f:
header = f.readline()
......@@ -131,7 +52,7 @@ def test_part_pipeline(num_chunks, num_parts):
# check node_data
output_node_data_dir = os.path.join(output_dir, 'node_data', 'paper')
for feat in ['feat', 'label', 'year', 'orig_ids']:
for feat in ['feat', 'label', 'year']:
for i in range(num_chunks):
chunk_f_name = '{}-{}.npy'.format(feat, i)
chunk_f_name = os.path.join(output_node_data_dir, chunk_f_name)
......@@ -140,31 +61,65 @@ def test_part_pipeline(num_chunks, num_parts):
assert feat_array.shape[0] == num_papers // num_chunks
# check edge_data
edge_data_gold = {}
num_edges = {
'paper:cites:paper': num_cite_edges,
'author:writes:paper': num_write_edges,
'paper:rev_writes:author': num_write_edges
'paper:rev_writes:author': num_write_edges,
}
output_edge_data_dir = os.path.join(output_dir, 'edge_data')
for etype, feat in [
['paper:cites:paper', 'count'],
['author:writes:paper', 'year'],
['paper:rev_writes:author', 'year']
['paper:rev_writes:author', 'year'],
]:
output_edge_sub_dir = os.path.join(output_edge_data_dir, etype)
features = []
for i in range(num_chunks):
chunk_f_name = '{}-{}.npy'.format(feat, i)
chunk_f_name = os.path.join(output_edge_sub_dir, chunk_f_name)
assert os.path.isfile(chunk_f_name)
feat_array = np.load(chunk_f_name)
assert feat_array.shape[0] == num_edges[etype] // num_chunks
features.append(feat_array)
edge_data_gold[etype + '/' + feat] = np.concatenate(features)
@pytest.mark.parametrize("num_chunks", [1, 2, 3, 4, 8])
@pytest.mark.parametrize("num_parts", [1, 2, 3, 4, 8])
def test_part_pipeline(num_chunks, num_parts):
if num_chunks < num_parts:
# num_parts should less/equal than num_chunks
return
include_edge_data = num_chunks == num_parts
with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(
root_dir, num_chunks, include_edge_data=include_edge_data
)
all_ntypes = g.ntypes
all_etypes = g.etypes
num_cite_edges = g.number_of_edges('cites')
num_write_edges = g.number_of_edges('writes')
num_affiliate_edges = g.number_of_edges('affiliated_with')
num_institutions = g.number_of_nodes('institution')
num_authors = g.number_of_nodes('author')
num_papers = g.number_of_nodes('paper')
# Step1: graph partition
in_dir = os.path.join(root_dir, 'chunked-data')
output_dir = os.path.join(root_dir, 'parted_data')
os.system('python3 tools/partition_algo/random_partition.py '\
os.system(
'python3 tools/partition_algo/random_partition.py '
'--in_dir {} --out_dir {} --num_partitions {}'.format(
in_dir, output_dir, num_parts))
in_dir, output_dir, num_parts
)
)
for ntype in ['author', 'institution', 'paper']:
fname = os.path.join(output_dir, '{}.txt'.format(ntype))
with open(fname, 'r') as f:
......@@ -184,7 +139,6 @@ def test_part_pipeline(num_chunks, num_parts):
cmd += f' --partitions-dir {partition_dir}'
cmd += f' --out-dir {out_dir}'
cmd += f' --ip-config {ip_config}'
cmd += ' --ssh-port 22'
cmd += ' --process-group-timeout 60'
cmd += ' --save-orig-nids'
cmd += ' --save-orig-eids'
......@@ -195,13 +149,11 @@ def test_part_pipeline(num_chunks, num_parts):
with open(meta_fname, 'rb') as f:
meta_data = json.load(f)
all_etypes = ['affiliated_with', 'writes', 'cites', 'rev_writes']
for etype in all_etypes:
assert len(meta_data['edge_map'][etype]) == num_parts
assert meta_data['etypes'].keys() == set(all_etypes)
assert meta_data['graph_name'] == 'mag240m'
all_ntypes = ['author', 'institution', 'paper']
for ntype in all_ntypes:
assert len(meta_data['node_map'][ntype]) == num_parts
assert meta_data['ntypes'].keys() == set(all_ntypes)
......@@ -209,11 +161,60 @@ def test_part_pipeline(num_chunks, num_parts):
assert meta_data['num_nodes'] == g.num_nodes()
assert meta_data['num_parts'] == num_parts
edge_dict = {}
edge_data_gold = {}
if include_edge_data:
# Create Id Map here.
num_edges = 0
for utype, etype, vtype in g.canonical_etypes:
fname = ':'.join([utype, etype, vtype])
edge_dict[fname] = np.array(
[num_edges, num_edges + g.number_of_edges(etype)]
).reshape(1, 2)
num_edges += g.number_of_edges(etype)
assert num_edges == g.number_of_edges()
id_map = dgl.distributed.id_map.IdMap(edge_dict)
orig_etype_id, orig_type_eid = id_map(np.arange(num_edges))
# check edge_data
num_edges = {
'paper:cites:paper': num_cite_edges,
'author:writes:paper': num_write_edges,
'paper:rev_writes:author': num_write_edges,
}
output_dir = os.path.join(root_dir, 'chunked-data')
output_edge_data_dir = os.path.join(output_dir, 'edge_data')
for etype, feat in [
['paper:cites:paper', 'count'],
['author:writes:paper', 'year'],
['paper:rev_writes:author', 'year'],
]:
output_edge_sub_dir = os.path.join(output_edge_data_dir, etype)
features = []
for i in range(num_chunks):
chunk_f_name = '{}-{}.npy'.format(feat, i)
chunk_f_name = os.path.join(
output_edge_sub_dir, chunk_f_name
)
assert os.path.isfile(chunk_f_name)
feat_array = np.load(chunk_f_name)
assert feat_array.shape[0] == num_edges[etype] // num_chunks
features.append(feat_array)
edge_data_gold[etype + '/' + feat] = np.concatenate(features)
for i in range(num_parts):
sub_dir = 'part-' + str(i)
assert meta_data[sub_dir]['node_feats'] == 'part{}/node_feat.dgl'.format(i)
assert meta_data[sub_dir]['edge_feats'] == 'part{}/edge_feat.dgl'.format(i)
assert meta_data[sub_dir]['part_graph'] == 'part{}/graph.dgl'.format(i)
assert meta_data[sub_dir][
'node_feats'
] == 'part{}/node_feat.dgl'.format(i)
assert meta_data[sub_dir][
'edge_feats'
] == 'part{}/edge_feat.dgl'.format(i)
assert meta_data[sub_dir][
'part_graph'
] == 'part{}/graph.dgl'.format(i)
# check data
sub_dir = os.path.join(out_dir, 'part' + str(i))
......@@ -229,17 +230,17 @@ def test_part_pipeline(num_chunks, num_parts):
fname = os.path.join(sub_dir, 'node_feat.dgl')
assert os.path.isfile(fname)
tensor_dict = load_tensors(fname)
all_tensors = ['paper/feat', 'paper/label', 'paper/year', 'paper/orig_ids']
all_tensors = [
'paper/feat',
'paper/label',
'paper/year',
'paper/orig_ids',
]
assert tensor_dict.keys() == set(all_tensors)
for key in all_tensors:
assert isinstance(tensor_dict[key], torch.Tensor)
ndata_paper_orig_ids = tensor_dict['paper/orig_ids']
# edge_feat.dgl
fname = os.path.join(sub_dir, 'edge_feat.dgl')
assert os.path.isfile(fname)
tensor_dict = load_tensors(fname)
# orig_nids.dgl
fname = os.path.join(sub_dir, 'orig_nids.dgl')
assert os.path.isfile(fname)
......@@ -253,3 +254,31 @@ def test_part_pipeline(num_chunks, num_parts):
orig_eids = load_tensors(fname)
assert len(orig_eids.keys()) == 4
if include_edge_data:
# Read edge_feat.dgl
fname = os.path.join(sub_dir, 'edge_feat.dgl')
assert os.path.isfile(fname)
tensor_dict = load_tensors(fname)
all_tensors = [
'paper:cites:paper/count',
'author:writes:paper/year',
'paper:rev_writes:author/year',
]
assert tensor_dict.keys() == set(all_tensors)
for key in all_tensors:
assert isinstance(tensor_dict[key], torch.Tensor)
# Compare the data stored as edge features in this partition with the data
# from the original graph.
for idx, etype in enumerate(all_etypes):
if etype != key:
continue
# key in canonical form
tokens = key.split(":")
assert len(tokens) == 3
gold_type_ids = orig_type_eid[orig_etype_id == idx]
gold_data = edge_data_gold[key][gold_type_ids]
assert np.all(gold_data == part_data.numpy())
......@@ -32,3 +32,6 @@ STR_EDGE_DATA = "edge_data"
STR_NUMPY = "numpy"
STR_CSV = "csv"
STR_NAME = "name"
STR_NODE_FEATURES = "node_features"
STR_EDGE_FEATURES = "edge_features"
......@@ -182,11 +182,14 @@ def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
# It's not guaranteed that the edges are sorted based on edge type.
# Let's sort edges and all attributes on the edges.
if not np.all(np.diff(etype_ids) >= 0):
sort_idx = np.argsort(etype_ids)
shuffle_global_src_id, shuffle_global_dst_id, global_src_id, global_dst_id, global_edge_id, etype_ids = \
shuffle_global_src_id[sort_idx], shuffle_global_dst_id[sort_idx], global_src_id[sort_idx], \
global_dst_id[sort_idx], global_edge_id[sort_idx], etype_ids[sort_idx]
assert np.all(np.diff(etype_ids) >= 0)
else:
print(f'[Rank: {part_id} Edge data is already sorted !!!')
# Determine the edge ID range of different edge types.
edge_id_start = edgeid_offset
......@@ -273,6 +276,7 @@ def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
part_graph.edata[dgl.ETYPE] = th.as_tensor(etype_ids)
part_graph.edata['inner_edge'] = th.ones(part_graph.number_of_edges(), dtype=th.bool)
#compute per_type_ids and ntype for all the nodes in the graph.
global_ids = np.concatenate(
[global_src_id, global_dst_id, global_homo_nid])
......
......@@ -20,9 +20,10 @@ from globalids import (assign_shuffle_global_nids_edges,
assign_shuffle_global_nids_nodes,
lookup_shuffle_global_nids_edges)
from gloo_wrapper import allgather_sizes, alltoallv_cpu, gather_metadata_json
from utils import (augment_edge_data, get_gnid_range_map, get_idranges,
get_node_types, get_ntype_featnames, memory_snapshot,
read_json, read_ntype_partition_files, write_dgl_objects,
from utils import (augment_edge_data, get_edge_types, get_etype_featnames,
get_gnid_range_map, get_idranges, get_node_types,
get_ntype_featnames, memory_snapshot, read_json,
read_ntype_partition_files, write_dgl_objects,
write_metadata_json)
......@@ -175,7 +176,7 @@ def exchange_edge_data(rank, world_size, edge_data):
edge_data.pop(constants.OWNER_PROCESS)
return edge_data
def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map, id_lookup, node_features):
def exchange_features(rank, world_size, feature_tids, ntype_gnid_map, id_lookup, feature_data, feat_type, data):
"""
This function is used to shuffle node features so that each process will receive
all the node features whose corresponding nodes are owned by the same process.
......@@ -210,9 +211,14 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map,
id_lookup : instance of class DistLookupService
Distributed lookup service used to map global-nids to respective partition-ids and
shuffle-global-nids
node_feautres: dicitonary
dictionry where node_features are stored and this information is read from the appropriate
feature_data: dicitonary
dictionry in which node or edge features are stored and this information is read from the appropriate
node features file which belongs to the current process
feat_type : string
this is used to distinguish which features are being exchanged. Please note that
for nodes ownership is clearly defined and for edges it is always assumed that
destination end point of the edge defines the ownership of that particular
edge
Returns:
--------
......@@ -224,31 +230,31 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map,
process
"""
start = timer()
own_node_features = {}
own_features = {}
own_global_nids = {}
#To iterate over the node_types and associated node_features
for ntype_name, ntype_info in node_feature_tids.items():
for type_name, type_info in feature_tids.items():
#To iterate over the node_features, of a given node_type
#ntype_info is a list of 3 elements
#[node-feature-name, starting-idx, ending-idx]
#node-feature-name is the name given to the node-feature, read from the input metadata file
#[starting-idx, ending-idx) specifies the range of indexes associated with the node-features read from
#the associated input file. Note that the rows of node-features read from the input file should be same
#type_info is a list of 3 elements
#[feature-name, starting-idx, ending-idx]
#feature-name is the name given to the feature-data, read from the input metadata file
#[starting-idx, ending-idx) specifies the range of indexes associated with the features read from
#the associated input file. Note that the rows of features read from the input file should be same
#as specified with this range. So no. of rows = ending-idx - starting-idx.
for feat_info in ntype_info:
for feat_info in type_info:
#determine the owner process for these node features.
node_feats_per_rank = []
feats_per_rank = []
global_nid_per_rank = []
feat_name = feat_info[0]
feat_key = ntype_name+'/'+feat_name
feat_key = type_name+'/'+feat_name
logging.info(f'[Rank: {rank}] processing node feature: {feat_key}')
#compute the global_nid range for this node features
type_nid_start = int(feat_info[1])
type_nid_end = int(feat_info[2])
begin_global_nid = ntype_gnid_map[ntype_name][0]
begin_global_nid = ntype_gnid_map[type_name][0]
gnid_start = begin_global_nid + type_nid_start
gnid_end = begin_global_nid + type_nid_end
......@@ -260,38 +266,58 @@ def exchange_node_features(rank, world_size, node_feature_tids, ntype_gnid_map,
#check if node features exist for this ntype_name + feat_name
#this check should always pass, because node_feature_tids are built
#by reading the input metadata json file for existing node features.
assert(feat_key in node_features)
assert(feat_key in feature_data)
node_feats = node_features[feat_key]
key_feats = feature_data[feat_key]
for part_id in range(world_size):
# Get the partition ids for the range of global nids.
if feat_type == constants.STR_NODE_FEATURES:
partid_slice = id_lookup.get_partition_ids(np.arange(gnid_start, gnid_end, dtype=np.int64))
else:
#Edge data case.
#Ownership is determined by the destination node.
assert data is not None
global_eids = np.arange(gnid_start, gnid_end, dtype=np.int64)
#Now use `data` to extract destination nodes' global id
#and use that to get the ownership
common, idx1, idx2 = np.intersect1d(data[constants.GLOBAL_EID], global_eids, return_indices=True)
assert common.shape[0] == idx2.shape[0]
global_dst_nids = data[constants.GLOBAL_DST_ID][idx1]
assert np.all(global_eids == data[constants.GLOBAL_EID][idx1])
partid_slice = id_lookup.get_partition_ids(global_dst_nids)
cond = (partid_slice == part_id)
gnids_per_partid = gnids_feat[cond]
tnids_per_partid = tnids_feat[cond]
local_idx_partid = local_idx[cond]
if (gnids_per_partid.shape[0] == 0):
node_feats_per_rank.append(torch.empty((0,1), dtype=torch.float))
feats_per_rank.append(torch.empty((0,1), dtype=torch.float))
global_nid_per_rank.append(np.empty((0,1), dtype=np.int64))
else:
node_feats_per_rank.append(node_feats[local_idx_partid])
feats_per_rank.append(key_feats[local_idx_partid])
global_nid_per_rank.append(torch.from_numpy(gnids_per_partid).type(torch.int64))
#features (and global nids) per rank to be sent out are ready
#for transmission, perform alltoallv here.
output_feat_list = alltoallv_cpu(rank, world_size, node_feats_per_rank)
output_feat_list = alltoallv_cpu(rank, world_size, feats_per_rank)
output_nid_list = alltoallv_cpu(rank, world_size, global_nid_per_rank)
#stitch node_features together to form one large feature tensor
own_node_features[feat_key] = torch.cat(output_feat_list)
own_features[feat_key] = torch.cat(output_feat_list)
own_global_nids[feat_key] = torch.cat(output_nid_list).numpy()
end = timer()
logging.info(f'[Rank: {rank}] Total time for node feature exchange: {timedelta(seconds = end - start)}')
return own_node_features, own_global_nids
return own_features, own_global_nids
def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_data,
id_lookup, ntypes_gnid_range_map, ntid_ntype_map, schema_map):
def exchange_graph_data(rank, world_size, node_features, edge_features,
node_feat_tids, edge_feat_tids,
edge_data, id_lookup, ntypes_ntypeid_map,
ntypes_gnid_range_map, etypes_geid_range_map,
ntid_ntype_map, schema_map):
"""
Wrapper function which is used to shuffle graph data on all the processes.
......@@ -301,16 +327,23 @@ def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_da
rank of the current process
world_size : int
total no. of participating processes.
node_feautres: dicitonary
node_feautres : dicitonary
dictionry where node_features are stored and this information is read from the appropriate
node features file which belongs to the current process
edge_features : dictionary
dictionary where edge_features are stored. This information is read from the appropriate
edge feature files whose ownership is assigned to the current process
node_feat_tids: dictionary
in which keys are node-type names and values are triplets. Each triplet has node-feature name
and the starting and ending type ids of the node-feature data read from the corresponding
node feature data file read by current process. Each node type may have several features and
hence each key may have several triplets.
edge_feat_tids : dictionary
a dictionary in which keys are edge-type names and values are triplets of the format
<feat-name, start-per-type-idx, end-per-type-idx>. This triplet is used to identify
the chunk of feature data for which current process is responsible for
edge_data : dictionary
dictionary which is used to store edge information as read from the edges.txt file assigned
dictionary which is used to store edge information as read from appropriate files assigned
to each process.
id_lookup : instance of class DistLookupService
Distributed lookup service used to map global-nids to respective partition-ids and
......@@ -319,6 +352,9 @@ def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_da
mappings between node type names and node type ids
ntypes_gnid_range_map : dictionary
mapping between node type names and global_nids which belong to the keys in this dictionary
etypes_geid_range_map : dictionary
mapping between edge type names and global_eids which are assigned to the edges of this
edge_type
ntid_ntype_map : dictionary
mapping between node type id and no of nodes which belong to each node_type_id
schema_map : dictionary
......@@ -334,22 +370,35 @@ def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_da
process
dictionary :
list of global_nids for the nodes whose node features are received when node features shuffling was
performed in the `exchange_node_features` function call
performed in the `exchange_features` function call
dictionary :
the input argument, edge_data dictionary, is updated with the edge data received from other processes
in the world. The edge data is received by each rank in the process of data shuffling.
dictionary :
edge features dictionary which has edge features. These destination end points of these edges
are owned by the current process
dictionary :
list of global_eids for the edges whose edge features are received when edge features shuffling
was performed in the `exchange_features` function call
"""
memory_snapshot("ShuffleNodeFeaturesBegin: ", rank)
rcvd_node_features, rcvd_global_nids = exchange_node_features(rank, world_size, node_feat_tids, \
ntypes_gnid_range_map, id_lookup, node_features)
rcvd_node_features, rcvd_global_nids = exchange_features(rank, world_size, node_feat_tids,
ntypes_gnid_range_map, id_lookup, node_features,
constants.STR_NODE_FEATURES, None)
memory_snapshot("ShuffleNodeFeaturesComplete: ", rank)
logging.info(f'[Rank: {rank}] Done with node features exchange.')
rcvd_edge_features, rcvd_global_eids = exchange_features(rank, world_size, edge_feat_tids,
etypes_geid_range_map, id_lookup, edge_features,
constants.STR_EDGE_FEATURES, edge_data)
logging.info(f'[Rank: {rank}] Done with edge features exchange.')
node_data = gen_node_data(rank, world_size, id_lookup, ntid_ntype_map, schema_map)
memory_snapshot("NodeDataGenerationComplete: ", rank)
edge_data = exchange_edge_data(rank, world_size, edge_data)
memory_snapshot("ShuffleEdgeDataComplete: ", rank)
return node_data, rcvd_node_features, rcvd_global_nids, edge_data
return node_data, rcvd_node_features, rcvd_global_nids, edge_data, rcvd_edge_features, rcvd_global_eids
def read_dataset(rank, world_size, id_lookup, params, schema_map):
"""
......@@ -391,17 +440,24 @@ def read_dataset(rank, world_size, id_lookup, params, schema_map):
owner process for each edge.
dictionary
edge features which is also a dictionary, similar to node features dictionary
dictionary
a dictionary in which keys are edge-type names and values are tuples indicating the range of ids
for edges read by the current process.
dictionary
a dictionary in which keys are edge-type names and values are triplets,
(edge-feature-name, start_type_id, end_type_id). These type_ids are indices in the edge-features
read by the current process. Note that each edge-type may have several edge-features.
"""
edge_features = {}
#node_tids, node_features, edge_datadict, edge_tids
node_tids, node_features, node_feat_tids, edge_data, edge_tids = \
node_tids, node_features, node_feat_tids, edge_data, edge_tids, edge_features, edge_feat_tids = \
get_dataset(params.input_dir, params.graph_name, rank, world_size, schema_map)
logging.info(f'[Rank: {rank}] Done reading dataset deom {params.input_dir}')
edge_data = augment_edge_data(edge_data, id_lookup, edge_tids, rank, world_size)
logging.info(f'[Rank: {rank}] Done augmenting edge_data: {len(edge_data)}, {edge_data[constants.GLOBAL_SRC_ID].shape}')
return node_tids, node_features, node_feat_tids, edge_data, edge_features
return node_tids, node_features, node_feat_tids, edge_data, edge_features, edge_tids, edge_feat_tids
def gen_dist_partitions(rank, world_size, params):
"""
......@@ -543,11 +599,12 @@ def gen_dist_partitions(rank, world_size, params):
id_map, rank, world_size)
ntypes_ntypeid_map, ntypes, ntypeid_ntypes_map = get_node_types(schema_map)
etypes_etypeid_map, etypes, etypeid_etypes_map = get_edge_types(schema_map)
logging.info(f'[Rank: {rank}] Initialized metis partitions and node_types map...')
#read input graph files and augment these datastructures with
#appropriate information (global_nid and owner process) for node and edge data
node_tids, node_features, node_feat_tids, edge_data, edge_features = \
node_tids, node_features, node_feat_tids, edge_data, edge_features, edge_tids, edge_feat_tids = \
read_dataset(rank, world_size, id_lookup, params, schema_map)
logging.info(f'[Rank: {rank}] Done augmenting file input data with auxilary columns')
memory_snapshot("DatasetReadComplete: ", rank)
......@@ -556,9 +613,11 @@ def gen_dist_partitions(rank, world_size, params):
#this function will also stitch the data recvd from other processes
#and return the aggregated data
ntypes_gnid_range_map = get_gnid_range_map(node_tids)
node_data, rcvd_node_features, rcvd_global_nids, edge_data = \
exchange_graph_data(rank, world_size, node_features, node_feat_tids,
edge_data, id_lookup, ntypes_gnid_range_map,
etypes_geid_range_map = get_gnid_range_map(edge_tids)
node_data, rcvd_node_features, rcvd_global_nids, edge_data, rcvd_edge_features, rcvd_global_eids = \
exchange_graph_data(rank, world_size, node_features, edge_features, \
node_feat_tids, edge_feat_tids, edge_data, id_lookup, ntypes_ntypeid_map, \
ntypes_gnid_range_map, etypes_geid_range_map, \
ntypeid_ntypes_map, schema_map)
gc.collect()
logging.info(f'[Rank: {rank}] Done with data shuffling...')
......@@ -584,13 +643,15 @@ def gen_dist_partitions(rank, world_size, params):
for featname in featnames:
#if a feature name exists for a node-type, then it should also have
#feature data as well. Hence using the assert statement.
assert(ntype_name+'/'+featname in rcvd_global_nids)
global_nids = rcvd_global_nids[ntype_name+'/'+featname]
feature_key = ntype_name+'/'+featname
assert(feature_key in rcvd_global_nids)
global_nids = rcvd_global_nids[feature_key]
common, idx1, idx2 = np.intersect1d(node_data[constants.GLOBAL_NID], global_nids, return_indices=True)
_, idx1, _ = np.intersect1d(node_data[constants.GLOBAL_NID], global_nids, return_indices=True)
shuffle_global_ids = node_data[constants.SHUFFLE_GLOBAL_NID][idx1]
feature_idx = shuffle_global_ids.argsort()
rcvd_node_features[ntype_name+'/'+featname] = rcvd_node_features[ntype_name+'/'+featname][feature_idx]
rcvd_node_features[feature_key] = rcvd_node_features[feature_key][feature_idx]
memory_snapshot("ReorderNodeFeaturesComplete: ", rank)
#sort edge_data by etype
......@@ -604,6 +665,23 @@ def gen_dist_partitions(rank, world_size, params):
logging.info(f'[Rank: {rank}] Done assigning global_ids to edges ...')
memory_snapshot("ShuffleGlobalID_Edges_Complete: ", rank)
#Shuffle edge features according to the edge order on each rank.
for etype_name in etypes:
featnames = get_etype_featnames(etype_name, schema_map)
for featname in featnames:
feature_key = etype_name+'/'+featname
assert feature_key in rcvd_global_eids
global_eids = rcvd_global_eids[feature_key]
_, idx1, _ = np.intersect1d(edge_data[constants.GLOBAL_EID], global_eids, return_indices=True)
shuffle_global_ids = edge_data[constants.SHUFFLE_GLOBAL_EID][idx1]
feature_idx = shuffle_global_ids.argsort()
rcvd_edge_features[feature_key] = rcvd_edge_features[feature_key][feature_idx]
for k, v in rcvd_edge_features.items():
logging.info(f'[Rank: {rank}] key: {k} v: {v.shape}')
#determine global-ids for edge end-points
edge_data = lookup_shuffle_global_nids_edges(rank, world_size, edge_data, id_lookup, node_data)
logging.info(f'[Rank: {rank}] Done resolving orig_node_id for local node_ids...')
......@@ -619,7 +697,7 @@ def gen_dist_partitions(rank, world_size, params):
orig_nids, orig_eids = create_dgl_object(schema_map, rank, node_data, \
edge_data, num_edges, params.save_orig_nids, params.save_orig_eids)
memory_snapshot("CreateDGLObjectsComplete: ", rank)
write_dgl_objects(graph_obj, rcvd_node_features, edge_features, params.output, \
write_dgl_objects(graph_obj, rcvd_node_features, rcvd_edge_features, params.output, \
rank, orig_nids, orig_eids)
memory_snapshot("DiskWriteDGLObjectsComplete: ", rank)
......
......@@ -50,7 +50,14 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
in which keys are edge-type names and values are triplets. This triplet has edge-feature name,
and range of tids for theedge feature data read from the files by the current process. Each
edge-type may have several edge features and associated tensor data.
dictionary
Data read from numpy files for all the edge features in this dataset. This dictionary's keys
are feature names and values are tensors data representing edge feature data.
dictionary
This dictionary is used for identifying the global-id range for the associated edge features
present in the previous return value. The keys are edge-type names and values are triplets.
Each triplet consists of edge-feature name and starting and ending points of the range of
tids representing the corresponding edge feautres.
"""
#node features dictionary
......@@ -160,13 +167,95 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
item[1] = node_tids[ntype_name][rank][0]
item[2] = node_tids[ntype_name][rank][1]
#done build node_features locally.
#done building node_features locally.
if len(node_features) <= 0:
logging.info(f'[Rank: {rank}] This dataset does not have any node features')
else:
for k, v in node_features.items():
logging.info(f'[Rank: {rank}] node feature name: {k}, feature data shape: {v.size()}')
'''
Reading edge features now.
The structure of the edge_data is as follows, which is present in the input metadata json file.
"edge_data" : {
"etype0-name" : {
"feat0-name" : {
"format" : {"name": "numpy"},
"data" : [ #list
"<path>/feat-0.npy",
"<path>/feat-1.npy",
....
"<path>/feat-<p-1>.npy"
]
},
"feat1-name" : {
"format" : {"name": "numpy"},
"data" : [ #list
"<path>/feat-0.npy",
"<path>/feat-1.npy",
....
"<path>/feat-<p-1>.npy"
]
}
}
}
As shown above, the value for the key "edge_data" is a dictionary object, which is
used to describe the feature data for each of the edge-type names. Keys in this top-level
dictionary are edge-type names and value is a dictionary which captures all the features
for the current edge-type. Feature data is captured with keys being the feature-names and
value is a dictionary object which has 2 keys namely `format` and `data`. Format entry is used
to mention the format of the storage used by the node features themselves and "data" is used
to mention all the files present for this given node feature.
Data read from each of the node features file is a multi-dimensional tensor data and is read
in numpy format, which is also the storage format of node features on the permanent storage.
'''
edge_features = {}
edge_feature_tids = {}
# Iterate over the "edge_data" dictionary in the schema_map.
# Read the edge features if exists.
# Also keep track of the type_eids for which the edge_features are read.
dataset_features = schema_map[constants.STR_EDGE_DATA]
if dataset_features and (len(dataset_features) > 0):
for etype_name, etype_feature_data in dataset_features.items():
#etype_feature_data is a dictionary
#where key: feature_name, value: dictionary in which keys are "format", "data"
edge_feature_tids[etype_name] = []
for feat_name, feat_data in etype_feature_data.items():
assert len(feat_data[constants.STR_DATA]) == world_size
assert feat_data[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_NUMPY
feature_fname = feat_data[constants.STR_DATA][rank] #this will be just the file name
if (os.path.isabs(feature_fname)):
logging.info(f'Loading numpy from {feature_fname}')
edge_features[etype_name+'/'+feat_name] = \
torch.from_numpy(np.load(feature_fname))
else:
numpy_path = os.path.join(input_dir, feature_fname)
logging.info(f'Loading numpy from {numpy_path}')
edge_features[etype_name+'/'+feat_name] = \
torch.from_numpy(np.load(numpy_path))
edge_feature_tids[etype_name].append([feat_name, -1, -1])
# Read edges for each node types that are processed by the currnet process.
edge_tids, _ = get_idranges(schema_map[constants.STR_EDGE_TYPE],
schema_map[constants.STR_NUM_EDGES_PER_CHUNK])
for etype_name in schema_map[constants.STR_EDGE_TYPE]:
if etype_name in edge_feature_tids:
for item in edge_feature_tids[etype_name]:
item[1] = edge_tids[etype_name][rank][0]
item[2] = edge_tids[etype_name][rank][1]
# Done with building node_features locally.
if len(edge_features) <= 0:
logging.info(f'[Rank: {rank}] This dataset does not have any edge features')
else:
for k, v in edge_features.items():
logging.info(f'[Rank: {rank}] edge feature name: {k}, feature data shape: {v.size()}')
'''
Code below is used to read edges from the input dataset with the help of the metadata json file
for the input graph dataset.
......@@ -270,5 +359,5 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
assert edge_datadict[constants.GLOBAL_TYPE_EID].shape == edge_datadict[constants.ETYPE_ID].shape
logging.info(f'[Rank: {rank}] Done reading edge_file: {len(edge_datadict)}, {edge_datadict[constants.GLOBAL_SRC_ID].shape}')
return node_tids, node_features, node_feature_tids, edge_datadict, edge_tids
return node_tids, node_features, node_feature_tids, edge_datadict, edge_tids, edge_features, edge_feature_tids
import logging
import os
import numpy as np
import pyarrow
import torch
from pyarrow import csv
from pyarrow import csv
from gloo_wrapper import alltoallv_cpu
......@@ -58,8 +57,9 @@ class DistLookupService:
# Iterate over the node types and extract the partition id mappings.
for ntype in ntype_names:
fname = f'{ntype}.txt'
logging.info(f'[Rank: {rank}] Reading file: {os.path.join(input_dir, fname)}')
filename = f'{ntype}.txt'
logging.info(f'[Rank: {rank}] Reading file: {os.path.join(input_dir, filename)}')
df = csv.read_csv(os.path.join(input_dir, '{}.txt'.format(ntype)), \
read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True), \
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
......@@ -87,6 +87,7 @@ class DistLookupService:
self.rank = rank
self.world_size = world_size
def get_partition_ids(self, global_nids):
'''
This function is used to get the partition-ids for a given set of global node ids
......
......@@ -67,6 +67,27 @@ def read_json(json_file):
return val
def get_etype_featnames(etype_name, schema_map):
"""Retrieves edge feature names for a given edge_type
Parameters:
-----------
eype_name : string
a string specifying a edge_type name
schema : dictionary
metadata json object as a dictionary, which is read from the input
metadata file from the input dataset
Returns:
--------
list :
a list of feature names for a given edge_type
"""
edge_data = schema_map[constants.STR_EDGE_DATA]
feats = edge_data.get(etype_name, {})
return [feat for feat in feats]
def get_ntype_featnames(ntype_name, schema_map):
"""
Retrieves node feature names for a given node_type
......@@ -89,6 +110,30 @@ def get_ntype_featnames(ntype_name, schema_map):
feats = node_data.get(ntype_name, {})
return [feat for feat in feats]
def get_edge_types(schema_map):
"""Utility method to extract edge_typename -> edge_type mappings
as defined by the input schema
Parameters:
-----------
schema_map : dictionary
Input schema from which the edge_typename -> edge_typeid
dictionary is created.
Returns:
--------
dictionary
with keys as edge type names and values as ids (integers)
list
list of etype name strings
dictionary
with keys as etype ids (integers) and values as edge type names
"""
etypes = schema_map[constants.STR_EDGE_TYPE]
etype_etypeid_map = {e : i for i, e in enumerate(etypes)}
etypeid_etype_map = {i : e for i, e in enumerate(etypes)}
return etype_etypeid_map, etypes, etypeid_etype_map
def get_node_types(schema_map):
"""
Utility method to extract node_typename -> node_type mappings
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment