"...git@developer.sourcefind.cn:renzhc/diffusers_dcu.git" did not exist on "a72bc0c4bb817d382a59b38bba8d9a71661f56cb"
Unverified Commit f0759a96 authored by Hongzhi (Steve), Chen's avatar Hongzhi (Steve), Chen Committed by GitHub
Browse files

[Misc] Auto-reformat test/. (#5324)



* auto-format-test

* more

* remove

---------
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-28-63.ap-northeast-1.compute.internal>
parent 9fe5092c
# NOTE(vibwu): Currently cugraph must be imported before torch to avoid a resource cleanup issue. # NOTE(vibwu): Currently cugraph must be imported before torch to avoid a resource cleanup issue.
# See https://github.com/rapidsai/cugraph/issues/2718 # See https://github.com/rapidsai/cugraph/issues/2718
import cugraph import cugraph # usort: skip
import backend as F import backend as F
import dgl import dgl
......
import os import os
import backend as F import backend as F
from numpy.testing import assert_array_equal
import dgl import dgl
from numpy.testing import assert_array_equal
INTEGER = 2 INTEGER = 2
STR = "hello world!" STR = "hello world!"
......
import json
import os import os
from itertools import product from itertools import product
import numpy as np
import dgl import dgl
import dgl.backend as F import dgl.backend as F
import json
from dgl.distributed import load_partition_book, node_split, edge_split import numpy as np
from dgl.distributed import edge_split, load_partition_book, node_split
mode = os.environ.get("DIST_DGL_TEST_MODE", "") mode = os.environ.get("DIST_DGL_TEST_MODE", "")
graph_name = os.environ.get("DIST_DGL_TEST_GRAPH_NAME", "random_test_graph") graph_name = os.environ.get("DIST_DGL_TEST_GRAPH_NAME", "random_test_graph")
...@@ -21,20 +21,24 @@ ip_config = os.environ.get("DIST_DGL_TEST_IP_CONFIG", "ip_config.txt") ...@@ -21,20 +21,24 @@ ip_config = os.environ.get("DIST_DGL_TEST_IP_CONFIG", "ip_config.txt")
os.environ["DGL_DIST_MODE"] = "distributed" os.environ["DGL_DIST_MODE"] = "distributed"
def batched_assert_zero(tensor, size): def batched_assert_zero(tensor, size):
BATCH_SIZE=2**16 BATCH_SIZE = 2**16
curr_pos = 0 curr_pos = 0
while curr_pos < size: while curr_pos < size:
end = min(curr_pos + BATCH_SIZE, size) end = min(curr_pos + BATCH_SIZE, size)
assert F.sum(tensor[F.arange(curr_pos, end)], 0) == 0 assert F.sum(tensor[F.arange(curr_pos, end)], 0) == 0
curr_pos = end curr_pos = end
def zeros_init(shape, dtype): def zeros_init(shape, dtype):
return F.zeros(shape, dtype=dtype, ctx=F.cpu()) return F.zeros(shape, dtype=dtype, ctx=F.cpu())
def rand_init(shape, dtype): def rand_init(shape, dtype):
return F.tensor((np.random.randint(0, 100, size=shape) > 30), dtype=dtype) return F.tensor((np.random.randint(0, 100, size=shape) > 30), dtype=dtype)
def run_server( def run_server(
graph_name, graph_name,
server_id, server_id,
...@@ -59,79 +63,99 @@ def run_server( ...@@ -59,79 +63,99 @@ def run_server(
g.start() g.start()
########################################## ##########################################
############### DistGraph ############### ############### DistGraph ###############
########################################## ##########################################
def node_split_test(g, force_even, ntype='_N'):
def node_split_test(g, force_even, ntype="_N"):
gpb = g.get_partition_book() gpb = g.get_partition_book()
selected_nodes_dist_tensor = dgl.distributed.DistTensor([g.number_of_nodes(ntype)], F.uint8, init_func=rand_init) selected_nodes_dist_tensor = dgl.distributed.DistTensor(
[g.number_of_nodes(ntype)], F.uint8, init_func=rand_init
)
nodes = node_split(selected_nodes_dist_tensor, gpb, ntype=ntype, force_even=force_even) nodes = node_split(
selected_nodes_dist_tensor, gpb, ntype=ntype, force_even=force_even
)
g.barrier() g.barrier()
selected_nodes_dist_tensor[nodes] = F.astype(F.zeros_like(nodes), selected_nodes_dist_tensor.dtype) selected_nodes_dist_tensor[nodes] = F.astype(
F.zeros_like(nodes), selected_nodes_dist_tensor.dtype
)
g.barrier() g.barrier()
if g.rank() == 0: if g.rank() == 0:
batched_assert_zero(selected_nodes_dist_tensor, g.number_of_nodes(ntype)) batched_assert_zero(
selected_nodes_dist_tensor, g.number_of_nodes(ntype)
)
g.barrier() g.barrier()
def edge_split_test(g, force_even, etype='_E'):
def edge_split_test(g, force_even, etype="_E"):
gpb = g.get_partition_book() gpb = g.get_partition_book()
selected_edges_dist_tensor = dgl.distributed.DistTensor([g.number_of_edges(etype)], F.uint8, init_func=rand_init) selected_edges_dist_tensor = dgl.distributed.DistTensor(
[g.number_of_edges(etype)], F.uint8, init_func=rand_init
)
edges = edge_split(selected_edges_dist_tensor, gpb, etype=etype, force_even=force_even) edges = edge_split(
selected_edges_dist_tensor, gpb, etype=etype, force_even=force_even
)
g.barrier() g.barrier()
selected_edges_dist_tensor[edges] = F.astype(F.zeros_like(edges), selected_edges_dist_tensor.dtype) selected_edges_dist_tensor[edges] = F.astype(
F.zeros_like(edges), selected_edges_dist_tensor.dtype
)
g.barrier() g.barrier()
if g.rank() == 0: if g.rank() == 0:
batched_assert_zero(selected_edges_dist_tensor, g.number_of_edges(etype)) batched_assert_zero(
selected_edges_dist_tensor, g.number_of_edges(etype)
)
g.barrier() g.barrier()
def test_dist_graph(g): def test_dist_graph(g):
gpb_path = graph_path + '/{}.json'.format(graph_name) gpb_path = graph_path + "/{}.json".format(graph_name)
with open(gpb_path) as conf_f: with open(gpb_path) as conf_f:
part_metadata = json.load(conf_f) part_metadata = json.load(conf_f)
assert 'num_nodes' in part_metadata assert "num_nodes" in part_metadata
assert 'num_edges' in part_metadata assert "num_edges" in part_metadata
num_nodes = part_metadata['num_nodes'] num_nodes = part_metadata["num_nodes"]
num_edges = part_metadata['num_edges'] num_edges = part_metadata["num_edges"]
assert g.number_of_nodes() == num_nodes assert g.number_of_nodes() == num_nodes
assert g.number_of_edges() == num_edges assert g.number_of_edges() == num_edges
num_nodes = {ntype : g.num_nodes(ntype) for ntype in g.ntypes} num_nodes = {ntype: g.num_nodes(ntype) for ntype in g.ntypes}
num_edges = {etype : g.num_edges(etype) for etype in g.etypes} num_edges = {etype: g.num_edges(etype) for etype in g.etypes}
for key, n_nodes in num_nodes.items(): for key, n_nodes in num_nodes.items():
assert g.number_of_nodes(key) == n_nodes assert g.number_of_nodes(key) == n_nodes
node_split_test(g, force_even=False, ntype=key) node_split_test(g, force_even=False, ntype=key)
node_split_test(g, force_even=True, ntype=key) node_split_test(g, force_even=True, ntype=key)
for key, n_edges in num_edges.items(): for key, n_edges in num_edges.items():
assert g.number_of_edges(key) == n_edges assert g.number_of_edges(key) == n_edges
edge_split_test(g, force_even=False, etype=key) edge_split_test(g, force_even=False, etype=key)
edge_split_test(g, force_even=True, etype=key) edge_split_test(g, force_even=True, etype=key)
########################################## ##########################################
########### DistGraphServices ########### ########### DistGraphServices ###########
########################################## ##########################################
def find_edges_test(g, orig_nid_map): def find_edges_test(g, orig_nid_map):
etypes = g.canonical_etypes etypes = g.canonical_etypes
etype_eids_uv_map = dict() etype_eids_uv_map = dict()
for u_type, etype, v_type in etypes: for u_type, etype, v_type in etypes:
orig_u = g.edges[etype].data['edge_u'] orig_u = g.edges[etype].data["edge_u"]
orig_v = g.edges[etype].data['edge_v'] orig_v = g.edges[etype].data["edge_v"]
eids = F.tensor(np.random.randint(g.number_of_edges(etype), size=100)) eids = F.tensor(np.random.randint(g.number_of_edges(etype), size=100))
u, v = g.find_edges(eids, etype=etype) u, v = g.find_edges(eids, etype=etype)
assert F.allclose(orig_nid_map[u_type][u], orig_u[eids]) assert F.allclose(orig_nid_map[u_type][u], orig_u[eids])
...@@ -139,6 +163,7 @@ def find_edges_test(g, orig_nid_map): ...@@ -139,6 +163,7 @@ def find_edges_test(g, orig_nid_map):
etype_eids_uv_map[etype] = (eids, F.cat([u, v], dim=0)) etype_eids_uv_map[etype] = (eids, F.cat([u, v], dim=0))
return etype_eids_uv_map return etype_eids_uv_map
def edge_subgraph_test(g, etype_eids_uv_map): def edge_subgraph_test(g, etype_eids_uv_map):
etypes = g.canonical_etypes etypes = g.canonical_etypes
all_eids = dict() all_eids = dict()
...@@ -158,11 +183,18 @@ def edge_subgraph_test(g, etype_eids_uv_map): ...@@ -158,11 +183,18 @@ def edge_subgraph_test(g, etype_eids_uv_map):
for node_id in uv: for node_id in uv:
assert node_id in sg_uv assert node_id in sg_uv
def sample_neighbors_with_args(g, size, fanout): def sample_neighbors_with_args(g, size, fanout):
num_nodes = {ntype : g.num_nodes(ntype) for ntype in g.ntypes} num_nodes = {ntype: g.num_nodes(ntype) for ntype in g.ntypes}
etypes = g.canonical_etypes etypes = g.canonical_etypes
sampled_graph = g.sample_neighbors({ntype : np.random.randint(0, n, size=size) for ntype, n in num_nodes.items()}, fanout) sampled_graph = g.sample_neighbors(
{
ntype: np.random.randint(0, n, size=size)
for ntype, n in num_nodes.items()
},
fanout,
)
for ntype, n in num_nodes.items(): for ntype, n in num_nodes.items():
assert sampled_graph.number_of_nodes(ntype) == n assert sampled_graph.number_of_nodes(ntype) == n
...@@ -173,6 +205,7 @@ def sample_neighbors_with_args(g, size, fanout): ...@@ -173,6 +205,7 @@ def sample_neighbors_with_args(g, size, fanout):
assert F.allclose(dist_u, src) assert F.allclose(dist_u, src)
assert F.allclose(dist_v, dst) assert F.allclose(dist_v, dst)
def sample_neighbors_test(g): def sample_neighbors_test(g):
sample_neighbors_with_args(g, size=1024, fanout=3) sample_neighbors_with_args(g, size=1024, fanout=3)
sample_neighbors_with_args(g, size=1, fanout=10) sample_neighbors_with_args(g, size=1, fanout=10)
...@@ -181,33 +214,37 @@ def sample_neighbors_test(g): ...@@ -181,33 +214,37 @@ def sample_neighbors_test(g):
sample_neighbors_with_args(g, size=2**10, fanout=1) sample_neighbors_with_args(g, size=2**10, fanout=1)
sample_neighbors_with_args(g, size=2**12, fanout=1) sample_neighbors_with_args(g, size=2**12, fanout=1)
def test_dist_graph_services(g): def test_dist_graph_services(g):
# in_degrees and out_degrees does not support heterograph # in_degrees and out_degrees does not support heterograph
if len(g.etypes) == 1: if len(g.etypes) == 1:
nids = F.arange(0, 128) nids = F.arange(0, 128)
# Test in_degrees # Test in_degrees
orig_in_degrees = g.ndata['in_degrees'] orig_in_degrees = g.ndata["in_degrees"]
local_in_degrees = g.in_degrees(nids) local_in_degrees = g.in_degrees(nids)
F.allclose(local_in_degrees, orig_in_degrees[nids]) F.allclose(local_in_degrees, orig_in_degrees[nids])
# Test out_degrees # Test out_degrees
orig_out_degrees = g.ndata['out_degrees'] orig_out_degrees = g.ndata["out_degrees"]
local_out_degrees = g.out_degrees(nids) local_out_degrees = g.out_degrees(nids)
F.allclose(local_out_degrees, orig_out_degrees[nids]) F.allclose(local_out_degrees, orig_out_degrees[nids])
num_nodes = {ntype : g.num_nodes(ntype) for ntype in g.ntypes} num_nodes = {ntype: g.num_nodes(ntype) for ntype in g.ntypes}
orig_nid_map = dict() orig_nid_map = dict()
dtype = g.edges[g.etypes[0]].data['edge_u'].dtype dtype = g.edges[g.etypes[0]].data["edge_u"].dtype
for ntype, _ in num_nodes.items(): for ntype, _ in num_nodes.items():
orig_nid = F.tensor(np.load(graph_path + f'/orig_nid_array_{ntype}.npy'), dtype) orig_nid = F.tensor(
np.load(graph_path + f"/orig_nid_array_{ntype}.npy"), dtype
)
orig_nid_map[ntype] = orig_nid orig_nid_map[ntype] = orig_nid
etype_eids_uv_map = find_edges_test(g, orig_nid_map) etype_eids_uv_map = find_edges_test(g, orig_nid_map)
edge_subgraph_test(g, etype_eids_uv_map) edge_subgraph_test(g, etype_eids_uv_map)
sample_neighbors_test(g) sample_neighbors_test(g)
########################################## ##########################################
############### DistTensor ############### ############### DistTensor ###############
########################################## ##########################################
...@@ -222,13 +259,13 @@ def dist_tensor_test_sanity(data_shape, name=None): ...@@ -222,13 +259,13 @@ def dist_tensor_test_sanity(data_shape, name=None):
stride = 3 stride = 3
pos = (part_id // 2) * num_client_per_machine + local_rank pos = (part_id // 2) * num_client_per_machine + local_rank
if part_id % 2 == 0: if part_id % 2 == 0:
dist_ten[pos * stride: (pos + 1) * stride] = F.ones( dist_ten[pos * stride : (pos + 1) * stride] = F.ones(
(stride, 2), dtype=F.int32, ctx=F.cpu() (stride, 2), dtype=F.int32, ctx=F.cpu()
) * (pos + 1) ) * (pos + 1)
dgl.distributed.client_barrier() dgl.distributed.client_barrier()
assert F.allclose( assert F.allclose(
dist_ten[pos * stride: (pos + 1) * stride], dist_ten[pos * stride : (pos + 1) * stride],
F.ones((stride, 2), dtype=F.int32, ctx=F.cpu()) * (pos + 1), F.ones((stride, 2), dtype=F.int32, ctx=F.cpu()) * (pos + 1),
) )
...@@ -378,24 +415,27 @@ def dist_optimizer_check_store(g): ...@@ -378,24 +415,27 @@ def dist_optimizer_check_store(g):
new_state = new_state[ new_state = new_state[
F.arange(0, num_nodes, F.int64, F.cpu()) F.arange(0, num_nodes, F.int64, F.cpu())
] ]
assert F.allclose (state, new_state, 0., 0.) assert F.allclose(state, new_state, 0.0, 0.0)
assert new_emb_optimizer._lr == emb_optimizer._lr assert new_emb_optimizer._lr == emb_optimizer._lr
assert new_emb_optimizer._eps == emb_optimizer._eps assert new_emb_optimizer._eps == emb_optimizer._eps
assert new_emb_optimizer._beta1 == emb_optimizer._beta1 assert new_emb_optimizer._beta1 == emb_optimizer._beta1
assert new_emb_optimizer._beta2 == emb_optimizer._beta2 assert new_emb_optimizer._beta2 == emb_optimizer._beta2
g.barrier() g.barrier()
finally: finally:
file = f'emb.pt_{rank}' file = f"emb.pt_{rank}"
if os.path.exists(file): if os.path.exists(file):
os.remove(file) os.remove(file)
def test_dist_optimizer(g): def test_dist_optimizer(g):
dist_optimizer_check_store(g) dist_optimizer_check_store(g)
########################################## ##########################################
############# DistDataLoader ############# ############# DistDataLoader #############
########################################## ##########################################
class NeighborSampler(object): class NeighborSampler(object):
def __init__(self, g, fanouts, sample_neighbors): def __init__(self, g, fanouts, sample_neighbors):
self.g = g self.g = g
...@@ -422,6 +462,7 @@ class NeighborSampler(object): ...@@ -422,6 +462,7 @@ class NeighborSampler(object):
blocks.insert(0, block) blocks.insert(0, block)
return blocks return blocks
def distdataloader_test(g, batch_size, drop_last, shuffle): def distdataloader_test(g, batch_size, drop_last, shuffle):
# We sample only a subset to minimize the test runtime # We sample only a subset to minimize the test runtime
num_nodes_to_sample = int(g.num_nodes() * 0.05) num_nodes_to_sample = int(g.num_nodes() * 0.05)
...@@ -430,25 +471,26 @@ def distdataloader_test(g, batch_size, drop_last, shuffle): ...@@ -430,25 +471,26 @@ def distdataloader_test(g, batch_size, drop_last, shuffle):
num_nodes_to_sample -= 1 num_nodes_to_sample -= 1
orig_nid_map = dict() orig_nid_map = dict()
dtype = g.edges[g.etypes[0]].data['edge_u'].dtype dtype = g.edges[g.etypes[0]].data["edge_u"].dtype
for ntype in g.ntypes: for ntype in g.ntypes:
orig_nid = F.tensor(np.load(graph_path + f'/orig_nid_array_{ntype}.npy'), dtype) orig_nid = F.tensor(
np.load(graph_path + f"/orig_nid_array_{ntype}.npy"), dtype
)
orig_nid_map[ntype] = orig_nid orig_nid_map[ntype] = orig_nid
orig_uv_map = dict() orig_uv_map = dict()
for etype in g.etypes: for etype in g.etypes:
orig_uv_map[etype] = (g.edges[etype].data['edge_u'], g.edges[etype].data['edge_v']) orig_uv_map[etype] = (
g.edges[etype].data["edge_u"],
g.edges[etype].data["edge_v"],
)
if len(g.ntypes) == 1: if len(g.ntypes) == 1:
train_nid = F.arange(0, num_nodes_to_sample) train_nid = F.arange(0, num_nodes_to_sample)
else: else:
train_nid = {g.ntypes[0]: F.arange(0, num_nodes_to_sample)} train_nid = {g.ntypes[0]: F.arange(0, num_nodes_to_sample)}
sampler = NeighborSampler( sampler = NeighborSampler(g, [5, 10], dgl.distributed.sample_neighbors)
g,
[5, 10],
dgl.distributed.sample_neighbors
)
dataloader = dgl.dataloading.DistDataLoader( dataloader = dgl.dataloading.DistDataLoader(
dataset=train_nid.numpy(), dataset=train_nid.numpy(),
...@@ -460,7 +502,9 @@ def distdataloader_test(g, batch_size, drop_last, shuffle): ...@@ -460,7 +502,9 @@ def distdataloader_test(g, batch_size, drop_last, shuffle):
for _ in range(2): for _ in range(2):
max_nid = [] max_nid = []
for idx, blocks in zip(range(0, num_nodes_to_sample, batch_size), dataloader): for idx, blocks in zip(
range(0, num_nodes_to_sample, batch_size), dataloader
):
block = blocks[-1] block = blocks[-1]
for src_type, etype, dst_type in block.canonical_etypes: for src_type, etype, dst_type in block.canonical_etypes:
orig_u, orig_v = orig_uv_map[etype] orig_u, orig_v = orig_uv_map[etype]
...@@ -486,8 +530,10 @@ def distdataloader_test(g, batch_size, drop_last, shuffle): ...@@ -486,8 +530,10 @@ def distdataloader_test(g, batch_size, drop_last, shuffle):
assert np.max(max_nid) == num_nodes_to_sample - 1 assert np.max(max_nid) == num_nodes_to_sample - 1
del dataloader del dataloader
def distnodedataloader_test(g, batch_size, drop_last, shuffle,
num_workers, orig_nid_map, orig_uv_map): def distnodedataloader_test(
g, batch_size, drop_last, shuffle, num_workers, orig_nid_map, orig_uv_map
):
# We sample only a subset to minimize the test runtime # We sample only a subset to minimize the test runtime
num_nodes_to_sample = int(g.num_nodes(g.ntypes[-1]) * 0.05) num_nodes_to_sample = int(g.num_nodes(g.ntypes[-1]) * 0.05)
# To make sure that drop_last is tested # To make sure that drop_last is tested
...@@ -525,7 +571,9 @@ def distnodedataloader_test(g, batch_size, drop_last, shuffle, ...@@ -525,7 +571,9 @@ def distnodedataloader_test(g, batch_size, drop_last, shuffle,
) )
for _ in range(2): for _ in range(2):
for _, (_, _, blocks) in zip(range(0, num_nodes_to_sample, batch_size), dataloader): for _, (_, _, blocks) in zip(
range(0, num_nodes_to_sample, batch_size), dataloader
):
block = blocks[-1] block = blocks[-1]
for src_type, etype, dst_type in block.canonical_etypes: for src_type, etype, dst_type in block.canonical_etypes:
orig_u, orig_v = orig_uv_map[etype] orig_u, orig_v = orig_uv_map[etype]
...@@ -540,8 +588,16 @@ def distnodedataloader_test(g, batch_size, drop_last, shuffle, ...@@ -540,8 +588,16 @@ def distnodedataloader_test(g, batch_size, drop_last, shuffle,
del dataloader del dataloader
def distedgedataloader_test(g, batch_size, drop_last, shuffle, def distedgedataloader_test(
num_workers, orig_nid_map, orig_uv_map, num_negs): g,
batch_size,
drop_last,
shuffle,
num_workers,
orig_nid_map,
orig_uv_map,
num_negs,
):
# We sample only a subset to minimize the test runtime # We sample only a subset to minimize the test runtime
num_edges_to_sample = int(g.num_edges(g.etypes[-1]) * 0.05) num_edges_to_sample = int(g.num_edges(g.etypes[-1]) * 0.05)
# To make sure that drop_last is tested # To make sure that drop_last is tested
...@@ -556,14 +612,16 @@ def distedgedataloader_test(g, batch_size, drop_last, shuffle, ...@@ -556,14 +612,16 @@ def distedgedataloader_test(g, batch_size, drop_last, shuffle,
sampler = dgl.dataloading.MultiLayerNeighborSampler([5, 10]) sampler = dgl.dataloading.MultiLayerNeighborSampler([5, 10])
dataloader = dgl.dataloading.DistEdgeDataLoader( dataloader = dgl.dataloading.DistEdgeDataLoader(
g, g,
train_eid, train_eid,
sampler, sampler,
batch_size=batch_size, batch_size=batch_size,
negative_sampler=dgl.dataloading.negative_sampler.Uniform(num_negs) if num_negs > 0 else None, negative_sampler=dgl.dataloading.negative_sampler.Uniform(num_negs)
shuffle=shuffle, if num_negs > 0
drop_last=drop_last, else None,
num_workers=num_workers, shuffle=shuffle,
drop_last=drop_last,
num_workers=num_workers,
) )
for _ in range(2): for _ in range(2):
for _, sampled_data in zip( for _, sampled_data in zip(
...@@ -585,7 +643,9 @@ def distedgedataloader_test(g, batch_size, drop_last, shuffle, ...@@ -585,7 +643,9 @@ def distedgedataloader_test(g, batch_size, drop_last, shuffle,
pos_pair_graph = sampled_data[1] pos_pair_graph = sampled_data[1]
assert np.all( assert np.all(
F.asnumpy(block.dstnodes[dst_type].data[dgl.NID]) F.asnumpy(block.dstnodes[dst_type].data[dgl.NID])
== F.asnumpy(pos_pair_graph.nodes[dst_type].data[dgl.NID]) == F.asnumpy(
pos_pair_graph.nodes[dst_type].data[dgl.NID]
)
) )
else: else:
pos_graph, neg_graph = sampled_data[1:3] pos_graph, neg_graph = sampled_data[1:3]
...@@ -597,14 +657,22 @@ def distedgedataloader_test(g, batch_size, drop_last, shuffle, ...@@ -597,14 +657,22 @@ def distedgedataloader_test(g, batch_size, drop_last, shuffle,
F.asnumpy(block.dstnodes[dst_type].data[dgl.NID]) F.asnumpy(block.dstnodes[dst_type].data[dgl.NID])
== F.asnumpy(neg_graph.nodes[dst_type].data[dgl.NID]) == F.asnumpy(neg_graph.nodes[dst_type].data[dgl.NID])
) )
assert pos_graph.num_edges() * num_negs == neg_graph.num_edges() assert (
pos_graph.num_edges() * num_negs
== neg_graph.num_edges()
)
del dataloader del dataloader
def multi_distdataloader_test(g, dataloader_class): def multi_distdataloader_test(g, dataloader_class):
total_num_items = g.num_nodes(g.ntypes[-1]) if "Node" in dataloader_class.__name__ else g.num_edges(g.etypes[-1]) total_num_items = (
g.num_nodes(g.ntypes[-1])
if "Node" in dataloader_class.__name__
else g.num_edges(g.etypes[-1])
)
num_dataloaders=4 num_dataloaders = 4
batch_size=32 batch_size = 32
sampler = dgl.dataloading.NeighborSampler([-1]) sampler = dgl.dataloading.NeighborSampler([-1])
dataloaders = [] dataloaders = []
dl_iters = [] dl_iters = []
...@@ -618,7 +686,11 @@ def multi_distdataloader_test(g, dataloader_class): ...@@ -618,7 +686,11 @@ def multi_distdataloader_test(g, dataloader_class):
if len(g.ntypes) == 1: if len(g.ntypes) == 1:
train_ids = F.arange(0, num_items_to_sample) train_ids = F.arange(0, num_items_to_sample)
else: else:
train_ids = {g.ntypes[-1] if "Node" in dataloader_class.__name__ else g.etypes[-1] : F.arange(0, num_items_to_sample)} train_ids = {
g.ntypes[-1]
if "Node" in dataloader_class.__name__
else g.etypes[-1]: F.arange(0, num_items_to_sample)
}
for _ in range(num_dataloaders): for _ in range(num_dataloaders):
dataloader = dataloader_class( dataloader = dataloader_class(
...@@ -636,31 +708,64 @@ def multi_distdataloader_test(g, dataloader_class): ...@@ -636,31 +708,64 @@ def multi_distdataloader_test(g, dataloader_class):
dl_iters.pop(current_dl) dl_iters.pop(current_dl)
del dataloaders[current_dl] del dataloaders[current_dl]
def test_dist_dataloader(g): def test_dist_dataloader(g):
orig_nid_map = dict() orig_nid_map = dict()
dtype = g.edges[g.etypes[0]].data['edge_u'].dtype dtype = g.edges[g.etypes[0]].data["edge_u"].dtype
for ntype in g.ntypes: for ntype in g.ntypes:
orig_nid = F.tensor(np.load(graph_path + f'/orig_nid_array_{ntype}.npy'), dtype) orig_nid = F.tensor(
np.load(graph_path + f"/orig_nid_array_{ntype}.npy"), dtype
)
orig_nid_map[ntype] = orig_nid orig_nid_map[ntype] = orig_nid
orig_uv_map = dict() orig_uv_map = dict()
for etype in g.etypes: for etype in g.etypes:
orig_uv_map[etype] = (g.edges[etype].data['edge_u'], g.edges[etype].data['edge_v']) orig_uv_map[etype] = (
g.edges[etype].data["edge_u"],
g.edges[etype].data["edge_v"],
)
batch_size_l = [64] batch_size_l = [64]
drop_last_l = [False, True] drop_last_l = [False, True]
num_workers_l = [0, 4] num_workers_l = [0, 4]
shuffle_l = [False, True] shuffle_l = [False, True]
for batch_size, drop_last, shuffle, num_workers \ for batch_size, drop_last, shuffle, num_workers in product(
in product(batch_size_l, drop_last_l, shuffle_l, num_workers_l): batch_size_l, drop_last_l, shuffle_l, num_workers_l
):
if len(g.ntypes) == 1 and num_workers == 0: if len(g.ntypes) == 1 and num_workers == 0:
distdataloader_test(g, batch_size, drop_last, shuffle) distdataloader_test(g, batch_size, drop_last, shuffle)
distnodedataloader_test(g, batch_size, drop_last, shuffle, num_workers, orig_nid_map, orig_uv_map) distnodedataloader_test(
g,
batch_size,
drop_last,
shuffle,
num_workers,
orig_nid_map,
orig_uv_map,
)
# No negssampling # No negssampling
distedgedataloader_test(g, batch_size, drop_last, shuffle, num_workers, orig_nid_map, orig_uv_map, num_negs=0) distedgedataloader_test(
g,
batch_size,
drop_last,
shuffle,
num_workers,
orig_nid_map,
orig_uv_map,
num_negs=0,
)
# negsampling 15 # negsampling 15
distedgedataloader_test(g, batch_size, drop_last, shuffle, num_workers, orig_nid_map, orig_uv_map, num_negs=15) distedgedataloader_test(
g,
batch_size,
drop_last,
shuffle,
num_workers,
orig_nid_map,
orig_uv_map,
num_negs=15,
)
multi_distdataloader_test(g, dgl.dataloading.DistNodeDataLoader) multi_distdataloader_test(g, dgl.dataloading.DistNodeDataLoader)
multi_distdataloader_test(g, dgl.dataloading.DistEdgeDataLoader) multi_distdataloader_test(g, dgl.dataloading.DistEdgeDataLoader)
...@@ -696,9 +801,9 @@ elif mode == "client": ...@@ -696,9 +801,9 @@ elif mode == "client":
} }
targets = os.environ.get("DIST_DGL_TEST_OBJECT_TYPE", "") targets = os.environ.get("DIST_DGL_TEST_OBJECT_TYPE", "")
targets = targets.replace(' ', '').split(',') if targets else [] targets = targets.replace(" ", "").split(",") if targets else []
blacklist = os.environ.get("DIST_DGL_TEST_OBJECT_TYPE_BLACKLIST", "") blacklist = os.environ.get("DIST_DGL_TEST_OBJECT_TYPE_BLACKLIST", "")
blacklist = blacklist.replace(' ', '').split(',') if blacklist else [] blacklist = blacklist.replace(" ", "").split(",") if blacklist else []
for to_bl in blacklist: for to_bl in blacklist:
target_func_map.pop(to_bl, None) target_func_map.pop(to_bl, None)
......
import multiprocessing as mp import multiprocessing as mp
import os import os
import shutil
import subprocess import subprocess
import unittest import unittest
import dgl
import dgl.backend as F
import numpy as np import numpy as np
import pytest import pytest
import utils import utils
import shutil
import dgl
import dgl.backend as F
from dgl.distributed import partition_graph from dgl.distributed import partition_graph
graph_name = os.environ.get("DIST_DGL_TEST_GRAPH_NAME", "random_test_graph") graph_name = os.environ.get("DIST_DGL_TEST_GRAPH_NAME", "random_test_graph")
target = os.environ.get("DIST_DGL_TEST_OBJECT_TYPE", "") target = os.environ.get("DIST_DGL_TEST_OBJECT_TYPE", "")
blacklist = os.environ.get("DIST_DGL_TEST_OBJECT_TYPE_BLACKLIST", "") blacklist = os.environ.get("DIST_DGL_TEST_OBJECT_TYPE_BLACKLIST", "")
shared_workspace = os.environ.get('DIST_DGL_TEST_WORKSPACE', '/shared_workspace/dgl_dist_tensor_test/') shared_workspace = os.environ.get(
"DIST_DGL_TEST_WORKSPACE", "/shared_workspace/dgl_dist_tensor_test/"
)
def create_graph(num_part, dist_graph_path, hetero): def create_graph(num_part, dist_graph_path, hetero):
...@@ -32,14 +34,19 @@ def create_graph(num_part, dist_graph_path, hetero): ...@@ -32,14 +34,19 @@ def create_graph(num_part, dist_graph_path, hetero):
g.edges[etype].data["edge_u"] = edge_u g.edges[etype].data["edge_u"] = edge_u
g.edges[etype].data["edge_v"] = edge_v g.edges[etype].data["edge_v"] = edge_v
orig_nid, orig_eid = partition_graph(g, graph_name, num_part, dist_graph_path, return_mapping=True) orig_nid, orig_eid = partition_graph(
g, graph_name, num_part, dist_graph_path, return_mapping=True
)
orig_nid_f = os.path.join(dist_graph_path, f"orig_nid_array_{ntype}.npy") orig_nid_f = os.path.join(
dist_graph_path, f"orig_nid_array_{ntype}.npy"
)
np.save(orig_nid_f, orig_nid.numpy()) np.save(orig_nid_f, orig_nid.numpy())
orig_eid_f = os.path.join(dist_graph_path, f"orig_eid_array_{etype}.npy") orig_eid_f = os.path.join(
dist_graph_path, f"orig_eid_array_{etype}.npy"
)
np.save(orig_eid_f, orig_eid.numpy()) np.save(orig_eid_f, orig_eid.numpy())
else: else:
from scipy import sparse as spsp from scipy import sparse as spsp
...@@ -65,21 +72,29 @@ def create_graph(num_part, dist_graph_path, hetero): ...@@ -65,21 +72,29 @@ def create_graph(num_part, dist_graph_path, hetero):
F.arange(0, g.number_of_edges("r1")), 1 F.arange(0, g.number_of_edges("r1")), 1
) )
for _, etype, _ in etypes: for _, etype, _ in etypes:
edge_u, edge_v = g.find_edges(F.arange(0, g.number_of_edges(etype)), etype=etype) edge_u, edge_v = g.find_edges(
F.arange(0, g.number_of_edges(etype)), etype=etype
)
g.edges[etype].data["edge_u"] = edge_u g.edges[etype].data["edge_u"] = edge_u
g.edges[etype].data["edge_v"] = edge_v g.edges[etype].data["edge_v"] = edge_v
orig_nid, orig_eid = partition_graph(g, graph_name, num_part, dist_graph_path, return_mapping=True) orig_nid, orig_eid = partition_graph(
g, graph_name, num_part, dist_graph_path, return_mapping=True
)
for n_type, tensor in orig_nid.items(): for n_type, tensor in orig_nid.items():
orig_nid_f = os.path.join(dist_graph_path, f"orig_nid_array_{n_type}.npy") orig_nid_f = os.path.join(
dist_graph_path, f"orig_nid_array_{n_type}.npy"
)
np.save(orig_nid_f, tensor.numpy()) np.save(orig_nid_f, tensor.numpy())
for e_type, tensor in orig_eid.items(): for e_type, tensor in orig_eid.items():
orig_eid_f = os.path.join(dist_graph_path, f"orig_eid_array_{e_type}.npy") orig_eid_f = os.path.join(
dist_graph_path, f"orig_eid_array_{e_type}.npy"
)
np.save(orig_eid_f, tensor.numpy()) np.save(orig_eid_f, tensor.numpy())
@unittest.skipIf(os.name == "nt", reason="Do not support windows yet") @unittest.skipIf(os.name == "nt", reason="Do not support windows yet")
@pytest.mark.parametrize("net_type", ["tensorpipe", "socket"]) @pytest.mark.parametrize("net_type", ["tensorpipe", "socket"])
@pytest.mark.parametrize("num_servers", [1, 4]) @pytest.mark.parametrize("num_servers", [1, 4])
...@@ -148,8 +163,9 @@ def test_dist_objects(net_type, num_servers, num_clients, hetero, shared_mem): ...@@ -148,8 +163,9 @@ def test_dist_objects(net_type, num_servers, num_clients, hetero, shared_mem):
p.join() p.join()
assert p.exitcode == 0 assert p.exitcode == 0
def teardown(): def teardown():
for name in ['dist_graph', 'hetero_dist_graph']: for name in ["dist_graph", "hetero_dist_graph"]:
path = os.path.join(shared_workspace, name) path = os.path.join(shared_workspace, name)
if os.path.exists(path): if os.path.exists(path):
print(f"Removing {path}...") print(f"Removing {path}...")
......
...@@ -11,14 +11,11 @@ import unittest ...@@ -11,14 +11,11 @@ import unittest
from multiprocessing import Condition, Manager, Process, Value from multiprocessing import Condition, Manager, Process, Value
import backend as F import backend as F
import dgl
import numpy as np import numpy as np
import pytest import pytest
import torch as th import torch as th
from numpy.testing import assert_almost_equal, assert_array_equal
from scipy import sparse as spsp
from utils import create_random_graph, generate_ip_config, reset_envs
import dgl
from dgl.data.utils import load_graphs, save_graphs from dgl.data.utils import load_graphs, save_graphs
from dgl.distributed import ( from dgl.distributed import (
DistEmbedding, DistEmbedding,
...@@ -32,6 +29,9 @@ from dgl.distributed import ( ...@@ -32,6 +29,9 @@ from dgl.distributed import (
) )
from dgl.distributed.optim import SparseAdagrad from dgl.distributed.optim import SparseAdagrad
from dgl.heterograph_index import create_unitgraph_from_coo from dgl.heterograph_index import create_unitgraph_from_coo
from numpy.testing import assert_almost_equal, assert_array_equal
from scipy import sparse as spsp
from utils import create_random_graph, generate_ip_config, reset_envs
if os.name != "nt": if os.name != "nt":
import fcntl import fcntl
...@@ -712,13 +712,13 @@ def create_random_hetero(): ...@@ -712,13 +712,13 @@ def create_random_hetero():
# data with same name as ntype/etype is assigned on purpose to verify # data with same name as ntype/etype is assigned on purpose to verify
# such same names can be correctly handled in DistGraph. See more details # such same names can be correctly handled in DistGraph. See more details
# in issue #4887 and #4463 on github. # in issue #4887 and #4463 on github.
ntype = 'n1' ntype = "n1"
for name in ['feat', ntype]: for name in ["feat", ntype]:
g.nodes[ntype].data[name] = F.unsqueeze( g.nodes[ntype].data[name] = F.unsqueeze(
F.arange(0, g.num_nodes(ntype)), 1 F.arange(0, g.num_nodes(ntype)), 1
) )
etype = 'r1' etype = "r1"
for name in ['feat', etype]: for name in ["feat", etype]:
g.edges[etype].data[name] = F.unsqueeze( g.edges[etype].data[name] = F.unsqueeze(
F.arange(0, g.num_edges(etype)), 1 F.arange(0, g.num_edges(etype)), 1
) )
...@@ -742,24 +742,24 @@ def check_dist_graph_hetero(g, num_clients, num_nodes, num_edges): ...@@ -742,24 +742,24 @@ def check_dist_graph_hetero(g, num_clients, num_nodes, num_edges):
assert g.number_of_edges() == sum([num_edges[etype] for etype in num_edges]) assert g.number_of_edges() == sum([num_edges[etype] for etype in num_edges])
# Test reading node data # Test reading node data
ntype = 'n1' ntype = "n1"
nids = F.arange(0, g.num_nodes(ntype) // 2) nids = F.arange(0, g.num_nodes(ntype) // 2)
for name in ['feat', ntype]: for name in ["feat", ntype]:
data = g.nodes[ntype].data[name][nids] data = g.nodes[ntype].data[name][nids]
data = F.squeeze(data, 1) data = F.squeeze(data, 1)
assert np.all(F.asnumpy(data == nids)) assert np.all(F.asnumpy(data == nids))
assert len(g.nodes['n2'].data) == 0 assert len(g.nodes["n2"].data) == 0
expect_except = False expect_except = False
try: try:
g.nodes['xxx'].data['x'] g.nodes["xxx"].data["x"]
except dgl.DGLError: except dgl.DGLError:
expect_except = True expect_except = True
assert expect_except assert expect_except
# Test reading edge data # Test reading edge data
etype = 'r1' etype = "r1"
eids = F.arange(0, g.num_edges(etype) // 2) eids = F.arange(0, g.num_edges(etype) // 2)
for name in ['feat', etype]: for name in ["feat", etype]:
# access via etype # access via etype
data = g.edges[etype].data[name][eids] data = g.edges[etype].data[name][eids]
data = F.squeeze(data, 1) data = F.squeeze(data, 1)
...@@ -769,10 +769,10 @@ def check_dist_graph_hetero(g, num_clients, num_nodes, num_edges): ...@@ -769,10 +769,10 @@ def check_dist_graph_hetero(g, num_clients, num_nodes, num_edges):
data = g.edges[c_etype].data[name][eids] data = g.edges[c_etype].data[name][eids]
data = F.squeeze(data, 1) data = F.squeeze(data, 1)
assert np.all(F.asnumpy(data == eids)) assert np.all(F.asnumpy(data == eids))
assert len(g.edges['r2'].data) == 0 assert len(g.edges["r2"].data) == 0
expect_except = False expect_except = False
try: try:
g.edges['xxx'].data['x'] g.edges["xxx"].data["x"]
except dgl.DGLError: except dgl.DGLError:
expect_except = True expect_except = True
assert expect_except assert expect_except
......
...@@ -3,10 +3,10 @@ import os ...@@ -3,10 +3,10 @@ import os
import unittest import unittest
import backend as F import backend as F
import pytest
from utils import create_random_graph, generate_ip_config, reset_envs
import dgl import dgl
import pytest
from utils import create_random_graph, generate_ip_config, reset_envs
dist_g = None dist_g = None
......
...@@ -316,9 +316,7 @@ def check_neg_dataloader(g, num_server, num_workers): ...@@ -316,9 +316,7 @@ def check_neg_dataloader(g, num_server, num_workers):
@pytest.mark.parametrize("num_workers", [0, 4]) @pytest.mark.parametrize("num_workers", [0, 4])
@pytest.mark.parametrize("drop_last", [True, False]) @pytest.mark.parametrize("drop_last", [True, False])
@pytest.mark.parametrize("num_groups", [1]) @pytest.mark.parametrize("num_groups", [1])
def test_dist_dataloader( def test_dist_dataloader(num_server, num_workers, drop_last, num_groups):
num_server, num_workers, drop_last, num_groups
):
reset_envs() reset_envs()
# No multiple partitions on single machine for # No multiple partitions on single machine for
# multiple client groups in case of race condition. # multiple client groups in case of race condition.
...@@ -763,4 +761,3 @@ def test_multiple_dist_dataloaders( ...@@ -763,4 +761,3 @@ def test_multiple_dist_dataloaders(
p.join() p.join()
assert p.exitcode == 0 assert p.exitcode == 0
reset_envs() reset_envs()
...@@ -5,21 +5,21 @@ import time ...@@ -5,21 +5,21 @@ import time
import unittest import unittest
import backend as F import backend as F
import dgl
import numpy as np import numpy as np
from dgl.graph_index import create_graph_index
from numpy.testing import assert_array_equal from numpy.testing import assert_array_equal
from scipy import sparse as spsp from scipy import sparse as spsp
from utils import generate_ip_config, reset_envs from utils import generate_ip_config, reset_envs
import dgl
from dgl.graph_index import create_graph_index
if os.name != "nt": if os.name != "nt":
import fcntl import fcntl
import struct import struct
# Create an one-part Graph # Create an one-part Graph
node_map = {'_N': F.tensor([[0, 6]], F.int64)} node_map = {"_N": F.tensor([[0, 6]], F.int64)}
edge_map = {('_N','_E','_N'): F.tensor([[0, 7]], F.int64)} edge_map = {("_N", "_E", "_N"): F.tensor([[0, 7]], F.int64)}
global_nid = F.tensor([0, 1, 2, 3, 4, 5], F.int64) global_nid = F.tensor([0, 1, 2, 3, 4, 5], F.int64)
global_eid = F.tensor([0, 1, 2, 3, 4, 5, 6], F.int64) global_eid = F.tensor([0, 1, 2, 3, 4, 5, 6], F.int64)
...@@ -37,9 +37,12 @@ g.ndata[dgl.NID] = global_nid ...@@ -37,9 +37,12 @@ g.ndata[dgl.NID] = global_nid
g.edata[dgl.EID] = global_eid g.edata[dgl.EID] = global_eid
gpb = dgl.distributed.graph_partition_book.RangePartitionBook( gpb = dgl.distributed.graph_partition_book.RangePartitionBook(
part_id=0, num_parts=1, node_map=node_map, edge_map=edge_map, part_id=0,
num_parts=1,
node_map=node_map,
edge_map=edge_map,
ntypes={ntype: i for i, ntype in enumerate(g.ntypes)}, ntypes={ntype: i for i, ntype in enumerate(g.ntypes)},
etypes={etype: i for i, etype in enumerate(g.canonical_etypes)} etypes={etype: i for i, etype in enumerate(g.canonical_etypes)},
) )
node_policy = dgl.distributed.PartitionPolicy( node_policy = dgl.distributed.PartitionPolicy(
......
import json
import os import os
import tempfile
import backend as F import backend as F
import torch as th
import dgl import dgl
import json
import numpy as np import numpy as np
import pytest import pytest
import tempfile import torch as th
from dgl import function as fn from dgl import function as fn
from dgl.distributed import ( from dgl.distributed import (
load_partition, load_partition,
...@@ -15,18 +15,18 @@ from dgl.distributed import ( ...@@ -15,18 +15,18 @@ from dgl.distributed import (
partition_graph, partition_graph,
) )
from dgl.distributed.graph_partition_book import ( from dgl.distributed.graph_partition_book import (
_etype_tuple_to_str,
DEFAULT_ETYPE, DEFAULT_ETYPE,
DEFAULT_NTYPE, DEFAULT_NTYPE,
EdgePartitionPolicy, EdgePartitionPolicy,
HeteroDataName, HeteroDataName,
NodePartitionPolicy, NodePartitionPolicy,
RangePartitionBook, RangePartitionBook,
_etype_tuple_to_str,
) )
from dgl.distributed.partition import ( from dgl.distributed.partition import (
RESERVED_FIELD_DTYPE,
_get_inner_edge_mask, _get_inner_edge_mask,
_get_inner_node_mask, _get_inner_node_mask,
RESERVED_FIELD_DTYPE,
) )
from scipy import sparse as spsp from scipy import sparse as spsp
from utils import reset_envs from utils import reset_envs
...@@ -446,12 +446,8 @@ def check_partition( ...@@ -446,12 +446,8 @@ def check_partition(
local_orig_nids = orig_nids[part_g.ndata[dgl.NID]] local_orig_nids = orig_nids[part_g.ndata[dgl.NID]]
local_orig_eids = orig_eids[part_g.edata[dgl.EID]] local_orig_eids = orig_eids[part_g.edata[dgl.EID]]
part_g.ndata["feats"] = F.gather_row( part_g.ndata["feats"] = F.gather_row(g.ndata["feats"], local_orig_nids)
g.ndata["feats"], local_orig_nids part_g.edata["feats"] = F.gather_row(g.edata["feats"], local_orig_eids)
)
part_g.edata["feats"] = F.gather_row(
g.edata["feats"], local_orig_eids
)
local_nodes = orig_nids[local_nodes] local_nodes = orig_nids[local_nodes]
local_edges = orig_eids[local_edges] local_edges = orig_eids[local_edges]
...@@ -487,9 +483,7 @@ def check_partition( ...@@ -487,9 +483,7 @@ def check_partition(
# Verify that we can reconstruct node/edge data for original IDs. # Verify that we can reconstruct node/edge data for original IDs.
shuffled_labels = F.asnumpy(F.cat(shuffled_labels, 0)) shuffled_labels = F.asnumpy(F.cat(shuffled_labels, 0))
shuffled_edata = F.asnumpy(F.cat(shuffled_edata, 0)) shuffled_edata = F.asnumpy(F.cat(shuffled_edata, 0))
orig_labels = np.zeros( orig_labels = np.zeros(shuffled_labels.shape, dtype=shuffled_labels.dtype)
shuffled_labels.shape, dtype=shuffled_labels.dtype
)
orig_edata = np.zeros(shuffled_edata.shape, dtype=shuffled_edata.dtype) orig_edata = np.zeros(shuffled_edata.shape, dtype=shuffled_edata.dtype)
orig_labels[F.asnumpy(orig_nids)] = shuffled_labels orig_labels[F.asnumpy(orig_nids)] = shuffled_labels
orig_edata[F.asnumpy(orig_eids)] = shuffled_edata orig_edata[F.asnumpy(orig_eids)] = shuffled_edata
...@@ -548,6 +542,7 @@ def test_partition( ...@@ -548,6 +542,7 @@ def test_partition(
) )
reset_envs() reset_envs()
def test_RangePartitionBook(): def test_RangePartitionBook():
part_id = 1 part_id = 1
num_parts = 2 num_parts = 2
...@@ -629,7 +624,7 @@ def test_RangePartitionBook(): ...@@ -629,7 +624,7 @@ def test_RangePartitionBook():
assert node_policy.policy_str == "node~node1" assert node_policy.policy_str == "node~node1"
assert node_policy.part_id == part_id assert node_policy.part_id == part_id
assert node_policy.is_node assert node_policy.is_node
assert node_policy.get_data_name('x').is_node() assert node_policy.get_data_name("x").is_node()
local_ids = th.arange(0, 1000) local_ids = th.arange(0, 1000)
global_ids = local_ids + 1000 global_ids = local_ids + 1000
assert th.equal(node_policy.to_local(global_ids), local_ids) assert th.equal(node_policy.to_local(global_ids), local_ids)
...@@ -643,7 +638,7 @@ def test_RangePartitionBook(): ...@@ -643,7 +638,7 @@ def test_RangePartitionBook():
assert edge_policy.policy_str == "edge~node1:edge1:node2" assert edge_policy.policy_str == "edge~node1:edge1:node2"
assert edge_policy.part_id == part_id assert edge_policy.part_id == part_id
assert not edge_policy.is_node assert not edge_policy.is_node
assert not edge_policy.get_data_name('x').is_node() assert not edge_policy.get_data_name("x").is_node()
local_ids = th.arange(0, 5000) local_ids = th.arange(0, 5000)
global_ids = local_ids + 5000 global_ids = local_ids + 5000
assert th.equal(edge_policy.to_local(global_ids), local_ids) assert th.equal(edge_policy.to_local(global_ids), local_ids)
...@@ -662,8 +657,8 @@ def test_RangePartitionBook(): ...@@ -662,8 +657,8 @@ def test_RangePartitionBook():
def test_UnknownPartitionBook(): def test_UnknownPartitionBook():
node_map = {'_N': {0:0, 1:1, 2:2}} node_map = {"_N": {0: 0, 1: 1, 2: 2}}
edge_map = {'_N:_E:_N': {0:0, 1:1, 2:2}} edge_map = {"_N:_E:_N": {0: 0, 1: 1, 2: 2}}
part_metadata = { part_metadata = {
"num_parts": 1, "num_parts": 1,
...@@ -671,13 +666,13 @@ def test_UnknownPartitionBook(): ...@@ -671,13 +666,13 @@ def test_UnknownPartitionBook():
"num_edges": len(edge_map), "num_edges": len(edge_map),
"node_map": node_map, "node_map": node_map,
"edge_map": edge_map, "edge_map": edge_map,
"graph_name": "test_graph" "graph_name": "test_graph",
} }
with tempfile.TemporaryDirectory() as test_dir: with tempfile.TemporaryDirectory() as test_dir:
part_config = os.path.join(test_dir, "test_graph.json") part_config = os.path.join(test_dir, "test_graph.json")
with open(part_config, "w") as file: with open(part_config, "w") as file:
json.dump(part_metadata, file, indent = 4) json.dump(part_metadata, file, indent=4)
try: try:
load_partition_book(part_config, 0) load_partition_book(part_config, 0)
except Exception as e: except Exception as e:
......
...@@ -5,12 +5,12 @@ import time ...@@ -5,12 +5,12 @@ import time
import unittest import unittest
import backend as F import backend as F
import dgl
import pytest import pytest
from numpy.testing import assert_array_equal from numpy.testing import assert_array_equal
from utils import generate_ip_config, reset_envs from utils import generate_ip_config, reset_envs
import dgl
if os.name != "nt": if os.name != "nt":
import fcntl import fcntl
import struct import struct
...@@ -305,8 +305,8 @@ def test_rpc_msg(): ...@@ -305,8 +305,8 @@ def test_rpc_msg():
reset_envs() reset_envs()
os.environ["DGL_DIST_MODE"] = "distributed" os.environ["DGL_DIST_MODE"] = "distributed"
from dgl.distributed.rpc import ( from dgl.distributed.rpc import (
RPCMessage,
deserialize_from_payload, deserialize_from_payload,
RPCMessage,
serialize_to_payload, serialize_to_payload,
) )
......
...@@ -2,11 +2,11 @@ import os ...@@ -2,11 +2,11 @@ import os
import random import random
import socket import socket
import dgl
import numpy as np import numpy as np
import scipy.sparse as spsp import scipy.sparse as spsp
import dgl
def generate_ip_config(file_name, num_machines, num_servers): def generate_ip_config(file_name, num_machines, num_servers):
"""Get local IP and available ports, writes to file.""" """Get local IP and available ports, writes to file."""
......
import dgl
import pytest import pytest
import torch import torch
from test_utils.graph_cases import get_cases from test_utils.graph_cases import get_cases
import dgl
from dglgo.model import * from dglgo.model import *
......
...@@ -7,17 +7,20 @@ Copyright by Contributors. ...@@ -7,17 +7,20 @@ Copyright by Contributors.
Borrowed from dmlc-core/scripts/lint.py@939c052 Borrowed from dmlc-core/scripts/lint.py@939c052
""" """
from __future__ import print_function from __future__ import print_function
import argparse import argparse
import codecs import codecs
import sys
import re
import os import os
import re
import sys
import cpplint import cpplint
from cpplint import _cpplint_state from cpplint import _cpplint_state
from pylint import epylint from pylint import epylint
CXX_SUFFIX = set(['cc', 'c', 'cpp', 'h', 'cu', 'hpp', 'cuh']) CXX_SUFFIX = set(["cc", "c", "cpp", "h", "cu", "hpp", "cuh"])
PYTHON_SUFFIX = set(['py']) PYTHON_SUFFIX = set(["py"])
def filepath_enumerate(paths): def filepath_enumerate(paths):
"""Enumerate the file paths of all subfiles of the list of paths""" """Enumerate the file paths of all subfiles of the list of paths"""
...@@ -31,6 +34,7 @@ def filepath_enumerate(paths): ...@@ -31,6 +34,7 @@ def filepath_enumerate(paths):
out.append(os.path.normpath(os.path.join(root, name))) out.append(os.path.normpath(os.path.join(root, name)))
return out return out
# pylint: disable=useless-object-inheritance # pylint: disable=useless-object-inheritance
class LintHelper(object): class LintHelper(object):
"""Class to help runing the lint and records summary""" """Class to help runing the lint and records summary"""
...@@ -41,12 +45,15 @@ class LintHelper(object): ...@@ -41,12 +45,15 @@ class LintHelper(object):
if len(result_map) == 0: if len(result_map) == 0:
return 0 return 0
npass = sum(1 for x in result_map.values() if len(x) == 0) npass = sum(1 for x in result_map.values() if len(x) == 0)
strm.write(f'====={npass}/{len(result_map)} {ftype} files passed check=====\n') strm.write(
f"====={npass}/{len(result_map)} {ftype} files passed check=====\n"
)
for fname, emap in result_map.items(): for fname, emap in result_map.items():
if len(emap) == 0: if len(emap) == 0:
continue continue
strm.write( strm.write(
f'{fname}: {sum(emap.values())} Errors of {len(emap)} Categories map={str(emap)}\n') f"{fname}: {sum(emap.values())} Errors of {len(emap)} Categories map={str(emap)}\n"
)
return len(result_map) - npass return len(result_map) - npass
def __init__(self): def __init__(self):
...@@ -54,23 +61,37 @@ class LintHelper(object): ...@@ -54,23 +61,37 @@ class LintHelper(object):
self.cpp_header_map = {} self.cpp_header_map = {}
self.cpp_src_map = {} self.cpp_src_map = {}
self.python_map = {} self.python_map = {}
pylint_disable = ['superfluous-parens', pylint_disable = [
'too-many-instance-attributes', "superfluous-parens",
'too-few-public-methods'] "too-many-instance-attributes",
"too-few-public-methods",
]
# setup pylint # setup pylint
self.pylint_opts = ['--extension-pkg-whitelist=numpy', self.pylint_opts = [
'--disable=' + ','.join(pylint_disable)] "--extension-pkg-whitelist=numpy",
"--disable=" + ",".join(pylint_disable),
]
self.pylint_cats = set(['error', 'warning', 'convention', 'refactor']) self.pylint_cats = set(["error", "warning", "convention", "refactor"])
# setup cpp lint # setup cpp lint
cpplint_args = ['--quiet', '--extensions=' + (','.join(CXX_SUFFIX)), '.'] cpplint_args = [
"--quiet",
"--extensions=" + (",".join(CXX_SUFFIX)),
".",
]
_ = cpplint.ParseArguments(cpplint_args) _ = cpplint.ParseArguments(cpplint_args)
cpplint._SetFilters(','.join(['-build/c++11', cpplint._SetFilters(
'-build/namespaces', ",".join(
'-build/include,', [
'+build/include_what_you_use', "-build/c++11",
'+build/include_order'])) "-build/namespaces",
cpplint._SetCountingStyle('toplevel') "-build/include,",
"+build/include_what_you_use",
"+build/include_order",
]
)
)
cpplint._SetCountingStyle("toplevel")
cpplint._line_length = 80 cpplint._line_length = 80
def process_cpp(self, path, suffix): def process_cpp(self, path, suffix):
...@@ -80,7 +101,7 @@ class LintHelper(object): ...@@ -80,7 +101,7 @@ class LintHelper(object):
_cpplint_state.PrintErrorCounts() _cpplint_state.PrintErrorCounts()
errors = _cpplint_state.errors_by_category.copy() errors = _cpplint_state.errors_by_category.copy()
if suffix == 'h': if suffix == "h":
self.cpp_header_map[str(path)] = errors self.cpp_header_map[str(path)] = errors
else: else:
self.cpp_src_map[str(path)] = errors self.cpp_src_map[str(path)] = errors
...@@ -88,14 +109,15 @@ class LintHelper(object): ...@@ -88,14 +109,15 @@ class LintHelper(object):
def process_python(self, path): def process_python(self, path):
"""Process a python file.""" """Process a python file."""
(pylint_stdout, pylint_stderr) = epylint.py_run( (pylint_stdout, pylint_stderr) = epylint.py_run(
' '.join([str(path)] + self.pylint_opts), return_std=True) " ".join([str(path)] + self.pylint_opts), return_std=True
)
emap = {} emap = {}
err = pylint_stderr.read() err = pylint_stderr.read()
if len(err): if len(err):
print(err) print(err)
for line in pylint_stdout: for line in pylint_stdout:
sys.stderr.write(line) sys.stderr.write(line)
key = line.split(':')[-1].split('(')[0].strip() key = line.split(":")[-1].split("(")[0].strip()
if key not in self.pylint_cats: if key not in self.pylint_cats:
continue continue
if key not in emap: if key not in emap:
...@@ -107,18 +129,24 @@ class LintHelper(object): ...@@ -107,18 +129,24 @@ class LintHelper(object):
def print_summary(self, strm): def print_summary(self, strm):
"""Print summary of lint.""" """Print summary of lint."""
nerr = 0 nerr = 0
nerr += LintHelper._print_summary_map(strm, self.cpp_header_map, 'cpp-header') nerr += LintHelper._print_summary_map(
nerr += LintHelper._print_summary_map(strm, self.cpp_src_map, 'cpp-source') strm, self.cpp_header_map, "cpp-header"
nerr += LintHelper._print_summary_map(strm, self.python_map, 'python') )
nerr += LintHelper._print_summary_map(
strm, self.cpp_src_map, "cpp-source"
)
nerr += LintHelper._print_summary_map(strm, self.python_map, "python")
if nerr == 0: if nerr == 0:
strm.write('All passed!\n') strm.write("All passed!\n")
else: else:
strm.write(f'{nerr} files failed lint\n') strm.write(f"{nerr} files failed lint\n")
return nerr return nerr
# singleton helper for lint check # singleton helper for lint check
_HELPER = LintHelper() _HELPER = LintHelper()
def get_header_guard_dmlc(filename): def get_header_guard_dmlc(filename):
"""Get Header Guard Convention for DMLC Projects. """Get Header Guard Convention for DMLC Projects.
...@@ -131,66 +159,86 @@ def get_header_guard_dmlc(filename): ...@@ -131,66 +159,86 @@ def get_header_guard_dmlc(filename):
""" """
fileinfo = cpplint.FileInfo(filename) fileinfo = cpplint.FileInfo(filename)
file_path_from_root = fileinfo.RepositoryName() file_path_from_root = fileinfo.RepositoryName()
inc_list = ['include', 'api', 'wrapper', 'contrib'] inc_list = ["include", "api", "wrapper", "contrib"]
if os.name == 'nt': if os.name == "nt":
inc_list.append("mshadow") inc_list.append("mshadow")
if file_path_from_root.find('src/') != -1 and _HELPER.project_name is not None: if (
idx = file_path_from_root.find('src/') file_path_from_root.find("src/") != -1
file_path_from_root = _HELPER.project_name + file_path_from_root[idx + 3:] and _HELPER.project_name is not None
):
idx = file_path_from_root.find("src/")
file_path_from_root = (
_HELPER.project_name + file_path_from_root[idx + 3 :]
)
else: else:
idx = file_path_from_root.find("include/") idx = file_path_from_root.find("include/")
if idx != -1: if idx != -1:
file_path_from_root = file_path_from_root[idx + 8:] file_path_from_root = file_path_from_root[idx + 8 :]
for spath in inc_list: for spath in inc_list:
prefix = spath + '/' prefix = spath + "/"
if file_path_from_root.startswith(prefix): if file_path_from_root.startswith(prefix):
file_path_from_root = re.sub('^' + prefix, '', file_path_from_root) file_path_from_root = re.sub(
"^" + prefix, "", file_path_from_root
)
break break
return re.sub(r'[-./\s]', '_', file_path_from_root).upper() + '_' return re.sub(r"[-./\s]", "_", file_path_from_root).upper() + "_"
cpplint.GetHeaderGuardCPPVariable = get_header_guard_dmlc cpplint.GetHeaderGuardCPPVariable = get_header_guard_dmlc
def process(fname, allow_type): def process(fname, allow_type):
"""Process a file.""" """Process a file."""
fname = str(fname) fname = str(fname)
arr = fname.rsplit('.', 1) arr = fname.rsplit(".", 1)
if fname.find('#') != -1 or arr[-1] not in allow_type: if fname.find("#") != -1 or arr[-1] not in allow_type:
return return
if arr[-1] in CXX_SUFFIX: if arr[-1] in CXX_SUFFIX:
_HELPER.process_cpp(fname, arr[-1]) _HELPER.process_cpp(fname, arr[-1])
if arr[-1] in PYTHON_SUFFIX: if arr[-1] in PYTHON_SUFFIX:
_HELPER.process_python(fname) _HELPER.process_python(fname)
def main(): def main():
"""Main entry function.""" """Main entry function."""
parser = argparse.ArgumentParser(description="lint source codes") parser = argparse.ArgumentParser(description="lint source codes")
parser.add_argument('project', help='project name') parser.add_argument("project", help="project name")
parser.add_argument('filetype', choices=['python', 'cpp', 'all'], parser.add_argument(
help='source code type') "filetype", choices=["python", "cpp", "all"], help="source code type"
parser.add_argument('path', nargs='+', help='path to traverse') )
parser.add_argument('--exclude_path', nargs='+', default=[], parser.add_argument("path", nargs="+", help="path to traverse")
help='exclude this path, and all subfolders if path is a folder') parser.add_argument(
parser.add_argument('--quiet', action='store_true', help='run cpplint in quiet mode') "--exclude_path",
parser.add_argument('--pylint-rc', default=None, nargs="+",
help='pylint rc file') default=[],
help="exclude this path, and all subfolders if path is a folder",
)
parser.add_argument(
"--quiet", action="store_true", help="run cpplint in quiet mode"
)
parser.add_argument("--pylint-rc", default=None, help="pylint rc file")
args = parser.parse_args() args = parser.parse_args()
_HELPER.project_name = args.project _HELPER.project_name = args.project
if args.pylint_rc is not None: if args.pylint_rc is not None:
_HELPER.pylint_opts = ['--rcfile='+args.pylint_rc,] _HELPER.pylint_opts = [
"--rcfile=" + args.pylint_rc,
]
file_type = args.filetype file_type = args.filetype
allow_type = [] allow_type = []
if file_type in ('python', 'all'): if file_type in ("python", "all"):
allow_type += PYTHON_SUFFIX allow_type += PYTHON_SUFFIX
if file_type in ('cpp', 'all'): if file_type in ("cpp", "all"):
allow_type += CXX_SUFFIX allow_type += CXX_SUFFIX
allow_type = set(allow_type) allow_type = set(allow_type)
if sys.version_info.major == 2 and os.name != 'nt': if sys.version_info.major == 2 and os.name != "nt":
sys.stderr = codecs.StreamReaderWriter(sys.stderr, sys.stderr = codecs.StreamReaderWriter(
codecs.getreader('utf8'), sys.stderr,
codecs.getwriter('utf8'), codecs.getreader("utf8"),
'replace') codecs.getwriter("utf8"),
"replace",
)
# get excluded files # get excluded files
excluded_paths = filepath_enumerate(args.exclude_path) excluded_paths = filepath_enumerate(args.exclude_path)
for path in args.path: for path in args.path:
...@@ -207,5 +255,6 @@ def main(): ...@@ -207,5 +255,6 @@ def main():
nerr = _HELPER.print_summary(sys.stderr) nerr = _HELPER.print_summary(sys.stderr)
sys.exit(nerr > 0) sys.exit(nerr > 0)
if __name__ == '__main__':
if __name__ == "__main__":
main() main()
from collections import defaultdict from collections import defaultdict
import backend as F import backend as F
import dgl import dgl
import numpy as np
import networkx as nx import networkx as nx
import numpy as np import numpy as np
import scipy.sparse as ssp import scipy.sparse as ssp
import backend as F
case_registry = defaultdict(list) case_registry = defaultdict(list)
def register_case(labels): def register_case(labels):
def wrapper(fn): def wrapper(fn):
for lbl in labels: for lbl in labels:
case_registry[lbl].append(fn) case_registry[lbl].append(fn)
fn.__labels__ = labels fn.__labels__ = labels
return fn return fn
return wrapper return wrapper
def get_cases(labels=None, exclude=[]): def get_cases(labels=None, exclude=[]):
"""Get all graph instances of the given labels.""" """Get all graph instances of the given labels."""
cases = set() cases = set()
...@@ -29,131 +31,212 @@ def get_cases(labels=None, exclude=[]): ...@@ -29,131 +31,212 @@ def get_cases(labels=None, exclude=[]):
cases.add(case) cases.add(case)
return [fn() for fn in cases] return [fn() for fn in cases]
@register_case(['bipartite', 'zero-degree'])
@register_case(["bipartite", "zero-degree"])
def bipartite1(): def bipartite1():
return dgl.heterograph({('_U', '_E', '_V'): ([0, 0, 0, 2, 2, 3], return dgl.heterograph(
[0, 1, 4, 1, 4, 3])}) {("_U", "_E", "_V"): ([0, 0, 0, 2, 2, 3], [0, 1, 4, 1, 4, 3])}
)
@register_case(['bipartite']) @register_case(["bipartite"])
def bipartite_full(): def bipartite_full():
return dgl.heterograph({('_U', '_E', '_V'): ([0, 0, 0, 0, 1, 1, 1, 1], return dgl.heterograph(
[0, 1, 2, 3, 0, 1, 2, 3])}) {
("_U", "_E", "_V"): (
[0, 0, 0, 0, 1, 1, 1, 1],
[0, 1, 2, 3, 0, 1, 2, 3],
)
}
)
@register_case(['homo']) @register_case(["homo"])
def graph0(): def graph0():
return dgl.graph(([0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3, 4, 6, 6, 7, 8, 9], return dgl.graph(
[4, 5, 1, 2, 4, 7, 9, 8 ,6, 4, 1, 0, 1, 0, 2, 3, 5])) (
[0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3, 4, 6, 6, 7, 8, 9],
[4, 5, 1, 2, 4, 7, 9, 8, 6, 4, 1, 0, 1, 0, 2, 3, 5],
)
)
@register_case(['homo', 'zero-degree', 'homo-zero-degree'])
@register_case(["homo", "zero-degree", "homo-zero-degree"])
def bipartite1(): def bipartite1():
return dgl.graph(([0, 0, 0, 2, 2, 3], [0, 1, 4, 1, 4, 3])) return dgl.graph(([0, 0, 0, 2, 2, 3], [0, 1, 4, 1, 4, 3]))
@register_case(['homo', 'has_feature'])
@register_case(["homo", "has_feature"])
def graph1(): def graph1():
g = dgl.graph(([0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3, 4, 6, 6, 7, 8, 9], g = dgl.graph(
[4, 5, 1, 2, 4, 7, 9, 8 ,6, 4, 1, 0, 1, 0, 2, 3, 5]), device=F.cpu()) (
g.ndata['h'] = F.copy_to(F.randn((g.number_of_nodes(), 2)), F.cpu()) [0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3, 4, 6, 6, 7, 8, 9],
g.edata['w'] = F.copy_to(F.randn((g.number_of_edges(), 3)), F.cpu()) [4, 5, 1, 2, 4, 7, 9, 8, 6, 4, 1, 0, 1, 0, 2, 3, 5],
),
device=F.cpu(),
)
g.ndata["h"] = F.copy_to(F.randn((g.number_of_nodes(), 2)), F.cpu())
g.edata["w"] = F.copy_to(F.randn((g.number_of_edges(), 3)), F.cpu())
return g return g
@register_case(['homo', 'has_scalar_e_feature'])
@register_case(["homo", "has_scalar_e_feature"])
def graph1(): def graph1():
g = dgl.graph(([0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3, 4, 6, 6, 7, 8, 9], g = dgl.graph(
[4, 5, 1, 2, 4, 7, 9, 8 ,6, 4, 1, 0, 1, 0, 2, 3, 5]), device=F.cpu()) (
g.ndata['h'] = F.copy_to(F.randn((g.number_of_nodes(), 2)), F.cpu()) [0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3, 4, 6, 6, 7, 8, 9],
g.edata['scalar_w'] = F.copy_to(F.abs(F.randn((g.number_of_edges(),))), F.cpu()) [4, 5, 1, 2, 4, 7, 9, 8, 6, 4, 1, 0, 1, 0, 2, 3, 5],
),
device=F.cpu(),
)
g.ndata["h"] = F.copy_to(F.randn((g.number_of_nodes(), 2)), F.cpu())
g.edata["scalar_w"] = F.copy_to(
F.abs(F.randn((g.number_of_edges(),))), F.cpu()
)
return g return g
@register_case(['homo', 'row_sorted'])
@register_case(["homo", "row_sorted"])
def graph2(): def graph2():
return dgl.graph(([0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3, 4, 6, 6, 7, 8, 9], return dgl.graph(
[4, 5, 1, 2, 4, 7, 9, 8 ,6, 4, 1, 0, 1, 0, 2, 3, 5]), (
row_sorted=True) [0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3, 4, 6, 6, 7, 8, 9],
[4, 5, 1, 2, 4, 7, 9, 8, 6, 4, 1, 0, 1, 0, 2, 3, 5],
),
row_sorted=True,
)
@register_case(['homo', 'row_sorted', 'col_sorted']) @register_case(["homo", "row_sorted", "col_sorted"])
def graph3(): def graph3():
return dgl.graph(([0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3, 4, 6, 6, 7, 8, 9], return dgl.graph(
[1, 4, 5, 2, 4, 7, 8, 9 ,1, 4, 6, 0, 0, 1, 2, 3, 5]), (
row_sorted=True, col_sorted=True) [0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3, 4, 6, 6, 7, 8, 9],
[1, 4, 5, 2, 4, 7, 8, 9, 1, 4, 6, 0, 0, 1, 2, 3, 5],
),
row_sorted=True,
col_sorted=True,
)
@register_case(['hetero', 'has_feature']) @register_case(["hetero", "has_feature"])
def heterograph0(): def heterograph0():
g = dgl.heterograph({ g = dgl.heterograph(
('user', 'plays', 'game'): ([0, 1, 1, 2], [0, 0, 1, 1]), {
('developer', 'develops', 'game'): ([0, 1], [0, 1])}, device=F.cpu()) ("user", "plays", "game"): ([0, 1, 1, 2], [0, 0, 1, 1]),
g.nodes['user'].data['h'] = F.copy_to(F.randn((g.number_of_nodes('user'), 3)), F.cpu()) ("developer", "develops", "game"): ([0, 1], [0, 1]),
g.nodes['game'].data['h'] = F.copy_to(F.randn((g.number_of_nodes('game'), 2)), F.cpu()) },
g.nodes['developer'].data['h'] = F.copy_to(F.randn((g.number_of_nodes('developer'), 3)), F.cpu()) device=F.cpu(),
g.edges['plays'].data['h'] = F.copy_to(F.randn((g.number_of_edges('plays'), 1)), F.cpu()) )
g.edges['develops'].data['h'] = F.copy_to(F.randn((g.number_of_edges('develops'), 5)), F.cpu()) g.nodes["user"].data["h"] = F.copy_to(
F.randn((g.number_of_nodes("user"), 3)), F.cpu()
)
g.nodes["game"].data["h"] = F.copy_to(
F.randn((g.number_of_nodes("game"), 2)), F.cpu()
)
g.nodes["developer"].data["h"] = F.copy_to(
F.randn((g.number_of_nodes("developer"), 3)), F.cpu()
)
g.edges["plays"].data["h"] = F.copy_to(
F.randn((g.number_of_edges("plays"), 1)), F.cpu()
)
g.edges["develops"].data["h"] = F.copy_to(
F.randn((g.number_of_edges("develops"), 5)), F.cpu()
)
return g return g
@register_case(['batched', 'homo']) @register_case(["batched", "homo"])
def batched_graph0(): def batched_graph0():
g1 = dgl.add_self_loop(dgl.graph(([0, 1, 2], [1, 2, 3]))) g1 = dgl.add_self_loop(dgl.graph(([0, 1, 2], [1, 2, 3])))
g2 = dgl.add_self_loop(dgl.graph(([1, 1], [2, 0]))) g2 = dgl.add_self_loop(dgl.graph(([1, 1], [2, 0])))
g3 = dgl.add_self_loop(dgl.graph(([0], [1]))) g3 = dgl.add_self_loop(dgl.graph(([0], [1])))
return dgl.batch([g1, g2, g3]) return dgl.batch([g1, g2, g3])
@register_case(['block', 'bipartite', 'block-bipartite'])
@register_case(["block", "bipartite", "block-bipartite"])
def block_graph0(): def block_graph0():
g = dgl.graph(([2, 3, 4], [5, 6, 7]), num_nodes=100) g = dgl.graph(([2, 3, 4], [5, 6, 7]), num_nodes=100)
g = g.to(F.cpu()) g = g.to(F.cpu())
return dgl.to_block(g) return dgl.to_block(g)
@register_case(['block'])
@register_case(["block"])
def block_graph1(): def block_graph1():
g = dgl.heterograph({ g = dgl.heterograph(
('user', 'plays', 'game') : ([0, 1, 2], [1, 1, 0]), {
('user', 'likes', 'game') : ([1, 2, 3], [0, 0, 2]), ("user", "plays", "game"): ([0, 1, 2], [1, 1, 0]),
('store', 'sells', 'game') : ([0, 1, 1], [0, 1, 2]), ("user", "likes", "game"): ([1, 2, 3], [0, 0, 2]),
}, device=F.cpu()) ("store", "sells", "game"): ([0, 1, 1], [0, 1, 2]),
},
device=F.cpu(),
)
return dgl.to_block(g) return dgl.to_block(g)
@register_case(['clique'])
@register_case(["clique"])
def clique(): def clique():
g = dgl.graph(([0, 0, 0, 1, 1, 1, 2, 2, 2], [0, 1, 2, 0, 1, 2, 0, 1, 2])) g = dgl.graph(([0, 0, 0, 1, 1, 1, 2, 2, 2], [0, 1, 2, 0, 1, 2, 0, 1, 2]))
return g return g
def random_dglgraph(size): def random_dglgraph(size):
return dgl.DGLGraph(nx.erdos_renyi_graph(size, 0.3)) return dgl.DGLGraph(nx.erdos_renyi_graph(size, 0.3))
def random_graph(size): def random_graph(size):
return dgl.from_networkx(nx.erdos_renyi_graph(size, 0.3)) return dgl.from_networkx(nx.erdos_renyi_graph(size, 0.3))
def random_bipartite(size_src, size_dst): def random_bipartite(size_src, size_dst):
return dgl.bipartite_from_scipy(ssp.random(size_src, size_dst, 0.1), return dgl.bipartite_from_scipy(
utype='_U', etype='_E', vtype='V', ) ssp.random(size_src, size_dst, 0.1),
utype="_U",
etype="_E",
vtype="V",
)
def random_block(size): def random_block(size):
g = dgl.from_networkx(nx.erdos_renyi_graph(size, 0.1)) g = dgl.from_networkx(nx.erdos_renyi_graph(size, 0.1))
return dgl.to_block(g, np.unique(F.zerocopy_to_numpy(g.edges()[1]))) return dgl.to_block(g, np.unique(F.zerocopy_to_numpy(g.edges()[1])))
@register_case(['two_hetero_batch'])
@register_case(["two_hetero_batch"])
def two_hetero_batch(): def two_hetero_batch():
g1 = dgl.heterograph({ g1 = dgl.heterograph(
('user', 'follows', 'user'): ([0, 1], [1, 2]), {
('user', 'follows', 'developer'): ([0, 1], [1, 2]), ("user", "follows", "user"): ([0, 1], [1, 2]),
('user', 'plays', 'game'): ([0, 1, 2, 3], [0, 0, 1, 1]) ("user", "follows", "developer"): ([0, 1], [1, 2]),
}) ("user", "plays", "game"): ([0, 1, 2, 3], [0, 0, 1, 1]),
g2 = dgl.heterograph({ }
('user', 'follows', 'user'): ([0, 1], [1, 2]), )
('user', 'follows', 'developer'): ([0, 1], [1, 2]), g2 = dgl.heterograph(
('user', 'plays', 'game'): ([0, 1, 2], [0, 0, 1]) {
}) ("user", "follows", "user"): ([0, 1], [1, 2]),
("user", "follows", "developer"): ([0, 1], [1, 2]),
("user", "plays", "game"): ([0, 1, 2], [0, 0, 1]),
}
)
return [g1, g2] return [g1, g2]
@register_case(['two_hetero_batch'])
@register_case(["two_hetero_batch"])
def two_hetero_batch_with_isolated_ntypes(): def two_hetero_batch_with_isolated_ntypes():
g1 = dgl.heterograph({ g1 = dgl.heterograph(
('user', 'follows', 'user'): ([0, 1], [1, 2]), {
('user', 'follows', 'developer'): ([0, 1], [1, 2]), ("user", "follows", "user"): ([0, 1], [1, 2]),
('user', 'plays', 'game'): ([0, 1, 2, 3], [0, 0, 1, 1]) ("user", "follows", "developer"): ([0, 1], [1, 2]),
}, num_nodes_dict={'user': 4, 'game': 2, 'developer': 3, 'platform': 2}) ("user", "plays", "game"): ([0, 1, 2, 3], [0, 0, 1, 1]),
g2 = dgl.heterograph({ },
('user', 'follows', 'user'): ([0, 1], [1, 2]), num_nodes_dict={"user": 4, "game": 2, "developer": 3, "platform": 2},
('user', 'follows', 'developer'): ([0, 1], [1, 2]), )
('user', 'plays', 'game'): ([0, 1, 2], [0, 0, 1]) g2 = dgl.heterograph(
}, num_nodes_dict={'user': 3, 'game': 2, 'developer': 3, 'platform': 3}) {
("user", "follows", "user"): ([0, 1], [1, 2]),
("user", "follows", "developer"): ([0, 1], [1, 2]),
("user", "plays", "game"): ([0, 1, 2], [0, 0, 1]),
},
num_nodes_dict={"user": 3, "game": 2, "developer": 3, "platform": 3},
)
return [g1, g2] return [g1, g2]
...@@ -3,12 +3,12 @@ import os ...@@ -3,12 +3,12 @@ import os
import tempfile import tempfile
from collections import Counter from collections import Counter
import dgl
import pytest import pytest
from change_etype_to_canonical_etype import convert_conf, is_old_version from change_etype_to_canonical_etype import convert_conf, is_old_version
from scipy import sparse as spsp
import dgl
from dgl.distributed import partition_graph from dgl.distributed import partition_graph
from scipy import sparse as spsp
def create_random_hetero(type_n, node_n): def create_random_hetero(type_n, node_n):
......
...@@ -2,21 +2,24 @@ import json ...@@ -2,21 +2,24 @@ import json
import os import os
import tempfile import tempfile
import dgl
import numpy as np import numpy as np
import pyarrow.parquet as pq
import pytest import pytest
import torch import torch
import pyarrow.parquet as pq from dgl.data.utils import load_graphs, load_tensors
from utils import create_chunked_dataset from dgl.distributed.partition import (
_etype_tuple_to_str,
_get_inner_edge_mask,
_get_inner_node_mask,
load_partition,
RESERVED_FIELD_DTYPE,
)
from distpartitioning import array_readwriter from distpartitioning import array_readwriter
from distpartitioning.utils import generate_read_list from distpartitioning.utils import generate_read_list
from utils import create_chunked_dataset
import dgl
from dgl.data.utils import load_graphs, load_tensors
from dgl.distributed.partition import (RESERVED_FIELD_DTYPE,
_etype_tuple_to_str,
_get_inner_edge_mask,
_get_inner_node_mask, load_partition)
def _verify_partition_data_types(part_g): def _verify_partition_data_types(part_g):
...@@ -26,12 +29,13 @@ def _verify_partition_data_types(part_g): ...@@ -26,12 +29,13 @@ def _verify_partition_data_types(part_g):
if k in part_g.edata: if k in part_g.edata:
assert part_g.edata[k].dtype == dtype assert part_g.edata[k].dtype == dtype
def _verify_partition_formats(part_g, formats): def _verify_partition_formats(part_g, formats):
# Verify saved graph formats # Verify saved graph formats
if formats is None: if formats is None:
assert "coo" in part_g.formats()["created"] assert "coo" in part_g.formats()["created"]
else: else:
formats = formats.split(',') formats = formats.split(",")
for format in formats: for format in formats:
assert format in part_g.formats()["created"] assert format in part_g.formats()["created"]
...@@ -74,30 +78,34 @@ def _verify_graph_feats( ...@@ -74,30 +78,34 @@ def _verify_graph_feats(
if name in [dgl.EID, "inner_edge"]: if name in [dgl.EID, "inner_edge"]:
continue continue
true_feats = g.edges[etype].data[name][orig_id] true_feats = g.edges[etype].data[name][orig_id]
edata = edge_feats[_etype_tuple_to_str(etype) + "/" + name][local_eids] edata = edge_feats[_etype_tuple_to_str(etype) + "/" + name][
local_eids
]
assert np.array_equal(edata.numpy(), true_feats.numpy()) assert np.array_equal(edata.numpy(), true_feats.numpy())
def _test_chunk_graph( def _test_chunk_graph(
num_chunks, num_chunks,
data_fmt = 'numpy', data_fmt="numpy",
edges_fmt = 'csv', edges_fmt="csv",
vector_rows = False, vector_rows=False,
num_chunks_nodes = None, num_chunks_nodes=None,
num_chunks_edges = None, num_chunks_edges=None,
num_chunks_node_data = None, num_chunks_node_data=None,
num_chunks_edge_data = None num_chunks_edge_data=None,
): ):
with tempfile.TemporaryDirectory() as root_dir: with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(
g = create_chunked_dataset(root_dir, num_chunks, root_dir,
data_fmt=data_fmt, edges_fmt=edges_fmt, num_chunks,
vector_rows=vector_rows, data_fmt=data_fmt,
num_chunks_nodes=num_chunks_nodes, edges_fmt=edges_fmt,
num_chunks_edges=num_chunks_edges, vector_rows=vector_rows,
num_chunks_node_data=num_chunks_node_data, num_chunks_nodes=num_chunks_nodes,
num_chunks_edge_data=num_chunks_edge_data num_chunks_edges=num_chunks_edges,
) num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data,
)
# check metadata.json # check metadata.json
output_dir = os.path.join(root_dir, "chunked-data") output_dir = os.path.join(root_dir, "chunked-data")
...@@ -118,34 +126,33 @@ def _test_chunk_graph( ...@@ -118,34 +126,33 @@ def _test_chunk_graph(
n_chunks = num_chunks_edges n_chunks = num_chunks_edges
for i in range(n_chunks): for i in range(n_chunks):
fname = os.path.join( fname = os.path.join(
output_edge_index_dir, f'{c_etype_str}{i}.txt' output_edge_index_dir, f"{c_etype_str}{i}.txt"
) )
assert os.path.isfile(fname) assert os.path.isfile(fname)
if edges_fmt == 'csv': if edges_fmt == "csv":
with open(fname, "r") as f: with open(fname, "r") as f:
header = f.readline() header = f.readline()
num1, num2 = header.rstrip().split(" ") num1, num2 = header.rstrip().split(" ")
assert isinstance(int(num1), int) assert isinstance(int(num1), int)
assert isinstance(int(num2), int) assert isinstance(int(num2), int)
elif edges_fmt == 'parquet': elif edges_fmt == "parquet":
metadata = pq.read_metadata(fname) metadata = pq.read_metadata(fname)
assert metadata.num_columns == 2 assert metadata.num_columns == 2
else: else:
assert False, f"Invalid edges_fmt: {edges_fmt}" assert False, f"Invalid edges_fmt: {edges_fmt}"
# check node/edge_data # check node/edge_data
suffix = 'npy' if data_fmt=='numpy' else 'parquet' suffix = "npy" if data_fmt == "numpy" else "parquet"
reader_fmt_meta = {"name": data_fmt} reader_fmt_meta = {"name": data_fmt}
def test_data(
sub_dir, feat, expected_data, expected_shape, num_chunks def test_data(sub_dir, feat, expected_data, expected_shape, num_chunks):
):
data = [] data = []
for i in range(num_chunks): for i in range(num_chunks):
fname = os.path.join(sub_dir, f'{feat}-{i}.{suffix}') fname = os.path.join(sub_dir, f"{feat}-{i}.{suffix}")
assert os.path.isfile(fname), f'{fname} cannot be found.' assert os.path.isfile(fname), f"{fname} cannot be found."
feat_array = array_readwriter.get_array_parser( feat_array = array_readwriter.get_array_parser(
**reader_fmt_meta **reader_fmt_meta
).read(fname) ).read(fname)
assert feat_array.shape[0] == expected_shape assert feat_array.shape[0] == expected_shape
data.append(feat_array) data.append(feat_array)
data = np.concatenate(data, 0) data = np.concatenate(data, 0)
...@@ -165,8 +172,13 @@ def _test_chunk_graph( ...@@ -165,8 +172,13 @@ def _test_chunk_graph(
n_chunks = chunks_data.get(feat, num_chunks) n_chunks = chunks_data.get(feat, num_chunks)
else: else:
n_chunks = chunks_data n_chunks = chunks_data
test_data(sub_dir, feat, data, g.num_nodes(ntype) // n_chunks, test_data(
n_chunks) sub_dir,
feat,
data,
g.num_nodes(ntype) // n_chunks,
n_chunks,
)
output_edge_data_dir = os.path.join(output_dir, "edge_data") output_edge_data_dir = os.path.join(output_dir, "edge_data")
for c_etype in g.canonical_etypes: for c_etype in g.canonical_etypes:
...@@ -183,20 +195,31 @@ def _test_chunk_graph( ...@@ -183,20 +195,31 @@ def _test_chunk_graph(
n_chunks = chunks_data.get(feat, num_chunks) n_chunks = chunks_data.get(feat, num_chunks)
else: else:
n_chunks = chunks_data n_chunks = chunks_data
test_data(sub_dir, feat, data, g.num_edges(c_etype) // n_chunks, test_data(
n_chunks) sub_dir,
feat,
data,
g.num_edges(c_etype) // n_chunks,
n_chunks,
)
@pytest.mark.parametrize("num_chunks", [1, 8]) @pytest.mark.parametrize("num_chunks", [1, 8])
@pytest.mark.parametrize("data_fmt", ['numpy', 'parquet']) @pytest.mark.parametrize("data_fmt", ["numpy", "parquet"])
@pytest.mark.parametrize("edges_fmt", ['csv', 'parquet']) @pytest.mark.parametrize("edges_fmt", ["csv", "parquet"])
def test_chunk_graph_basics(num_chunks, data_fmt, edges_fmt): def test_chunk_graph_basics(num_chunks, data_fmt, edges_fmt):
_test_chunk_graph(num_chunks, data_fmt=data_fmt, edges_fmt=edges_fmt) _test_chunk_graph(num_chunks, data_fmt=data_fmt, edges_fmt=edges_fmt)
@pytest.mark.parametrize("num_chunks", [1, 8]) @pytest.mark.parametrize("num_chunks", [1, 8])
@pytest.mark.parametrize("vector_rows", [True, False]) @pytest.mark.parametrize("vector_rows", [True, False])
def test_chunk_graph_vector_rows(num_chunks, vector_rows): def test_chunk_graph_vector_rows(num_chunks, vector_rows):
_test_chunk_graph(num_chunks, data_fmt='parquet', edges_fmt='parquet', vector_rows=vector_rows) _test_chunk_graph(
num_chunks,
data_fmt="parquet",
edges_fmt="parquet",
vector_rows=vector_rows,
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
...@@ -209,24 +232,29 @@ def test_chunk_graph_vector_rows(num_chunks, vector_rows): ...@@ -209,24 +232,29 @@ def test_chunk_graph_vector_rows(num_chunks, vector_rows):
[1, None, None, None, None], [1, None, None, None, None],
[8, None, None, None, None], [8, None, None, None, None],
[4, 4, 4, 8, 12], [4, 4, 4, 8, 12],
[4, 4, 4, {'paper': 10}, {('author', 'writes', 'paper'): 24}], [4, 4, 4, {"paper": 10}, {("author", "writes", "paper"): 24}],
[4, 4, 4, {'paper': {'feat': 10}}, [
{('author', 'writes', 'paper'): {'year': 24}}], 4,
] 4,
4,
{"paper": {"feat": 10}},
{("author", "writes", "paper"): {"year": 24}},
],
],
) )
def test_chunk_graph_arbitray_chunks( def test_chunk_graph_arbitray_chunks(
num_chunks, num_chunks,
num_chunks_nodes, num_chunks_nodes,
num_chunks_edges, num_chunks_edges,
num_chunks_node_data, num_chunks_node_data,
num_chunks_edge_data num_chunks_edge_data,
): ):
_test_chunk_graph( _test_chunk_graph(
num_chunks, num_chunks,
num_chunks_nodes=num_chunks_nodes, num_chunks_nodes=num_chunks_nodes,
num_chunks_edges=num_chunks_edges, num_chunks_edges=num_chunks_edges,
num_chunks_node_data=num_chunks_node_data, num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data num_chunks_edge_data=num_chunks_edge_data,
) )
...@@ -235,11 +263,11 @@ def _test_pipeline( ...@@ -235,11 +263,11 @@ def _test_pipeline(
num_parts, num_parts,
world_size, world_size,
graph_formats=None, graph_formats=None,
data_fmt='numpy', data_fmt="numpy",
num_chunks_nodes=None, num_chunks_nodes=None,
num_chunks_edges=None, num_chunks_edges=None,
num_chunks_node_data=None, num_chunks_node_data=None,
num_chunks_edge_data=None num_chunks_edge_data=None,
): ):
if num_chunks < num_parts: if num_chunks < num_parts:
# num_parts should less/equal than num_chunks # num_parts should less/equal than num_chunks
...@@ -250,14 +278,15 @@ def _test_pipeline( ...@@ -250,14 +278,15 @@ def _test_pipeline(
return return
with tempfile.TemporaryDirectory() as root_dir: with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(
g = create_chunked_dataset(root_dir, num_chunks, root_dir,
data_fmt=data_fmt, num_chunks,
num_chunks_nodes=num_chunks_nodes, data_fmt=data_fmt,
num_chunks_edges=num_chunks_edges, num_chunks_nodes=num_chunks_nodes,
num_chunks_node_data=num_chunks_node_data, num_chunks_edges=num_chunks_edges,
num_chunks_edge_data=num_chunks_edge_data num_chunks_node_data=num_chunks_node_data,
) num_chunks_edge_data=num_chunks_edge_data,
)
# Step1: graph partition # Step1: graph partition
in_dir = os.path.join(root_dir, "chunked-data") in_dir = os.path.join(root_dir, "chunked-data")
...@@ -275,12 +304,12 @@ def _test_pipeline( ...@@ -275,12 +304,12 @@ def _test_pipeline(
assert isinstance(int(header), int) assert isinstance(int(header), int)
# Step2: data dispatch # Step2: data dispatch
partition_dir = os.path.join(root_dir, 'parted_data') partition_dir = os.path.join(root_dir, "parted_data")
out_dir = os.path.join(root_dir, 'partitioned') out_dir = os.path.join(root_dir, "partitioned")
ip_config = os.path.join(root_dir, 'ip_config.txt') ip_config = os.path.join(root_dir, "ip_config.txt")
with open(ip_config, 'w') as f: with open(ip_config, "w") as f:
for i in range(world_size): for i in range(world_size):
f.write(f'127.0.0.{i + 1}\n') f.write(f"127.0.0.{i + 1}\n")
cmd = "python3 tools/dispatch_data.py" cmd = "python3 tools/dispatch_data.py"
cmd += f" --in-dir {in_dir}" cmd += f" --in-dir {in_dir}"
...@@ -323,8 +352,9 @@ def _test_pipeline( ...@@ -323,8 +352,9 @@ def _test_pipeline(
) )
@pytest.mark.parametrize("num_chunks, num_parts, world_size", @pytest.mark.parametrize(
[[4, 4, 4], [8, 4, 2], [8, 4, 4], [9, 6, 3], [11, 11, 1], [11, 4, 1]] "num_chunks, num_parts, world_size",
[[4, 4, 4], [8, 4, 2], [8, 4, 4], [9, 6, 3], [11, 11, 1], [11, 4, 1]],
) )
def test_pipeline_basics(num_chunks, num_parts, world_size): def test_pipeline_basics(num_chunks, num_parts, world_size):
_test_pipeline(num_chunks, num_parts, world_size) _test_pipeline(num_chunks, num_parts, world_size)
...@@ -347,9 +377,14 @@ def test_pipeline_formats(graph_formats): ...@@ -347,9 +377,14 @@ def test_pipeline_formats(graph_formats):
[8, 4, 2, 20, 25], [8, 4, 2, 20, 25],
[9, 7, 5, 3, 11], [9, 7, 5, 3, 11],
[8, 8, 4, 3, 5], [8, 8, 4, 3, 5],
[8, 4, 2, {'paper': {'feat': 11, 'year': 1}}, [
{('author', 'writes', 'paper'): {'year': 24}}], 8,
] 4,
2,
{"paper": {"feat": 11, "year": 1}},
{("author", "writes", "paper"): {"year": 24}},
],
],
) )
def test_pipeline_arbitray_chunks( def test_pipeline_arbitray_chunks(
num_chunks, num_chunks,
...@@ -374,9 +409,7 @@ def test_pipeline_formats(graph_formats): ...@@ -374,9 +409,7 @@ def test_pipeline_formats(graph_formats):
_test_pipeline(4, 4, 4, graph_formats) _test_pipeline(4, 4, 4, graph_formats)
@pytest.mark.parametrize( @pytest.mark.parametrize("data_fmt", ["numpy", "parquet"])
"data_fmt", ["numpy", "parquet"]
)
def test_pipeline_feature_format(data_fmt): def test_pipeline_feature_format(data_fmt):
_test_pipeline(4, 4, 4, data_fmt=data_fmt) _test_pipeline(4, 4, 4, data_fmt=data_fmt)
......
...@@ -9,9 +9,9 @@ import dgl ...@@ -9,9 +9,9 @@ import dgl
import numpy as np import numpy as np
import torch import torch
from dgl.data.utils import load_graphs, load_tensors from dgl.data.utils import load_graphs, load_tensors
from partition_algo.base import load_partition_meta
from utils import create_chunked_dataset from utils import create_chunked_dataset
from partition_algo.base import load_partition_meta
""" """
TODO: skipping this test case since the dependency, mpirun, is TODO: skipping this test case since the dependency, mpirun, is
...@@ -26,25 +26,25 @@ def test_parmetis_preprocessing(): ...@@ -26,25 +26,25 @@ def test_parmetis_preprocessing():
g = create_chunked_dataset(root_dir, num_chunks) g = create_chunked_dataset(root_dir, num_chunks)
# Trigger ParMETIS pre-processing here. # Trigger ParMETIS pre-processing here.
schema_path = os.path.join(root_dir, 'chunked-data/metadata.json') schema_path = os.path.join(root_dir, "chunked-data/metadata.json")
results_dir = os.path.join(root_dir, 'parmetis-data') results_dir = os.path.join(root_dir, "parmetis-data")
os.system( os.system(
f'mpirun -np 2 python3 tools/distpartitioning/parmetis_preprocess.py ' f"mpirun -np 2 python3 tools/distpartitioning/parmetis_preprocess.py "
f'--schema {schema_path} --output {results_dir}' f"--schema {schema_path} --output {results_dir}"
) )
# Now add all the tests and check whether the test has passed or failed. # Now add all the tests and check whether the test has passed or failed.
# Read parmetis_nfiles and ensure all files are present. # Read parmetis_nfiles and ensure all files are present.
parmetis_data_dir = os.path.join(root_dir, 'parmetis-data') parmetis_data_dir = os.path.join(root_dir, "parmetis-data")
assert os.path.isdir(parmetis_data_dir) assert os.path.isdir(parmetis_data_dir)
parmetis_nodes_file = os.path.join( parmetis_nodes_file = os.path.join(
parmetis_data_dir, 'parmetis_nfiles.txt' parmetis_data_dir, "parmetis_nfiles.txt"
) )
assert os.path.isfile(parmetis_nodes_file) assert os.path.isfile(parmetis_nodes_file)
# `parmetis_nfiles.txt` should have each line in the following format. # `parmetis_nfiles.txt` should have each line in the following format.
# <filename> <global_id_start> <global_id_end> # <filename> <global_id_start> <global_id_end>
with open(parmetis_nodes_file, 'r') as nodes_metafile: with open(parmetis_nodes_file, "r") as nodes_metafile:
lines = nodes_metafile.readlines() lines = nodes_metafile.readlines()
total_node_count = 0 total_node_count = 0
for line in lines: for line in lines:
...@@ -54,7 +54,7 @@ def test_parmetis_preprocessing(): ...@@ -54,7 +54,7 @@ def test_parmetis_preprocessing():
assert int(tokens[1]) == total_node_count assert int(tokens[1]) == total_node_count
# check contents of each of the nodes files here # check contents of each of the nodes files here
with open(tokens[0], 'r') as nodes_file: with open(tokens[0], "r") as nodes_file:
node_lines = nodes_file.readlines() node_lines = nodes_file.readlines()
for line in node_lines: for line in node_lines:
val = line.split(" ") val = line.split(" ")
...@@ -65,15 +65,15 @@ def test_parmetis_preprocessing(): ...@@ -65,15 +65,15 @@ def test_parmetis_preprocessing():
assert int(tokens[2]) == total_node_count assert int(tokens[2]) == total_node_count
# Meta_data object. # Meta_data object.
output_dir = os.path.join(root_dir, 'chunked-data') output_dir = os.path.join(root_dir, "chunked-data")
json_file = os.path.join(output_dir, 'metadata.json') json_file = os.path.join(output_dir, "metadata.json")
assert os.path.isfile(json_file) assert os.path.isfile(json_file)
with open(json_file, 'rb') as f: with open(json_file, "rb") as f:
meta_data = json.load(f) meta_data = json.load(f)
# Count the total no. of nodes. # Count the total no. of nodes.
true_node_count = 0 true_node_count = 0
num_nodes_per_chunk = meta_data['num_nodes_per_chunk'] num_nodes_per_chunk = meta_data["num_nodes_per_chunk"]
for i in range(len(num_nodes_per_chunk)): for i in range(len(num_nodes_per_chunk)):
node_per_part = num_nodes_per_chunk[i] node_per_part = num_nodes_per_chunk[i]
for j in range(len(node_per_part)): for j in range(len(node_per_part)):
...@@ -83,18 +83,18 @@ def test_parmetis_preprocessing(): ...@@ -83,18 +83,18 @@ def test_parmetis_preprocessing():
# Read parmetis_efiles and ensure all files are present. # Read parmetis_efiles and ensure all files are present.
# This file contains a list of filenames. # This file contains a list of filenames.
parmetis_edges_file = os.path.join( parmetis_edges_file = os.path.join(
parmetis_data_dir, 'parmetis_efiles.txt' parmetis_data_dir, "parmetis_efiles.txt"
) )
assert os.path.isfile(parmetis_edges_file) assert os.path.isfile(parmetis_edges_file)
with open(parmetis_edges_file, 'r') as edges_metafile: with open(parmetis_edges_file, "r") as edges_metafile:
lines = edges_metafile.readlines() lines = edges_metafile.readlines()
total_edge_count = 0 total_edge_count = 0
for line in lines: for line in lines:
edges_filename = line.strip() edges_filename = line.strip()
assert os.path.isfile(edges_filename) assert os.path.isfile(edges_filename)
with open(edges_filename, 'r') as edges_file: with open(edges_filename, "r") as edges_file:
edge_lines = edges_file.readlines() edge_lines = edges_file.readlines()
total_edge_count += len(edge_lines) total_edge_count += len(edge_lines)
for line in edge_lines: for line in edge_lines:
...@@ -103,7 +103,7 @@ def test_parmetis_preprocessing(): ...@@ -103,7 +103,7 @@ def test_parmetis_preprocessing():
# Count the total no. of edges # Count the total no. of edges
true_edge_count = 0 true_edge_count = 0
num_edges_per_chunk = meta_data['num_edges_per_chunk'] num_edges_per_chunk = meta_data["num_edges_per_chunk"]
for i in range(len(num_edges_per_chunk)): for i in range(len(num_edges_per_chunk)):
edges_per_part = num_edges_per_chunk[i] edges_per_part = num_edges_per_chunk[i]
for j in range(len(edges_per_part)): for j in range(len(edges_per_part)):
...@@ -117,42 +117,42 @@ def test_parmetis_postprocessing(): ...@@ -117,42 +117,42 @@ def test_parmetis_postprocessing():
g = create_chunked_dataset(root_dir, num_chunks) g = create_chunked_dataset(root_dir, num_chunks)
num_nodes = g.number_of_nodes() num_nodes = g.number_of_nodes()
num_institutions = g.number_of_nodes('institution') num_institutions = g.number_of_nodes("institution")
num_authors = g.number_of_nodes('author') num_authors = g.number_of_nodes("author")
num_papers = g.number_of_nodes('paper') num_papers = g.number_of_nodes("paper")
# Generate random parmetis partition ids for the nodes in the graph. # Generate random parmetis partition ids for the nodes in the graph.
# Replace this code with actual ParMETIS executable when it is ready # Replace this code with actual ParMETIS executable when it is ready
output_dir = os.path.join(root_dir, 'chunked-data') output_dir = os.path.join(root_dir, "chunked-data")
parmetis_file = os.path.join(output_dir, 'parmetis_output.txt') parmetis_file = os.path.join(output_dir, "parmetis_output.txt")
node_ids = np.arange(num_nodes) node_ids = np.arange(num_nodes)
partition_ids = np.random.randint(0, 2, (num_nodes,)) partition_ids = np.random.randint(0, 2, (num_nodes,))
parmetis_output = np.column_stack([node_ids, partition_ids]) parmetis_output = np.column_stack([node_ids, partition_ids])
# Create parmetis output, this is mimicking running actual parmetis. # Create parmetis output, this is mimicking running actual parmetis.
with open(parmetis_file, 'w') as f: with open(parmetis_file, "w") as f:
np.savetxt(f, parmetis_output) np.savetxt(f, parmetis_output)
# Check the post processing script here. # Check the post processing script here.
results_dir = os.path.join(output_dir, 'partitions_dir') results_dir = os.path.join(output_dir, "partitions_dir")
json_file = os.path.join(output_dir, 'metadata.json') json_file = os.path.join(output_dir, "metadata.json")
print(json_file) print(json_file)
print(results_dir) print(results_dir)
print(parmetis_file) print(parmetis_file)
os.system( os.system(
f'python3 tools/distpartitioning/parmetis_postprocess.py ' f"python3 tools/distpartitioning/parmetis_postprocess.py "
f'--schema_file {json_file} ' f"--schema_file {json_file} "
f'--parmetis_output_file {parmetis_file} ' f"--parmetis_output_file {parmetis_file} "
f'--partitions_dir {results_dir}' f"--partitions_dir {results_dir}"
) )
ntype_count = { ntype_count = {
'author': num_authors, "author": num_authors,
'paper': num_papers, "paper": num_papers,
'institution': num_institutions, "institution": num_institutions,
} }
for ntype_name in ['author', 'paper', 'institution']: for ntype_name in ["author", "paper", "institution"]:
fname = os.path.join(results_dir, f'{ntype_name}.txt') fname = os.path.join(results_dir, f"{ntype_name}.txt")
print(fname) print(fname)
assert os.path.isfile(fname) assert os.path.isfile(fname)
...@@ -185,49 +185,49 @@ def test_parmetis_wrapper(): ...@@ -185,49 +185,49 @@ def test_parmetis_wrapper():
all_ntypes = g.ntypes all_ntypes = g.ntypes
all_etypes = g.etypes all_etypes = g.etypes
num_constraints = len(all_ntypes) + 3 num_constraints = len(all_ntypes) + 3
num_institutions = g.number_of_nodes('institution') num_institutions = g.number_of_nodes("institution")
num_authors = g.number_of_nodes('author') num_authors = g.number_of_nodes("author")
num_papers = g.number_of_nodes('paper') num_papers = g.number_of_nodes("paper")
# Trigger ParMETIS. # Trigger ParMETIS.
schema_file = os.path.join(root_dir, 'chunked-data/metadata.json') schema_file = os.path.join(root_dir, "chunked-data/metadata.json")
preproc_output_dir = os.path.join( preproc_output_dir = os.path.join(
root_dir, 'chunked-data/preproc_output_dir' root_dir, "chunked-data/preproc_output_dir"
) )
parmetis_output_file = os.path.join( parmetis_output_file = os.path.join(
os.getcwd(), f'{graph_name}_part.{num_chunks}' os.getcwd(), f"{graph_name}_part.{num_chunks}"
) )
partitions_dir = os.path.join(root_dir, 'chunked-data/partitions_dir') partitions_dir = os.path.join(root_dir, "chunked-data/partitions_dir")
hostfile = os.path.join(root_dir, 'ip_config.txt') hostfile = os.path.join(root_dir, "ip_config.txt")
with open(hostfile, 'w') as f: with open(hostfile, "w") as f:
f.write('127.0.0.1\n') f.write("127.0.0.1\n")
f.write('127.0.0.1\n') f.write("127.0.0.1\n")
num_nodes = g.number_of_nodes() num_nodes = g.number_of_nodes()
num_edges = g.number_of_edges() num_edges = g.number_of_edges()
stats_file = f'{graph_name}_stats.txt' stats_file = f"{graph_name}_stats.txt"
with open(stats_file, 'w') as f: with open(stats_file, "w") as f:
f.write(f'{num_nodes} {num_edges} {num_constraints}') f.write(f"{num_nodes} {num_edges} {num_constraints}")
parmetis_cmd = ( parmetis_cmd = (
f'python3 tools/distpartitioning/parmetis_wrapper.py ' f"python3 tools/distpartitioning/parmetis_wrapper.py "
f'--schema_file {schema_file} ' f"--schema_file {schema_file} "
f'--preproc_output_dir {preproc_output_dir} ' f"--preproc_output_dir {preproc_output_dir} "
f'--hostfile {hostfile} ' f"--hostfile {hostfile} "
f'--parmetis_output_file {parmetis_output_file} ' f"--parmetis_output_file {parmetis_output_file} "
f'--partitions_dir {partitions_dir} ' f"--partitions_dir {partitions_dir} "
) )
print(f'Executing the following cmd: {parmetis_cmd}') print(f"Executing the following cmd: {parmetis_cmd}")
print(parmetis_cmd) print(parmetis_cmd)
os.system(parmetis_cmd) os.system(parmetis_cmd)
ntype_count = { ntype_count = {
'author': num_authors, "author": num_authors,
'paper': num_papers, "paper": num_papers,
'institution': num_institutions, "institution": num_institutions,
} }
for ntype_name in ['author', 'paper', 'institution']: for ntype_name in ["author", "paper", "institution"]:
fname = os.path.join(partitions_dir, f'{ntype_name}.txt') fname = os.path.join(partitions_dir, f"{ntype_name}.txt")
print(fname) print(fname)
assert os.path.isfile(fname) assert os.path.isfile(fname)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment