"...git@developer.sourcefind.cn:renzhc/diffusers_dcu.git" did not exist on "130fd8df54f24ffb006d84787b598d8adc899f23"
Unverified Commit 562871e7 authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

[Distributed] add distributed evaluation. (#1810)



* add eval.

* extend DistTensor.

* fix.

* add barrier.

* add more print.

* add more checks in kvstore.

* fix lint.

* get all neighbors for eval.

* reorganize.

* fix.

* fix.

* fix.

* fix test.

* add reuse_if_exist.

* add test for reuse_if_exist.

* fix lint.

* fix bugs.

* fix.

* print errors of tcp socket.

* support delete tensors.

* fix lint.

* fix

* fix example
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-19-1.us-west-2.compute.internal>
parent 9d853977
...@@ -43,6 +43,63 @@ class NeighborSampler(object): ...@@ -43,6 +43,63 @@ class NeighborSampler(object):
blocks.insert(0, block) blocks.insert(0, block)
return blocks return blocks
class DistSAGE(SAGE):
def __init__(self, in_feats, n_hidden, n_classes, n_layers,
activation, dropout):
super(DistSAGE, self).__init__(in_feats, n_hidden, n_classes, n_layers,
activation, dropout)
def inference(self, g, x, batch_size, device):
"""
Inference with the GraphSAGE model on full neighbors (i.e. without neighbor sampling).
g : the entire graph.
x : the input of entire node set.
The inference code is written in a fashion that it could handle any number of nodes and
layers.
"""
# During inference with sampling, multi-layer blocks are very inefficient because
# lots of computations in the first few layers are repeated.
# Therefore, we compute the representation of all nodes layer by layer. The nodes
# on each layer are of course splitted in batches.
# TODO: can we standardize this?
nodes = dgl.distributed.node_split(np.arange(g.number_of_nodes()),
g.get_partition_book(), force_even=True)
y = dgl.distributed.DistTensor(g, (g.number_of_nodes(), self.n_hidden), th.float32, 'h',
persistent=True)
for l, layer in enumerate(self.layers):
if l == len(self.layers) - 1:
y = dgl.distributed.DistTensor(g, (g.number_of_nodes(), self.n_classes),
th.float32, 'h_last', persistent=True)
sampler = NeighborSampler(g, [-1], dgl.distributed.sample_neighbors)
print('|V|={}, eval batch size: {}'.format(g.number_of_nodes(), batch_size))
# Create PyTorch DataLoader for constructing blocks
dataloader = DataLoader(
dataset=nodes,
batch_size=batch_size,
collate_fn=sampler.sample_blocks,
shuffle=False,
drop_last=False,
num_workers=args.num_workers)
for blocks in tqdm.tqdm(dataloader):
block = blocks[0]
input_nodes = block.srcdata[dgl.NID]
output_nodes = block.dstdata[dgl.NID]
h = x[input_nodes].to(device)
h_dst = h[:block.number_of_dst_nodes()]
h = layer(block, (h, h_dst))
if l != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
y[output_nodes] = h.cpu()
x = y
g.barrier()
return y
def run(args, device, data): def run(args, device, data):
# Unpack data # Unpack data
train_nid, val_nid, in_feats, n_classes, g = data train_nid, val_nid, in_feats, n_classes, g = data
...@@ -60,7 +117,7 @@ def run(args, device, data): ...@@ -60,7 +117,7 @@ def run(args, device, data):
num_workers=args.num_workers) num_workers=args.num_workers)
# Define model and optimizer # Define model and optimizer
model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout) model = DistSAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout)
model = model.to(device) model = model.to(device)
if not args.standalone: if not args.standalone:
model = th.nn.parallel.DistributedDataParallel(model) model = th.nn.parallel.DistributedDataParallel(model)
...@@ -101,6 +158,8 @@ def run(args, device, data): ...@@ -101,6 +158,8 @@ def run(args, device, data):
# Load the input features as well as output labels # Load the input features as well as output labels
start = time.time() start = time.time()
batch_inputs, batch_labels = load_subtensor(g, seeds, input_nodes, device) batch_inputs, batch_labels = load_subtensor(g, seeds, input_nodes, device)
assert th.all(th.logical_not(th.isnan(batch_labels)))
batch_labels = batch_labels.long()
copy_time += time.time() - start copy_time += time.time() - start
num_seeds += len(blocks[-1].dstdata[dgl.NID]) num_seeds += len(blocks[-1].dstdata[dgl.NID])
...@@ -122,7 +181,7 @@ def run(args, device, data): ...@@ -122,7 +181,7 @@ def run(args, device, data):
if param.requires_grad and param.grad is not None: if param.requires_grad and param.grad is not None:
th.distributed.all_reduce(param.grad.data, th.distributed.all_reduce(param.grad.data,
op=th.distributed.ReduceOp.SUM) op=th.distributed.ReduceOp.SUM)
param.grad.data /= args.num_client param.grad.data /= dgl.distributed.get_num_client()
optimizer.step() optimizer.step()
update_time += time.time() - compute_end update_time += time.time() - compute_end
...@@ -133,21 +192,21 @@ def run(args, device, data): ...@@ -133,21 +192,21 @@ def run(args, device, data):
if step % args.log_every == 0: if step % args.log_every == 0:
acc = compute_acc(batch_pred, batch_labels) acc = compute_acc(batch_pred, batch_labels)
gpu_mem_alloc = th.cuda.max_memory_allocated() / 1000000 if th.cuda.is_available() else 0 gpu_mem_alloc = th.cuda.max_memory_allocated() / 1000000 if th.cuda.is_available() else 0
print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MiB | time {:.3f} s'.format( print('Part {} | Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MiB | time {:.3f} s'.format(
epoch, step, loss.item(), acc.item(), np.mean(iter_tput[3:]), gpu_mem_alloc, np.sum(step_time[-args.log_every:]))) g.rank(), epoch, step, loss.item(), acc.item(), np.mean(iter_tput[3:]), gpu_mem_alloc, np.sum(step_time[-args.log_every:])))
start = time.time() start = time.time()
toc = time.time() toc = time.time()
print('Epoch Time(s): {:.4f}, sample: {:.4f}, data copy: {:.4f}, forward: {:.4f}, backward: {:.4f}, update: {:.4f}, #seeds: {}, #inputs: {}'.format( print('Part {}, Epoch Time(s): {:.4f}, sample: {:.4f}, data copy: {:.4f}, forward: {:.4f}, backward: {:.4f}, update: {:.4f}, #seeds: {}, #inputs: {}'.format(
toc - tic, sample_time, copy_time, forward_time, backward_time, update_time, num_seeds, num_inputs)) g.rank(), toc - tic, sample_time, copy_time, forward_time, backward_time, update_time, num_seeds, num_inputs))
epoch += 1 epoch += 1
toc = time.time() if epoch % args.eval_every == 0 and epoch != 0:
print('Epoch Time(s): {:.4f}'.format(toc - tic)) start = time.time()
#if epoch % args.eval_every == 0 and epoch != 0: eval_acc = evaluate(model.module, g, g.ndata['features'],
# eval_acc = evaluate(model, g, g.ndata['features'], g.ndata['labels'], val_nid, args.batch_size, device) g.ndata['labels'], val_nid, args.batch_size_eval, device)
# print('Eval Acc {:.4f}'.format(eval_acc)) print('Part {}, Eval Acc {:.4f}, time: {:.4f}'.format(g.rank(), eval_acc, time.time() - start))
profiler.stop() profiler.stop()
print(profiler.output_text(unicode=True, color=True)) print(profiler.output_text(unicode=True, color=True))
...@@ -161,13 +220,19 @@ def main(args): ...@@ -161,13 +220,19 @@ def main(args):
g = dgl.distributed.DistGraph(args.ip_config, args.graph_name, conf_file=args.conf_path) g = dgl.distributed.DistGraph(args.ip_config, args.graph_name, conf_file=args.conf_path)
print('rank:', g.rank()) print('rank:', g.rank())
train_nid = dgl.distributed.node_split(g.ndata['train_mask'], g.get_partition_book(), force_even=True) pb = g.get_partition_book()
val_nid = dgl.distributed.node_split(g.ndata['val_mask'], g.get_partition_book(), force_even=True) train_nid = dgl.distributed.node_split(g.ndata['train_mask'], pb, force_even=True)
test_nid = dgl.distributed.node_split(g.ndata['test_mask'], g.get_partition_book(), force_even=True) val_nid = dgl.distributed.node_split(g.ndata['val_mask'], pb, force_even=True)
print('part {}, train: {}, val: {}, test: {}'.format(g.rank(), len(train_nid), test_nid = dgl.distributed.node_split(g.ndata['test_mask'], pb, force_even=True)
len(val_nid), len(test_nid))) local_nid = pb.partid2nids(pb.partid).detach().numpy()
print('part {}, train: {} (local: {}), val: {} (local: {}), test: {} (local: {})'.format(
g.rank(), len(train_nid), len(np.intersect1d(train_nid.numpy(), local_nid)),
len(val_nid), len(np.intersect1d(val_nid.numpy(), local_nid)),
len(test_nid), len(np.intersect1d(test_nid.numpy(), local_nid))))
device = th.device('cpu') device = th.device('cpu')
n_classes = len(th.unique(g.ndata['labels'][np.arange(g.number_of_nodes())])) labels = g.ndata['labels'][np.arange(g.number_of_nodes())]
n_classes = len(th.unique(labels[th.logical_not(th.isnan(labels))]))
print('#labels:', n_classes)
# Pack data # Pack data
in_feats = g.ndata['features'].shape[1] in_feats = g.ndata['features'].shape[1]
...@@ -191,6 +256,7 @@ if __name__ == '__main__': ...@@ -191,6 +256,7 @@ if __name__ == '__main__':
parser.add_argument('--num-layers', type=int, default=2) parser.add_argument('--num-layers', type=int, default=2)
parser.add_argument('--fan-out', type=str, default='10,25') parser.add_argument('--fan-out', type=str, default='10,25')
parser.add_argument('--batch-size', type=int, default=1000) parser.add_argument('--batch-size', type=int, default=1000)
parser.add_argument('--batch-size-eval', type=int, default=100000)
parser.add_argument('--log-every', type=int, default=20) parser.add_argument('--log-every', type=int, default=20)
parser.add_argument('--eval-every', type=int, default=5) parser.add_argument('--eval-every', type=int, default=5)
parser.add_argument('--lr', type=float, default=0.003) parser.add_argument('--lr', type=float, default=0.003)
......
...@@ -68,7 +68,6 @@ class SAGE(nn.Module): ...@@ -68,7 +68,6 @@ class SAGE(nn.Module):
# Therefore, we compute the representation of all nodes layer by layer. The nodes # Therefore, we compute the representation of all nodes layer by layer. The nodes
# on each layer are of course splitted in batches. # on each layer are of course splitted in batches.
# TODO: can we standardize this? # TODO: can we standardize this?
nodes = th.arange(g.number_of_nodes())
for l, layer in enumerate(self.layers): for l, layer in enumerate(self.layers):
y = th.zeros(g.number_of_nodes(), self.n_hidden if l != len(self.layers) - 1 else self.n_classes) y = th.zeros(g.number_of_nodes(), self.n_hidden if l != len(self.layers) - 1 else self.n_classes)
...@@ -113,6 +112,7 @@ def compute_acc(pred, labels): ...@@ -113,6 +112,7 @@ def compute_acc(pred, labels):
""" """
Compute the accuracy of prediction given the labels. Compute the accuracy of prediction given the labels.
""" """
labels = labels.long()
return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred) return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred)
def evaluate(model, g, inputs, labels, val_nid, batch_size, device): def evaluate(model, g, inputs, labels, val_nid, batch_size, device):
......
...@@ -14,12 +14,13 @@ from .._ffi.ndarray import empty_shared_mem ...@@ -14,12 +14,13 @@ from .._ffi.ndarray import empty_shared_mem
from ..frame import infer_scheme from ..frame import infer_scheme
from .partition import load_partition from .partition import load_partition
from .graph_partition_book import PartitionPolicy, get_shared_mem_partition_book from .graph_partition_book import PartitionPolicy, get_shared_mem_partition_book
from .. import utils from .graph_partition_book import NODE_PART_POLICY, EDGE_PART_POLICY
from .shared_mem_utils import _to_shared_mem, _get_ndata_path, _get_edata_path, DTYPE_DICT from .shared_mem_utils import _to_shared_mem, _get_ndata_path, _get_edata_path, DTYPE_DICT
from . import rpc from . import rpc
from .rpc_client import connect_to_server from .rpc_client import connect_to_server
from .server_state import ServerState from .server_state import ServerState
from .rpc_server import start_server from .rpc_server import start_server
from .dist_tensor import DistTensor, _get_data_name
from ..transform import as_heterograph from ..transform import as_heterograph
def _get_graph_path(graph_name): def _get_graph_path(graph_name):
...@@ -43,27 +44,13 @@ FIELD_DICT = {'inner_node': F.int64, ...@@ -43,27 +44,13 @@ FIELD_DICT = {'inner_node': F.int64,
NID: F.int64, NID: F.int64,
EID: F.int64} EID: F.int64}
def _get_ndata_name(name):
''' This is to get the name of node data in the kvstore.
KVStore doesn't understand node data or edge data. We'll use a prefix to distinguish them.
'''
return 'node:' + name
def _get_edata_name(name):
''' This is to get the name of edge data in the kvstore.
KVStore doesn't understand node data or edge data. We'll use a prefix to distinguish them.
'''
return 'edge:' + name
def _is_ndata_name(name): def _is_ndata_name(name):
''' Is this node data in the kvstore ''' ''' Is this node data in the kvstore '''
return name[:5] == 'node:' return name[:5] == NODE_PART_POLICY + ':'
def _is_edata_name(name): def _is_edata_name(name):
''' Is this edge data in the kvstore ''' ''' Is this edge data in the kvstore '''
return name[:5] == 'edge:' return name[:5] == EDGE_PART_POLICY + ':'
def _get_shared_mem_ndata(g, graph_name, name): def _get_shared_mem_ndata(g, graph_name, name):
''' Get shared-memory node data from DistGraph server. ''' Get shared-memory node data from DistGraph server.
...@@ -109,64 +96,6 @@ def _get_graph_from_shared_mem(graph_name): ...@@ -109,64 +96,6 @@ def _get_graph_from_shared_mem(graph_name):
g.edata[EID] = _get_shared_mem_edata(g, graph_name, EID) g.edata[EID] = _get_shared_mem_edata(g, graph_name, EID)
return g return g
class DistTensor:
''' Distributed tensor.
This is a wrapper to access a tensor stored in multiple machines.
This wrapper provides an interface similar to the local tensor.
Parameters
----------
g : DistGraph
The distributed graph object.
name : string
The name of the tensor.
part_policy : PartitionPolicy
The partition policy of the tensor
'''
def __init__(self, g, name, part_policy):
self.kvstore = g._client
self._name = name
dtype, shape, _ = g._client.get_data_meta(name)
self._shape = shape
self._dtype = dtype
self._part_policy = part_policy
def __getitem__(self, idx):
idx = utils.toindex(idx)
idx = idx.tousertensor()
return self.kvstore.pull(name=self._name, id_tensor=idx)
def __setitem__(self, idx, val):
idx = utils.toindex(idx)
idx = idx.tousertensor()
# TODO(zhengda) how do we want to support broadcast (e.g., G.ndata['h'][idx] = 1).
self.kvstore.push(name=self._name, id_tensor=idx, data_tensor=val)
def __len__(self):
return self._shape[0]
@property
def part_policy(self):
''' Return the partition policy '''
return self._part_policy
@property
def shape(self):
''' Return the shape of the distributed tensor. '''
return self._shape
@property
def dtype(self):
''' Return the data type of the distributed tensor. '''
return self._dtype
@property
def name(self):
''' Return the name of the distributed tensor '''
return self._name
class NodeDataView(MutableMapping): class NodeDataView(MutableMapping):
"""The data view class when dist_graph.ndata[...].data is called. """The data view class when dist_graph.ndata[...].data is called.
""" """
...@@ -177,26 +106,25 @@ class NodeDataView(MutableMapping): ...@@ -177,26 +106,25 @@ class NodeDataView(MutableMapping):
# When this is created, the server may already load node data. We need to # When this is created, the server may already load node data. We need to
# initialize the node data in advance. # initialize the node data in advance.
names = g._get_all_ndata_names() names = g._get_all_ndata_names()
policy = PartitionPolicy("node", g.get_partition_book()) policy = PartitionPolicy(NODE_PART_POLICY, g.get_partition_book())
self._data = {name: DistTensor(g, _get_ndata_name(name), policy) for name in names} self._data = {}
for name in names:
name1 = _get_data_name(name, policy.policy_str)
dtype, shape, _ = g._client.get_data_meta(name1)
# We create a wrapper on the existing tensor in the kvstore.
self._data[name] = DistTensor(g, shape, dtype, name, part_policy=policy)
def _get_names(self): def _get_names(self):
return list(self._data.keys()) return list(self._data.keys())
def _add(self, name):
policy = PartitionPolicy("node", self._graph.get_partition_book())
self._data[name] = DistTensor(self._graph, _get_ndata_name(name), policy)
def __getitem__(self, key): def __getitem__(self, key):
return self._data[key] return self._data[key]
def __setitem__(self, key, val): def __setitem__(self, key, val):
raise DGLError("DGL doesn't support assignment. " self._data[key] = val
+ "Please call init_ndata to initialize new node data.")
def __delitem__(self, key): def __delitem__(self, key):
#TODO(zhengda) how to delete data in the kvstore. del self._data[key]
raise NotImplementedError("delete node data isn't supported yet")
def __len__(self): def __len__(self):
# The number of node data may change. Let's count it every time we need them. # The number of node data may change. Let's count it every time we need them.
...@@ -224,26 +152,25 @@ class EdgeDataView(MutableMapping): ...@@ -224,26 +152,25 @@ class EdgeDataView(MutableMapping):
# When this is created, the server may already load edge data. We need to # When this is created, the server may already load edge data. We need to
# initialize the edge data in advance. # initialize the edge data in advance.
names = g._get_all_edata_names() names = g._get_all_edata_names()
policy = PartitionPolicy("edge", g.get_partition_book()) policy = PartitionPolicy(EDGE_PART_POLICY, g.get_partition_book())
self._data = {name: DistTensor(g, _get_edata_name(name), policy) for name in names} self._data = {}
for name in names:
name1 = _get_data_name(name, policy.policy_str)
dtype, shape, _ = g._client.get_data_meta(name1)
# We create a wrapper on the existing tensor in the kvstore.
self._data[name] = DistTensor(g, shape, dtype, name, part_policy=policy)
def _get_names(self): def _get_names(self):
return list(self._data.keys()) return list(self._data.keys())
def _add(self, name):
policy = PartitionPolicy("edge", self._graph.get_partition_book())
self._data[name] = DistTensor(self._graph, _get_edata_name(name), policy)
def __getitem__(self, key): def __getitem__(self, key):
return self._data[key] return self._data[key]
def __setitem__(self, key, val): def __setitem__(self, key, val):
raise DGLError("DGL doesn't support assignment. " self._data[key] = val
+ "Please call init_edata to initialize new edge data.")
def __delitem__(self, key): def __delitem__(self, key):
#TODO(zhengda) how to delete data in the kvstore. del self._data[key]
raise NotImplementedError("delete edge data isn't supported yet")
def __len__(self): def __len__(self):
# The number of edge data may change. Let's count it every time we need them. # The number of edge data may change. Let's count it every time we need them.
...@@ -306,21 +233,25 @@ class DistGraphServer(KVServer): ...@@ -306,21 +233,25 @@ class DistGraphServer(KVServer):
if not disable_shared_mem: if not disable_shared_mem:
self.gpb.shared_memory(graph_name) self.gpb.shared_memory(graph_name)
assert self.gpb.partid == server_id assert self.gpb.partid == server_id
self.add_part_policy(PartitionPolicy('node', self.gpb)) self.add_part_policy(PartitionPolicy(NODE_PART_POLICY, self.gpb))
self.add_part_policy(PartitionPolicy('edge', self.gpb)) self.add_part_policy(PartitionPolicy(EDGE_PART_POLICY, self.gpb))
if not self.is_backup_server(): if not self.is_backup_server():
for name in node_feats: for name in node_feats:
self.init_data(name=_get_ndata_name(name), policy_str='node', self.init_data(name=_get_data_name(name, NODE_PART_POLICY),
policy_str=NODE_PART_POLICY,
data_tensor=node_feats[name]) data_tensor=node_feats[name])
for name in edge_feats: for name in edge_feats:
self.init_data(name=_get_edata_name(name), policy_str='edge', self.init_data(name=_get_data_name(name, EDGE_PART_POLICY),
policy_str=EDGE_PART_POLICY,
data_tensor=edge_feats[name]) data_tensor=edge_feats[name])
else: else:
for name in node_feats: for name in node_feats:
self.init_data(name=_get_ndata_name(name), policy_str='node') self.init_data(name=_get_data_name(name, NODE_PART_POLICY),
policy_str=NODE_PART_POLICY)
for name in edge_feats: for name in edge_feats:
self.init_data(name=_get_edata_name(name), policy_str='edge') self.init_data(name=_get_data_name(name, EDGE_PART_POLICY),
policy_str=EDGE_PART_POLICY)
def start(self): def start(self):
""" Start graph store server. """ Start graph store server.
...@@ -331,9 +262,6 @@ class DistGraphServer(KVServer): ...@@ -331,9 +262,6 @@ class DistGraphServer(KVServer):
start_server(server_id=self.server_id, ip_config=self.ip_config, start_server(server_id=self.server_id, ip_config=self.ip_config,
num_clients=self.num_clients, server_state=server_state) num_clients=self.num_clients, server_state=server_state)
def _default_init_data(shape, dtype):
return F.zeros(shape, dtype, F.cpu())
class DistGraph: class DistGraph:
''' The DistGraph client. ''' The DistGraph client.
...@@ -380,9 +308,9 @@ class DistGraph: ...@@ -380,9 +308,9 @@ class DistGraph:
self._gpb = gpb self._gpb = gpb
self._g = as_heterograph(g) self._g = as_heterograph(g)
for name in node_feats: for name in node_feats:
self._client.add_data(_get_ndata_name(name), node_feats[name]) self._client.add_data(_get_data_name(name, NODE_PART_POLICY), node_feats[name])
for name in edge_feats: for name in edge_feats:
self._client.add_data(_get_edata_name(name), edge_feats[name]) self._client.add_data(_get_data_name(name, EDGE_PART_POLICY), edge_feats[name])
rpc.set_num_client(1) rpc.set_num_client(1)
else: else:
connect_to_server(ip_config=ip_config) connect_to_server(ip_config=ip_config)
...@@ -407,68 +335,6 @@ class DistGraph: ...@@ -407,68 +335,6 @@ class DistGraph:
self._num_nodes += int(part_md['num_nodes']) self._num_nodes += int(part_md['num_nodes'])
self._num_edges += int(part_md['num_edges']) self._num_edges += int(part_md['num_edges'])
def init_ndata(self, name, shape, dtype, init_func=None):
'''Initialize node data
This initializes the node data in the distributed graph storage.
Users can provide a init function to initialize data. The signature of
the init function is
```
def init_func(shape, dtype)
```
The inputs are the shape and data type and the output is a tensor with
the initialized values.
Parameters
----------
name : string
The name of the node data.
shape : tuple
The shape of the node data.
dtype : dtype
The data type of the node data.
init_func : callable
The function to initialize the data
'''
assert shape[0] == self.number_of_nodes()
if init_func is None:
init_func = _default_init_data
policy = PartitionPolicy('node', self._gpb)
self._client.init_data(_get_ndata_name(name), shape, dtype, policy, init_func)
self._ndata._add(name)
def init_edata(self, name, shape, dtype, init_func=None):
'''Initialize edge data
This initializes the edge data in the distributed graph storage.
Users can provide a init function to initialize data. The signature of
the init function is
```
def init_func(shape, dtype)
```
The inputs are the shape and data type and the output is a tensor with
the initialized values.
Parameters
----------
name : string
The name of the edge data.
shape : tuple
The shape of the edge data.
dtype : dtype
The data type of the edge data.
init_func : callable
The function to initialize the data
'''
assert shape[0] == self.number_of_edges()
if init_func is None:
init_func = _default_init_data
policy = PartitionPolicy('edge', self._gpb)
self._client.init_data(_get_edata_name(name), shape, dtype, policy, init_func)
self._edata._add(name)
@property @property
def local_partition(self): def local_partition(self):
''' Return the local partition on the client ''' Return the local partition on the client
...@@ -561,6 +427,13 @@ class DistGraph: ...@@ -561,6 +427,13 @@ class DistGraph:
""" """
return self._gpb return self._gpb
def barrier(self):
'''Barrier for all client nodes.
This API will be blocked untill all the clients invoke this API.
'''
self._client.barrier()
def _get_all_ndata_names(self): def _get_all_ndata_names(self):
''' Get the names of all node data. ''' Get the names of all node data.
''' '''
......
"""Define distributed tensor."""
import os
import uuid
from .graph_partition_book import PartitionPolicy, NODE_PART_POLICY, EDGE_PART_POLICY
from .rpc_client import is_initialized
from ..base import DGLError
from .. import utils
from .. import backend as F
def _get_data_name(name, part_policy):
''' This is to get the name of data in the kvstore.
KVStore doesn't understand node data or edge data. We'll use a prefix to distinguish them.
'''
return part_policy + ':' + name
def _default_init_data(shape, dtype):
return F.zeros(shape, dtype, F.cpu())
class DistTensor:
''' Distributed tensor.
DistTensor references to a tensor stored in the distributed KVStore.
When a DistTensor is created, it may reference to a tensor in the KVStore, or
create a new one. The tensor is identified by the name passed to the constructor
of DistTensor. If the name exists, DistTensor will reference the existing one.
In this case, the shape and the data type should match the existing tensor.
If the name doesn't exist, a new tensor will be created in the kvstore.
If persistent=True when creating DistTesnor, the tensor in the KVStore will
be persistent. Even if DistTensor is destroyed in the local trainer process,
the tensor will still exist in KVStore. However, we do not allow an anonymous
tensor to be persistent.
Parameters
----------
g : DistGraph
The distributed graph object.
shape : tuple
The shape of the tensor
dtype : dtype
The dtype of the tensor
name : string
The name of the tensor.
part_policy : PartitionPolicy
The partition policy of the tensor
init_func : callable
The function to initialize data in the tensor.
persistent : bool
Whether the created tensor is persistent.
'''
def __init__(self, g, shape, dtype, name=None, part_policy=None, init_func=None,
persistent=False):
self.kvstore = g._client
self._shape = shape
self._dtype = dtype
if part_policy is None:
assert shape[0] != g.number_of_nodes() or shape[0] != g.number_of_edges(), \
'Cannot determine the partition policy. Please provide it.'
if shape[0] == g.number_of_nodes():
part_policy = PartitionPolicy(NODE_PART_POLICY, g.get_partition_book())
elif shape[0] == g.number_of_edges():
part_policy = PartitionPolicy(EDGE_PART_POLICY, g.get_partition_book())
else:
raise DGLError('Cannot determine the partition policy. Please provide it.')
self._part_policy = part_policy
if init_func is None:
init_func = _default_init_data
# If a user doesn't provide a name, we generate a name ourselves.
if name is None:
assert not persistent, 'We cannot generate anonymous persistent distributed tensors'
name = uuid.uuid4().hex[:10]
self._name = _get_data_name(name, part_policy.policy_str)
self._persistent = persistent
if self._name not in g._client.data_name_list():
g._client.init_data(self._name, shape, dtype, part_policy, init_func)
self._owner = True
else:
self._owner = False
dtype1, shape1, _ = g._client.get_data_meta(self._name)
assert dtype == dtype1, 'The dtype does not match with the existing tensor'
assert shape == shape1, 'The shape does not match with the existing tensor'
def __del__(self):
initialized = os.environ.get('DGL_DIST_MODE', 'standalone') == 'standalone' \
or is_initialized()
if not self._persistent and self._owner and initialized:
self.kvstore.delete_data(self._name)
def __getitem__(self, idx):
idx = utils.toindex(idx)
idx = idx.tousertensor()
return self.kvstore.pull(name=self._name, id_tensor=idx)
def __setitem__(self, idx, val):
idx = utils.toindex(idx)
idx = idx.tousertensor()
# TODO(zhengda) how do we want to support broadcast (e.g., G.ndata['h'][idx] = 1).
self.kvstore.push(name=self._name, id_tensor=idx, data_tensor=val)
def __len__(self):
return self._shape[0]
@property
def part_policy(self):
''' Return the partition policy '''
return self._part_policy
@property
def shape(self):
''' Return the shape of the distributed tensor. '''
return self._shape
@property
def dtype(self):
''' Return the data type of the distributed tensor. '''
return self._dtype
@property
def name(self):
''' Return the name of the distributed tensor '''
return self._name
...@@ -623,6 +623,9 @@ class RangePartitionBook: ...@@ -623,6 +623,9 @@ class RangePartitionBook:
""" """
return self._partid return self._partid
NODE_PART_POLICY = 'node'
EDGE_PART_POLICY = 'edge'
class PartitionPolicy(object): class PartitionPolicy(object):
"""Wrapper for GraphPartitionBook and RangePartitionBook. """Wrapper for GraphPartitionBook and RangePartitionBook.
...@@ -637,7 +640,8 @@ class PartitionPolicy(object): ...@@ -637,7 +640,8 @@ class PartitionPolicy(object):
""" """
def __init__(self, policy_str, partition_book): def __init__(self, policy_str, partition_book):
# TODO(chao): support more policies for HeteroGraph # TODO(chao): support more policies for HeteroGraph
assert policy_str in ('edge', 'node'), 'policy_str must be \'edge\' or \'node\'.' assert policy_str in (EDGE_PART_POLICY, NODE_PART_POLICY), \
'policy_str must be \'edge\' or \'node\'.'
self._policy_str = policy_str self._policy_str = policy_str
self._part_id = partition_book.partid self._part_id = partition_book.partid
self._partition_book = partition_book self._partition_book = partition_book
...@@ -670,9 +674,9 @@ class PartitionPolicy(object): ...@@ -670,9 +674,9 @@ class PartitionPolicy(object):
tensor tensor
local ID tensor local ID tensor
""" """
if self._policy_str == 'edge': if self._policy_str == EDGE_PART_POLICY:
return self._partition_book.eid2localeid(id_tensor, self._part_id) return self._partition_book.eid2localeid(id_tensor, self._part_id)
elif self._policy_str == 'node': elif self._policy_str == NODE_PART_POLICY:
return self._partition_book.nid2localnid(id_tensor, self._part_id) return self._partition_book.nid2localnid(id_tensor, self._part_id)
else: else:
raise RuntimeError('Cannot support policy: %s ' % self._policy_str) raise RuntimeError('Cannot support policy: %s ' % self._policy_str)
...@@ -690,9 +694,9 @@ class PartitionPolicy(object): ...@@ -690,9 +694,9 @@ class PartitionPolicy(object):
tensor tensor
partition ID partition ID
""" """
if self._policy_str == 'edge': if self._policy_str == EDGE_PART_POLICY:
return self._partition_book.eid2partid(id_tensor) return self._partition_book.eid2partid(id_tensor)
elif self._policy_str == 'node': elif self._policy_str == NODE_PART_POLICY:
return self._partition_book.nid2partid(id_tensor) return self._partition_book.nid2partid(id_tensor)
else: else:
raise RuntimeError('Cannot support policy: %s ' % self._policy_str) raise RuntimeError('Cannot support policy: %s ' % self._policy_str)
...@@ -705,9 +709,9 @@ class PartitionPolicy(object): ...@@ -705,9 +709,9 @@ class PartitionPolicy(object):
int int
data size data size
""" """
if self._policy_str == 'edge': if self._policy_str == EDGE_PART_POLICY:
return len(self._partition_book.partid2eids(self._part_id)) return len(self._partition_book.partid2eids(self._part_id))
elif self._policy_str == 'node': elif self._policy_str == NODE_PART_POLICY:
return len(self._partition_book.partid2nids(self._part_id)) return len(self._partition_book.partid2nids(self._part_id))
else: else:
raise RuntimeError('Cannot support policy: %s ' % self._policy_str) raise RuntimeError('Cannot support policy: %s ' % self._policy_str)
...@@ -612,6 +612,9 @@ class KVServer(object): ...@@ -612,6 +612,9 @@ class KVServer(object):
# Basic information # Basic information
self._server_id = server_id self._server_id = server_id
self._server_namebook = rpc.read_ip_config(ip_config) self._server_namebook = rpc.read_ip_config(ip_config)
assert server_id in self._server_namebook, \
'Trying to start server {}, but there are {} servers in the config file'.format(
server_id, len(self._server_namebook))
self._machine_id = self._server_namebook[server_id][0] self._machine_id = self._server_namebook[server_id][0]
self._group_count = self._server_namebook[server_id][3] self._group_count = self._server_namebook[server_id][3]
# We assume partition_id is equal to machine_id # We assume partition_id is equal to machine_id
...@@ -700,13 +703,19 @@ class KVServer(object): ...@@ -700,13 +703,19 @@ class KVServer(object):
assert len(name) > 0, 'name cannot be empty.' assert len(name) > 0, 'name cannot be empty.'
if name in self._data_store: if name in self._data_store:
raise RuntimeError("Data %s has already exists!" % name) raise RuntimeError("Data %s has already exists!" % name)
self._part_policy[name] = self.find_policy(policy_str)
if data_tensor is not None: # Create shared-tensor if data_tensor is not None: # Create shared-tensor
data_type = F.reverse_data_type_dict[F.dtype(data_tensor)] data_type = F.reverse_data_type_dict[F.dtype(data_tensor)]
shared_data = empty_shared_mem(name+'-kvdata-', True, data_tensor.shape, data_type) shared_data = empty_shared_mem(name+'-kvdata-', True, data_tensor.shape, data_type)
dlpack = shared_data.to_dlpack() dlpack = shared_data.to_dlpack()
self._data_store[name] = F.zerocopy_from_dlpack(dlpack) self._data_store[name] = F.zerocopy_from_dlpack(dlpack)
self._data_store[name][:] = data_tensor[:] self._data_store[name][:] = data_tensor[:]
self._part_policy[name] = self.find_policy(policy_str) assert self._part_policy[name].get_data_size() == data_tensor.shape[0], \
'kvserver expect partition {} for {} has {} rows, but gets {} rows'.format(
self._part_policy[name].part_id,
policy_str,
self._part_policy[name].get_data_size(),
data_tensor.shape[0])
self._pull_handlers[name] = default_pull_handler self._pull_handlers[name] = default_pull_handler
self._push_handlers[name] = default_push_handler self._push_handlers[name] = default_push_handler
......
...@@ -96,6 +96,8 @@ def get_local_usable_addr(): ...@@ -96,6 +96,8 @@ def get_local_usable_addr():
return ip_addr + ':' + str(port) return ip_addr + ':' + str(port)
INITIALIZED = False
def connect_to_server(ip_config, max_queue_size=MAX_QUEUE_SIZE, net_type='socket'): def connect_to_server(ip_config, max_queue_size=MAX_QUEUE_SIZE, net_type='socket'):
"""Connect this client to server. """Connect this client to server.
...@@ -171,11 +173,15 @@ def connect_to_server(ip_config, max_queue_size=MAX_QUEUE_SIZE, net_type='socket ...@@ -171,11 +173,15 @@ def connect_to_server(ip_config, max_queue_size=MAX_QUEUE_SIZE, net_type='socket
res = rpc.recv_response() res = rpc.recv_response()
rpc.set_num_client(res.num_client) rpc.set_num_client(res.num_client)
atexit.register(exit_client) atexit.register(exit_client)
global INITIALIZED
INITIALIZED = True
def finalize_client(): def finalize_client():
"""Release resources of this client.""" """Release resources of this client."""
rpc.finalize_sender() rpc.finalize_sender()
rpc.finalize_receiver() rpc.finalize_receiver()
global INITIALIZED
INITIALIZED = False
def shutdown_servers(): def shutdown_servers():
"""Issue commands to remote servers to shut them down. """Issue commands to remote servers to shut them down.
...@@ -196,3 +202,8 @@ def exit_client(): ...@@ -196,3 +202,8 @@ def exit_client():
shutdown_servers() shutdown_servers()
finalize_client() finalize_client()
atexit.unregister(exit_client) atexit.unregister(exit_client)
def is_initialized():
"""Is RPC initialized?
"""
return INITIALIZED
"""Define sparse embedding and optimizer.""" """Define sparse embedding and optimizer."""
from .. import backend as F from .. import backend as F
from .dist_tensor import DistTensor
from .graph_partition_book import PartitionPolicy, NODE_PART_POLICY
class SparseNodeEmbedding: class SparseNodeEmbedding:
''' Sparse embeddings in the distributed KVStore. ''' Sparse embeddings in the distributed KVStore.
...@@ -32,7 +34,8 @@ class SparseNodeEmbedding: ...@@ -32,7 +34,8 @@ class SparseNodeEmbedding:
''' '''
def __init__(self, g, name, shape, initializer): def __init__(self, g, name, shape, initializer):
assert shape[0] == g.number_of_nodes() assert shape[0] == g.number_of_nodes()
g.init_ndata(name, shape, F.float32, initializer) part_policy = PartitionPolicy(NODE_PART_POLICY, g.get_partition_book())
g.ndata[name] = DistTensor(g, shape, F.float32, name, part_policy, initializer)
self._tensor = g.ndata[name] self._tensor = g.ndata[name]
self._trace = [] self._trace = []
......
...@@ -35,6 +35,10 @@ class KVClient(object): ...@@ -35,6 +35,10 @@ class KVClient(object):
'''add new data to the client''' '''add new data to the client'''
self._data[name] = init_func(shape, dtype) self._data[name] = init_func(shape, dtype)
def delete_data(self, name):
'''delete the data'''
del self._data[name]
def data_name_list(self): def data_name_list(self):
'''get the names of all data''' '''get the names of all data'''
return list(self._data.keys()) return list(self._data.keys())
......
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <unistd.h> #include <unistd.h>
#endif // !_WIN32 #endif // !_WIN32
#include <string.h>
#include <errno.h>
namespace dgl { namespace dgl {
namespace network { namespace network {
...@@ -182,6 +184,9 @@ int64_t TCPSocket::Send(const char * data, int64_t len_data) { ...@@ -182,6 +184,9 @@ int64_t TCPSocket::Send(const char * data, int64_t len_data) {
do { // retry if EINTR failure appears do { // retry if EINTR failure appears
number_send = send(socket_, data, len_data, 0); number_send = send(socket_, data, len_data, 0);
} while (number_send == -1 && errno == EINTR); } while (number_send == -1 && errno == EINTR);
if (number_send == -1) {
LOG(ERROR) << "send error: " << strerror(errno);
}
return number_send; return number_send;
} }
...@@ -192,6 +197,9 @@ int64_t TCPSocket::Receive(char * buffer, int64_t size_buffer) { ...@@ -192,6 +197,9 @@ int64_t TCPSocket::Receive(char * buffer, int64_t size_buffer) {
do { // retry if EINTR failure appears do { // retry if EINTR failure appears
number_recv = recv(socket_, buffer, size_buffer, 0); number_recv = recv(socket_, buffer, size_buffer, 0);
} while (number_recv == -1 && errno == EINTR); } while (number_recv == -1 && errno == EINTR);
if (number_recv == -1) {
LOG(ERROR) << "recv error: " << strerror(errno);
}
return number_recv; return number_recv;
} }
......
...@@ -64,6 +64,9 @@ def run_server(graph_name, server_id, num_clients, shared_mem): ...@@ -64,6 +64,9 @@ def run_server(graph_name, server_id, num_clients, shared_mem):
def emb_init(shape, dtype): def emb_init(shape, dtype):
return F.zeros(shape, dtype, F.cpu()) return F.zeros(shape, dtype, F.cpu())
def rand_init(shape, dtype):
return F.tensor(np.random.normal(size=shape))
def run_client(graph_name, part_id, num_nodes, num_edges): def run_client(graph_name, part_id, num_nodes, num_edges):
time.sleep(5) time.sleep(5)
gpb, graph_name = load_partition_book('/tmp/dist_graph/{}.json'.format(graph_name), gpb, graph_name = load_partition_book('/tmp/dist_graph/{}.json'.format(graph_name),
...@@ -90,15 +93,30 @@ def check_dist_graph(g, num_nodes, num_edges): ...@@ -90,15 +93,30 @@ def check_dist_graph(g, num_nodes, num_edges):
# Test init node data # Test init node data
new_shape = (g.number_of_nodes(), 2) new_shape = (g.number_of_nodes(), 2)
g.init_ndata('test1', new_shape, F.int32) g.ndata['test1'] = dgl.distributed.DistTensor(g, new_shape, F.int32)
feats = g.ndata['test1'][nids] feats = g.ndata['test1'][nids]
assert np.all(F.asnumpy(feats) == 0) assert np.all(F.asnumpy(feats) == 0)
# Test init edge data # reference to a one that exists
new_shape = (g.number_of_edges(), 2) test2 = dgl.distributed.DistTensor(g, new_shape, F.float32, 'test2', init_func=rand_init)
g.init_edata('test1', new_shape, F.int32) test3 = dgl.distributed.DistTensor(g, new_shape, F.float32, 'test2')
feats = g.edata['test1'][eids] assert np.all(F.asnumpy(test2[nids]) == F.asnumpy(test3[nids]))
assert np.all(F.asnumpy(feats) == 0)
# create a tensor and destroy a tensor and create it again.
test3 = dgl.distributed.DistTensor(g, new_shape, F.float32, 'test3', init_func=rand_init)
del test3
test3 = dgl.distributed.DistTensor(g, (g.number_of_nodes(), 3), F.float32, 'test3')
del test3
# test a persistent tesnor
test4 = dgl.distributed.DistTensor(g, new_shape, F.float32, 'test4', init_func=rand_init,
persistent=True)
del test4
try:
test4 = dgl.distributed.DistTensor(g, (g.number_of_nodes(), 3), F.float32, 'test4')
raise Exception('')
except:
pass
# Test sparse emb # Test sparse emb
try: try:
...@@ -119,7 +137,8 @@ def check_dist_graph(g, num_nodes, num_edges): ...@@ -119,7 +137,8 @@ def check_dist_graph(g, num_nodes, num_edges):
assert np.all(F.asnumpy(feats1) == np.zeros((len(rest), 1))) assert np.all(F.asnumpy(feats1) == np.zeros((len(rest), 1)))
policy = dgl.distributed.PartitionPolicy('node', g.get_partition_book()) policy = dgl.distributed.PartitionPolicy('node', g.get_partition_book())
grad_sum = dgl.distributed.DistTensor(g, 'node:emb1_sum', policy) grad_sum = dgl.distributed.DistTensor(g, (g.number_of_nodes(),), F.float32,
'emb1_sum', policy)
assert np.all(F.asnumpy(grad_sum[nids]) == np.ones((len(nids), 1))) assert np.all(F.asnumpy(grad_sum[nids]) == np.ones((len(nids), 1)))
assert np.all(F.asnumpy(grad_sum[rest]) == np.zeros((len(rest), 1))) assert np.all(F.asnumpy(grad_sum[rest]) == np.zeros((len(rest), 1)))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment