Unverified Commit f4ee96db authored by Serge Panev's avatar Serge Panev Committed by GitHub
Browse files

[Dist][Test] Add tests for multi-node DistDataLoader, DistNodeDataLoader,...


[Dist][Test] Add tests for multi-node DistDataLoader, DistNodeDataLoader, DistEdgeDataLoader (#4846)
Signed-off-by: default avatarSerge Panev <spanev@nvidia.com>
Signed-off-by: default avatarSerge Panev <spanev@nvidia.com>
parent a8f9d5ef
import os import os
from itertools import product
import numpy as np import numpy as np
...@@ -392,6 +393,278 @@ def dist_optimizer_check_store(g): ...@@ -392,6 +393,278 @@ def dist_optimizer_check_store(g):
def test_dist_optimizer(g): def test_dist_optimizer(g):
dist_optimizer_check_store(g) dist_optimizer_check_store(g)
##########################################
############# DistDataLoader #############
##########################################
class NeighborSampler(object):
def __init__(self, g, fanouts, sample_neighbors):
self.g = g
self.fanouts = fanouts
self.sample_neighbors = sample_neighbors
def sample_blocks(self, seeds):
import torch as th
seeds = th.LongTensor(np.asarray(seeds))
blocks = []
for fanout in self.fanouts:
# For each seed node, sample ``fanout`` neighbors.
frontier = self.sample_neighbors(
self.g, seeds, fanout, replace=True
)
# Then we compact the frontier into a bipartite graph for
# message passing.
block = dgl.to_block(frontier, seeds)
# Obtain the seed nodes for next layer.
seeds = block.srcdata[dgl.NID]
block.edata["original_eids"] = frontier.edata[dgl.EID]
blocks.insert(0, block)
return blocks
def distdataloader_test(g, batch_size, drop_last, shuffle):
# We sample only a subset to minimize the test runtime
num_nodes_to_sample = g.num_nodes() * 0.05
# To make sure that drop_last is tested
if num_nodes_to_sample % batch_size == 0:
num_nodes_to_sample -= 1
orig_nid_map = dict()
dtype = g.edges[g.etypes[0]].data['edge_u'].dtype
for ntype in g.ntypes:
orig_nid = F.tensor(np.load(graph_path + f'/orig_nid_array_{ntype}.npy'), dtype)
orig_nid_map[ntype] = orig_nid
orig_uv_map = dict()
for etype in g.etypes:
orig_uv_map[etype] = (g.edges[etype].data['edge_u'], g.edges[etype].data['edge_v'])
if len(g.ntypes) == 1:
train_nid = F.arange(0, num_nodes_to_sample)
else:
train_nid = {g.ntypes[0]: F.arange(0, num_nodes_to_sample)}
sampler = NeighborSampler(
g,
[5, 10],
dgl.distributed.sample_neighbors
)
dataloader = dgl.dataloading.DistDataLoader(
dataset=train_nid.numpy(),
batch_size=batch_size,
collate_fn=sampler.sample_blocks,
shuffle=shuffle,
drop_last=drop_last,
)
for _ in range(2):
max_nid = []
for idx, blocks in zip(range(0, num_nodes_to_sample, batch_size), dataloader):
block = blocks[-1]
for src_type, etype, dst_type in block.canonical_etypes:
orig_u, orig_v = orig_uv_map[etype]
o_src, o_dst = block.edges(etype=etype)
src_nodes_id = block.srcnodes[src_type].data[dgl.NID][o_src]
dst_nodes_id = block.dstnodes[dst_type].data[dgl.NID][o_dst]
max_nid.append(np.max(F.asnumpy(dst_nodes_id)))
src_nodes_id = orig_nid_map[src_type][src_nodes_id]
dst_nodes_id = orig_nid_map[dst_type][dst_nodes_id]
eids = block.edata["original_eids"]
F.allclose(src_nodes_id, orig_u[eids])
F.allclose(dst_nodes_id, orig_v[eids])
if not shuffle and len(max_nid) > 0:
if drop_last:
assert (
np.max(max_nid)
== num_nodes_to_sample
- 1
- num_nodes_to_sample % batch_size
)
else:
assert np.max(max_nid) == num_nodes_to_sample - 1
del dataloader
def distnodedataloader_test(g, batch_size, drop_last, shuffle,
num_workers, orig_nid_map, orig_uv_map):
# We sample only a subset to minimize the test runtime
num_nodes_to_sample = g.num_nodes(g.ntypes[-1]) * 0.05
# To make sure that drop_last is tested
if num_nodes_to_sample % batch_size == 0:
num_nodes_to_sample -= 1
if len(g.ntypes) == 1:
train_nid = F.arange(0, num_nodes_to_sample)
else:
train_nid = {g.ntypes[-1]: F.arange(0, num_nodes_to_sample)}
if len(g.etypes) > 1:
sampler = dgl.dataloading.MultiLayerNeighborSampler(
[
{etype: 5 for etype in g.etypes},
10,
]
)
else:
sampler = dgl.dataloading.MultiLayerNeighborSampler(
[
5,
10,
]
)
dataloader = dgl.dataloading.DistNodeDataLoader(
g,
train_nid,
sampler,
batch_size=batch_size,
shuffle=shuffle,
drop_last=drop_last,
num_workers=num_workers,
)
for _ in range(2):
for _, (_, _, blocks) in zip(range(0, num_nodes_to_sample, batch_size), dataloader):
block = blocks[-1]
for src_type, etype, dst_type in block.canonical_etypes:
orig_u, orig_v = orig_uv_map[etype]
o_src, o_dst = block.edges(etype=etype)
src_nodes_id = block.srcnodes[src_type].data[dgl.NID][o_src]
dst_nodes_id = block.dstnodes[dst_type].data[dgl.NID][o_dst]
src_nodes_id = orig_nid_map[src_type][src_nodes_id]
dst_nodes_id = orig_nid_map[dst_type][dst_nodes_id]
eids = block.edges[etype].data[dgl.EID]
F.allclose(src_nodes_id, orig_u[eids])
F.allclose(dst_nodes_id, orig_v[eids])
del dataloader
def distedgedataloader_test(g, batch_size, drop_last, shuffle,
num_workers, orig_nid_map, orig_uv_map, num_negs):
# We sample only a subset to minimize the test runtime
num_edges_to_sample = g.num_edges(g.etypes[-1]) * 0.05
# To make sure that drop_last is tested
if num_edges_to_sample % batch_size == 0:
num_edges_to_sample -= 1
if len(g.etypes) == 1:
train_eid = F.arange(0, num_edges_to_sample)
else:
train_eid = {g.etypes[-1]: F.arange(0, num_edges_to_sample)}
sampler = dgl.dataloading.MultiLayerNeighborSampler([5, 10])
dataloader = dgl.dataloading.DistEdgeDataLoader(
g,
train_eid,
sampler,
batch_size=batch_size,
negative_sampler=dgl.dataloading.negative_sampler.Uniform(num_negs) if num_negs > 0 else None,
shuffle=shuffle,
drop_last=drop_last,
num_workers=num_workers,
)
for _ in range(2):
for _, sampled_data in zip(
range(0, num_edges_to_sample, batch_size), dataloader
):
blocks = sampled_data[3 if num_negs > 0 else 2]
block = blocks[-1]
for src_type, etype, dst_type in block.canonical_etypes:
orig_u, orig_v = orig_uv_map[etype]
o_src, o_dst = block.edges(etype=etype)
src_nodes_id = block.srcnodes[src_type].data[dgl.NID][o_src]
dst_nodes_id = block.dstnodes[dst_type].data[dgl.NID][o_dst]
src_nodes_id = orig_nid_map[src_type][src_nodes_id]
dst_nodes_id = orig_nid_map[dst_type][dst_nodes_id]
eids = block.edges[etype].data[dgl.EID]
F.allclose(src_nodes_id, orig_u[eids])
F.allclose(dst_nodes_id, orig_v[eids])
if num_negs == 0:
pos_pair_graph = sampled_data[1]
assert np.all(
F.asnumpy(block.dstnodes[dst_type].data[dgl.NID])
== F.asnumpy(pos_pair_graph.nodes[dst_type].data[dgl.NID])
)
else:
pos_graph, neg_graph = sampled_data[1:3]
assert np.all(
F.asnumpy(block.dstnodes[dst_type].data[dgl.NID])
== F.asnumpy(pos_graph.nodes[dst_type].data[dgl.NID])
)
assert np.all(
F.asnumpy(block.dstnodes[dst_type].data[dgl.NID])
== F.asnumpy(neg_graph.nodes[dst_type].data[dgl.NID])
)
assert pos_graph.num_edges() * num_negs == neg_graph.num_edges()
del dataloader
def multi_distdataloader_test(g, dataloader_class):
total_num_items = g.num_nodes(g.ntypes[-1]) if "Node" in dataloader_class.__name__ else g.num_edges(g.etypes[-1])
# We sample only a subset to minimize the test runtime
num_items_to_sample = total_num_items * 0.05
# To make sure that drop_last is tested
if num_items_to_sample % batch_size == 0:
num_items_to_sample -= 1
num_dataloaders=4
batch_size=32
sampler = dgl.dataloading.NeighborSampler([-1])
dataloaders = []
dl_iters = []
if len(g.ntypes) == 1:
train_ids = F.arange(0, num_items_to_sample)
else:
train_ids = {g.ntypes[-1] if "Node" in dataloader_class.__name__ else g.etypes[-1] : F.arange(0, num_items_to_sample)}
for _ in range(num_dataloaders):
dataloader = dataloader_class(
g, train_ids, sampler, batch_size=batch_size
)
dataloaders.append(dataloader)
dl_iters.append(iter(dataloader))
# iterate on multiple dataloaders randomly
while len(dl_iters) > 0:
current_dl = np.random.choice(len(dl_iters), 1)[0]
try:
_ = next(dl_iters[current_dl])
except StopIteration:
dl_iters.pop(current_dl)
del dataloaders[current_dl]
def test_dist_dataloader(g):
orig_nid_map = dict()
dtype = g.edges[g.etypes[0]].data['edge_u'].dtype
for ntype in g.ntypes:
orig_nid = F.tensor(np.load(graph_path + f'/orig_nid_array_{ntype}.npy'), dtype)
orig_nid_map[ntype] = orig_nid
orig_uv_map = dict()
for etype in g.etypes:
orig_uv_map[etype] = (g.edges[etype].data['edge_u'], g.edges[etype].data['edge_v'])
batch_size_l = [64]
drop_last_l = [False, True]
num_workers_l = [0, 4]
shuffle_l = [False, True]
for batch_size, drop_last, shuffle, num_workers \
in product(batch_size_l, drop_last_l, shuffle_l, num_workers_l):
if len(g.ntypes) == 1 and num_workers == 0:
distdataloader_test(g, batch_size, drop_last, shuffle)
distnodedataloader_test(g, batch_size, drop_last, shuffle, num_workers, orig_nid_map, orig_uv_map)
# No negssampling
distedgedataloader_test(g, batch_size, drop_last, shuffle, num_workers, orig_nid_map, orig_uv_map, num_negs=0)
# negsampling 15
distedgedataloader_test(g, batch_size, drop_last, shuffle, num_workers, orig_nid_map, orig_uv_map, num_negs=15)
multi_distdataloader_test(g, dgl.dataloading.DistNodeDataLoader)
multi_distdataloader_test(g, dgl.dataloading.DistEdgeDataLoader)
if mode == "server": if mode == "server":
shared_mem = bool(int(os.environ.get("DIST_DGL_TEST_SHARED_MEM"))) shared_mem = bool(int(os.environ.get("DIST_DGL_TEST_SHARED_MEM")))
...@@ -419,6 +692,7 @@ elif mode == "client": ...@@ -419,6 +692,7 @@ elif mode == "client":
"DistTensor": test_dist_tensor, "DistTensor": test_dist_tensor,
"DistEmbedding": test_dist_embedding, "DistEmbedding": test_dist_embedding,
"DistOptimizer": test_dist_optimizer, "DistOptimizer": test_dist_optimizer,
"DistDataLoader": test_dist_dataloader,
} }
target = os.environ.get("DIST_DGL_TEST_OBJECT_TYPE", "") target = os.environ.get("DIST_DGL_TEST_OBJECT_TYPE", "")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment