Unverified Commit efa95d86 authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

[Distributed] Fix bugs (#2009)

* fix.

* add tests.

* fix.

* fix.
parent 1ad46fd0
......@@ -61,7 +61,9 @@ class MultiLayerNeighborSampler(BlockSampler):
fanout = self.fanouts[block_id]
if isinstance(g, distributed.DistGraph):
if fanout is None:
frontier = distributed.in_subgraph(g, seed_nodes)
# TODO(zhengda) There is a bug in the distributed version of in_subgraph.
# let's use sample_neighbors to replace in_subgraph for now.
frontier = distributed.sample_neighbors(g, seed_nodes, -1, replace=False)
else:
frontier = distributed.sample_neighbors(g, seed_nodes, fanout, replace=self.replace)
else:
......
......@@ -4,6 +4,7 @@ import os
from .dist_context import is_initialized
from .kvstore import get_kvstore
from .role import get_role
from .. import utils
from .. import backend as F
......@@ -17,6 +18,9 @@ def _get_data_name(name, part_policy):
def _default_init_data(shape, dtype):
return F.zeros(shape, dtype, F.cpu())
# These Ids can identify the anonymous distributed tensors.
DIST_TENSOR_ID = 0
class DistTensor:
''' Distributed tensor.
......@@ -80,7 +84,11 @@ class DistTensor:
# We need to generate the name in a deterministic way.
if name is None:
assert not persistent, 'We cannot generate anonymous persistent distributed tensors'
name = 'anonymous-' + str(len(exist_names) + 1)
global DIST_TENSOR_ID
# All processes of the same role should create DistTensor synchronously.
# Thus, all of them should have the same Ids.
name = 'anonymous-' + get_role() + '-' + str(DIST_TENSOR_ID)
DIST_TENSOR_ID += 1
self._name = _get_data_name(name, part_policy.policy_str)
self._persistent = persistent
if self._name not in exist_names:
......
......@@ -108,6 +108,14 @@ def check_dist_graph(g, num_clients, num_nodes, num_edges):
test3 = dgl.distributed.DistTensor((g.number_of_nodes(), 3), F.float32, 'test3')
del test3
# add tests for anonymous distributed tensor.
test3 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
data = test3[0:10]
test4 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
del test3
test5 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
assert np.sum(F.asnumpy(test5[0:10] != data)) > 0
# test a persistent tesnor
test4 = dgl.distributed.DistTensor(new_shape, F.float32, 'test4', init_func=rand_init,
persistent=True)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment