Unverified Commit 1990e797 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[Dist] Reduce startup overhead: sort etypes and save in specified formats (#4735)

* [Dist] reduce startup overhead: enable to save in specified formats

* [Dist] reduce startup overhead: sort partitions when generating

* sort csc/csr only whenmultiple etypes

* refine
parent e682fa74
...@@ -10,7 +10,7 @@ import numpy as np ...@@ -10,7 +10,7 @@ import numpy as np
from ..heterograph import DGLHeteroGraph from ..heterograph import DGLHeteroGraph
from ..convert import heterograph as dgl_heterograph from ..convert import heterograph as dgl_heterograph
from ..convert import graph as dgl_graph from ..convert import graph as dgl_graph
from ..transforms import compact_graphs, sort_csr_by_tag, sort_csc_by_tag from ..transforms import compact_graphs
from .. import heterograph_index from .. import heterograph_index
from .. import backend as F from .. import backend as F
from ..base import NID, EID, ETYPE, ALL, is_all from ..base import NID, EID, ETYPE, ALL, is_all
...@@ -345,14 +345,6 @@ class DistGraphServer(KVServer): ...@@ -345,14 +345,6 @@ class DistGraphServer(KVServer):
# Create the graph formats specified the users. # Create the graph formats specified the users.
self.client_g = self.client_g.formats(graph_format) self.client_g = self.client_g.formats(graph_format)
self.client_g.create_formats_() self.client_g.create_formats_()
# Sort underlying matrix beforehand to avoid runtime overhead during sampling.
if len(etypes) > 1:
if 'csr' in graph_format:
self.client_g = sort_csr_by_tag(
self.client_g, tag=self.client_g.edata[ETYPE], tag_type='edge')
if 'csc' in graph_format:
self.client_g = sort_csc_by_tag(
self.client_g, tag=self.client_g.edata[ETYPE], tag_type='edge')
if not disable_shared_mem: if not disable_shared_mem:
self.client_g = _copy_graph_to_shared_mem(self.client_g, graph_name, graph_format) self.client_g = _copy_graph_to_shared_mem(self.client_g, graph_name, graph_format)
......
...@@ -9,6 +9,7 @@ from .. import backend as F ...@@ -9,6 +9,7 @@ from .. import backend as F
from ..base import NID, EID, NTYPE, ETYPE, dgl_warning from ..base import NID, EID, NTYPE, ETYPE, dgl_warning
from ..convert import to_homogeneous from ..convert import to_homogeneous
from ..random import choice as random_choice from ..random import choice as random_choice
from ..transforms import sort_csr_by_tag, sort_csc_by_tag
from ..data.utils import load_graphs, save_graphs, load_tensors, save_tensors from ..data.utils import load_graphs, save_graphs, load_tensors, save_tensors
from ..partition import metis_partition_assignment, partition_graph_with_halo, get_peak_mem from ..partition import metis_partition_assignment, partition_graph_with_halo, get_peak_mem
from .graph_partition_book import BasicPartitionBook, RangePartitionBook from .graph_partition_book import BasicPartitionBook, RangePartitionBook
...@@ -23,8 +24,10 @@ RESERVED_FIELD_DTYPE = { ...@@ -23,8 +24,10 @@ RESERVED_FIELD_DTYPE = {
ETYPE: F.int32 ETYPE: F.int32
} }
def _save_graphs(filename, g_list): def _save_graphs(filename, g_list, formats=None, sort_etypes=False):
'''Format data types in graphs before saving '''Preprocess partitions before saving:
1. format data types.
2. sort csc/csr by tag.
''' '''
for g in g_list: for g in g_list:
for k, dtype in RESERVED_FIELD_DTYPE.items(): for k, dtype in RESERVED_FIELD_DTYPE.items():
...@@ -32,7 +35,14 @@ def _save_graphs(filename, g_list): ...@@ -32,7 +35,14 @@ def _save_graphs(filename, g_list):
g.ndata[k] = F.astype(g.ndata[k], dtype) g.ndata[k] = F.astype(g.ndata[k], dtype)
if k in g.edata: if k in g.edata:
g.edata[k] = F.astype(g.edata[k], dtype) g.edata[k] = F.astype(g.edata[k], dtype)
save_graphs(filename , g_list) for g in g_list:
if (not sort_etypes) or (formats is None):
continue
if 'csr' in formats:
g = sort_csr_by_tag(g, tag=g.edata[ETYPE], tag_type='edge')
if 'csc' in formats:
g = sort_csc_by_tag(g, tag=g.edata[ETYPE], tag_type='edge')
save_graphs(filename , g_list, formats=formats)
def _get_inner_node_mask(graph, ntype_id): def _get_inner_node_mask(graph, ntype_id):
if NTYPE in graph.ndata: if NTYPE in graph.ndata:
...@@ -368,7 +378,8 @@ def _set_trainer_ids(g, sim_g, node_parts): ...@@ -368,7 +378,8 @@ def _set_trainer_ids(g, sim_g, node_parts):
def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method="metis", def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method="metis",
reshuffle=True, balance_ntypes=None, balance_edges=False, return_mapping=False, reshuffle=True, balance_ntypes=None, balance_edges=False, return_mapping=False,
num_trainers_per_machine=1, objtype='cut'): num_trainers_per_machine=1, objtype='cut',
graph_formats=None):
''' Partition a graph for distributed training and store the partitions on files. ''' Partition a graph for distributed training and store the partitions on files.
The partitioning occurs in three steps: 1) run a partition algorithm (e.g., Metis) to The partitioning occurs in three steps: 1) run a partition algorithm (e.g., Metis) to
...@@ -549,6 +560,11 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -549,6 +560,11 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
objtype : str, "cut" or "vol" objtype : str, "cut" or "vol"
Set the objective as edge-cut minimization or communication volume minimization. This Set the objective as edge-cut minimization or communication volume minimization. This
argument is used by the Metis algorithm. argument is used by the Metis algorithm.
graph_formats : str or list[str]
Save partitions in specified formats. It could be any combination of ``coo``,
``csc`` and ``csr``. If not specified, save one format only according to what
format is available. If multiple formats are available, selection priority
from high to low is ``coo``, ``csc``, ``csr``.
Returns Returns
------- -------
...@@ -573,6 +589,9 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -573,6 +589,9 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
... g, node_feats, edge_feats, gpb, graph_name, ntypes_list, etypes_list, ... g, node_feats, edge_feats, gpb, graph_name, ntypes_list, etypes_list,
... ) = dgl.distributed.load_partition('output/test.json', 0) ... ) = dgl.distributed.load_partition('output/test.json', 0)
''' '''
# 'coo' is required for partition
assert 'coo' in np.concatenate(list(g.formats().values())), \
"'coo' format should be allowed for partitioning graph."
def get_homogeneous(g, balance_ntypes): def get_homogeneous(g, balance_ntypes):
if g.is_homogeneous: if g.is_homogeneous:
sim_g = to_homogeneous(g) sim_g = to_homogeneous(g)
...@@ -930,7 +949,9 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -930,7 +949,9 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
save_tensors(node_feat_file, node_feats) save_tensors(node_feat_file, node_feats)
save_tensors(edge_feat_file, edge_feats) save_tensors(edge_feat_file, edge_feats)
_save_graphs(part_graph_file, [part]) sort_etypes = len(g.etypes) > 1
_save_graphs(part_graph_file, [part], formats=graph_formats,
sort_etypes=sort_etypes)
print('Save partitions: {:.3f} seconds, peak memory: {:.3f} GB'.format( print('Save partitions: {:.3f} seconds, peak memory: {:.3f} GB'.format(
time.time() - start, get_peak_mem())) time.time() - start, get_peak_mem()))
......
...@@ -447,7 +447,7 @@ def check_rpc_hetero_sampling_empty_shuffle(tmpdir, num_server): ...@@ -447,7 +447,7 @@ def check_rpc_hetero_sampling_empty_shuffle(tmpdir, num_server):
assert block.number_of_edges() == 0 assert block.number_of_edges() == 0
assert len(block.etypes) == len(g.etypes) assert len(block.etypes) == len(g.etypes)
def check_rpc_hetero_etype_sampling_shuffle(tmpdir, num_server, etype_sorted=False): def check_rpc_hetero_etype_sampling_shuffle(tmpdir, num_server, graph_formats=None):
generate_ip_config("rpc_ip_config.txt", num_server, num_server) generate_ip_config("rpc_ip_config.txt", num_server, num_server)
g = create_random_hetero(dense=True) g = create_random_hetero(dense=True)
...@@ -455,7 +455,8 @@ def check_rpc_hetero_etype_sampling_shuffle(tmpdir, num_server, etype_sorted=Fal ...@@ -455,7 +455,8 @@ def check_rpc_hetero_etype_sampling_shuffle(tmpdir, num_server, etype_sorted=Fal
num_hops = 1 num_hops = 1
orig_nid_map, orig_eid_map = partition_graph(g, 'test_sampling', num_parts, tmpdir, orig_nid_map, orig_eid_map = partition_graph(g, 'test_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', reshuffle=True, return_mapping=True) num_hops=num_hops, part_method='metis', reshuffle=True, return_mapping=True,
graph_formats=graph_formats)
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context('spawn')
...@@ -466,6 +467,9 @@ def check_rpc_hetero_etype_sampling_shuffle(tmpdir, num_server, etype_sorted=Fal ...@@ -466,6 +467,9 @@ def check_rpc_hetero_etype_sampling_shuffle(tmpdir, num_server, etype_sorted=Fal
pserver_list.append(p) pserver_list.append(p)
fanout = 3 fanout = 3
etype_sorted = False
if graph_formats is not None:
etype_sorted = 'csc' in graph_formats or 'csr' in graph_formats
block, gpb = start_hetero_etype_sample_client(0, tmpdir, num_server > 1, fanout, block, gpb = start_hetero_etype_sample_client(0, tmpdir, num_server > 1, fanout,
nodes={'n3': [0, 10, 99, 66, 124, 208]}, nodes={'n3': [0, 10, 99, 66, 124, 208]},
etype_sorted=etype_sorted) etype_sorted=etype_sorted)
...@@ -768,7 +772,9 @@ def test_rpc_sampling_shuffle(num_server): ...@@ -768,7 +772,9 @@ def test_rpc_sampling_shuffle(num_server):
check_rpc_hetero_sampling_shuffle(Path(tmpdirname), num_server) check_rpc_hetero_sampling_shuffle(Path(tmpdirname), num_server)
check_rpc_hetero_sampling_empty_shuffle(Path(tmpdirname), num_server) check_rpc_hetero_sampling_empty_shuffle(Path(tmpdirname), num_server)
check_rpc_hetero_etype_sampling_shuffle(Path(tmpdirname), num_server) check_rpc_hetero_etype_sampling_shuffle(Path(tmpdirname), num_server)
check_rpc_hetero_etype_sampling_shuffle(Path(tmpdirname), num_server, etype_sorted=True) check_rpc_hetero_etype_sampling_shuffle(Path(tmpdirname), num_server, ['csc'])
check_rpc_hetero_etype_sampling_shuffle(Path(tmpdirname), num_server, ['csr'])
check_rpc_hetero_etype_sampling_shuffle(Path(tmpdirname), num_server, ['csc', 'coo'])
check_rpc_hetero_etype_sampling_empty_shuffle(Path(tmpdirname), num_server) check_rpc_hetero_etype_sampling_empty_shuffle(Path(tmpdirname), num_server)
check_rpc_bipartite_sampling_empty(Path(tmpdirname), num_server) check_rpc_bipartite_sampling_empty(Path(tmpdirname), num_server)
check_rpc_bipartite_sampling_shuffle(Path(tmpdirname), num_server) check_rpc_bipartite_sampling_shuffle(Path(tmpdirname), num_server)
......
import dgl
import sys
import os import os
import backend as F
import dgl
import numpy as np import numpy as np
from scipy import sparse as spsp import pytest
from dgl.distributed import partition_graph, load_partition, load_partition_feats
from dgl.distributed.graph_partition_book import BasicPartitionBook, RangePartitionBook, \
NodePartitionPolicy, EdgePartitionPolicy, HeteroDataName
from dgl import function as fn from dgl import function as fn
import backend as F from dgl.distributed import (load_partition, load_partition_feats,
import unittest partition_graph)
import tempfile from dgl.distributed.graph_partition_book import (BasicPartitionBook,
EdgePartitionPolicy,
HeteroDataName,
NodePartitionPolicy,
RangePartitionBook)
from dgl.distributed.partition import (
RESERVED_FIELD_DTYPE,
_get_inner_node_mask,
_get_inner_edge_mask
)
from scipy import sparse as spsp
from utils import reset_envs from utils import reset_envs
from dgl.distributed.partition import RESERVED_FIELD_DTYPE
def _verify_partition_data_types(part_g): def _verify_partition_data_types(part_g):
for k, dtype in RESERVED_FIELD_DTYPE.items(): for k, dtype in RESERVED_FIELD_DTYPE.items():
...@@ -20,48 +28,43 @@ def _verify_partition_data_types(part_g): ...@@ -20,48 +28,43 @@ def _verify_partition_data_types(part_g):
if k in part_g.edata: if k in part_g.edata:
assert part_g.edata[k].dtype == dtype assert part_g.edata[k].dtype == dtype
def _get_inner_node_mask(graph, ntype_id):
if dgl.NTYPE in graph.ndata:
dtype = F.dtype(graph.ndata['inner_node'])
return graph.ndata['inner_node'] * F.astype(graph.ndata[dgl.NTYPE] == ntype_id, dtype) == 1
else:
return graph.ndata['inner_node'] == 1
def _get_inner_edge_mask(graph, etype_id):
if dgl.ETYPE in graph.edata:
dtype = F.dtype(graph.edata['inner_edge'])
return graph.edata['inner_edge'] * F.astype(graph.edata[dgl.ETYPE] == etype_id, dtype) == 1
else:
return graph.edata['inner_edge'] == 1
def _get_part_ranges(id_ranges): def _verify_partition_formats(part_g, formats):
if isinstance(id_ranges, dict): # verify saved graph formats
return {key:np.concatenate([np.array(l) for l in id_ranges[key]]).reshape(-1, 2) \ if formats is None:
for key in id_ranges} assert "coo" in part_g.formats()["created"]
else: else:
return np.concatenate([np.array(l) for l in id_range[key]]).reshape(-1, 2) for format in formats:
assert format in part_g.formats()["created"]
def create_random_graph(n): def create_random_graph(n):
arr = (spsp.random(n, n, density=0.001, format='coo', random_state=100) != 0).astype(np.int64) arr = (
spsp.random(n, n, density=0.001, format="coo", random_state=100) != 0
).astype(np.int64)
return dgl.from_scipy(arr) return dgl.from_scipy(arr)
def create_random_hetero(): def create_random_hetero():
num_nodes = {'n1': 1000, 'n2': 1010, 'n3': 1020} num_nodes = {"n1": 1000, "n2": 1010, "n3": 1020}
etypes = [('n1', 'r1', 'n2'), etypes = [("n1", "r1", "n2"), ("n1", "r2", "n3"), ("n2", "r3", "n3")]
('n1', 'r2', 'n3'),
('n2', 'r3', 'n3')]
edges = {} edges = {}
for etype in etypes: for etype in etypes:
src_ntype, _, dst_ntype = etype src_ntype, _, dst_ntype = etype
arr = spsp.random(num_nodes[src_ntype], num_nodes[dst_ntype], density=0.001, format='coo', arr = spsp.random(
random_state=100) num_nodes[src_ntype],
num_nodes[dst_ntype],
density=0.001,
format="coo",
random_state=100,
)
edges[etype] = (arr.row, arr.col) edges[etype] = (arr.row, arr.col)
return dgl.heterograph(edges, num_nodes) return dgl.heterograph(edges, num_nodes)
def verify_hetero_graph(g, parts): def verify_hetero_graph(g, parts):
num_nodes = {ntype:0 for ntype in g.ntypes} num_nodes = {ntype: 0 for ntype in g.ntypes}
num_edges = {etype:0 for etype in g.etypes} num_edges = {etype: 0 for etype in g.etypes}
for part in parts: for part in parts:
assert len(g.ntypes) == len(F.unique(part.ndata[dgl.NTYPE])) assert len(g.ntypes) == len(F.unique(part.ndata[dgl.NTYPE]))
assert len(g.etypes) == len(F.unique(part.edata[dgl.ETYPE])) assert len(g.etypes) == len(F.unique(part.edata[dgl.ETYPE]))
...@@ -77,17 +80,25 @@ def verify_hetero_graph(g, parts): ...@@ -77,17 +80,25 @@ def verify_hetero_graph(g, parts):
num_edges[etype] += num_inner_edges num_edges[etype] += num_inner_edges
# Verify the number of nodes are correct. # Verify the number of nodes are correct.
for ntype in g.ntypes: for ntype in g.ntypes:
print('node {}: {}, {}'.format(ntype, g.number_of_nodes(ntype), num_nodes[ntype])) print(
"node {}: {}, {}".format(
ntype, g.number_of_nodes(ntype), num_nodes[ntype]
)
)
assert g.number_of_nodes(ntype) == num_nodes[ntype] assert g.number_of_nodes(ntype) == num_nodes[ntype]
# Verify the number of edges are correct. # Verify the number of edges are correct.
for etype in g.etypes: for etype in g.etypes:
print('edge {}: {}, {}'.format(etype, g.number_of_edges(etype), num_edges[etype])) print(
"edge {}: {}, {}".format(
etype, g.number_of_edges(etype), num_edges[etype]
)
)
assert g.number_of_edges(etype) == num_edges[etype] assert g.number_of_edges(etype) == num_edges[etype]
nids = {ntype:[] for ntype in g.ntypes} nids = {ntype: [] for ntype in g.ntypes}
eids = {etype:[] for etype in g.etypes} eids = {etype: [] for etype in g.etypes}
for part in parts: for part in parts:
_, _, eid = part.edges(form='all') _, _, eid = part.edges(form="all")
etype_arr = F.gather_row(part.edata[dgl.ETYPE], eid) etype_arr = F.gather_row(part.edata[dgl.ETYPE], eid)
eid_type = F.gather_row(part.edata[dgl.EID], eid) eid_type = F.gather_row(part.edata[dgl.EID], eid)
for etype in g.etypes: for etype in g.etypes:
...@@ -95,16 +106,27 @@ def verify_hetero_graph(g, parts): ...@@ -95,16 +106,27 @@ def verify_hetero_graph(g, parts):
eids[etype].append(F.boolean_mask(eid_type, etype_arr == etype_id)) eids[etype].append(F.boolean_mask(eid_type, etype_arr == etype_id))
# Make sure edge Ids fall into a range. # Make sure edge Ids fall into a range.
inner_edge_mask = _get_inner_edge_mask(part, etype_id) inner_edge_mask = _get_inner_edge_mask(part, etype_id)
inner_eids = np.sort(F.asnumpy(F.boolean_mask(part.edata[dgl.EID], inner_edge_mask))) inner_eids = np.sort(
assert np.all(inner_eids == np.arange(inner_eids[0], inner_eids[-1] + 1)) F.asnumpy(F.boolean_mask(part.edata[dgl.EID], inner_edge_mask))
)
assert np.all(
inner_eids == np.arange(inner_eids[0], inner_eids[-1] + 1)
)
for ntype in g.ntypes: for ntype in g.ntypes:
ntype_id = g.get_ntype_id(ntype) ntype_id = g.get_ntype_id(ntype)
# Make sure inner nodes have Ids fall into a range. # Make sure inner nodes have Ids fall into a range.
inner_node_mask = _get_inner_node_mask(part, ntype_id) inner_node_mask = _get_inner_node_mask(part, ntype_id)
inner_nids = F.boolean_mask(part.ndata[dgl.NID], inner_node_mask) inner_nids = F.boolean_mask(part.ndata[dgl.NID], inner_node_mask)
assert np.all(F.asnumpy(inner_nids == F.arange(F.as_scalar(inner_nids[0]), assert np.all(
F.as_scalar(inner_nids[-1]) + 1))) F.asnumpy(
inner_nids
== F.arange(
F.as_scalar(inner_nids[0]),
F.as_scalar(inner_nids[-1]) + 1,
)
)
)
nids[ntype].append(inner_nids) nids[ntype].append(inner_nids)
for ntype in nids: for ntype in nids:
...@@ -118,11 +140,14 @@ def verify_hetero_graph(g, parts): ...@@ -118,11 +140,14 @@ def verify_hetero_graph(g, parts):
assert len(uniq_ids) == g.number_of_edges(etype) assert len(uniq_ids) == g.number_of_edges(etype)
# TODO(zhengda) this doesn't check 'part_id' # TODO(zhengda) this doesn't check 'part_id'
def verify_graph_feats(g, gpb, part, node_feats, edge_feats, orig_nids, orig_eids):
def verify_graph_feats(
g, gpb, part, node_feats, edge_feats, orig_nids, orig_eids
):
for ntype in g.ntypes: for ntype in g.ntypes:
ntype_id = g.get_ntype_id(ntype) ntype_id = g.get_ntype_id(ntype)
inner_node_mask = _get_inner_node_mask(part, ntype_id) inner_node_mask = _get_inner_node_mask(part, ntype_id)
inner_nids = F.boolean_mask(part.ndata[dgl.NID],inner_node_mask) inner_nids = F.boolean_mask(part.ndata[dgl.NID], inner_node_mask)
ntype_ids, inner_type_nids = gpb.map_to_per_ntype(inner_nids) ntype_ids, inner_type_nids = gpb.map_to_per_ntype(inner_nids)
partid = gpb.nid2partid(inner_type_nids, ntype) partid = gpb.nid2partid(inner_type_nids, ntype)
assert np.all(F.asnumpy(ntype_ids) == ntype_id) assert np.all(F.asnumpy(ntype_ids) == ntype_id)
...@@ -132,16 +157,16 @@ def verify_graph_feats(g, gpb, part, node_feats, edge_feats, orig_nids, orig_eid ...@@ -132,16 +157,16 @@ def verify_graph_feats(g, gpb, part, node_feats, edge_feats, orig_nids, orig_eid
local_nids = gpb.nid2localnid(inner_type_nids, gpb.partid, ntype) local_nids = gpb.nid2localnid(inner_type_nids, gpb.partid, ntype)
for name in g.nodes[ntype].data: for name in g.nodes[ntype].data:
if name in [dgl.NID, 'inner_node']: if name in [dgl.NID, "inner_node"]:
continue continue
true_feats = F.gather_row(g.nodes[ntype].data[name], orig_id) true_feats = F.gather_row(g.nodes[ntype].data[name], orig_id)
ndata = F.gather_row(node_feats[ntype + '/' + name], local_nids) ndata = F.gather_row(node_feats[ntype + "/" + name], local_nids)
assert np.all(F.asnumpy(ndata == true_feats)) assert np.all(F.asnumpy(ndata == true_feats))
for etype in g.etypes: for etype in g.etypes:
etype_id = g.get_etype_id(etype) etype_id = g.get_etype_id(etype)
inner_edge_mask = _get_inner_edge_mask(part, etype_id) inner_edge_mask = _get_inner_edge_mask(part, etype_id)
inner_eids = F.boolean_mask(part.edata[dgl.EID],inner_edge_mask) inner_eids = F.boolean_mask(part.edata[dgl.EID], inner_edge_mask)
etype_ids, inner_type_eids = gpb.map_to_per_etype(inner_eids) etype_ids, inner_type_eids = gpb.map_to_per_etype(inner_eids)
partid = gpb.eid2partid(inner_type_eids, etype) partid = gpb.eid2partid(inner_type_eids, etype)
assert np.all(F.asnumpy(etype_ids) == etype_id) assert np.all(F.asnumpy(etype_ids) == etype_id)
...@@ -151,22 +176,43 @@ def verify_graph_feats(g, gpb, part, node_feats, edge_feats, orig_nids, orig_eid ...@@ -151,22 +176,43 @@ def verify_graph_feats(g, gpb, part, node_feats, edge_feats, orig_nids, orig_eid
local_eids = gpb.eid2localeid(inner_type_eids, gpb.partid, etype) local_eids = gpb.eid2localeid(inner_type_eids, gpb.partid, etype)
for name in g.edges[etype].data: for name in g.edges[etype].data:
if name in [dgl.EID, 'inner_edge']: if name in [dgl.EID, "inner_edge"]:
continue continue
true_feats = F.gather_row(g.edges[etype].data[name], orig_id) true_feats = F.gather_row(g.edges[etype].data[name], orig_id)
edata = F.gather_row(edge_feats[etype + '/' + name], local_eids) edata = F.gather_row(edge_feats[etype + "/" + name], local_eids)
assert np.all(F.asnumpy(edata == true_feats)) assert np.all(F.asnumpy(edata == true_feats))
def check_hetero_partition(hg, part_method, num_parts=4, num_trainers_per_machine=1, load_feats=True):
hg.nodes['n1'].data['labels'] = F.arange(0, hg.number_of_nodes('n1')) def check_hetero_partition(
hg.nodes['n1'].data['feats'] = F.tensor(np.random.randn(hg.number_of_nodes('n1'), 10), F.float32) hg,
hg.edges['r1'].data['feats'] = F.tensor(np.random.randn(hg.number_of_edges('r1'), 10), F.float32) part_method,
hg.edges['r1'].data['labels'] = F.arange(0, hg.number_of_edges('r1')) num_parts=4,
num_trainers_per_machine=1,
load_feats=True,
graph_formats=None,
):
hg.nodes["n1"].data["labels"] = F.arange(0, hg.number_of_nodes("n1"))
hg.nodes["n1"].data["feats"] = F.tensor(
np.random.randn(hg.number_of_nodes("n1"), 10), F.float32
)
hg.edges["r1"].data["feats"] = F.tensor(
np.random.randn(hg.number_of_edges("r1"), 10), F.float32
)
hg.edges["r1"].data["labels"] = F.arange(0, hg.number_of_edges("r1"))
num_hops = 1 num_hops = 1
orig_nids, orig_eids = partition_graph(hg, 'test', num_parts, '/tmp/partition', num_hops=num_hops, orig_nids, orig_eids = partition_graph(
part_method=part_method, reshuffle=True, return_mapping=True, hg,
num_trainers_per_machine=num_trainers_per_machine) "test",
num_parts,
"/tmp/partition",
num_hops=num_hops,
part_method=part_method,
reshuffle=True,
return_mapping=True,
num_trainers_per_machine=num_trainers_per_machine,
graph_formats=graph_formats,
)
assert len(orig_nids) == len(hg.ntypes) assert len(orig_nids) == len(hg.ntypes)
assert len(orig_eids) == len(hg.etypes) assert len(orig_eids) == len(hg.etypes)
for ntype in hg.ntypes: for ntype in hg.ntypes:
...@@ -178,23 +224,31 @@ def check_hetero_partition(hg, part_method, num_parts=4, num_trainers_per_machin ...@@ -178,23 +224,31 @@ def check_hetero_partition(hg, part_method, num_parts=4, num_trainers_per_machin
shuffled_elabels = [] shuffled_elabels = []
for i in range(num_parts): for i in range(num_parts):
part_g, node_feats, edge_feats, gpb, _, ntypes, etypes = load_partition( part_g, node_feats, edge_feats, gpb, _, ntypes, etypes = load_partition(
'/tmp/partition/test.json', i, load_feats=load_feats) "/tmp/partition/test.json", i, load_feats=load_feats
)
_verify_partition_data_types(part_g) _verify_partition_data_types(part_g)
_verify_partition_formats(part_g, graph_formats)
if not load_feats: if not load_feats:
assert not node_feats assert not node_feats
assert not edge_feats assert not edge_feats
node_feats, edge_feats = load_partition_feats('/tmp/partition/test.json', i) node_feats, edge_feats = load_partition_feats(
"/tmp/partition/test.json", i
)
if num_trainers_per_machine > 1: if num_trainers_per_machine > 1:
for ntype in hg.ntypes: for ntype in hg.ntypes:
name = ntype + '/trainer_id' name = ntype + "/trainer_id"
assert name in node_feats assert name in node_feats
part_ids = F.floor_div(node_feats[name], num_trainers_per_machine) part_ids = F.floor_div(
node_feats[name], num_trainers_per_machine
)
assert np.all(F.asnumpy(part_ids) == i) assert np.all(F.asnumpy(part_ids) == i)
for etype in hg.etypes: for etype in hg.etypes:
name = etype + '/trainer_id' name = etype + "/trainer_id"
assert name in edge_feats assert name in edge_feats
part_ids = F.floor_div(edge_feats[name], num_trainers_per_machine) part_ids = F.floor_div(
edge_feats[name], num_trainers_per_machine
)
assert np.all(F.asnumpy(part_ids) == i) assert np.all(F.asnumpy(part_ids) == i)
# Verify the mapping between the reshuffled IDs and the original IDs. # Verify the mapping between the reshuffled IDs and the original IDs.
# These are partition-local IDs. # These are partition-local IDs.
...@@ -210,9 +264,13 @@ def check_hetero_partition(hg, part_method, num_parts=4, num_trainers_per_machin ...@@ -210,9 +264,13 @@ def check_hetero_partition(hg, part_method, num_parts=4, num_trainers_per_machin
# These are original per-type IDs. # These are original per-type IDs.
for etype_id, etype in enumerate(hg.etypes): for etype_id, etype in enumerate(hg.etypes):
part_src_ids1 = F.boolean_mask(part_src_ids, etype_ids == etype_id) part_src_ids1 = F.boolean_mask(part_src_ids, etype_ids == etype_id)
src_ntype_ids1 = F.boolean_mask(src_ntype_ids, etype_ids == etype_id) src_ntype_ids1 = F.boolean_mask(
src_ntype_ids, etype_ids == etype_id
)
part_dst_ids1 = F.boolean_mask(part_dst_ids, etype_ids == etype_id) part_dst_ids1 = F.boolean_mask(part_dst_ids, etype_ids == etype_id)
dst_ntype_ids1 = F.boolean_mask(dst_ntype_ids, etype_ids == etype_id) dst_ntype_ids1 = F.boolean_mask(
dst_ntype_ids, etype_ids == etype_id
)
part_eids1 = F.boolean_mask(part_eids, etype_ids == etype_id) part_eids1 = F.boolean_mask(part_eids, etype_ids == etype_id)
assert np.all(F.asnumpy(src_ntype_ids1 == src_ntype_ids1[0])) assert np.all(F.asnumpy(src_ntype_ids1 == src_ntype_ids1[0]))
assert np.all(F.asnumpy(dst_ntype_ids1 == dst_ntype_ids1[0])) assert np.all(F.asnumpy(dst_ntype_ids1 == dst_ntype_ids1[0]))
...@@ -225,54 +283,88 @@ def check_hetero_partition(hg, part_method, num_parts=4, num_trainers_per_machin ...@@ -225,54 +283,88 @@ def check_hetero_partition(hg, part_method, num_parts=4, num_trainers_per_machin
assert len(orig_eids1) == len(orig_eids2) assert len(orig_eids1) == len(orig_eids2)
assert np.all(F.asnumpy(orig_eids1) == F.asnumpy(orig_eids2)) assert np.all(F.asnumpy(orig_eids1) == F.asnumpy(orig_eids2))
parts.append(part_g) parts.append(part_g)
verify_graph_feats(hg, gpb, part_g, node_feats, edge_feats, orig_nids, orig_eids) verify_graph_feats(
hg, gpb, part_g, node_feats, edge_feats, orig_nids, orig_eids
)
shuffled_labels.append(node_feats['n1/labels']) shuffled_labels.append(node_feats["n1/labels"])
shuffled_elabels.append(edge_feats['r1/labels']) shuffled_elabels.append(edge_feats["r1/labels"])
verify_hetero_graph(hg, parts) verify_hetero_graph(hg, parts)
shuffled_labels = F.asnumpy(F.cat(shuffled_labels, 0)) shuffled_labels = F.asnumpy(F.cat(shuffled_labels, 0))
shuffled_elabels = F.asnumpy(F.cat(shuffled_elabels, 0)) shuffled_elabels = F.asnumpy(F.cat(shuffled_elabels, 0))
orig_labels = np.zeros(shuffled_labels.shape, dtype=shuffled_labels.dtype) orig_labels = np.zeros(shuffled_labels.shape, dtype=shuffled_labels.dtype)
orig_elabels = np.zeros(shuffled_elabels.shape, dtype=shuffled_elabels.dtype) orig_elabels = np.zeros(
orig_labels[F.asnumpy(orig_nids['n1'])] = shuffled_labels shuffled_elabels.shape, dtype=shuffled_elabels.dtype
orig_elabels[F.asnumpy(orig_eids['r1'])] = shuffled_elabels )
assert np.all(orig_labels == F.asnumpy(hg.nodes['n1'].data['labels'])) orig_labels[F.asnumpy(orig_nids["n1"])] = shuffled_labels
assert np.all(orig_elabels == F.asnumpy(hg.edges['r1'].data['labels'])) orig_elabels[F.asnumpy(orig_eids["r1"])] = shuffled_elabels
assert np.all(orig_labels == F.asnumpy(hg.nodes["n1"].data["labels"]))
def check_partition(g, part_method, reshuffle, num_parts=4, num_trainers_per_machine=1, load_feats=True): assert np.all(orig_elabels == F.asnumpy(hg.edges["r1"].data["labels"]))
g.ndata['labels'] = F.arange(0, g.number_of_nodes())
g.ndata['feats'] = F.tensor(np.random.randn(g.number_of_nodes(), 10), F.float32)
g.edata['feats'] = F.tensor(np.random.randn(g.number_of_edges(), 10), F.float32) def check_partition(
g.update_all(fn.copy_src('feats', 'msg'), fn.sum('msg', 'h')) g,
g.update_all(fn.copy_edge('feats', 'msg'), fn.sum('msg', 'eh')) part_method,
reshuffle,
num_parts=4,
num_trainers_per_machine=1,
load_feats=True,
graph_formats=None,
):
g.ndata["labels"] = F.arange(0, g.number_of_nodes())
g.ndata["feats"] = F.tensor(
np.random.randn(g.number_of_nodes(), 10), F.float32
)
g.edata["feats"] = F.tensor(
np.random.randn(g.number_of_edges(), 10), F.float32
)
g.update_all(fn.copy_src("feats", "msg"), fn.sum("msg", "h"))
g.update_all(fn.copy_edge("feats", "msg"), fn.sum("msg", "eh"))
num_hops = 2 num_hops = 2
orig_nids, orig_eids = partition_graph(g, 'test', num_parts, '/tmp/partition', num_hops=num_hops, orig_nids, orig_eids = partition_graph(
part_method=part_method, reshuffle=reshuffle, return_mapping=True, g,
num_trainers_per_machine=num_trainers_per_machine) "test",
num_parts,
"/tmp/partition",
num_hops=num_hops,
part_method=part_method,
reshuffle=reshuffle,
return_mapping=True,
num_trainers_per_machine=num_trainers_per_machine,
graph_formats=graph_formats,
)
part_sizes = [] part_sizes = []
shuffled_labels = [] shuffled_labels = []
shuffled_edata = [] shuffled_edata = []
for i in range(num_parts): for i in range(num_parts):
part_g, node_feats, edge_feats, gpb, _, ntypes, etypes = load_partition( part_g, node_feats, edge_feats, gpb, _, ntypes, etypes = load_partition(
'/tmp/partition/test.json', i, load_feats=load_feats) "/tmp/partition/test.json", i, load_feats=load_feats
)
_verify_partition_data_types(part_g) _verify_partition_data_types(part_g)
_verify_partition_formats(part_g, graph_formats)
if not load_feats: if not load_feats:
assert not node_feats assert not node_feats
assert not edge_feats assert not edge_feats
node_feats, edge_feats = load_partition_feats('/tmp/partition/test.json', i) node_feats, edge_feats = load_partition_feats(
"/tmp/partition/test.json", i
)
if num_trainers_per_machine > 1: if num_trainers_per_machine > 1:
for ntype in g.ntypes: for ntype in g.ntypes:
name = ntype + '/trainer_id' name = ntype + "/trainer_id"
assert name in node_feats assert name in node_feats
part_ids = F.floor_div(node_feats[name], num_trainers_per_machine) part_ids = F.floor_div(
node_feats[name], num_trainers_per_machine
)
assert np.all(F.asnumpy(part_ids) == i) assert np.all(F.asnumpy(part_ids) == i)
for etype in g.etypes: for etype in g.etypes:
name = etype + '/trainer_id' name = etype + "/trainer_id"
assert name in edge_feats assert name in edge_feats
part_ids = F.floor_div(edge_feats[name], num_trainers_per_machine) part_ids = F.floor_div(
edge_feats[name], num_trainers_per_machine
)
assert np.all(F.asnumpy(part_ids) == i) assert np.all(F.asnumpy(part_ids) == i)
# Check the metadata # Check the metadata
...@@ -282,33 +374,41 @@ def check_partition(g, part_method, reshuffle, num_parts=4, num_trainers_per_mac ...@@ -282,33 +374,41 @@ def check_partition(g, part_method, reshuffle, num_parts=4, num_trainers_per_mac
assert gpb.num_partitions() == num_parts assert gpb.num_partitions() == num_parts
gpb_meta = gpb.metadata() gpb_meta = gpb.metadata()
assert len(gpb_meta) == num_parts assert len(gpb_meta) == num_parts
assert len(gpb.partid2nids(i)) == gpb_meta[i]['num_nodes'] assert len(gpb.partid2nids(i)) == gpb_meta[i]["num_nodes"]
assert len(gpb.partid2eids(i)) == gpb_meta[i]['num_edges'] assert len(gpb.partid2eids(i)) == gpb_meta[i]["num_edges"]
part_sizes.append((gpb_meta[i]['num_nodes'], gpb_meta[i]['num_edges'])) part_sizes.append((gpb_meta[i]["num_nodes"], gpb_meta[i]["num_edges"]))
nid = F.boolean_mask(part_g.ndata[dgl.NID], part_g.ndata['inner_node']) nid = F.boolean_mask(part_g.ndata[dgl.NID], part_g.ndata["inner_node"])
local_nid = gpb.nid2localnid(nid, i) local_nid = gpb.nid2localnid(nid, i)
assert F.dtype(local_nid) in (F.int64, F.int32) assert F.dtype(local_nid) in (F.int64, F.int32)
assert np.all(F.asnumpy(local_nid) == np.arange(0, len(local_nid))) assert np.all(F.asnumpy(local_nid) == np.arange(0, len(local_nid)))
eid = F.boolean_mask(part_g.edata[dgl.EID], part_g.edata['inner_edge']) eid = F.boolean_mask(part_g.edata[dgl.EID], part_g.edata["inner_edge"])
local_eid = gpb.eid2localeid(eid, i) local_eid = gpb.eid2localeid(eid, i)
assert F.dtype(local_eid) in (F.int64, F.int32) assert F.dtype(local_eid) in (F.int64, F.int32)
assert np.all(F.asnumpy(local_eid) == np.arange(0, len(local_eid))) assert np.all(F.asnumpy(local_eid) == np.arange(0, len(local_eid)))
# Check the node map. # Check the node map.
local_nodes = F.boolean_mask(part_g.ndata[dgl.NID], part_g.ndata['inner_node']) local_nodes = F.boolean_mask(
llocal_nodes = F.nonzero_1d(part_g.ndata['inner_node']) part_g.ndata[dgl.NID], part_g.ndata["inner_node"]
)
llocal_nodes = F.nonzero_1d(part_g.ndata["inner_node"])
local_nodes1 = gpb.partid2nids(i) local_nodes1 = gpb.partid2nids(i)
assert F.dtype(local_nodes1) in (F.int32, F.int64) assert F.dtype(local_nodes1) in (F.int32, F.int64)
assert np.all(np.sort(F.asnumpy(local_nodes)) == np.sort(F.asnumpy(local_nodes1))) assert np.all(
np.sort(F.asnumpy(local_nodes)) == np.sort(F.asnumpy(local_nodes1))
)
assert np.all(F.asnumpy(llocal_nodes) == np.arange(len(llocal_nodes))) assert np.all(F.asnumpy(llocal_nodes) == np.arange(len(llocal_nodes)))
# Check the edge map. # Check the edge map.
local_edges = F.boolean_mask(part_g.edata[dgl.EID], part_g.edata['inner_edge']) local_edges = F.boolean_mask(
llocal_edges = F.nonzero_1d(part_g.edata['inner_edge']) part_g.edata[dgl.EID], part_g.edata["inner_edge"]
)
llocal_edges = F.nonzero_1d(part_g.edata["inner_edge"])
local_edges1 = gpb.partid2eids(i) local_edges1 = gpb.partid2eids(i)
assert F.dtype(local_edges1) in (F.int32, F.int64) assert F.dtype(local_edges1) in (F.int32, F.int64)
assert np.all(np.sort(F.asnumpy(local_edges)) == np.sort(F.asnumpy(local_edges1))) assert np.all(
np.sort(F.asnumpy(local_edges)) == np.sort(F.asnumpy(local_edges1))
)
assert np.all(F.asnumpy(llocal_edges) == np.arange(len(llocal_edges))) assert np.all(F.asnumpy(llocal_edges) == np.arange(len(llocal_edges)))
# Verify the mapping between the reshuffled IDs and the original IDs. # Verify the mapping between the reshuffled IDs and the original IDs.
...@@ -326,49 +426,63 @@ def check_partition(g, part_method, reshuffle, num_parts=4, num_trainers_per_mac ...@@ -326,49 +426,63 @@ def check_partition(g, part_method, reshuffle, num_parts=4, num_trainers_per_mac
if reshuffle: if reshuffle:
local_orig_nids = orig_nids[part_g.ndata[dgl.NID]] local_orig_nids = orig_nids[part_g.ndata[dgl.NID]]
local_orig_eids = orig_eids[part_g.edata[dgl.EID]] local_orig_eids = orig_eids[part_g.edata[dgl.EID]]
part_g.ndata['feats'] = F.gather_row(g.ndata['feats'], local_orig_nids) part_g.ndata["feats"] = F.gather_row(
part_g.edata['feats'] = F.gather_row(g.edata['feats'], local_orig_eids) g.ndata["feats"], local_orig_nids
)
part_g.edata["feats"] = F.gather_row(
g.edata["feats"], local_orig_eids
)
local_nodes = orig_nids[local_nodes] local_nodes = orig_nids[local_nodes]
local_edges = orig_eids[local_edges] local_edges = orig_eids[local_edges]
else: else:
part_g.ndata['feats'] = F.gather_row(g.ndata['feats'], part_g.ndata[dgl.NID]) part_g.ndata["feats"] = F.gather_row(
part_g.edata['feats'] = F.gather_row(g.edata['feats'], part_g.edata[dgl.NID]) g.ndata["feats"], part_g.ndata[dgl.NID]
)
part_g.update_all(fn.copy_src('feats', 'msg'), fn.sum('msg', 'h')) part_g.edata["feats"] = F.gather_row(
part_g.update_all(fn.copy_edge('feats', 'msg'), fn.sum('msg', 'eh')) g.edata["feats"], part_g.edata[dgl.NID]
assert F.allclose(F.gather_row(g.ndata['h'], local_nodes), )
F.gather_row(part_g.ndata['h'], llocal_nodes))
assert F.allclose(F.gather_row(g.ndata['eh'], local_nodes), part_g.update_all(fn.copy_src("feats", "msg"), fn.sum("msg", "h"))
F.gather_row(part_g.ndata['eh'], llocal_nodes)) part_g.update_all(fn.copy_edge("feats", "msg"), fn.sum("msg", "eh"))
assert F.allclose(
for name in ['labels', 'feats']: F.gather_row(g.ndata["h"], local_nodes),
assert '_N/' + name in node_feats F.gather_row(part_g.ndata["h"], llocal_nodes),
assert node_feats['_N/' + name].shape[0] == len(local_nodes) )
assert F.allclose(
F.gather_row(g.ndata["eh"], local_nodes),
F.gather_row(part_g.ndata["eh"], llocal_nodes),
)
for name in ["labels", "feats"]:
assert "_N/" + name in node_feats
assert node_feats["_N/" + name].shape[0] == len(local_nodes)
true_feats = F.gather_row(g.ndata[name], local_nodes) true_feats = F.gather_row(g.ndata[name], local_nodes)
ndata = F.gather_row(node_feats['_N/' + name], local_nid) ndata = F.gather_row(node_feats["_N/" + name], local_nid)
assert np.all(F.asnumpy(true_feats) == F.asnumpy(ndata)) assert np.all(F.asnumpy(true_feats) == F.asnumpy(ndata))
for name in ['feats']: for name in ["feats"]:
assert '_E/' + name in edge_feats assert "_E/" + name in edge_feats
assert edge_feats['_E/' + name].shape[0] == len(local_edges) assert edge_feats["_E/" + name].shape[0] == len(local_edges)
true_feats = F.gather_row(g.edata[name], local_edges) true_feats = F.gather_row(g.edata[name], local_edges)
edata = F.gather_row(edge_feats['_E/' + name], local_eid) edata = F.gather_row(edge_feats["_E/" + name], local_eid)
assert np.all(F.asnumpy(true_feats) == F.asnumpy(edata)) assert np.all(F.asnumpy(true_feats) == F.asnumpy(edata))
# This only works if node/edge IDs are shuffled. # This only works if node/edge IDs are shuffled.
if reshuffle: if reshuffle:
shuffled_labels.append(node_feats['_N/labels']) shuffled_labels.append(node_feats["_N/labels"])
shuffled_edata.append(edge_feats['_E/feats']) shuffled_edata.append(edge_feats["_E/feats"])
# Verify that we can reconstruct node/edge data for original IDs. # Verify that we can reconstruct node/edge data for original IDs.
if reshuffle: if reshuffle:
shuffled_labels = F.asnumpy(F.cat(shuffled_labels, 0)) shuffled_labels = F.asnumpy(F.cat(shuffled_labels, 0))
shuffled_edata = F.asnumpy(F.cat(shuffled_edata, 0)) shuffled_edata = F.asnumpy(F.cat(shuffled_edata, 0))
orig_labels = np.zeros(shuffled_labels.shape, dtype=shuffled_labels.dtype) orig_labels = np.zeros(
shuffled_labels.shape, dtype=shuffled_labels.dtype
)
orig_edata = np.zeros(shuffled_edata.shape, dtype=shuffled_edata.dtype) orig_edata = np.zeros(shuffled_edata.shape, dtype=shuffled_edata.dtype)
orig_labels[F.asnumpy(orig_nids)] = shuffled_labels orig_labels[F.asnumpy(orig_nids)] = shuffled_labels
orig_edata[F.asnumpy(orig_eids)] = shuffled_edata orig_edata[F.asnumpy(orig_eids)] = shuffled_edata
assert np.all(orig_labels == F.asnumpy(g.ndata['labels'])) assert np.all(orig_labels == F.asnumpy(g.ndata["labels"]))
assert np.all(orig_edata == F.asnumpy(g.edata['feats'])) assert np.all(orig_edata == F.asnumpy(g.edata["feats"]))
if reshuffle: if reshuffle:
node_map = [] node_map = []
...@@ -385,52 +499,48 @@ def check_partition(g, part_method, reshuffle, num_parts=4, num_trainers_per_mac ...@@ -385,52 +499,48 @@ def check_partition(g, part_method, reshuffle, num_parts=4, num_trainers_per_mac
assert F.dtype(eid2pid) in (F.int32, F.int64) assert F.dtype(eid2pid) in (F.int32, F.int64)
assert np.all(F.asnumpy(eid2pid) == edge_map) assert np.all(F.asnumpy(eid2pid) == edge_map)
def check_hetero_partition_single_etype(num_trainers):
user_ids = np.arange(1000)
item_ids = np.arange(2000)
num_edges = 3 * 1000
src_ids = np.random.choice(user_ids, size=num_edges)
dst_ids = np.random.choice(item_ids, size=num_edges)
hg = dgl.heterograph({('user', 'like', 'item'): (src_ids, dst_ids)})
with tempfile.TemporaryDirectory() as test_dir:
orig_nids, orig_eids = partition_graph(
hg, 'test', 2, test_dir, num_trainers_per_machine=num_trainers, return_mapping=True)
assert len(orig_nids) == len(hg.ntypes)
assert len(orig_eids) == len(hg.etypes)
for ntype in hg.ntypes:
assert len(orig_nids[ntype]) == hg.number_of_nodes(ntype)
for etype in hg.etypes:
assert len(orig_eids[etype]) == hg.number_of_edges(etype)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet') @pytest.mark.parametrize("part_method", ["metis", "random"])
def test_partition(): @pytest.mark.parametrize("reshuffle", [True, False])
os.environ['DGL_DIST_DEBUG'] = '1' @pytest.mark.parametrize("num_parts", [1, 4])
@pytest.mark.parametrize("num_trainers_per_machine", [1, 4])
@pytest.mark.parametrize("load_feats", [True, False])
@pytest.mark.parametrize(
"graph_formats", [None, ["csc"], ["coo", "csc"], ["coo", "csc", "csr"]]
)
def test_partition(
part_method,
reshuffle,
num_parts,
num_trainers_per_machine,
load_feats,
graph_formats,
):
os.environ["DGL_DIST_DEBUG"] = "1"
if part_method == "random" and num_parts > 1:
num_trainers_per_machine = 1
g = create_random_graph(1000) g = create_random_graph(1000)
check_partition(g, 'metis', False) check_partition(
check_partition(g, 'metis', True) g,
check_partition(g, 'metis', True, 4, 8) part_method,
check_partition(g, 'metis', True, 1, 8) reshuffle,
check_partition(g, 'random', False) num_parts,
check_partition(g, 'random', True) num_trainers_per_machine,
check_partition(g, 'metis', True, 4, 8, load_feats=False) load_feats,
reset_envs() graph_formats,
)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_hetero_partition():
os.environ['DGL_DIST_DEBUG'] = '1'
check_hetero_partition_single_etype(1)
check_hetero_partition_single_etype(4)
hg = create_random_hetero() hg = create_random_hetero()
check_hetero_partition(hg, 'metis') check_hetero_partition(
check_hetero_partition(hg, 'metis', 1, 8) hg,
check_hetero_partition(hg, 'metis', 4, 8) part_method,
check_hetero_partition(hg, 'random') num_parts,
check_hetero_partition(hg, 'metis', 4, 8, load_feats=False) num_trainers_per_machine,
load_feats,
graph_formats,
)
reset_envs() reset_envs()
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
def test_BasicPartitionBook(): def test_BasicPartitionBook():
part_id = 0 part_id = 0
num_parts = 2 num_parts = 2
...@@ -439,91 +549,93 @@ def test_BasicPartitionBook(): ...@@ -439,91 +549,93 @@ def test_BasicPartitionBook():
graph = dgl.rand_graph(1000, 5000) graph = dgl.rand_graph(1000, 5000)
graph = dgl.node_subgraph(graph, F.arange(0, graph.num_nodes())) graph = dgl.node_subgraph(graph, F.arange(0, graph.num_nodes()))
gpb = BasicPartitionBook(part_id, num_parts, node_map, edge_map, graph) gpb = BasicPartitionBook(part_id, num_parts, node_map, edge_map, graph)
c_etype = ('_N', '_E', '_N') c_etype = ("_N", "_E", "_N")
assert gpb.etypes == ['_E'] assert gpb.etypes == ["_E"]
assert gpb.canonical_etypes == [c_etype] assert gpb.canonical_etypes == [c_etype]
node_policy = NodePartitionPolicy(gpb, '_N') node_policy = NodePartitionPolicy(gpb, "_N")
assert node_policy.type_name == '_N' assert node_policy.type_name == "_N"
edge_policy = EdgePartitionPolicy(gpb, '_E') edge_policy = EdgePartitionPolicy(gpb, "_E")
assert edge_policy.type_name == '_E' assert edge_policy.type_name == "_E"
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
def test_RangePartitionBook(): def test_RangePartitionBook():
part_id = 0 part_id = 0
num_parts = 2 num_parts = 2
# homogeneous # homogeneous
node_map = {'_N': F.tensor([[0, 1000], [1000, 2000]])} node_map = {"_N": F.tensor([[0, 1000], [1000, 2000]])}
edge_map = {'_E': F.tensor([[0, 5000], [5000, 10000]])} edge_map = {"_E": F.tensor([[0, 5000], [5000, 10000]])}
ntypes = {'_N': 0} ntypes = {"_N": 0}
etypes = {'_E': 0} etypes = {"_E": 0}
gpb = RangePartitionBook( gpb = RangePartitionBook(
part_id, num_parts, node_map, edge_map, ntypes, etypes) part_id, num_parts, node_map, edge_map, ntypes, etypes
assert gpb.etypes == ['_E'] )
assert gpb.etypes == ["_E"]
assert gpb.canonical_etypes == [None] assert gpb.canonical_etypes == [None]
assert gpb._to_canonical_etype('_E') == '_E' assert gpb._to_canonical_etype("_E") == "_E"
node_policy = NodePartitionPolicy(gpb, '_N') node_policy = NodePartitionPolicy(gpb, "_N")
assert node_policy.type_name == '_N' assert node_policy.type_name == "_N"
edge_policy = EdgePartitionPolicy(gpb, '_E') edge_policy = EdgePartitionPolicy(gpb, "_E")
assert edge_policy.type_name == '_E' assert edge_policy.type_name == "_E"
# heterogeneous, init via etype # heterogeneous, init via etype
node_map = {'node1': F.tensor([[0, 1000], [1000, 2000]]), 'node2': F.tensor([ node_map = {
[0, 1000], [1000, 2000]])} "node1": F.tensor([[0, 1000], [1000, 2000]]),
edge_map = {'edge1': F.tensor([[0, 5000], [5000, 10000]])} "node2": F.tensor([[0, 1000], [1000, 2000]]),
ntypes = {'node1': 0, 'node2': 1} }
etypes = {'edge1': 0} edge_map = {"edge1": F.tensor([[0, 5000], [5000, 10000]])}
ntypes = {"node1": 0, "node2": 1}
etypes = {"edge1": 0}
gpb = RangePartitionBook( gpb = RangePartitionBook(
part_id, num_parts, node_map, edge_map, ntypes, etypes) part_id, num_parts, node_map, edge_map, ntypes, etypes
assert gpb.etypes == ['edge1'] )
assert gpb.etypes == ["edge1"]
assert gpb.canonical_etypes == [None] assert gpb.canonical_etypes == [None]
assert gpb._to_canonical_etype('edge1') == 'edge1' assert gpb._to_canonical_etype("edge1") == "edge1"
node_policy = NodePartitionPolicy(gpb, 'node1') node_policy = NodePartitionPolicy(gpb, "node1")
assert node_policy.type_name == 'node1' assert node_policy.type_name == "node1"
edge_policy = EdgePartitionPolicy(gpb, 'edge1') edge_policy = EdgePartitionPolicy(gpb, "edge1")
assert edge_policy.type_name == 'edge1' assert edge_policy.type_name == "edge1"
# heterogeneous, init via canonical etype # heterogeneous, init via canonical etype
node_map = {'node1': F.tensor([[0, 1000], [1000, 2000]]), 'node2': F.tensor([ node_map = {
[0, 1000], [1000, 2000]])} "node1": F.tensor([[0, 1000], [1000, 2000]]),
edge_map = {('node1', 'edge1', 'node2'): F.tensor([[0, 5000], [5000, 10000]])} "node2": F.tensor([[0, 1000], [1000, 2000]]),
ntypes = {'node1': 0, 'node2': 1} }
etypes = {('node1', 'edge1', 'node2'): 0} edge_map = {
("node1", "edge1", "node2"): F.tensor([[0, 5000], [5000, 10000]])
}
ntypes = {"node1": 0, "node2": 1}
etypes = {("node1", "edge1", "node2"): 0}
c_etype = list(etypes.keys())[0] c_etype = list(etypes.keys())[0]
gpb = RangePartitionBook( gpb = RangePartitionBook(
part_id, num_parts, node_map, edge_map, ntypes, etypes) part_id, num_parts, node_map, edge_map, ntypes, etypes
assert gpb.etypes == ['edge1'] )
assert gpb.etypes == ["edge1"]
assert gpb.canonical_etypes == [c_etype] assert gpb.canonical_etypes == [c_etype]
assert gpb._to_canonical_etype('edge1') == c_etype assert gpb._to_canonical_etype("edge1") == c_etype
assert gpb._to_canonical_etype(c_etype) == c_etype assert gpb._to_canonical_etype(c_etype) == c_etype
expect_except = False expect_except = False
try: try:
gpb._to_canonical_etype(('node1', 'edge2', 'node2')) gpb._to_canonical_etype(("node1", "edge2", "node2"))
except: except dgl.DGLError:
expect_except = True expect_except = True
assert expect_except assert expect_except
expect_except = False expect_except = False
try: try:
gpb._to_canonical_etype('edge2') gpb._to_canonical_etype("edge2")
except: except dgl.DGLError:
expect_except = True expect_except = True
assert expect_except assert expect_except
node_policy = NodePartitionPolicy(gpb, 'node1') node_policy = NodePartitionPolicy(gpb, "node1")
assert node_policy.type_name == 'node1' assert node_policy.type_name == "node1"
edge_policy = EdgePartitionPolicy(gpb, c_etype) edge_policy = EdgePartitionPolicy(gpb, c_etype)
assert edge_policy.type_name == c_etype assert edge_policy.type_name == c_etype
data_name = HeteroDataName(False, 'edge1', 'edge1') data_name = HeteroDataName(False, "edge1", "edge1")
assert data_name.get_type() == 'edge1' assert data_name.get_type() == "edge1"
data_name = HeteroDataName(False, c_etype, 'edge1') data_name = HeteroDataName(False, c_etype, "edge1")
assert data_name.get_type() == c_etype assert data_name.get_type() == c_etype
if __name__ == '__main__':
os.makedirs('/tmp/partition', exist_ok=True)
test_partition()
test_hetero_partition()
test_BasicPartitionBook()
test_RangePartitionBook()
...@@ -26,6 +26,15 @@ def _verify_partition_data_types(part_g): ...@@ -26,6 +26,15 @@ def _verify_partition_data_types(part_g):
if k in part_g.edata: if k in part_g.edata:
assert part_g.edata[k].dtype == dtype assert part_g.edata[k].dtype == dtype
def _verify_partition_formats(part_g, formats):
# Verify saved graph formats
if formats is None:
assert "coo" in part_g.formats()["created"]
else:
formats = formats.split(',')
for format in formats:
assert format in part_g.formats()["created"]
def _verify_graph_feats( def _verify_graph_feats(
g, gpb, part, node_feats, edge_feats, orig_nids, orig_eids g, gpb, part, node_feats, edge_feats, orig_nids, orig_eids
...@@ -139,9 +148,12 @@ def test_chunk_graph(num_chunks): ...@@ -139,9 +148,12 @@ def test_chunk_graph(num_chunks):
assert feat_array.shape[0] == num_edges[etype] // num_chunks assert feat_array.shape[0] == num_edges[etype] // num_chunks
@pytest.mark.parametrize("num_chunks", [1, 2, 3, 4, 8]) @pytest.mark.parametrize("num_chunks", [1, 3, 8])
@pytest.mark.parametrize("num_parts", [1, 2, 3, 4, 8]) @pytest.mark.parametrize("num_parts", [1, 3, 8])
def test_part_pipeline(num_chunks, num_parts): @pytest.mark.parametrize(
"graph_formats", [None, "csc", "coo,csc", "coo,csc,csr"]
)
def test_part_pipeline(num_chunks, num_parts, graph_formats):
if num_chunks < num_parts: if num_chunks < num_parts:
# num_parts should less/equal than num_chunks # num_parts should less/equal than num_chunks
return return
...@@ -182,6 +194,7 @@ def test_part_pipeline(num_chunks, num_parts): ...@@ -182,6 +194,7 @@ def test_part_pipeline(num_chunks, num_parts):
cmd += " --process-group-timeout 60" cmd += " --process-group-timeout 60"
cmd += " --save-orig-nids" cmd += " --save-orig-nids"
cmd += " --save-orig-eids" cmd += " --save-orig-eids"
cmd += f" --graph-formats {graph_formats}" if graph_formats else ""
os.system(cmd) os.system(cmd)
# read original node/edge IDs # read original node/edge IDs
...@@ -207,6 +220,7 @@ def test_part_pipeline(num_chunks, num_parts): ...@@ -207,6 +220,7 @@ def test_part_pipeline(num_chunks, num_parts):
part_config, i part_config, i
) )
_verify_partition_data_types(part_g) _verify_partition_data_types(part_g)
_verify_partition_formats(part_g, graph_formats)
_verify_graph_feats( _verify_graph_feats(
g, gpb, part_g, node_feats, edge_feats, orig_nids, orig_eids g, gpb, part_g, node_feats, edge_feats, orig_nids, orig_eids
) )
...@@ -76,6 +76,7 @@ def submit_jobs(args) -> str: ...@@ -76,6 +76,7 @@ def submit_jobs(args) -> str:
argslist += "--log-level {} ".format(args.log_level) argslist += "--log-level {} ".format(args.log_level)
argslist += "--save-orig-nids " if args.save_orig_nids else "" argslist += "--save-orig-nids " if args.save_orig_nids else ""
argslist += "--save-orig-eids " if args.save_orig_eids else "" argslist += "--save-orig-eids " if args.save_orig_eids else ""
argslist += f"--graph-formats {args.graph_formats} " if args.graph_formats else ""
# (BarclayII) Is it safe to assume all the workers have the Python executable at the same path? # (BarclayII) Is it safe to assume all the workers have the Python executable at the same path?
pipeline_cmd = os.path.join(INSTALL_DIR, PIPELINE_SCRIPT) pipeline_cmd = os.path.join(INSTALL_DIR, PIPELINE_SCRIPT)
...@@ -149,6 +150,15 @@ def main(): ...@@ -149,6 +150,15 @@ def main():
action="store_true", action="store_true",
help="Save original edge IDs into files", help="Save original edge IDs into files",
) )
parser.add_argument(
"--graph-formats",
type=str,
default=None,
help="Save partitions in specified formats. It could be any combination(joined with ``,``) "
"of ``coo``, ``csc`` and ``csr``. If not specified, save one format only according to "
"what format is available. If multiple formats are available, selection priority "
"from high to low is ``coo``, ``csc``, ``csr``.",
)
args, udf_command = parser.parse_known_args() args, udf_command = parser.parse_known_args()
......
...@@ -58,6 +58,8 @@ if __name__ == "__main__": ...@@ -58,6 +58,8 @@ if __name__ == "__main__":
help='Save original node IDs into files') help='Save original node IDs into files')
parser.add_argument('--save-orig-eids', action='store_true', parser.add_argument('--save-orig-eids', action='store_true',
help='Save original edge IDs into files') help='Save original edge IDs into files')
parser.add_argument('--graph-formats', default=None, type=str,
help='Save partitions in specified formats.')
params = parser.parse_args() params = parser.parse_args()
#invoke the pipeline function #invoke the pipeline function
......
...@@ -697,8 +697,12 @@ def gen_dist_partitions(rank, world_size, params): ...@@ -697,8 +697,12 @@ def gen_dist_partitions(rank, world_size, params):
orig_nids, orig_eids = create_dgl_object(schema_map, rank, node_data, \ orig_nids, orig_eids = create_dgl_object(schema_map, rank, node_data, \
edge_data, num_edges, params.save_orig_nids, params.save_orig_eids) edge_data, num_edges, params.save_orig_nids, params.save_orig_eids)
memory_snapshot("CreateDGLObjectsComplete: ", rank) memory_snapshot("CreateDGLObjectsComplete: ", rank)
graph_formats = None
if params.graph_formats:
graph_formats = params.graph_formats.split(',')
sort_etypes = len(etypes_map) > 1
write_dgl_objects(graph_obj, rcvd_node_features, rcvd_edge_features, params.output, \ write_dgl_objects(graph_obj, rcvd_node_features, rcvd_edge_features, params.output, \
rank, orig_nids, orig_eids) rank, orig_nids, orig_eids, graph_formats, sort_etypes)
memory_snapshot("DiskWriteDGLObjectsComplete: ", rank) memory_snapshot("DiskWriteDGLObjectsComplete: ", rank)
#get the meta-data #get the meta-data
......
...@@ -379,7 +379,7 @@ def write_edge_features(edge_features, edge_file): ...@@ -379,7 +379,7 @@ def write_edge_features(edge_features, edge_file):
""" """
dgl.data.utils.save_tensors(edge_file, edge_features) dgl.data.utils.save_tensors(edge_file, edge_features)
def write_graph_dgl(graph_file, graph_obj): def write_graph_dgl(graph_file, graph_obj, formats, sort_etypes):
""" """
Utility function to serialize graph dgl objects Utility function to serialize graph dgl objects
...@@ -389,11 +389,16 @@ def write_graph_dgl(graph_file, graph_obj): ...@@ -389,11 +389,16 @@ def write_graph_dgl(graph_file, graph_obj):
graph dgl object, as created in convert_partition.py, which is to be serialized graph dgl object, as created in convert_partition.py, which is to be serialized
graph_file : string graph_file : string
File name in which graph object is serialized File name in which graph object is serialized
formats : str or list[str]
Save graph in specified formats.
sort_etypes : bool
Whether to sort etypes in csc/csr.
""" """
dgl.distributed.partition._save_graphs(graph_file, [graph_obj]) dgl.distributed.partition._save_graphs(graph_file, [graph_obj],
formats, sort_etypes)
def write_dgl_objects(graph_obj, node_features, edge_features, def write_dgl_objects(graph_obj, node_features, edge_features,
output_dir, part_id, orig_nids, orig_eids): output_dir, part_id, orig_nids, orig_eids, formats, sort_etypes):
""" """
Wrapper function to write graph, node/edge feature, original node/edge IDs. Wrapper function to write graph, node/edge feature, original node/edge IDs.
...@@ -413,11 +418,15 @@ def write_dgl_objects(graph_obj, node_features, edge_features, ...@@ -413,11 +418,15 @@ def write_dgl_objects(graph_obj, node_features, edge_features,
original node IDs original node IDs
orig_eids : dict orig_eids : dict
original edge IDs original edge IDs
formats : str or list[str]
Save graph in formats.
sort_etypes : bool
Whether to sort etypes in csc/csr.
""" """
part_dir = output_dir + '/part' + str(part_id) part_dir = output_dir + '/part' + str(part_id)
os.makedirs(part_dir, exist_ok=True) os.makedirs(part_dir, exist_ok=True)
write_graph_dgl(os.path.join(part_dir ,'graph.dgl'), graph_obj) write_graph_dgl(os.path.join(part_dir ,'graph.dgl'), graph_obj,
formats, sort_etypes)
if node_features != None: if node_features != None:
write_node_features(node_features, os.path.join(part_dir, "node_feat.dgl")) write_node_features(node_features, os.path.join(part_dir, "node_feat.dgl"))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment