"vscode:/vscode.git/clone" did not exist on "c9f5ba10209293317b5b95a32d58d430da3f8f70"
Unverified Commit 408eba24 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

Multiple dist dl sampler (#4704)

* [Dist] enable iterate multiple dist dataloaders simultaneously

* format file

* add support for any number of dataloaders

* fix lint

* refine code
parent ea48ce7a
...@@ -12,6 +12,7 @@ import traceback ...@@ -12,6 +12,7 @@ import traceback
from enum import Enum from enum import Enum
from .. import utils from .. import utils
from ..base import DGLError
from . import rpc from . import rpc
from .constants import MAX_QUEUE_SIZE from .constants import MAX_QUEUE_SIZE
from .kvstore import close_kvstore, init_kvstore from .kvstore import close_kvstore, init_kvstore
...@@ -122,8 +123,11 @@ class CustomPool: ...@@ -122,8 +123,11 @@ class CustomPool:
""" """
ctx = mp.get_context("spawn") ctx = mp.get_context("spawn")
self.num_workers = num_workers self.num_workers = num_workers
self.queue_size = num_workers * 4 # As pool could be used by any number of dataloaders, queues
# should be able to take infinite elements to avoid dead lock.
self.queue_size = 0
self.result_queue = ctx.Queue(self.queue_size) self.result_queue = ctx.Queue(self.queue_size)
self.results = {} # key is dataloader name, value is fetched batch.
self.task_queues = [] self.task_queues = []
self.process_list = [] self.process_list = []
self.current_proc_id = 0 self.current_proc_id = 0
...@@ -149,6 +153,7 @@ class CustomPool: ...@@ -149,6 +153,7 @@ class CustomPool:
self.task_queues[i].put( self.task_queues[i].put(
(MpCommand.SET_COLLATE_FN, (dataloader_name, func)) (MpCommand.SET_COLLATE_FN, (dataloader_name, func))
) )
self.results[dataloader_name] = []
def submit_task(self, dataloader_name, args): def submit_task(self, dataloader_name, args):
"""Submit task to workers""" """Submit task to workers"""
...@@ -167,9 +172,14 @@ class CustomPool: ...@@ -167,9 +172,14 @@ class CustomPool:
def get_result(self, dataloader_name, timeout=1800): def get_result(self, dataloader_name, timeout=1800):
"""Get result from result queue""" """Get result from result queue"""
result_dataloader_name, result = self.result_queue.get(timeout=timeout) if dataloader_name not in self.results:
assert result_dataloader_name == dataloader_name raise DGLError(
return result f"Got result from an unknown dataloader {dataloader_name}."
)
while len(self.results[dataloader_name]) == 0:
dl_name, data = self.result_queue.get(timeout=timeout)
self.results[dl_name].append(data)
return self.results[dataloader_name].pop(0)
def delete_collate_fn(self, dataloader_name): def delete_collate_fn(self, dataloader_name):
"""Delete collate function""" """Delete collate function"""
...@@ -177,6 +187,7 @@ class CustomPool: ...@@ -177,6 +187,7 @@ class CustomPool:
self.task_queues[i].put( self.task_queues[i].put(
(MpCommand.DELETE_COLLATE_FN, (dataloader_name,)) (MpCommand.DELETE_COLLATE_FN, (dataloader_name,))
) )
del self.results[dataloader_name]
def call_barrier(self): def call_barrier(self):
"""Call barrier at all workers""" """Call barrier at all workers"""
......
import dgl
import unittest
import os
from scipy import sparse as spsp
from dgl.data import CitationGraphDataset
from dgl.distributed import sample_neighbors
from dgl.distributed import partition_graph, load_partition, load_partition_book
import sys
import multiprocessing as mp import multiprocessing as mp
import numpy as np import os
import tempfile
import time import time
from utils import generate_ip_config, reset_envs
from pathlib import Path
from dgl.distributed import DistGraphServer, DistGraph, DistDataLoader
import pytest
import backend as F import backend as F
import dgl
import numpy as np
import pytest
import torch as th
from dgl.data import CitationGraphDataset
from dgl.distributed import (
DistDataLoader,
DistGraph,
DistGraphServer,
load_partition,
partition_graph,
)
from scipy import sparse as spsp
from utils import generate_ip_config, reset_envs
class NeighborSampler(object): class NeighborSampler(object):
def __init__(self, g, fanouts, sample_neighbors): def __init__(self, g, fanouts, sample_neighbors):
...@@ -23,13 +28,16 @@ class NeighborSampler(object): ...@@ -23,13 +28,16 @@ class NeighborSampler(object):
def sample_blocks(self, seeds): def sample_blocks(self, seeds):
import torch as th import torch as th
seeds = th.LongTensor(np.asarray(seeds)) seeds = th.LongTensor(np.asarray(seeds))
blocks = [] blocks = []
for fanout in self.fanouts: for fanout in self.fanouts:
# For each seed node, sample ``fanout`` neighbors. # For each seed node, sample ``fanout`` neighbors.
frontier = self.sample_neighbors( frontier = self.sample_neighbors(
self.g, seeds, fanout, replace=True) self.g, seeds, fanout, replace=True
# Then we compact the frontier into a bipartite graph for message passing. )
# Then we compact the frontier into a bipartite graph for
# message passing.
block = dgl.to_block(frontier, seeds) block = dgl.to_block(frontier, seeds)
# Obtain the seed nodes for next layer. # Obtain the seed nodes for next layer.
seeds = block.srcdata[dgl.NID] seeds = block.srcdata[dgl.NID]
...@@ -38,35 +46,59 @@ class NeighborSampler(object): ...@@ -38,35 +46,59 @@ class NeighborSampler(object):
return blocks return blocks
def start_server(rank, tmpdir, disable_shared_mem, num_clients, keep_alive=False): def start_server(
import dgl rank,
print('server: #clients=' + str(num_clients)) ip_config,
g = DistGraphServer(rank, "mp_ip_config.txt", 1, num_clients, part_config,
tmpdir / 'test_sampling.json', disable_shared_mem=disable_shared_mem, disable_shared_mem,
graph_format=['csc', 'coo'], keep_alive=keep_alive) num_clients,
keep_alive=False,
):
print("server: #clients=" + str(num_clients))
g = DistGraphServer(
rank,
ip_config,
1,
num_clients,
part_config,
disable_shared_mem=disable_shared_mem,
graph_format=["csc", "coo"],
keep_alive=keep_alive,
)
g.start() g.start()
def start_dist_dataloader(rank, tmpdir, num_server, drop_last, orig_nid, orig_eid, group_id=0): def start_dist_dataloader(
rank,
ip_config,
part_config,
num_server,
drop_last,
orig_nid,
orig_eid,
group_id=0,
):
import dgl import dgl
import torch as th import torch as th
os.environ['DGL_GROUP_ID'] = str(group_id)
dgl.distributed.initialize("mp_ip_config.txt") os.environ["DGL_GROUP_ID"] = str(group_id)
dgl.distributed.initialize(ip_config)
gpb = None gpb = None
disable_shared_mem = num_server > 0 disable_shared_mem = num_server > 0
if disable_shared_mem: if disable_shared_mem:
_, _, _, gpb, _, _, _ = load_partition(tmpdir / 'test_sampling.json', rank) _, _, _, gpb, _, _, _ = load_partition(part_config, rank)
num_nodes_to_sample = 202 num_nodes_to_sample = 202
batch_size = 32 batch_size = 32
train_nid = th.arange(num_nodes_to_sample) train_nid = th.arange(num_nodes_to_sample)
dist_graph = DistGraph("test_mp", gpb=gpb, part_config=tmpdir / 'test_sampling.json') dist_graph = DistGraph("test_mp", gpb=gpb, part_config=part_config)
for i in range(num_server): for i in range(num_server):
part, _, _, _, _, _, _ = load_partition(tmpdir / 'test_sampling.json', i) part, _, _, _, _, _, _ = load_partition(part_config, i)
# Create sampler # Create sampler
sampler = NeighborSampler(dist_graph, [5, 10], sampler = NeighborSampler(
dgl.distributed.sample_neighbors) dist_graph, [5, 10], dgl.distributed.sample_neighbors
)
# We need to test creating DistDataLoader multiple times. # We need to test creating DistDataLoader multiple times.
for i in range(2): for i in range(2):
...@@ -76,13 +108,16 @@ def start_dist_dataloader(rank, tmpdir, num_server, drop_last, orig_nid, orig_ei ...@@ -76,13 +108,16 @@ def start_dist_dataloader(rank, tmpdir, num_server, drop_last, orig_nid, orig_ei
batch_size=batch_size, batch_size=batch_size,
collate_fn=sampler.sample_blocks, collate_fn=sampler.sample_blocks,
shuffle=False, shuffle=False,
drop_last=drop_last) drop_last=drop_last,
)
groundtruth_g = CitationGraphDataset("cora")[0] groundtruth_g = CitationGraphDataset("cora")[0]
max_nid = [] max_nid = []
for epoch in range(2): for epoch in range(2):
for idx, blocks in zip(range(0, num_nodes_to_sample, batch_size), dataloader): for idx, blocks in zip(
range(0, num_nodes_to_sample, batch_size), dataloader
):
block = blocks[-1] block = blocks[-1]
o_src, o_dst = block.edges() o_src, o_dst = block.edges()
src_nodes_id = block.srcdata[dgl.NID][o_src] src_nodes_id = block.srcdata[dgl.NID][o_src]
...@@ -91,48 +126,75 @@ def start_dist_dataloader(rank, tmpdir, num_server, drop_last, orig_nid, orig_ei ...@@ -91,48 +126,75 @@ def start_dist_dataloader(rank, tmpdir, num_server, drop_last, orig_nid, orig_ei
src_nodes_id = orig_nid[src_nodes_id] src_nodes_id = orig_nid[src_nodes_id]
dst_nodes_id = orig_nid[dst_nodes_id] dst_nodes_id = orig_nid[dst_nodes_id]
has_edges = groundtruth_g.has_edges_between(src_nodes_id, dst_nodes_id) has_edges = groundtruth_g.has_edges_between(
src_nodes_id, dst_nodes_id
)
assert np.all(F.asnumpy(has_edges)) assert np.all(F.asnumpy(has_edges))
# assert np.all(np.unique(np.sort(F.asnumpy(dst_nodes_id))) == np.arange(idx, batch_size))
if drop_last: if drop_last:
assert np.max(max_nid) == num_nodes_to_sample - 1 - num_nodes_to_sample % batch_size assert (
np.max(max_nid)
== num_nodes_to_sample
- 1
- num_nodes_to_sample % batch_size
)
else: else:
assert np.max(max_nid) == num_nodes_to_sample - 1 assert np.max(max_nid) == num_nodes_to_sample - 1
del dataloader del dataloader
dgl.distributed.exit_client() # this is needed since there's two test here in one process # this is needed since there's two test here in one process
dgl.distributed.exit_client()
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name != 'pytorch', reason='Only support PyTorch for now') def test_standalone():
def test_standalone(tmpdir):
reset_envs() reset_envs()
generate_ip_config("mp_ip_config.txt", 1, 1) with tempfile.TemporaryDirectory() as test_dir:
ip_config = os.path.join(test_dir, "ip_config.txt")
generate_ip_config(ip_config, 1, 1)
g = CitationGraphDataset("cora")[0] g = CitationGraphDataset("cora")[0]
print(g.idtype) print(g.idtype)
num_parts = 1 num_parts = 1
num_hops = 1 num_hops = 1
orig_nid, orig_eid = partition_graph(g, 'test_sampling', num_parts, tmpdir, orig_nid, orig_eid = partition_graph(
num_hops=num_hops, part_method='metis', reshuffle=True, g,
return_mapping=True) "test_sampling",
num_parts,
os.environ['DGL_DIST_MODE'] = 'standalone' test_dir,
num_hops=num_hops,
part_method="metis",
reshuffle=True,
return_mapping=True,
)
part_config = os.path.join(test_dir, "test_sampling.json")
os.environ["DGL_DIST_MODE"] = "standalone"
try: try:
start_dist_dataloader(0, tmpdir, 1, True, orig_nid, orig_eid) start_dist_dataloader(
0, ip_config, part_config, 1, True, orig_nid, orig_eid
)
except Exception as e: except Exception as e:
print(e) print(e)
def start_dist_neg_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, groundtruth_g):
def start_dist_neg_dataloader(
rank,
ip_config,
part_config,
num_server,
num_workers,
orig_nid,
groundtruth_g,
):
import dgl import dgl
import torch as th import torch as th
dgl.distributed.initialize("mp_ip_config.txt")
dgl.distributed.initialize(ip_config)
gpb = None gpb = None
disable_shared_mem = num_server > 1 disable_shared_mem = num_server > 1
if disable_shared_mem: if disable_shared_mem:
_, _, _, gpb, _, _, _ = load_partition(tmpdir / 'test_sampling.json', rank) _, _, _, gpb, _, _, _ = load_partition(part_config, rank)
num_edges_to_sample = 202 num_edges_to_sample = 202
batch_size = 32 batch_size = 32
dist_graph = DistGraph("test_mp", gpb=gpb, part_config=tmpdir / 'test_sampling.json') dist_graph = DistGraph("test_mp", gpb=gpb, part_config=part_config)
assert len(dist_graph.ntypes) == len(groundtruth_g.ntypes) assert len(dist_graph.ntypes) == len(groundtruth_g.ntypes)
assert len(dist_graph.etypes) == len(groundtruth_g.etypes) assert len(dist_graph.etypes) == len(groundtruth_g.etypes)
if len(dist_graph.etypes) == 1: if len(dist_graph.etypes) == 1:
...@@ -141,21 +203,25 @@ def start_dist_neg_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, g ...@@ -141,21 +203,25 @@ def start_dist_neg_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, g
train_eid = {dist_graph.etypes[0]: th.arange(num_edges_to_sample)} train_eid = {dist_graph.etypes[0]: th.arange(num_edges_to_sample)}
for i in range(num_server): for i in range(num_server):
part, _, _, _, _, _, _ = load_partition(tmpdir / 'test_sampling.json', i) part, _, _, _, _, _, _ = load_partition(part_config, i)
num_negs = 5 num_negs = 5
sampler = dgl.dataloading.MultiLayerNeighborSampler([5,10]) sampler = dgl.dataloading.MultiLayerNeighborSampler([5, 10])
negative_sampler=dgl.dataloading.negative_sampler.Uniform(num_negs) negative_sampler = dgl.dataloading.negative_sampler.Uniform(num_negs)
dataloader = dgl.dataloading.DistEdgeDataLoader(dist_graph, dataloader = dgl.dataloading.DistEdgeDataLoader(
dist_graph,
train_eid, train_eid,
sampler, sampler,
batch_size=batch_size, batch_size=batch_size,
negative_sampler=negative_sampler, negative_sampler=negative_sampler,
shuffle=True, shuffle=True,
drop_last=False, drop_last=False,
num_workers=num_workers) num_workers=num_workers,
)
for _ in range(2): for _ in range(2):
for _, (_, pos_graph, neg_graph, blocks) in zip(range(0, num_edges_to_sample, batch_size), dataloader): for _, (_, pos_graph, neg_graph, blocks) in zip(
range(0, num_edges_to_sample, batch_size), dataloader
):
block = blocks[-1] block = blocks[-1]
for src_type, etype, dst_type in block.canonical_etypes: for src_type, etype, dst_type in block.canonical_etypes:
o_src, o_dst = block.edges(etype=etype) o_src, o_dst = block.edges(etype=etype)
...@@ -163,42 +229,80 @@ def start_dist_neg_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, g ...@@ -163,42 +229,80 @@ def start_dist_neg_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, g
dst_nodes_id = block.dstnodes[dst_type].data[dgl.NID][o_dst] dst_nodes_id = block.dstnodes[dst_type].data[dgl.NID][o_dst]
src_nodes_id = orig_nid[src_type][src_nodes_id] src_nodes_id = orig_nid[src_type][src_nodes_id]
dst_nodes_id = orig_nid[dst_type][dst_nodes_id] dst_nodes_id = orig_nid[dst_type][dst_nodes_id]
has_edges = groundtruth_g.has_edges_between(src_nodes_id, dst_nodes_id, etype=etype) has_edges = groundtruth_g.has_edges_between(
src_nodes_id, dst_nodes_id, etype=etype
)
assert np.all(F.asnumpy(has_edges)) assert np.all(F.asnumpy(has_edges))
assert np.all(F.asnumpy(block.dstnodes[dst_type].data[dgl.NID]) == F.asnumpy(pos_graph.nodes[dst_type].data[dgl.NID])) assert np.all(
assert np.all(F.asnumpy(block.dstnodes[dst_type].data[dgl.NID]) == F.asnumpy(neg_graph.nodes[dst_type].data[dgl.NID])) F.asnumpy(block.dstnodes[dst_type].data[dgl.NID])
== F.asnumpy(pos_graph.nodes[dst_type].data[dgl.NID])
)
assert np.all(
F.asnumpy(block.dstnodes[dst_type].data[dgl.NID])
== F.asnumpy(neg_graph.nodes[dst_type].data[dgl.NID])
)
assert pos_graph.num_edges() * num_negs == neg_graph.num_edges() assert pos_graph.num_edges() * num_negs == neg_graph.num_edges()
del dataloader del dataloader
dgl.distributed.exit_client() # this is needed since there's two test here in one process # this is needed since there's two test here in one process
dgl.distributed.exit_client()
def check_neg_dataloader(g, tmpdir, num_server, num_workers):
generate_ip_config("mp_ip_config.txt", num_server, num_server) def check_neg_dataloader(g, num_server, num_workers):
with tempfile.TemporaryDirectory() as test_dir:
ip_config = "ip_config.txt"
generate_ip_config(ip_config, num_server, num_server)
num_parts = num_server num_parts = num_server
num_hops = 1 num_hops = 1
orig_nid, orig_eid = partition_graph(g, 'test_sampling', num_parts, tmpdir, orig_nid, orig_eid = partition_graph(
num_hops=num_hops, part_method='metis', g,
reshuffle=True, return_mapping=True) "test_sampling",
num_parts,
test_dir,
num_hops=num_hops,
part_method="metis",
reshuffle=True,
return_mapping=True,
)
part_config = os.path.join(test_dir, "test_sampling.json")
if not isinstance(orig_nid, dict): if not isinstance(orig_nid, dict):
orig_nid = {g.ntypes[0]: orig_nid} orig_nid = {g.ntypes[0]: orig_nid}
if not isinstance(orig_eid, dict): if not isinstance(orig_eid, dict):
orig_eid = {g.etypes[0]: orig_eid} orig_eid = {g.etypes[0]: orig_eid}
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context("spawn")
for i in range(num_server): for i in range(num_server):
p = ctx.Process(target=start_server, args=( p = ctx.Process(
i, tmpdir, num_server > 1, num_workers+1)) target=start_server,
args=(
i,
ip_config,
part_config,
num_server > 1,
num_workers + 1,
),
)
p.start() p.start()
time.sleep(1) time.sleep(1)
pserver_list.append(p) pserver_list.append(p)
os.environ['DGL_DIST_MODE'] = 'distributed' os.environ["DGL_DIST_MODE"] = "distributed"
os.environ['DGL_NUM_SAMPLER'] = str(num_workers) os.environ["DGL_NUM_SAMPLER"] = str(num_workers)
ptrainer_list = [] ptrainer_list = []
p = ctx.Process(target=start_dist_neg_dataloader, args=( p = ctx.Process(
0, tmpdir, num_server, num_workers, orig_nid, g)) target=start_dist_neg_dataloader,
args=(
0,
ip_config,
part_config,
num_server,
num_workers,
orig_nid,
g,
),
)
p.start() p.start()
ptrainer_list.append(p) ptrainer_list.append(p)
...@@ -207,50 +311,83 @@ def check_neg_dataloader(g, tmpdir, num_server, num_workers): ...@@ -207,50 +311,83 @@ def check_neg_dataloader(g, tmpdir, num_server, num_workers):
for p in ptrainer_list: for p in ptrainer_list:
p.join() p.join()
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name != 'pytorch', reason='Only support PyTorch for now')
@pytest.mark.parametrize("num_server", [3]) @pytest.mark.parametrize("num_server", [3])
@pytest.mark.parametrize("num_workers", [0, 4]) @pytest.mark.parametrize("num_workers", [0, 4])
@pytest.mark.parametrize("drop_last", [True, False]) @pytest.mark.parametrize("drop_last", [True, False])
@pytest.mark.parametrize("reshuffle", [True, False]) @pytest.mark.parametrize("reshuffle", [True, False])
@pytest.mark.parametrize("num_groups", [1]) @pytest.mark.parametrize("num_groups", [1])
def test_dist_dataloader(tmpdir, num_server, num_workers, drop_last, reshuffle, num_groups): def test_dist_dataloader(
num_server, num_workers, drop_last, reshuffle, num_groups
):
reset_envs() reset_envs()
# No multiple partitions on single machine for # No multiple partitions on single machine for
# multiple client groups in case of race condition. # multiple client groups in case of race condition.
if num_groups > 1: if num_groups > 1:
num_server = 1 num_server = 1
generate_ip_config("mp_ip_config.txt", num_server, num_server) with tempfile.TemporaryDirectory() as test_dir:
ip_config = "ip_config.txt"
generate_ip_config(ip_config, num_server, num_server)
g = CitationGraphDataset("cora")[0] g = CitationGraphDataset("cora")[0]
print(g.idtype) print(g.idtype)
num_parts = num_server num_parts = num_server
num_hops = 1 num_hops = 1
orig_nid, orig_eid = partition_graph(g, 'test_sampling', num_parts, tmpdir, orig_nid, orig_eid = partition_graph(
num_hops=num_hops, part_method='metis', g,
reshuffle=reshuffle, return_mapping=True) "test_sampling",
num_parts,
test_dir,
num_hops=num_hops,
part_method="metis",
reshuffle=reshuffle,
return_mapping=True,
)
part_config = os.path.join(test_dir, "test_sampling.json")
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context("spawn")
keep_alive = num_groups > 1 keep_alive = num_groups > 1
for i in range(num_server): for i in range(num_server):
p = ctx.Process(target=start_server, args=( p = ctx.Process(
i, tmpdir, num_server > 1, num_workers+1, keep_alive)) target=start_server,
args=(
i,
ip_config,
part_config,
num_server > 1,
num_workers + 1,
keep_alive,
),
)
p.start() p.start()
time.sleep(1) time.sleep(1)
pserver_list.append(p) pserver_list.append(p)
os.environ['DGL_DIST_MODE'] = 'distributed' os.environ["DGL_DIST_MODE"] = "distributed"
os.environ['DGL_NUM_SAMPLER'] = str(num_workers) os.environ["DGL_NUM_SAMPLER"] = str(num_workers)
ptrainer_list = [] ptrainer_list = []
num_trainers = 1 num_trainers = 1
for trainer_id in range(num_trainers): for trainer_id in range(num_trainers):
for group_id in range(num_groups): for group_id in range(num_groups):
p = ctx.Process(target=start_dist_dataloader, args=( p = ctx.Process(
trainer_id, tmpdir, num_server, drop_last, orig_nid, orig_eid, group_id)) target=start_dist_dataloader,
args=(
trainer_id,
ip_config,
part_config,
num_server,
drop_last,
orig_nid,
orig_eid,
group_id,
),
)
p.start() p.start()
time.sleep(1) # avoid race condition when instantiating DistGraph time.sleep(
1
) # avoid race condition when instantiating DistGraph
ptrainer_list.append(p) ptrainer_list.append(p)
for p in ptrainer_list: for p in ptrainer_list:
...@@ -263,32 +400,45 @@ def test_dist_dataloader(tmpdir, num_server, num_workers, drop_last, reshuffle, ...@@ -263,32 +400,45 @@ def test_dist_dataloader(tmpdir, num_server, num_workers, drop_last, reshuffle,
for p in pserver_list: for p in pserver_list:
p.join() p.join()
def start_node_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_eid, groundtruth_g):
import dgl def start_node_dataloader(
import torch as th rank,
dgl.distributed.initialize("mp_ip_config.txt") ip_config,
part_config,
num_server,
num_workers,
orig_nid,
orig_eid,
groundtruth_g,
):
dgl.distributed.initialize(ip_config)
gpb = None gpb = None
disable_shared_mem = num_server > 1 disable_shared_mem = num_server > 1
if disable_shared_mem: if disable_shared_mem:
_, _, _, gpb, _, _, _ = load_partition(tmpdir / 'test_sampling.json', rank) _, _, _, gpb, _, _, _ = load_partition(part_config, rank)
num_nodes_to_sample = 202 num_nodes_to_sample = 202
batch_size = 32 batch_size = 32
dist_graph = DistGraph("test_mp", gpb=gpb, part_config=tmpdir / 'test_sampling.json') dist_graph = DistGraph("test_mp", gpb=gpb, part_config=part_config)
assert len(dist_graph.ntypes) == len(groundtruth_g.ntypes) assert len(dist_graph.ntypes) == len(groundtruth_g.ntypes)
assert len(dist_graph.etypes) == len(groundtruth_g.etypes) assert len(dist_graph.etypes) == len(groundtruth_g.etypes)
if len(dist_graph.etypes) == 1: if len(dist_graph.etypes) == 1:
train_nid = th.arange(num_nodes_to_sample) train_nid = th.arange(num_nodes_to_sample)
else: else:
train_nid = {'n3': th.arange(num_nodes_to_sample)} train_nid = {"n3": th.arange(num_nodes_to_sample)}
for i in range(num_server): for i in range(num_server):
part, _, _, _, _, _, _ = load_partition(tmpdir / 'test_sampling.json', i) part, _, _, _, _, _, _ = load_partition(part_config, i)
# Create sampler # Create sampler
sampler = dgl.dataloading.MultiLayerNeighborSampler([ sampler = dgl.dataloading.MultiLayerNeighborSampler(
[
# test dict for hetero # test dict for hetero
{etype: 5 for etype in dist_graph.etypes} if len(dist_graph.etypes) > 1 else 5, {etype: 5 for etype in dist_graph.etypes}
10]) # test int for hetero if len(dist_graph.etypes) > 1
else 5,
10,
]
) # test int for hetero
# We need to test creating DistDataLoader multiple times. # We need to test creating DistDataLoader multiple times.
for i in range(2): for i in range(2):
...@@ -300,10 +450,13 @@ def start_node_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_ ...@@ -300,10 +450,13 @@ def start_node_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_
batch_size=batch_size, batch_size=batch_size,
shuffle=True, shuffle=True,
drop_last=False, drop_last=False,
num_workers=num_workers) num_workers=num_workers,
)
for epoch in range(2): for epoch in range(2):
for idx, (_, _, blocks) in zip(range(0, num_nodes_to_sample, batch_size), dataloader): for idx, (_, _, blocks) in zip(
range(0, num_nodes_to_sample, batch_size), dataloader
):
block = blocks[-1] block = blocks[-1]
for src_type, etype, dst_type in block.canonical_etypes: for src_type, etype, dst_type in block.canonical_etypes:
o_src, o_dst = block.edges(etype=etype) o_src, o_dst = block.edges(etype=etype)
...@@ -311,23 +464,33 @@ def start_node_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_ ...@@ -311,23 +464,33 @@ def start_node_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_
dst_nodes_id = block.dstnodes[dst_type].data[dgl.NID][o_dst] dst_nodes_id = block.dstnodes[dst_type].data[dgl.NID][o_dst]
src_nodes_id = orig_nid[src_type][src_nodes_id] src_nodes_id = orig_nid[src_type][src_nodes_id]
dst_nodes_id = orig_nid[dst_type][dst_nodes_id] dst_nodes_id = orig_nid[dst_type][dst_nodes_id]
has_edges = groundtruth_g.has_edges_between(src_nodes_id, dst_nodes_id, etype=etype) has_edges = groundtruth_g.has_edges_between(
src_nodes_id, dst_nodes_id, etype=etype
)
assert np.all(F.asnumpy(has_edges)) assert np.all(F.asnumpy(has_edges))
# assert np.all(np.unique(np.sort(F.asnumpy(dst_nodes_id))) == np.arange(idx, batch_size))
del dataloader del dataloader
dgl.distributed.exit_client() # this is needed since there's two test here in one process # this is needed since there's two test here in one process
dgl.distributed.exit_client()
def start_edge_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_eid, groundtruth_g):
import dgl
import torch as th def start_edge_dataloader(
dgl.distributed.initialize("mp_ip_config.txt") rank,
ip_config,
part_config,
num_server,
num_workers,
orig_nid,
orig_eid,
groundtruth_g,
):
dgl.distributed.initialize(ip_config)
gpb = None gpb = None
disable_shared_mem = num_server > 1 disable_shared_mem = num_server > 1
if disable_shared_mem: if disable_shared_mem:
_, _, _, gpb, _, _, _ = load_partition(tmpdir / 'test_sampling.json', rank) _, _, _, gpb, _, _, _ = load_partition(part_config, rank)
num_edges_to_sample = 202 num_edges_to_sample = 202
batch_size = 32 batch_size = 32
dist_graph = DistGraph("test_mp", gpb=gpb, part_config=tmpdir / 'test_sampling.json') dist_graph = DistGraph("test_mp", gpb=gpb, part_config=part_config)
assert len(dist_graph.ntypes) == len(groundtruth_g.ntypes) assert len(dist_graph.ntypes) == len(groundtruth_g.ntypes)
assert len(dist_graph.etypes) == len(groundtruth_g.etypes) assert len(dist_graph.etypes) == len(groundtruth_g.etypes)
if len(dist_graph.etypes) == 1: if len(dist_graph.etypes) == 1:
...@@ -336,7 +499,7 @@ def start_edge_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_ ...@@ -336,7 +499,7 @@ def start_edge_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_
train_eid = {dist_graph.etypes[0]: th.arange(num_edges_to_sample)} train_eid = {dist_graph.etypes[0]: th.arange(num_edges_to_sample)}
for i in range(num_server): for i in range(num_server):
part, _, _, _, _, _, _ = load_partition(tmpdir / 'test_sampling.json', i) part, _, _, _, _, _, _ = load_partition(part_config, i)
# Create sampler # Create sampler
sampler = dgl.dataloading.MultiLayerNeighborSampler([5, 10]) sampler = dgl.dataloading.MultiLayerNeighborSampler([5, 10])
...@@ -351,10 +514,13 @@ def start_edge_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_ ...@@ -351,10 +514,13 @@ def start_edge_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_
batch_size=batch_size, batch_size=batch_size,
shuffle=True, shuffle=True,
drop_last=False, drop_last=False,
num_workers=num_workers) num_workers=num_workers,
)
for epoch in range(2): for epoch in range(2):
for idx, (input_nodes, pos_pair_graph, blocks) in zip(range(0, num_edges_to_sample, batch_size), dataloader): for idx, (input_nodes, pos_pair_graph, blocks) in zip(
range(0, num_edges_to_sample, batch_size), dataloader
):
block = blocks[-1] block = blocks[-1]
for src_type, etype, dst_type in block.canonical_etypes: for src_type, etype, dst_type in block.canonical_etypes:
o_src, o_dst = block.edges(etype=etype) o_src, o_dst = block.edges(etype=etype)
...@@ -362,46 +528,93 @@ def start_edge_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_ ...@@ -362,46 +528,93 @@ def start_edge_dataloader(rank, tmpdir, num_server, num_workers, orig_nid, orig_
dst_nodes_id = block.dstnodes[dst_type].data[dgl.NID][o_dst] dst_nodes_id = block.dstnodes[dst_type].data[dgl.NID][o_dst]
src_nodes_id = orig_nid[src_type][src_nodes_id] src_nodes_id = orig_nid[src_type][src_nodes_id]
dst_nodes_id = orig_nid[dst_type][dst_nodes_id] dst_nodes_id = orig_nid[dst_type][dst_nodes_id]
has_edges = groundtruth_g.has_edges_between(src_nodes_id, dst_nodes_id, etype=etype) has_edges = groundtruth_g.has_edges_between(
src_nodes_id, dst_nodes_id, etype=etype
)
assert np.all(F.asnumpy(has_edges)) assert np.all(F.asnumpy(has_edges))
assert np.all(F.asnumpy(block.dstnodes[dst_type].data[dgl.NID]) == F.asnumpy(pos_pair_graph.nodes[dst_type].data[dgl.NID])) assert np.all(
# assert np.all(np.unique(np.sort(F.asnumpy(dst_nodes_id))) == np.arange(idx, batch_size)) F.asnumpy(block.dstnodes[dst_type].data[dgl.NID])
== F.asnumpy(
pos_pair_graph.nodes[dst_type].data[dgl.NID]
)
)
del dataloader del dataloader
dgl.distributed.exit_client() # this is needed since there's two test here in one process dgl.distributed.exit_client()
def check_dataloader(g, tmpdir, num_server, num_workers, dataloader_type): def check_dataloader(g, num_server, num_workers, dataloader_type):
generate_ip_config("mp_ip_config.txt", num_server, num_server) with tempfile.TemporaryDirectory() as test_dir:
ip_config = "ip_config.txt"
generate_ip_config(ip_config, num_server, num_server)
num_parts = num_server num_parts = num_server
num_hops = 1 num_hops = 1
orig_nid, orig_eid = partition_graph(g, 'test_sampling', num_parts, tmpdir, orig_nid, orig_eid = partition_graph(
num_hops=num_hops, part_method='metis', g,
reshuffle=True, return_mapping=True) "test_sampling",
num_parts,
test_dir,
num_hops=num_hops,
part_method="metis",
reshuffle=True,
return_mapping=True,
)
part_config = os.path.join(test_dir, "test_sampling.json")
if not isinstance(orig_nid, dict): if not isinstance(orig_nid, dict):
orig_nid = {g.ntypes[0]: orig_nid} orig_nid = {g.ntypes[0]: orig_nid}
if not isinstance(orig_eid, dict): if not isinstance(orig_eid, dict):
orig_eid = {g.etypes[0]: orig_eid} orig_eid = {g.etypes[0]: orig_eid}
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context("spawn")
for i in range(num_server): for i in range(num_server):
p = ctx.Process(target=start_server, args=( p = ctx.Process(
i, tmpdir, num_server > 1, num_workers+1)) target=start_server,
args=(
i,
ip_config,
part_config,
num_server > 1,
num_workers + 1,
),
)
p.start() p.start()
time.sleep(1) time.sleep(1)
pserver_list.append(p) pserver_list.append(p)
os.environ['DGL_DIST_MODE'] = 'distributed' os.environ["DGL_DIST_MODE"] = "distributed"
os.environ['DGL_NUM_SAMPLER'] = str(num_workers) os.environ["DGL_NUM_SAMPLER"] = str(num_workers)
ptrainer_list = [] ptrainer_list = []
if dataloader_type == 'node': if dataloader_type == "node":
p = ctx.Process(target=start_node_dataloader, args=( p = ctx.Process(
0, tmpdir, num_server, num_workers, orig_nid, orig_eid, g)) target=start_node_dataloader,
args=(
0,
ip_config,
part_config,
num_server,
num_workers,
orig_nid,
orig_eid,
g,
),
)
p.start() p.start()
ptrainer_list.append(p) ptrainer_list.append(p)
elif dataloader_type == 'edge': elif dataloader_type == "edge":
p = ctx.Process(target=start_edge_dataloader, args=( p = ctx.Process(
0, tmpdir, num_server, num_workers, orig_nid, orig_eid, g)) target=start_edge_dataloader,
args=(
0,
ip_config,
part_config,
num_server,
num_workers,
orig_nid,
orig_eid,
g,
),
)
p.start() p.start()
ptrainer_list.append(p) ptrainer_list.append(p)
for p in pserver_list: for p in pserver_list:
...@@ -409,55 +622,142 @@ def check_dataloader(g, tmpdir, num_server, num_workers, dataloader_type): ...@@ -409,55 +622,142 @@ def check_dataloader(g, tmpdir, num_server, num_workers, dataloader_type):
for p in ptrainer_list: for p in ptrainer_list:
p.join() p.join()
def create_random_hetero(): def create_random_hetero():
num_nodes = {'n1': 10000, 'n2': 10010, 'n3': 10020} num_nodes = {"n1": 10000, "n2": 10010, "n3": 10020}
etypes = [('n1', 'r1', 'n2'), etypes = [("n1", "r1", "n2"), ("n1", "r2", "n3"), ("n2", "r3", "n3")]
('n1', 'r2', 'n3'),
('n2', 'r3', 'n3')]
edges = {} edges = {}
for etype in etypes: for etype in etypes:
src_ntype, _, dst_ntype = etype src_ntype, _, dst_ntype = etype
arr = spsp.random(num_nodes[src_ntype], num_nodes[dst_ntype], density=0.001, format='coo', arr = spsp.random(
random_state=100) num_nodes[src_ntype],
num_nodes[dst_ntype],
density=0.001,
format="coo",
random_state=100,
)
edges[etype] = (arr.row, arr.col) edges[etype] = (arr.row, arr.col)
g = dgl.heterograph(edges, num_nodes) g = dgl.heterograph(edges, num_nodes)
g.nodes['n1'].data['feat'] = F.unsqueeze(F.arange(0, g.number_of_nodes('n1')), 1) g.nodes["n1"].data["feat"] = F.unsqueeze(
g.edges['r1'].data['feat'] = F.unsqueeze(F.arange(0, g.number_of_edges('r1')), 1) F.arange(0, g.number_of_nodes("n1")), 1
)
g.edges["r1"].data["feat"] = F.unsqueeze(
F.arange(0, g.number_of_edges("r1")), 1
)
return g return g
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name != 'pytorch', reason='Only support PyTorch for now')
@pytest.mark.parametrize("num_server", [3]) @pytest.mark.parametrize("num_server", [3])
@pytest.mark.parametrize("num_workers", [0, 4]) @pytest.mark.parametrize("num_workers", [0, 4])
@pytest.mark.parametrize("dataloader_type", ["node", "edge"]) @pytest.mark.parametrize("dataloader_type", ["node", "edge"])
def test_dataloader(tmpdir, num_server, num_workers, dataloader_type): def test_dataloader(num_server, num_workers, dataloader_type):
reset_envs() reset_envs()
g = CitationGraphDataset("cora")[0] g = CitationGraphDataset("cora")[0]
check_dataloader(g, tmpdir, num_server, num_workers, dataloader_type) check_dataloader(g, num_server, num_workers, dataloader_type)
g = create_random_hetero() g = create_random_hetero()
check_dataloader(g, tmpdir, num_server, num_workers, dataloader_type) check_dataloader(g, num_server, num_workers, dataloader_type)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now')
@unittest.skipIf(dgl.backend.backend_name == "mxnet", reason="Turn off Mxnet support")
@pytest.mark.parametrize("num_server", [3]) @pytest.mark.parametrize("num_server", [3])
@pytest.mark.parametrize("num_workers", [0, 4]) @pytest.mark.parametrize("num_workers", [0, 4])
def test_neg_dataloader(tmpdir, num_server, num_workers): def test_neg_dataloader(num_server, num_workers):
reset_envs() reset_envs()
g = CitationGraphDataset("cora")[0] g = CitationGraphDataset("cora")[0]
check_neg_dataloader(g, tmpdir, num_server, num_workers) check_neg_dataloader(g, num_server, num_workers)
g = create_random_hetero() g = create_random_hetero()
check_neg_dataloader(g, tmpdir, num_server, num_workers) check_neg_dataloader(g, num_server, num_workers)
if __name__ == "__main__":
import tempfile def start_multiple_dataloaders(
with tempfile.TemporaryDirectory() as tmpdirname: ip_config, part_config, graph_name, orig_g, num_dataloaders, dataloader_type
test_standalone(Path(tmpdirname)) ):
test_dataloader(Path(tmpdirname), 3, 4, 'node') dgl.distributed.initialize(ip_config)
test_dataloader(Path(tmpdirname), 3, 4, 'edge') dist_g = dgl.distributed.DistGraph(graph_name, part_config=part_config)
test_neg_dataloader(Path(tmpdirname), 3, 4) if dataloader_type == "node":
for num_groups in [1]: train_ids = th.arange(orig_g.num_nodes())
test_dist_dataloader(Path(tmpdirname), 3, 0, True, True, num_groups) batch_size = orig_g.num_nodes() // 100
test_dist_dataloader(Path(tmpdirname), 3, 4, True, True, num_groups) else:
test_dist_dataloader(Path(tmpdirname), 3, 0, True, False, num_groups) train_ids = th.arange(orig_g.num_edges())
test_dist_dataloader(Path(tmpdirname), 3, 4, True, False, num_groups) batch_size = orig_g.num_edges() // 100
sampler = dgl.dataloading.NeighborSampler([-1])
dataloaders = []
dl_iters = []
for _ in range(num_dataloaders):
if dataloader_type == "node":
dataloader = dgl.dataloading.DistNodeDataLoader(
dist_g, train_ids, sampler, batch_size=batch_size
)
else:
dataloader = dgl.dataloading.DistEdgeDataLoader(
dist_g, train_ids, sampler, batch_size=batch_size
)
dataloaders.append(dataloader)
dl_iters.append(iter(dataloader))
# iterate on multiple dataloaders randomly
while len(dl_iters) > 0:
next_dl = np.random.choice(len(dl_iters), 1)[0]
try:
_ = next(dl_iters[next_dl])
except StopIteration:
dl_iters.pop(next_dl)
del dataloaders[next_dl]
dgl.distributed.exit_client()
@pytest.mark.parametrize("num_dataloaders", [1, 4])
@pytest.mark.parametrize("num_workers", [0, 1, 4])
@pytest.mark.parametrize("dataloader_type", ["node", "edge"])
def test_multiple_dist_dataloaders(
num_dataloaders, num_workers, dataloader_type
):
reset_envs()
os.environ["DGL_DIST_MODE"] = "distributed"
os.environ["DGL_NUM_SAMPLER"] = str(num_workers)
num_parts = 1
num_servers = 1
with tempfile.TemporaryDirectory() as test_dir:
ip_config = os.path.join(test_dir, "ip_config.txt")
generate_ip_config(ip_config, num_parts, num_servers)
orig_g = dgl.rand_graph(1000, 10000)
graph_name = "test"
partition_graph(orig_g, graph_name, num_parts, test_dir)
part_config = os.path.join(test_dir, f"{graph_name}.json")
p_servers = []
ctx = mp.get_context("spawn")
for i in range(num_servers):
p = ctx.Process(
target=start_server,
args=(
i,
ip_config,
part_config,
num_servers > 1,
num_workers + 1,
),
)
p.start()
time.sleep(1)
p_servers.append(p)
p_client = ctx.Process(
target=start_multiple_dataloaders,
args=(
ip_config,
part_config,
graph_name,
orig_g,
num_dataloaders,
dataloader_type,
),
)
p_client.start()
p_client.join()
for p in p_servers:
p.join()
reset_envs()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment