"src/runtime/git@developer.sourcefind.cn:OpenDAS/dgl.git" did not exist on "e234fcfa8f89a902b15e4a5f2d3e41cd9f20b95a"
Unverified Commit c42fa8a5 authored by kylasa's avatar kylasa Committed by GitHub
Browse files

New script for customers to validate partitioned graph objects (#5340)

* A new script to validate graph partitioning pipeline

* Addressing CI review comments.

* lintrunner patch.
parent bbc538d9
...@@ -19,69 +19,13 @@ from dgl.distributed.partition import ( ...@@ -19,69 +19,13 @@ from dgl.distributed.partition import (
from distpartitioning import array_readwriter from distpartitioning import array_readwriter
from distpartitioning.utils import generate_read_list from distpartitioning.utils import generate_read_list
from utils import create_chunked_dataset
def _verify_partition_data_types(part_g):
for k, dtype in RESERVED_FIELD_DTYPE.items():
if k in part_g.ndata:
assert part_g.ndata[k].dtype == dtype
if k in part_g.edata:
assert part_g.edata[k].dtype == dtype
def _verify_partition_formats(part_g, formats):
# Verify saved graph formats
if formats is None:
assert "coo" in part_g.formats()["created"]
else:
formats = formats.split(",")
for format in formats:
assert format in part_g.formats()["created"]
def _verify_graph_feats( from tools.verification_utils import (
g, gpb, part, node_feats, edge_feats, orig_nids, orig_eids verify_graph_feats,
): verify_partition_data_types,
for ntype in g.ntypes: verify_partition_formats,
ntype_id = g.get_ntype_id(ntype) )
inner_node_mask = _get_inner_node_mask(part, ntype_id) from utils import create_chunked_dataset
inner_nids = part.ndata[dgl.NID][inner_node_mask]
ntype_ids, inner_type_nids = gpb.map_to_per_ntype(inner_nids)
partid = gpb.nid2partid(inner_type_nids, ntype)
assert np.all(ntype_ids.numpy() == ntype_id)
assert np.all(partid.numpy() == gpb.partid)
orig_id = orig_nids[ntype][inner_type_nids]
local_nids = gpb.nid2localnid(inner_type_nids, gpb.partid, ntype)
for name in g.nodes[ntype].data:
if name in [dgl.NID, "inner_node"]:
continue
true_feats = g.nodes[ntype].data[name][orig_id]
ndata = node_feats[ntype + "/" + name][local_nids]
assert np.array_equal(ndata.numpy(), true_feats.numpy())
for etype in g.canonical_etypes:
etype_id = g.get_etype_id(etype)
inner_edge_mask = _get_inner_edge_mask(part, etype_id)
inner_eids = part.edata[dgl.EID][inner_edge_mask]
etype_ids, inner_type_eids = gpb.map_to_per_etype(inner_eids)
partid = gpb.eid2partid(inner_type_eids, etype)
assert np.all(etype_ids.numpy() == etype_id)
assert np.all(partid.numpy() == gpb.partid)
orig_id = orig_eids[_etype_tuple_to_str(etype)][inner_type_eids]
local_eids = gpb.eid2localeid(inner_type_eids, gpb.partid, etype)
for name in g.edges[etype].data:
if name in [dgl.EID, "inner_edge"]:
continue
true_feats = g.edges[etype].data[name][orig_id]
edata = edge_feats[_etype_tuple_to_str(etype) + "/" + name][
local_eids
]
assert np.array_equal(edata.numpy(), true_feats.numpy())
def _test_chunk_graph( def _test_chunk_graph(
...@@ -268,6 +212,7 @@ def _test_pipeline( ...@@ -268,6 +212,7 @@ def _test_pipeline(
num_chunks_edges=None, num_chunks_edges=None,
num_chunks_node_data=None, num_chunks_node_data=None,
num_chunks_edge_data=None, num_chunks_edge_data=None,
use_verify_partitions=False,
): ):
if num_chunks < num_parts: if num_chunks < num_parts:
# num_parts should less/equal than num_chunks # num_parts should less/equal than num_chunks
...@@ -323,6 +268,15 @@ def _test_pipeline( ...@@ -323,6 +268,15 @@ def _test_pipeline(
cmd += f" --graph-formats {graph_formats}" if graph_formats else "" cmd += f" --graph-formats {graph_formats}" if graph_formats else ""
os.system(cmd) os.system(cmd)
# check if verify_partitions.py is used for validation.
if use_verify_partitions:
cmd = "python3 tools/verify_partitions.py "
cmd += f" --orig-dataset-dir {in_dir}"
cmd += f" --part-graph {out_dir}"
cmd += f" --partitions-dir {output_dir}"
os.system(cmd)
return
# read original node/edge IDs # read original node/edge IDs
def read_orig_ids(fname): def read_orig_ids(fname):
orig_ids = {} orig_ids = {}
...@@ -345,9 +299,9 @@ def _test_pipeline( ...@@ -345,9 +299,9 @@ def _test_pipeline(
part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition( part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition(
part_config, i part_config, i
) )
_verify_partition_data_types(part_g) verify_partition_data_types(part_g)
_verify_partition_formats(part_g, graph_formats) verify_partition_formats(part_g, graph_formats)
_verify_graph_feats( verify_graph_feats(
g, gpb, part_g, node_feats, edge_feats, orig_nids, orig_eids g, gpb, part_g, node_feats, edge_feats, orig_nids, orig_eids
) )
...@@ -358,6 +312,9 @@ def _test_pipeline( ...@@ -358,6 +312,9 @@ def _test_pipeline(
) )
def test_pipeline_basics(num_chunks, num_parts, world_size): def test_pipeline_basics(num_chunks, num_parts, world_size):
_test_pipeline(num_chunks, num_parts, world_size) _test_pipeline(num_chunks, num_parts, world_size)
_test_pipeline(
num_chunks, num_parts, world_size, use_verify_partitions=True
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
......
import json
import os
import constants
import dgl
import numpy as np
import pyarrow
import pyarrow.parquet as pq
import pytest
import torch
from dgl.data.utils import load_tensors
from dgl.distributed.partition import (
_etype_str_to_tuple,
_etype_tuple_to_str,
_get_inner_edge_mask,
_get_inner_node_mask,
RESERVED_FIELD_DTYPE,
)
from distpartitioning.utils import get_idranges
def read_file(fname, ftype):
"""Read a file from disk
Parameters:
-----------
fname : string
specifying the absolute path to the file to read
ftype : string
supported formats are `numpy`, `parquet', `csv`
Returns:
--------
numpy ndarray :
file contents are returned as numpy array
"""
reader_fmt_meta = {"name": ftype}
array_readwriter.get_array_parser(**reader_fmt_meta).read(fname)
return data
def verify_partition_data_types(part_g):
"""Validate the dtypes in the partitioned graphs are valid
Parameters:
-----------
part_g : DGL Graph object
created for the partitioned graphs
"""
for k, dtype in RESERVED_FIELD_DTYPE.items():
if k in part_g.ndata:
assert part_g.ndata[k].dtype == dtype
if k in part_g.edata:
assert part_g.edata[k].dtype == dtype
def verify_partition_formats(part_g, formats):
"""Validate the partitioned graphs with supported formats
Parameters:
-----------
part_g : DGL Graph object
created for the partitioned graphs
formats : string
formats(csc, coo, csr) supported formats and multiple
values can be seperated by comma
"""
# Verify saved graph formats
if formats is None:
assert "coo" in part_g.formats()["created"]
else:
formats = formats.split(",")
for format in formats:
assert format in part_g.formats()["created"]
def verify_graph_feats(
g, gpb, part, node_feats, edge_feats, orig_nids, orig_eids
):
"""Verify the node/edge features of the partitioned graph with
the original graph
Parameters:
-----------
g : DGL Graph Object
of the original graph
gpb : global partition book
created for the partitioned graph object
node_feats : dictionary
with key, value pairs as node-types and features as numpy arrays
edge_feats : dictionary
with key, value pairs as edge-types and features as numpy arrays
orig_nids : dictionary
with key, value pairs as node-types and (global) nids from the
original graph
orig_eids : dictionary
with key, value pairs as edge-types and (global) eids from the
original graph
"""
for ntype in g.ntypes:
ntype_id = g.get_ntype_id(ntype)
inner_node_mask = _get_inner_node_mask(part, ntype_id)
inner_nids = part.ndata[dgl.NID][inner_node_mask]
ntype_ids, inner_type_nids = gpb.map_to_per_ntype(inner_nids)
partid = gpb.nid2partid(inner_type_nids, ntype)
assert np.all(ntype_ids.numpy() == ntype_id)
assert np.all(partid.numpy() == gpb.partid)
orig_id = orig_nids[ntype][inner_type_nids]
local_nids = gpb.nid2localnid(inner_type_nids, gpb.partid, ntype)
for name in g.nodes[ntype].data:
if name in [dgl.NID, "inner_node"]:
continue
true_feats = g.nodes[ntype].data[name][orig_id]
ndata = node_feats[ntype + "/" + name][local_nids]
assert np.array_equal(ndata.numpy(), true_feats.numpy())
for etype in g.canonical_etypes:
etype_id = g.get_etype_id(etype)
inner_edge_mask = _get_inner_edge_mask(part, etype_id)
inner_eids = part.edata[dgl.EID][inner_edge_mask]
etype_ids, inner_type_eids = gpb.map_to_per_etype(inner_eids)
partid = gpb.eid2partid(inner_type_eids, etype)
assert np.all(etype_ids.numpy() == etype_id)
assert np.all(partid.numpy() == gpb.partid)
orig_id = orig_eids[_etype_tuple_to_str(etype)][inner_type_eids]
local_eids = gpb.eid2localeid(inner_type_eids, gpb.partid, etype)
for name in g.edges[etype].data:
if name in [dgl.EID, "inner_edge"]:
continue
true_feats = g.edges[etype].data[name][orig_id]
edata = edge_feats[_etype_tuple_to_str(etype) + "/" + name][
local_eids
]
assert np.array_equal(edata.numpy(), true_feats.numpy())
def verify_metadata_counts(part_schema, part_g, graph_schema, g, partid):
"""Verify the partitioned graph objects with the metadata
Parameters:
-----------
part_schema : json object
which is created by reading the metadata.json file for the
partitioned graph
part_g : DGL graph object
of a graph partition
graph_schema : json object
which is created by reading the metadata.json file for the
original graph
g : DGL Graph object
created by reading the original graph from the disk.
partid : integer
specifying the partition id of the graph object, part_g
"""
for ntype in part_schema[constants.STR_NTYPES]:
ntype_data = part_schema[constants.STR_NODE_MAP][ntype]
meta_ntype_count = ntype_data[partid][1] - ntype_data[partid][0]
inner_node_mask = _get_inner_node_mask(part_g, g.get_ntype_id(ntype))
graph_ntype_count = len(part_g.ndata[dgl.NID][inner_node_mask])
assert (
meta_ntype_count == graph_ntype_count
), f"Metadata ntypecount = {meta_ntype_count} and graph_ntype_count = {graph_ntype_count}"
for etype in part_schema[constants.STR_ETYPES]:
etype_data = part_schema[constants.STR_EDGE_MAP][etype]
meta_etype_count = etype_data[partid][1] - etype_data[partid][0]
mask = _get_inner_edge_mask(
part_g, g.get_etype_id(_etype_str_to_tuple(etype))
)
graph_etype_count = len(part_g.edata[dgl.EID][mask])
assert (
meta_etype_count == graph_etype_count
), f"Metadata etypecount = {meta_etype_count} does not match part graph etypecount = {graph_etype_count}"
def get_node_partids(partitions_dir, graph_schema):
"""load the node partition ids from the disk
Parameters:
----------
partitions_dir : string
directory path where metis/random partitions are located
graph_schema : json object
which is created by reading the metadata.json file for the
original graph
Returns:
--------
dictionary :
where keys are node-types and value is a list of partition-ids for all the
nodes of that particular node-type.
"""
assert os.path.isdir(
partitions_dir
), f"Please provide a valid directory to read nodes to partition-id mappings."
_, gid_dict = get_idranges(
graph_schema[constants.STR_NODE_TYPE],
dict(
zip(
graph_schema[constants.STR_NODE_TYPE],
graph_schema[constants.STR_NODE_TYPE_COUNTS],
)
),
)
node_partids = {}
for ntype_id, ntype in enumerate(graph_schema[constants.STR_NODE_TYPE]):
node_partids[ntype] = read_csv_file(
os.path.join(partitions_dir, f"{ntype}.txt"), True
)
assert (
len(node_partids[ntype])
== graph_schema[constants.STR_NODE_TYPE_COUNTS][ntype_id]
), f"Node count for {ntype} = {len(node_partids[ntype])} in the partitions_dir while it should be {graph_schema[constants.STR_NTYPE_COUNTS][ntype_id]} (from graph schema)."
return node_partids
def verify_node_partitionids(
node_partids, part_g, g, gpb, graph_schema, orig_nids, partition_id
):
"""Verify partitioned graph objects node counts with the original graph
Parameters:
-----------
params : argparser object
to access command line arguments for this python script
part_data : list of tuples
partitioned graph objects read from the disk
g : DGL Graph object
created by reading the original graph from disk
graph_schema : json object
created by reading the metadata.json file for the original graph
orig_nids : dictionary
which contains the origial(global) node-ids
partition_id : integer
partition id of the partitioned graph, part_g
"""
# read part graphs and verify the counts
# inner node masks, should give the node counts in each part-g and get the corresponding orig-ids to map to the original graph node-ids
for ntype_id, ntype in enumerate(graph_schema[constants.STR_NODE_TYPE]):
mask = _get_inner_node_mask(part_g, g.get_ntype_id(ntype))
# map these to orig-nids.
inner_nids = part_g.ndata[dgl.NID][mask]
ntype_ids, inner_type_nids = gpb.map_to_per_ntype(inner_nids)
partid = gpb.nid2partid(inner_type_nids, ntype)
assert np.all(ntype_ids.numpy() == ntype_id)
assert np.all(partid.numpy() == gpb.partid)
idxes = orig_nids[ntype][inner_type_nids]
assert np.all(idxes >= 0)
# get the partition-ids for these nodes.
assert np.all(
node_partids[ntype][idxes] == partition_id
), f"All the nodes in the partition = {partid} does not their nodeid to partition-id maps are defined by the partitioning algorithm. Node-type = {ntype}"
def read_orig_ids(out_dir, fname, num_parts):
"""Read original id files for the partitioned graph objects
Parameters:
-----------
out_dir : string
specifying the directory where the files are located
fname : string
file name to read from
num_parts : integer
no. of partitions
Returns:
--------
dictionary :
where keys are node/edge types and values are original node
or edge ids from the original graph
"""
orig_ids = {}
for i in range(num_parts):
ids_path = os.path.join(out_dir, f"part{i}", fname)
part_ids = load_tensors(ids_path)
for type, data in part_ids.items():
if type not in orig_ids:
orig_ids[type] = data.numpy()
else:
orig_ids[type] = np.concatenate((orig_ids[type], data))
return orig_ids
import argparse
import logging
import os
import platform
import constants
import dgl
import numpy as np
import pyarrow
import pyarrow.parquet as pq
import torch as th
from dgl.data.utils import load_graphs, load_tensors
from dgl.distributed.partition import (
_etype_str_to_tuple,
_etype_tuple_to_str,
_get_inner_edge_mask,
_get_inner_node_mask,
load_partition,
RESERVED_FIELD_DTYPE,
)
from utils import get_idranges, read_json
from verification_utils import (
get_node_partids,
read_csv_file,
read_npy_file,
read_orig_ids,
read_pq_file,
verify_graph_feats,
verify_metadata_counts,
verify_node_partitionids,
verify_partition_data_types,
verify_partition_formats,
)
def _read_graph(schema):
"""Read a DGL Graph object from storage using metadata schema, which is
a json object describing the DGL graph on disk.
Parameters:
-----------
schema : json object
json object describing the input graph to read from the disk
Returns:
--------
DGL Graph Object :
DGL Graph object is created which is read from the disk storage.
"""
edges = {}
edge_types = schema[constants.STR_EDGE_TYPE]
for etype in edge_types:
efiles = schema[constants.STR_EDGES][etype][constants.STR_DATA]
src = []
dst = []
for fname in efiles:
if (
schema[constants.STR_EDGES][etype][constants.STR_FORMAT][
constants.STR_NAME
]
== constants.STR_CSV
):
data = read_file(fname, constants.STR_CSV)
elif (
schema[constants.STR_EDGES][etype][constants.STR_FORMAT][
constants.STR_NAME
]
== constants.STR_PARQUET
):
data = read_file(fname)
else:
raise ValueError(
f"Unknown edge format for {etype} - {schema[constants.STR_EDGES][etype][constants.STR_FORMAT]}"
)
src.append(data[:, 0])
dst.append(data[:, 1])
src = np.concatenate(src)
dst = np.concatenate(dst)
edges[_etype_str_to_tuple(etype)] = (src, dst)
g = dgl.heterograph(edges)
# g = dgl.to_homogeneous(g)
g.ndata["orig_id"] = g.ndata[dgl.NID]
g.edata["orig_id"] = g.edata[dgl.EID]
# read features here.
for ntype in schema[constants.STR_NODE_TYPE]:
if ntype in schema[constants.STR_NODE_DATA]:
for featname, featdata in schema[constants.STR_NODE_DATA][
ntype
].items():
files = fdata[constants.STR_DATA]
feats = []
for fname in files:
feats.append(read_file(fname, constants.STR_NUMPY))
if len(feats) > 0:
g.nodes[ntype].data[featname] = th.from_numpy(
np.concatenate(feats)
)
# read edge features here.
for etype in schema[constants.STR_EDGE_TYPE]:
if etype in schema[constants.STR_EDGE_DATA]:
for featname, fdata in schema[constants.STR_EDGE_DATA][etype]:
files = fdata[constants.STR_DATA]
feats = []
for fname in files:
feats.append(read_file(fname))
if len(feats) > 0:
g.edges[etype].data[featname] = th.from_numpy(
np.concatenate(feats)
)
# print from graph
logging.info(f"|V|= {g.number_of_nodes()}")
logging.info(f"|E|= {g.number_of_edges()}")
for ntype in g.ntypes:
for name, data in g.nodes[ntype].data.items():
if isinstance(data, th.Tensor):
logging.info(
f"Input Graph: nfeat - {ntype}/{name} - data - {data.size()}"
)
for c_etype in g.canonical_etypes:
for name, data in g.edges[c_etype].data.items():
if isinstance(data, th.Tensor):
logging.info(
f"Input Graph: efeat - {etype}/{name} - data - {g.edges[etype].data[name].size()}"
)
return g
def _read_part_graphs(part_config, part_metafile):
"""Read partitioned graph objects from disk storage.
Parameters:
----------
part_config : json object
json object created using the metadata file for the partitioned graph.
part_metafile : string
absolute path of the metadata.json file for the partitioned graph.
Returns:
--------
list of tuples :
where each tuple contains 4 objects in the following order:
partitioned graph object
global partition book
node features
edge features
"""
part_graph_data = []
for i in range(part_config["num_parts"]):
part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition(
part_metafile, i
)
part_graph_data.append((part_g, node_feats, edge_feats, gpb))
return part_graph_data
def _validate_results(params):
"""Main function to verify the graph partitions
Parameters:
-----------
params : argparser object
to access the command line arguments
"""
logging.info(f"loading config files...")
part_config = os.path.join(params.part_graph_dir, "metadata.json")
part_schema = read_json(part_config)
num_parts = part_schema["num_parts"]
logging.info(f"loading config files of the original dataset...")
graph_config = os.path.join(params.orig_dataset_dir, "metadata.json")
graph_schema = read_json(graph_config)
logging.info(f"loading original ids from the dgl files...")
orig_nids = read_orig_ids(params.part_graph_dir, "orig_nids.dgl", num_parts)
orig_eids = read_orig_ids(params.part_graph_dir, "orig_eids.dgl", num_parts)
logging.info(f"loading node to partition-ids from files... ")
node_partids = get_node_partids(params.partitions_dir, graph_schema)
logging.info(f"loading the original dataset...")
g = _read_graph(graph_schema)
logging.info(f"Beginning the verification process...")
for i in range(num_parts):
part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition(
part_config, i
)
verify_partition_data_types(part_g)
verify_partition_formats(part_g, None)
verify_graph_feats(
g, gpb, part_g, node_feats, edge_feats, orig_nids, orig_eids
)
verify_metadata_counts(part_schema, part_g, graph_schema, g, i)
verify_node_partitionids(
node_partids, part_g, g, gpb, graph_schema, orig_nids, i
)
logging.info(f"Verification of partitioned graph - {i}... SUCCESS !!!")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Construct graph partitions")
parser.add_argument(
"--orig-dataset-dir",
required=True,
type=str,
help="The directory path that contains the original graph input files.",
)
parser.add_argument(
"--part-graph-dir",
required=True,
type=str,
help="The directory path that contains the partitioned graph files.",
)
parser.add_argument(
"--partitions-dir",
required=True,
type=str,
help="The directory path that contains metis/random partitions results.",
)
parser.add_argument(
"--log-level",
type=str,
default="info",
help="To enable log level for debugging purposes. Available options: \
(Critical, Error, Warning, Info, Debug, Notset), default value \
is: Info",
)
params = parser.parse_args()
numeric_level = getattr(logging, params.log_level.upper(), None)
logging.basicConfig(
level=numeric_level,
format=f"[{platform.node()} %(levelname)s %(asctime)s PID:%(process)d] %(message)s",
)
_validate_results(params)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment