Unverified Commit 743516f3 authored by peizhou001's avatar peizhou001 Committed by GitHub
Browse files

add standalone tools for generating canonical etypes (#4626)

* add a standalone tool for change etypes to canonical etypes in part config
parent 6a3cb548
import json
import os
import tempfile
from collections import Counter
import pytest
from change_etype_to_canonical_etype import convert_conf, is_old_version
from scipy import sparse as spsp
import dgl
from dgl.distributed import partition_graph
def create_random_hetero(type_n, node_n):
num_nodes = {}
for i in range(1, type_n + 1):
num_nodes[f"n{i}"] = node_n
c_etypes = []
count = 0
for i in range(1, type_n):
for j in range(i + 1, type_n + 1):
count += 1
c_etypes.append((f"n{i}", f"r{count}", f"n{j}"))
edges = {}
for etype in c_etypes:
src_ntype, _, dst_ntype = etype
arr = spsp.random(
num_nodes[src_ntype],
num_nodes[dst_ntype],
density=0.001,
format="coo",
random_state=100,
)
edges[etype] = (arr.row, arr.col)
return dgl.heterograph(edges, num_nodes), [
":".join(c_etype) for c_etype in c_etypes
]
@pytest.mark.parametrize(
"type_n, node_n, num_parts", [[3, 100, 2], [10, 500, 4], [10, 1000, 8]]
)
def test_hetero_graph(type_n, node_n, num_parts):
g, expected_c_etypes = create_random_hetero(type_n, node_n)
do_convert_and_check(g, "convert_conf_test", num_parts, expected_c_etypes)
@pytest.mark.parametrize("node_n, num_parts", [[100, 2], [500, 4]])
def test_homo_graph(node_n, num_parts):
g = dgl.rand_graph(node_n, node_n // 10)
do_convert_and_check(g, "convert_conf_test", num_parts, ["_N:_E:_N"])
def do_convert_and_check(g, graph_name, num_parts, expected_c_etypes):
with tempfile.TemporaryDirectory() as root_dir:
partition_graph(g, graph_name, num_parts, root_dir)
part_config = os.path.join(root_dir, graph_name + ".json")
old_config = _get_old_config(part_config)
# Call convert function
convert_conf(part_config)
with open(part_config, "r") as config_f:
config = json.load(config_f)
# Check we get all canonical etypes
assert Counter(expected_c_etypes) == Counter(
config["etypes"].keys()
)
# Check the id is match after transform from etypes -> canonical
assert old_config["etypes"] == _extract_etypes(config["etypes"])
def _get_old_config(part_config):
with open(part_config, "r+") as config_f:
config = json.load(config_f)
if not is_old_version(config):
config["etypes"] = _extract_etypes(config["etypes"])
config["edge_map"] = _extract_edge_map(config["edge_map"])
config_f.seek(0)
json.dump(config, config_f, indent=4)
config_f.truncate()
return config
def _extract_etypes(c_etypes):
etypes = {}
for c_etype, eid in c_etypes.items():
etype = c_etype.split(":")[1]
etypes[etype] = eid
return etypes
def _extract_edge_map(c_edge_map):
edge_map = {}
for c_etype, emap in c_edge_map.items():
etype = c_etype.split(":")[1]
edge_map[etype] = emap
return edge_map
......@@ -167,3 +167,70 @@ The output chunked graph metadata will go as follows (assuming the current direc
"edge_data": {}
}
```
## Change edge type to canonical edge type for partition configuration json
In the upcoming DGL v1.0, we will require the partition configuration file to contain only canonical edge type. This tool is designed to help migrating existing configuration files from old style to new one.
### Sample Usage
```
python tools/change_etype_to_canonical_etype.py --part-config "{configuration file path}"
```
### Requirement
Partition algorithms produce one configuration file and multiple data folders, and each data folder corresponds to a partition. **This tool needs to read from the partition configuration file (specified by the commandline argument) *and* the graph structure data (stored in `graph.dgl` under the data folder) of the first partition.** They can be local files or shared files among network, if you follow this [official tutorial](https://docs.dgl.ai/en/latest/tutorials/dist/1_node_classification.html#sphx-glr-tutorials-dist-1-node-classification-py) for distributed training, you don't need to care about this as all files are shared by every participant through NFS.
**For example, below is a typical data folder expected by this tool:**
```
data_root_dir/
|-- graph_name.json # specified by part_config
|-- part0/
...
|-- graph.dgl
...
```
For more information about partition algorithm, see https://docs.dgl.ai/en/latest/generated/dgl.distributed.partition.partition_graph.html.
### Input arguments
1. *part-config*: The path of partition json file. < **Required**>
### Result
This tool changes the key of ``etypes`` and ``edge_map`` from format ``str`` to ``str:str:str`` and it overwrites the original file instead of creating a new one.
E.g. **File content before running the script**
```json
{
"edge_map": {
"r1": [ [ 0, 6 ], [ 16, 20 ] ],
"r2": [ [ 6, 11 ], [ 20, 25 ] ],
"r3": [ [ 11, 16 ], [ 25, 30 ] ]
},
"etypes": {
"r1": 0,
"r2": 1,
"r3": 2
},
...
}
```
**After running**
```json
{
"edge_map": {
"n1:r1:n2": [ [ 0, 6 ], [ 16, 20 ] ],
"n1:r2:n3": [ [ 6, 11 ], [ 20, 25 ] ],
"n2:r3:n3": [ [ 11, 16 ], [ 25, 30 ] ] },
"etypes": {
"n1:r1:n2": 0,
"n1:r2:n3": 1,
"n2:r3:n3": 2
}
...
}
```
import argparse
import json
import logging
import os
import time
import torch
import dgl
from dgl._ffi.base import DGLError
from dgl.data.utils import load_graphs
from dgl.distributed import load_partition_book
etypes_key = "etypes"
edge_map_key = "edge_map"
canonical_etypes_delimiter = ":"
def convert_conf(part_config):
with open(part_config, "r+", encoding="utf-8") as f:
config = json.load(f)
logging.info("Checking if the provided json file need to be changed.")
if is_old_version(config):
logging.info("Changing the partition configuration file.")
canonical_etypes = etype2canonical_etype(part_config)
# convert edge_map key from etype -> c_etype
new_edge_map = {}
for e_type, range in config[edge_map_key].items():
eid = config[etypes_key][e_type]
c_etype = [
key
for key in canonical_etypes
if canonical_etypes[key] == eid
][0]
new_edge_map[c_etype] = range
config[edge_map_key] = new_edge_map
config[etypes_key] = canonical_etypes
logging.info("Dumping the content to disk.")
f.seek(0)
json.dump(config, f, indent=4)
f.truncate()
def etype2canonical_etype(part_config):
gpb, _, _, etypes = load_partition_book(part_config=part_config, part_id=0)
eid = []
etype_id = []
for etype in etypes:
type_eid = torch.zeros((1,), dtype=torch.int64)
eid.append(gpb.map_to_homo_eid(type_eid, etype))
etype_id.append(etypes[etype])
eid = torch.cat(eid, 0)
etype_id = torch.IntTensor(etype_id)
partition_id = gpb.eid2partid(eid)
canonical_etypes = {}
part_ids = [
part_id
for part_id in range(gpb.num_partitions())
if part_id in partition_id
]
for part_id in part_ids:
seed_edges = torch.masked_select(eid, partition_id == part_id)
seed_edge_tids = torch.masked_select(etype_id, partition_id == part_id)
c_etype = _find_c_etypes_in_partition(
seed_edges, seed_edge_tids, part_id, part_config
)
canonical_etypes.update(c_etype)
return canonical_etypes
def _find_c_etypes_in_partition(
seed_edges, seed_edge_tids, part_id, part_config
):
folder = os.path.dirname(os.path.realpath(part_config))
partition_book = {}
local_g = dgl.DGLGraph()
try:
local_g = load_graphs(f"{folder}/part{part_id}/graph.dgl")[0][0]
partition_book = load_partition_book(
part_config=part_config, part_id=part_id
)[0]
except DGLError as e:
logging.fatal(
f"Graph data of partition {part_id} is requested but not found."
)
raise e
ntypes, etypes = partition_book.ntypes, partition_book.etypes
src, dst = _find_edges(local_g, partition_book, seed_edges)
src_tids, _ = partition_book.map_to_per_ntype(src)
dst_tids, _ = partition_book.map_to_per_ntype(dst)
canonical_etypes = {}
for src_tid, etype_id, dst_tid in zip(src_tids, seed_edge_tids, dst_tids):
src_tid = src_tid.item()
etype_id = etype_id.item()
dst_tid = dst_tid.item()
c_etype = (ntypes[src_tid], etypes[etype_id], ntypes[dst_tid])
canonical_etypes[canonical_etypes_delimiter.join(c_etype)] = etype_id
return canonical_etypes
def _find_edges(local_g, partition_book, seed_edges):
local_eids = partition_book.eid2localeid(seed_edges, partition_book.partid)
local_src, local_dst = local_g.find_edges(local_eids)
global_nid_mapping = local_g.ndata[dgl.NID]
global_src = global_nid_mapping[local_src]
global_dst = global_nid_mapping[local_dst]
return global_src, global_dst
def is_old_version(config):
first_etype = list(config[etypes_key].keys())[0]
etype_tuple = first_etype.split(canonical_etypes_delimiter)
return len(etype_tuple) == 1
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Change edge type in config file from format (str)"
" to (str,str,str), the original file will be overwritten",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--part-config", type=str, help="The file of the partition config"
)
args, _ = parser.parse_known_args()
assert (
args.part_config is not None
), "A user has to specify a partition config file with --part_config."
start = time.time()
convert_conf(args.part_config)
end = time.time()
logging.info(f"elplased time in seconds: {end - start}")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment