Unverified Commit 6a460725 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[Dist] decouple num_chunks and num_parts for graphs with edge feature (#4729)

* [Dist] decouple num_chunks and num_parts for graphs with edge feature

* fix test failure
parent b63fc41a
......@@ -15,7 +15,7 @@ from dgl.data.utils import load_graphs, load_tensors
def create_chunked_dataset(
root_dir, num_chunks, include_masks=False, include_edge_data=False
root_dir, num_chunks, include_masks=False
):
"""
This function creates a sample dataset, based on MAG240 dataset.
......@@ -81,9 +81,8 @@ def create_chunked_dataset(
inst_val_mask = np.random.randint(0, 2, num_institutions)
# Edge features.
if include_edge_data:
cite_count = np.random.choice(10, num_cite_edges)
write_year = np.random.choice(2022, num_write_edges)
cite_count = np.random.choice(10, num_cite_edges)
write_year = np.random.choice(2022, num_write_edges)
# Save features.
input_dir = os.path.join(root_dir, 'data_test')
......@@ -107,14 +106,13 @@ def create_chunked_dataset(
with open(paper_orig_ids_path, 'wb') as f:
np.save(f, paper_orig_ids)
if include_edge_data:
cite_count_path = os.path.join(input_dir, 'cites/count.npy')
with open(cite_count_path, 'wb') as f:
np.save(f, cite_count)
cite_count_path = os.path.join(input_dir, 'cites/count.npy')
with open(cite_count_path, 'wb') as f:
np.save(f, cite_count)
write_year_path = os.path.join(input_dir, 'writes/year.npy')
with open(write_year_path, 'wb') as f:
np.save(f, write_year)
write_year_path = os.path.join(input_dir, 'writes/year.npy')
with open(write_year_path, 'wb') as f:
np.save(f, write_year)
node_data = None
if include_masks:
......@@ -193,13 +191,11 @@ def create_chunked_dataset(
}
}
edge_data = {}
if include_edge_data:
edge_data = {
'cites': {'count': cite_count_path},
'writes': {'year': write_year_path},
'rev_writes': {'year': write_year_path},
}
edge_data = {
'cites': {'count': cite_count_path},
'writes': {'year': write_year_path},
'rev_writes': {'year': write_year_path},
}
output_dir = os.path.join(root_dir, 'chunked-data')
chunk_graph(
......
......@@ -11,7 +11,12 @@ from create_chunked_dataset import create_chunked_dataset
import dgl
from dgl.data.utils import load_graphs, load_tensors
from dgl.distributed.partition import RESERVED_FIELD_DTYPE
from dgl.distributed.partition import (
RESERVED_FIELD_DTYPE,
load_partition,
_get_inner_node_mask,
_get_inner_edge_mask,
)
def _verify_partition_data_types(part_g):
......@@ -22,12 +27,54 @@ def _verify_partition_data_types(part_g):
assert part_g.edata[k].dtype == dtype
def _verify_graph_feats(
g, gpb, part, node_feats, edge_feats, orig_nids, orig_eids
):
for ntype in g.ntypes:
ntype_id = g.get_ntype_id(ntype)
inner_node_mask = _get_inner_node_mask(part, ntype_id)
inner_nids = part.ndata[dgl.NID][inner_node_mask]
ntype_ids, inner_type_nids = gpb.map_to_per_ntype(inner_nids)
partid = gpb.nid2partid(inner_type_nids, ntype)
assert np.all(ntype_ids.numpy() == ntype_id)
assert np.all(partid.numpy() == gpb.partid)
orig_id = orig_nids[ntype][inner_type_nids]
local_nids = gpb.nid2localnid(inner_type_nids, gpb.partid, ntype)
for name in g.nodes[ntype].data:
if name in [dgl.NID, "inner_node"]:
continue
true_feats = g.nodes[ntype].data[name][orig_id]
ndata = node_feats[ntype + "/" + name][local_nids]
assert torch.equal(ndata, true_feats)
for etype in g.etypes:
etype_id = g.get_etype_id(etype)
inner_edge_mask = _get_inner_edge_mask(part, etype_id)
inner_eids = part.edata[dgl.EID][inner_edge_mask]
etype_ids, inner_type_eids = gpb.map_to_per_etype(inner_eids)
partid = gpb.eid2partid(inner_type_eids, etype)
assert np.all(etype_ids.numpy() == etype_id)
assert np.all(partid.numpy() == gpb.partid)
orig_id = orig_eids[etype][inner_type_eids]
local_eids = gpb.eid2localeid(inner_type_eids, gpb.partid, etype)
for name in g.edges[etype].data:
if name in [dgl.EID, "inner_edge"]:
continue
true_feats = g.edges[etype].data[name][orig_id]
edata = edge_feats[etype + "/" + name][local_eids]
assert torch.equal(edata == true_feats)
@pytest.mark.parametrize("num_chunks", [1, 8])
def test_chunk_graph(num_chunks):
with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(root_dir, num_chunks, include_edge_data=True)
g = create_chunked_dataset(root_dir, num_chunks)
num_cite_edges = g.number_of_edges("cites")
num_write_edges = g.number_of_edges("writes")
......@@ -99,24 +146,9 @@ def test_part_pipeline(num_chunks, num_parts):
# num_parts should less/equal than num_chunks
return
include_edge_data = num_chunks == num_parts
with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(
root_dir, num_chunks, include_edge_data=include_edge_data
)
all_ntypes = g.ntypes
all_etypes = g.etypes
num_cite_edges = g.number_of_edges("cites")
num_write_edges = g.number_of_edges("writes")
num_affiliate_edges = g.number_of_edges("affiliated_with")
num_institutions = g.number_of_nodes("institution")
num_authors = g.number_of_nodes("author")
num_papers = g.number_of_nodes("paper")
g = create_chunked_dataset(root_dir, num_chunks)
# Step1: graph partition
in_dir = os.path.join(root_dir, "chunked-data")
......@@ -152,142 +184,29 @@ def test_part_pipeline(num_chunks, num_parts):
cmd += " --save-orig-eids"
os.system(cmd)
# check metadata.json
meta_fname = os.path.join(out_dir, "metadata.json")
with open(meta_fname, "rb") as f:
meta_data = json.load(f)
for etype in all_etypes:
assert len(meta_data["edge_map"][etype]) == num_parts
assert meta_data["etypes"].keys() == set(all_etypes)
assert meta_data["graph_name"] == "mag240m"
for ntype in all_ntypes:
assert len(meta_data["node_map"][ntype]) == num_parts
assert meta_data["ntypes"].keys() == set(all_ntypes)
assert meta_data["num_edges"] == g.num_edges()
assert meta_data["num_nodes"] == g.num_nodes()
assert meta_data["num_parts"] == num_parts
edge_dict = {}
edge_data_gold = {}
if include_edge_data:
# Create Id Map here.
num_edges = 0
for utype, etype, vtype in g.canonical_etypes:
fname = ":".join([utype, etype, vtype])
edge_dict[fname] = np.array(
[num_edges, num_edges + g.number_of_edges(etype)]
).reshape(1, 2)
num_edges += g.number_of_edges(etype)
assert num_edges == g.number_of_edges()
id_map = dgl.distributed.id_map.IdMap(edge_dict)
orig_etype_id, orig_type_eid = id_map(np.arange(num_edges))
# check edge_data
num_edges = {
"paper:cites:paper": num_cite_edges,
"author:writes:paper": num_write_edges,
"paper:rev_writes:author": num_write_edges,
}
output_dir = os.path.join(root_dir, "chunked-data")
output_edge_data_dir = os.path.join(output_dir, "edge_data")
for etype, feat in [
["paper:cites:paper", "count"],
["author:writes:paper", "year"],
["paper:rev_writes:author", "year"],
]:
output_edge_sub_dir = os.path.join(output_edge_data_dir, etype)
features = []
for i in range(num_chunks):
chunk_f_name = "{}-{}.npy".format(feat, i)
chunk_f_name = os.path.join(
output_edge_sub_dir, chunk_f_name
)
assert os.path.isfile(chunk_f_name)
feat_array = np.load(chunk_f_name)
assert feat_array.shape[0] == num_edges[etype] // num_chunks
features.append(feat_array)
edge_data_gold[etype + "/" + feat] = np.concatenate(features)
# read original node/edge IDs
def read_orig_ids(fname):
orig_ids = {}
for i in range(num_parts):
ids_path = os.path.join(out_dir, f"part{i}", fname)
part_ids = load_tensors(ids_path)
for type, data in part_ids.items():
if type not in orig_ids:
orig_ids[type] = data
else:
orig_ids[type] = torch.cat((orig_ids[type], data))
return orig_ids
orig_nids = read_orig_ids("orig_nids.dgl")
orig_eids = read_orig_ids("orig_eids.dgl")
# load partitions and verify
part_config = os.path.join(out_dir, "metadata.json")
for i in range(num_parts):
sub_dir = "part-" + str(i)
assert meta_data[sub_dir][
"node_feats"
] == "part{}/node_feat.dgl".format(i)
assert meta_data[sub_dir][
"edge_feats"
] == "part{}/edge_feat.dgl".format(i)
assert meta_data[sub_dir][
"part_graph"
] == "part{}/graph.dgl".format(i)
# check data
sub_dir = os.path.join(out_dir, "part" + str(i))
# graph.dgl
fname = os.path.join(sub_dir, "graph.dgl")
assert os.path.isfile(fname)
g_list, data_dict = load_graphs(fname)
part_g = g_list[0]
assert isinstance(part_g, dgl.DGLGraph)
part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition(
part_config, i
)
_verify_partition_data_types(part_g)
# node_feat.dgl
fname = os.path.join(sub_dir, "node_feat.dgl")
assert os.path.isfile(fname)
tensor_dict = load_tensors(fname)
all_tensors = [
"paper/feat",
"paper/label",
"paper/year",
"paper/orig_ids",
]
assert tensor_dict.keys() == set(all_tensors)
for key in all_tensors:
assert isinstance(tensor_dict[key], torch.Tensor)
ndata_paper_orig_ids = tensor_dict["paper/orig_ids"]
# orig_nids.dgl
fname = os.path.join(sub_dir, "orig_nids.dgl")
assert os.path.isfile(fname)
orig_nids = load_tensors(fname)
assert len(orig_nids.keys()) == 3
assert torch.equal(ndata_paper_orig_ids, orig_nids["paper"])
# orig_eids.dgl
fname = os.path.join(sub_dir, "orig_eids.dgl")
assert os.path.isfile(fname)
orig_eids = load_tensors(fname)
assert len(orig_eids.keys()) == 4
if include_edge_data:
# Read edge_feat.dgl
fname = os.path.join(sub_dir, "edge_feat.dgl")
assert os.path.isfile(fname)
tensor_dict = load_tensors(fname)
all_tensors = [
"paper:cites:paper/count",
"author:writes:paper/year",
"paper:rev_writes:author/year",
]
assert tensor_dict.keys() == set(all_tensors)
for key in all_tensors:
assert isinstance(tensor_dict[key], torch.Tensor)
# Compare the data stored as edge features in this partition with the data
# from the original graph.
for idx, etype in enumerate(all_etypes):
if etype != key:
continue
# key in canonical form
tokens = key.split(":")
assert len(tokens) == 3
gold_type_ids = orig_type_eid[orig_etype_id == idx]
gold_data = edge_data_gold[key][gold_type_ids]
assert np.all(gold_data == part_data.numpy())
_verify_graph_feats(
g, gpb, part_g, node_feats, edge_feats, orig_nids, orig_eids
)
......@@ -107,7 +107,6 @@ def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
memory_snapshot("CreateDGLObj_Begin", part_id)
_, global_nid_ranges = get_idranges(schema[constants.STR_NODE_TYPE],
schema[constants.STR_NUM_NODES_PER_CHUNK])
memory_snapshot("CreateDGLObj_Begin", part_id)
_, global_eid_ranges = get_idranges(schema[constants.STR_EDGE_TYPE],
schema[constants.STR_NUM_EDGES_PER_CHUNK])
......
......@@ -224,24 +224,27 @@ def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
#where key: feature_name, value: dictionary in which keys are "format", "data"
edge_feature_tids[etype_name] = []
for feat_name, feat_data in etype_feature_data.items():
assert len(feat_data[constants.STR_DATA]) == world_size
assert feat_data[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_NUMPY
feature_fname = feat_data[constants.STR_DATA][rank] #this will be just the file name
if (os.path.isabs(feature_fname)):
logging.info(f'Loading numpy from {feature_fname}')
edge_features[etype_name+'/'+feat_name] = \
torch.from_numpy(np.load(feature_fname))
else:
numpy_path = os.path.join(input_dir, feature_fname)
logging.info(f'Loading numpy from {numpy_path}')
edge_features[etype_name+'/'+feat_name] = \
torch.from_numpy(np.load(numpy_path))
num_chunks = len(feat_data[constants.STR_DATA])
read_list = np.array_split(np.arange(num_chunks), world_size)
efeat = []
for idx in read_list[rank]:
efeat_file = feat_data[constants.STR_DATA][idx]
if not os.path.isabs(efeat_file):
efeat_file = os.path.join(input_dir, efeat_file)
logging.info(
f'Loading edge feature[{feat_name}] of etype[{etype_name}] from {efeat_file}'
)
efeat.append(np.load(efeat_file))
efeat = np.concatenate(efeat)
edge_features[etype_name + '/' + feat_name] = torch.from_numpy(efeat)
edge_feature_tids[etype_name].append([feat_name, -1, -1])
# Read edges for each node types that are processed by the currnet process.
edge_tids, _ = get_idranges(schema_map[constants.STR_EDGE_TYPE],
schema_map[constants.STR_NUM_EDGES_PER_CHUNK])
schema_map[constants.STR_NUM_EDGES_PER_CHUNK],
num_chunks=world_size)
for etype_name in schema_map[constants.STR_EDGE_TYPE]:
if etype_name in edge_feature_tids:
for item in edge_feature_tids[etype_name]:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment