Unverified Commit d7390763 authored by xiang song(charlie.song)'s avatar xiang song(charlie.song) Committed by GitHub
Browse files

[Distributed] Deprecate old DistEmbedding impl, use synchronized embedding impl (#3111)



* fix.

* fix.

* fix.

* fix.

* Fix test

* Deprecate old DistEmbedding impl, use synchronized embedding impl

* update doc
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-71-112.ec2.internal>
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-2-66.ec2.internal>
Co-authored-by: default avatarDa Zheng <zhengda1936@gmail.com>
Co-authored-by: default avatarJinjing Zhou <VoVAllen@users.noreply.github.com>
parent ee6bc951
......@@ -27,9 +27,9 @@ Distributed Tensor
Distributed Node Embedding
---------------------
.. currentmodule:: dgl.distributed.nn.pytorch
.. currentmodule:: dgl.distributed
.. autoclass:: NodeEmbedding
.. autoclass:: DistEmbedding
Distributed embedding optimizer
......
......@@ -9,7 +9,7 @@ This section covers the distributed APIs used in the training script. DGL provid
data structures and various APIs for initialization, distributed sampling and workload split.
For distributed training/inference, DGL provides three distributed data structures:
:class:`~dgl.distributed.DistGraph` for distributed graphs, :class:`~dgl.distributed.DistTensor` for
distributed tensors and :class:`~dgl.distributed.nn.NodeEmbedding` for distributed learnable embeddings.
distributed tensors and :class:`~dgl.distributed.DistEmbedding` for distributed learnable embeddings.
Initialization of the DGL distributed module
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......@@ -27,7 +27,7 @@ Typically, the initialization APIs should be invoked in the following order:
th.distributed.init_process_group(backend='gloo')
**Note**: If the training script contains user-defined functions (UDFs) that have to be invoked on
the servers (see the section of DistTensor and NodeEmbedding for more details), these UDFs have to
the servers (see the section of DistTensor and DistEmbedding for more details), these UDFs have to
be declared before :func:`~dgl.distributed.initialize`.
Distributed graph
......@@ -153,10 +153,10 @@ computation operators, such as sum and mean.
when a machine runs multiple servers. This may result in data corruption. One way to avoid concurrent
writes to the same row of data is to run one server process on a machine.
Distributed NodeEmbedding
Distributed DistEmbedding
~~~~~~~~~~~~~~~~~~~~~
DGL provides :class:`~dgl.distributed.nn.NodeEmbedding` to support transductive models that require
DGL provides :class:`~dgl.distributed.DistEmbedding` to support transductive models that require
node embeddings. Creating distributed embeddings is very similar to creating distributed tensors.
.. code:: python
......@@ -165,7 +165,7 @@ node embeddings. Creating distributed embeddings is very similar to creating dis
arr = th.zeros(shape, dtype=dtype)
arr.uniform_(-1, 1)
return arr
emb = dgl.distributed.nn.NodeEmbedding(g.number_of_nodes(), 10, init_func=initializer)
emb = dgl.distributed.DistEmbedding(g.number_of_nodes(), 10, init_func=initializer)
Internally, distributed embeddings are built on top of distributed tensors, and, thus, has
very similar behaviors to distributed tensors. For example, when embeddings are created, they
......@@ -192,7 +192,7 @@ the other for dense model parameters, as shown in the code below:
optimizer.step()
sparse_optimizer.step()
**Note**: :class:`~dgl.distributed.nn.NodeEmbedding` is not an Pytorch nn module, so we cannot
**Note**: :class:`~dgl.distributed.DistEmbedding` is not an Pytorch nn module, so we cannot
get access to it from parameters of a Pytorch nn module.
Distributed sampling
......
......@@ -85,7 +85,7 @@ Specifically, DGL's distributed training has three types of interacting processe
generate mini-batches for training.
* Trainers contain multiple classes to interact with servers. It has
:class:`~dgl.distributed.DistGraph` to get access to partitioned graph data and has
:class:`~dgl.distributed.nn.NodeEmbedding` and :class:`~dgl.distributed.DistTensor` to access
:class:`~dgl.distributed.DistEmbedding` and :class:`~dgl.distributed.DistTensor` to access
the node/edge features/embeddings. It has
:class:`~dgl.distributed.dist_dataloader.DistDataLoader` to
interact with samplers to get mini-batches.
......
......@@ -8,7 +8,7 @@
本节介绍了在训练脚本中使用的分布式计算API。DGL提供了三种分布式数据结构和多种API,用于初始化、分布式采样和数据分割。
对于分布式训练/推断,DGL提供了三种分布式数据结构:用于分布式图的 :class:`~dgl.distributed.DistGraph`、
用于分布式张量的 :class:`~dgl.distributed.DistTensor` 和用于分布式可学习嵌入的
:class:`~dgl.distributed.nn.NodeEmbedding`。
:class:`~dgl.distributed.DistEmbedding`。
DGL分布式模块的初始化
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......@@ -24,7 +24,7 @@ DGL分布式模块的初始化
dgl.distributed.initialize('ip_config.txt')
th.distributed.init_process_group(backend='gloo')
**Note**: 如果训练脚本里包含需要在服务器(细节内容可以在下面的DistTensor和NodeEmbedding章节里查看)上调用的用户自定义函数(UDF),
**Note**: 如果训练脚本里包含需要在服务器(细节内容可以在下面的DistTensor和DistEmbedding章节里查看)上调用的用户自定义函数(UDF),
这些UDF必须在 :func:`~dgl.distributed.initialize` 之前被声明。
分布式图
......@@ -138,7 +138,7 @@ DGL为分布式张量提供了类似于单机普通张量的接口,以访问
分布式嵌入
~~~~~~~~~~~~~~~~~~~~~
DGL提供 :class:`~dgl.distributed.nn.NodeEmbedding` 以支持需要节点嵌入的直推(transductive)模型。
DGL提供 :class:`~dgl.distributed.DistEmbedding` 以支持需要节点嵌入的直推(transductive)模型。
分布式嵌入的创建与分布式张量的创建非常相似。
.. code:: python
......@@ -147,7 +147,7 @@ DGL提供 :class:`~dgl.distributed.nn.NodeEmbedding` 以支持需要节点嵌入
arr = th.zeros(shape, dtype=dtype)
arr.uniform_(-1, 1)
return arr
emb = dgl.distributed.nn.NodeEmbedding(g.number_of_nodes(), 10, init_func=initializer)
emb = dgl.distributed.DistEmbedding(g.number_of_nodes(), 10, init_func=initializer)
在内部,分布式嵌入建立在分布式张量之上,因此,其行为与分布式张量非常相似。
例如,创建嵌入时,DGL会将它们分片并存储在集群中的所有计算机上。(分布式嵌入)可以通过名称唯一标识。
......@@ -169,7 +169,7 @@ DGL提供了一个稀疏的Adagrad优化器 :class:`~dgl.distributed.SparseAdagr
optimizer.step()
sparse_optimizer.step()
**Note**: :class:`~dgl.distributed.nn.NodeEmbedding` 不是PyTorch的nn模块,因此用户无法从nn模块的参数访问它。
**Note**: :class:`~dgl.distributed.DistEmbedding` 不是PyTorch的nn模块,因此用户无法从nn模块的参数访问它。
分布式采样
~~~~~~~~~~~~~~~~~~~~
......
......@@ -74,7 +74,7 @@ DGL实现了一些分布式组件以支持分布式训练,下图显示了这
这些服务器一起工作以将图数据提供给训练器。请注意,一台机器可能同时运行多个服务器进程,以并行化计算和网络通信。
* *采样器进程* 与服务器进行交互,并对节点和边采样以生成用于训练的小批次数据。
* *训练器进程* 包含多个与服务器交互的类。它用 :class:`~dgl.distributed.DistGraph` 来获取被划分的图分区数据,
:class:`~dgl.distributed.nn.NodeEmbedding`
:class:`~dgl.distributed.DistEmbedding`
:class:`~dgl.distributed.DistTensor` 来获取节点/边特征/嵌入,用
:class:`~dgl.distributed.dist_dataloader.DistDataLoader` 与采样器进行交互以获得小批次数据。
......
......@@ -164,7 +164,7 @@ python3 ~/workspace/dgl/tools/launch.py --workspace ~/workspace/dgl/examples/pyt
"python3 train_dist_transductive.py --graph_name ogb-product --ip_config ip_config.txt --batch_size 1000 --num_gpu 4 --eval_every 5"
```
To run supervised with transductive setting using dgl distributed NodeEmbedding
To run supervised with transductive setting using dgl distributed DistEmbedding
```bash
python3 ~/workspace/dgl/tools/launch.py --workspace ~/workspace/dgl/examples/pytorch/graphsage/experimental/ \
--num_trainers 4 \
......@@ -188,7 +188,7 @@ python3 ~/workspace/dgl/tools/launch.py --workspace ~/workspace/dgl/examples/pyt
"python3 train_dist_unsupervised_transductive.py --graph_name ogb-product --ip_config ip_config.txt --num_epochs 3 --batch_size 1000 --num_gpus 4"
```
To run unsupervised with transductive setting using dgl distributed NodeEmbedding
To run unsupervised with transductive setting using dgl distributed DistEmbedding
```bash
python3 ~/workspace/dgl/tools/launch.py --workspace ~/workspace/dgl/examples/pytorch/graphsage/experimental/ \
--num_trainers 4 \
......
......@@ -13,7 +13,7 @@ from dgl.data.utils import load_graphs
import dgl.function as fn
import dgl.nn.pytorch as dglnn
from dgl.distributed import DistDataLoader
from dgl.distributed.nn import NodeEmbedding
from dgl.distributed import DistEmbedding
import torch as th
import torch.nn as nn
......@@ -91,7 +91,7 @@ class DistEmb(nn.Module):
self.emb_size = emb_size
self.dgl_sparse_emb = dgl_sparse_emb
if dgl_sparse_emb:
self.sparse_emb = NodeEmbedding(num_nodes, emb_size, name='sage', init_func=initializer)
self.sparse_emb = DistEmbedding(num_nodes, emb_size, name='sage', init_func=initializer)
else:
self.sparse_emb = th.nn.Embedding(num_nodes, emb_size, sparse=True)
nn.init.uniform_(self.sparse_emb.weight, -1.0, 1.0)
......
......@@ -22,7 +22,6 @@ import torch.optim as optim
import torch.multiprocessing as mp
from dgl.distributed import DistDataLoader
from dgl.distributed.optim import SparseAdagrad
from train_dist_unsupervised import SAGE, NeighborSampler, PosNeighborSampler, CrossEntropyLoss, compute_acc
from train_dist_transductive import DistEmb, load_embs
......
......@@ -126,7 +126,7 @@ We can get the performance score at the second epoch:
Val Acc 0.4323, Test Acc 0.4255, time: 128.0379
```
The command below launches the same distributed training job using dgl distributed NodeEmbedding
The command below launches the same distributed training job using dgl distributed DistEmbedding
```bash
python3 ~/workspace/dgl/tools/launch.py \
--workspace ~/workspace/dgl/examples/pytorch/rgcn/experimental/ \
......@@ -135,7 +135,7 @@ python3 ~/workspace/dgl/tools/launch.py \
--num_samplers 4 \
--part_config data/ogbn-mag.json \
--ip_config ip_config.txt \
"python3 entity_classify_dist.py --graph-name ogbn-mag --dataset ogbn-mag --fanout='25,25' --batch-size 1024 --n-hidden 64 --lr 0.01 --eval-batch-size 1024 --low-mem --dropout 0.5 --use-self-loop --n-bases 2 --n-epochs 3 --layer-norm --ip-config ip_config.txt --sparse-embedding --sparse-lr 0.06 --num_gpus 1"
"python3 entity_classify_dist.py --graph-name ogbn-mag --dataset ogbn-mag --fanout='25,25' --batch-size 1024 --n-hidden 64 --lr 0.01 --eval-batch-size 1024 --low-mem --dropout 0.5 --use-self-loop --n-bases 2 --n-epochs 3 --layer-norm --ip-config ip_config.txt --sparse-embedding --sparse-lr 0.06 --num_gpus 1 --dgl-sparse"
```
We can get the performance score at the second epoch:
......@@ -218,5 +218,5 @@ python3 partition_graph.py --dataset ogbn-mag --num_parts 1
### Step 2: run the training script
```bash
python3 entity_classify_dist.py --graph-name ogbn-mag --dataset ogbn-mag --fanout='25,25' --batch-size 512 --n-hidden 64 --lr 0.01 --eval-batch-size 128 --low-mem --dropout 0.5 --use-self-loop --n-bases 2 --n-epochs 3 --layer-norm --ip-config ip_config.txt --conf-path 'data/ogbn-mag.json' --standalone --sparse-embedding --sparse-lr 0.06 --node-feats
DGL_DIST_MODE=standalone python3 entity_classify_dist.py --graph-name ogbn-mag --dataset ogbn-mag --fanout='25,25' --batch-size 512 --n-hidden 64 --lr 0.01 --eval-batch-size 128 --low-mem --dropout 0.5 --use-self-loop --n-bases 2 --n-epochs 3 --layer-norm --ip-config ip_config.txt --conf-path 'data/ogbn-mag.json' --standalone --sparse-embedding --sparse-lr 0.06
```
......@@ -10,7 +10,7 @@ import argparse
import itertools
import numpy as np
import time
import os
import os, gc
os.environ['DGLBACKEND']='pytorch'
import torch as th
......@@ -162,7 +162,7 @@ class DistEmbedLayer(nn.Module):
# We only create embeddings for nodes without node features.
if feat_name not in g.nodes[ntype].data:
part_policy = g.get_node_partition_policy(ntype)
self.node_embeds[ntype] = dgl.distributed.nn.NodeEmbedding(g.number_of_nodes(ntype),
self.node_embeds[ntype] = dgl.distributed.DistEmbedding(g.number_of_nodes(ntype),
self.embed_size,
embed_name + '_' + ntype,
init_emb,
......@@ -229,6 +229,7 @@ def evaluate(g, model, embed_layer, labels, eval_loader, test_loader, all_val_ni
global_results = dgl.distributed.DistTensor(labels.shape, th.long, 'results', persistent=True)
with th.no_grad():
th.cuda.empty_cache()
for sample_data in tqdm.tqdm(eval_loader):
seeds, blocks = sample_data
for block in blocks:
......@@ -245,6 +246,7 @@ def evaluate(g, model, embed_layer, labels, eval_loader, test_loader, all_val_ni
test_logits = []
test_seeds = []
with th.no_grad():
th.cuda.empty_cache()
for sample_data in tqdm.tqdm(test_loader):
seeds, blocks = sample_data
for block in blocks:
......@@ -347,7 +349,7 @@ def run(args, device, data):
# Create DataLoader for constructing blocks
test_dataloader = DistDataLoader(
dataset=test_nid,
batch_size=args.batch_size,
batch_size=args.eval_batch_size,
collate_fn=test_sampler.sample_blocks,
shuffle=False,
drop_last=False)
......@@ -486,6 +488,7 @@ def run(args, device, data):
np.sum(backward_t[-args.log_every:]), np.sum(update_t[-args.log_every:])))
start = time.time()
gc.collect()
print('[{}]Epoch Time(s): {:.4f}, sample: {:.4f}, data copy: {:.4f}, forward: {:.4f}, backward: {:.4f}, update: {:.4f}, #train: {}, #input: {}'.format(
g.rank(), np.sum(step_time), np.sum(sample_t), np.sum(feat_copy_t), np.sum(forward_t), np.sum(backward_t), np.sum(update_t), number_train, number_input))
epoch += 1
......
......@@ -18,8 +18,7 @@ from .dist_graph import DistGraphServer, DistGraph, node_split, edge_split
from .dist_tensor import DistTensor
from .partition import partition_graph, load_partition, load_partition_book
from .graph_partition_book import GraphPartitionBook, PartitionPolicy
from .sparse_emb import SparseAdagrad, DistEmbedding
from . import nn
from .nn import *
from . import optim
from .rpc import *
......
"""dgl distributed sparse optimizer for pytorch."""
from .sparse_emb import NodeEmbedding
from .sparse_emb import DistEmbedding
......@@ -5,7 +5,7 @@ from .... import backend as F
from .... import utils
from ...dist_tensor import DistTensor
class NodeEmbedding:
class DistEmbedding:
'''Distributed node embeddings.
DGL provides a distributed embedding to support models that require learnable embeddings.
......@@ -34,7 +34,7 @@ class NodeEmbedding:
The dimension size of embeddings.
name : str, optional
The name of the embeddings. The name can uniquely identify embeddings in a system
so that another NodeEmbedding object can referent to the same embeddings.
so that another DistEmbedding object can referent to the same embeddings.
init_func : callable, optional
The function to create the initial data. If the init function is not provided,
the values of the embeddings are initialized to zero.
......@@ -49,7 +49,7 @@ class NodeEmbedding:
arr = th.zeros(shape, dtype=dtype)
arr.uniform_(-1, 1)
return arr
>>> emb = dgl.distributed.nn.NodeEmbedding(g.number_of_nodes(), 10, init_func=initializer)
>>> emb = dgl.distributed.DistEmbedding(g.number_of_nodes(), 10, init_func=initializer)
>>> optimizer = dgl.distributed.optim.SparseAdagrad([emb], lr=0.001)
>>> for blocks in dataloader:
... feats = emb(nids)
......@@ -59,7 +59,7 @@ class NodeEmbedding:
Note
----
When a ``NodeEmbedding`` object is used when the deep learning framework is recording
When a ``DistEmbedding`` object is used when the deep learning framework is recording
the forward computation, users have to invoke
py:meth:`~dgl.distributed.optim.SparseAdagrad.step` afterwards. Otherwise, there will be
some memory leak.
......
......@@ -4,18 +4,18 @@ from abc import abstractmethod
import torch as th
from ...dist_tensor import DistTensor
from ...nn.pytorch import NodeEmbedding
from ...nn.pytorch import DistEmbedding
from .utils import alltoallv_cpu, alltoall_cpu
class DistSparseGradOptimizer(abc.ABC):
r''' The abstract dist sparse optimizer.
Note: dgl dist sparse optimizer only work with dgl.distributed.nn.NodeEmbedding
Note: dgl dist sparse optimizer only work with dgl.distributed.DistEmbedding
Parameters
----------
params : list of NodeEmbedding
The list of NodeEmbedding.
params : list of DistEmbedding
The list of DistEmbedding.
lr : float
The learning rate.
'''
......@@ -146,7 +146,7 @@ class DistSparseGradOptimizer(abc.ABC):
Index of the embeddings to be updated.
grad : tensor
Gradient of each embedding.
emb : dgl.distributed.nn.NodeEmbedding
emb : dgl.distributed.DistEmbedding
Sparse node embedding to update.
"""
......@@ -172,7 +172,7 @@ class SparseAdagrad(DistSparseGradOptimizer):
r''' Distributed Node embedding optimizer using the Adagrad algorithm.
This optimizer implements a distributed sparse version of Adagrad algorithm for
optimizing :class:`dgl.distributed.nn.NodeEmbedding`. Being sparse means it only updates
optimizing :class:`dgl.distributed.DistEmbedding`. Being sparse means it only updates
the embeddings whose gradients have updates, which are usually a very
small portion of the total embeddings.
......@@ -184,8 +184,8 @@ class SparseAdagrad(DistSparseGradOptimizer):
Parameters
----------
params : list[dgl.distributed.nn.NodeEmbedding]
The list of dgl.distributed.nn.NodeEmbedding.
params : list[dgl.distributed.DistEmbedding]
The list of dgl.distributed.DistEmbedding.
lr : float
The learning rate.
eps : float, Optional
......@@ -198,8 +198,8 @@ class SparseAdagrad(DistSparseGradOptimizer):
# We need to register a state sum for each embedding in the kvstore.
self._state = {}
for emb in params:
assert isinstance(emb, NodeEmbedding), \
'SparseAdagrad only supports dgl.distributed.nn.NodeEmbedding'
assert isinstance(emb, DistEmbedding), \
'SparseAdagrad only supports dgl.distributed.DistEmbedding'
name = emb.name + "_sum"
state = DistTensor((emb.num_embeddings, emb.embedding_dim), th.float32, name,
......@@ -219,7 +219,7 @@ class SparseAdagrad(DistSparseGradOptimizer):
Index of the embeddings to be updated.
grad : tensor
Gradient of each embedding.
emb : dgl.distributed.nn.NodeEmbedding
emb : dgl.distributed.DistEmbedding
Sparse embedding to update.
"""
eps = self._eps
......@@ -247,7 +247,7 @@ class SparseAdam(DistSparseGradOptimizer):
r''' Distributed Node embedding optimizer using the Adam algorithm.
This optimizer implements a distributed sparse version of Adam algorithm for
optimizing :class:`dgl.distributed.nn.NodeEmbedding`. Being sparse means it only updates
optimizing :class:`dgl.distributed.DistEmbedding`. Being sparse means it only updates
the embeddings whose gradients have updates, which are usually a very
small portion of the total embeddings.
......@@ -263,8 +263,8 @@ class SparseAdam(DistSparseGradOptimizer):
Parameters
----------
params : list[dgl.distributed.nn.NodeEmbedding]
The list of dgl.distributed.nn.NodeEmbedding.
params : list[dgl.distributed.DistEmbedding]
The list of dgl.distributed.DistEmbedding.
lr : float
The learning rate.
betas : tuple[float, float], Optional
......@@ -282,8 +282,8 @@ class SparseAdam(DistSparseGradOptimizer):
self._beta2 = betas[1]
self._state = {}
for emb in params:
assert isinstance(emb, NodeEmbedding), \
'SparseAdam only supports dgl.distributed.nn.NodeEmbedding'
assert isinstance(emb, DistEmbedding), \
'SparseAdam only supports dgl.distributed.DistEmbedding'
state_step = DistTensor((emb.num_embeddings,),
th.float32, emb.name + "_step",
......@@ -316,7 +316,7 @@ class SparseAdam(DistSparseGradOptimizer):
Index of the embeddings to be updated.
grad : tensor
Gradient of each embedding.
emb : dgl.distributed.nn.NodeEmbedding
emb : dgl.distributed.DistEmbedding
Sparse embedding to update.
"""
beta1 = self._beta1
......
"""Define sparse embedding and optimizer."""
from .. import backend as F
from .. import utils
from .dist_tensor import DistTensor
class DistEmbedding:
'''Distributed embeddings.
DGL provides a distributed embedding to support models that require learnable embeddings.
DGL's distributed embeddings are mainly used for learning node embeddings of graph models.
Because distributed embeddings are part of a model, they are updated by mini-batches.
The distributed embeddings have to be updated by DGL's optimizers instead of
the optimizers provided by the deep learning frameworks (e.g., Pytorch and MXNet).
To support efficient training on a graph with many nodes, the embeddings support sparse
updates. That is, only the embeddings involved in a mini-batch computation are updated.
Currently, DGL provides only one optimizer: `SparseAdagrad`. DGL will provide more
optimizers in the future.
Distributed embeddings are sharded and stored in a cluster of machines in the same way as
py:meth:`dgl.distributed.DistTensor`, except that distributed embeddings are trainable.
Because distributed embeddings are sharded
in the same way as nodes and edges of a distributed graph, it is usually much more
efficient to access than the sparse embeddings provided by the deep learning frameworks.
DEPRECATED: Please use dgl.distributed.nn.NodeEmbedding instead.
Parameters
----------
num_embeddings : int
The number of embeddings. Currently, the number of embeddings has to be the same as
the number of nodes or the number of edges.
embedding_dim : int
The dimension size of embeddings.
name : str, optional
The name of the embeddings. The name can uniquely identify embeddings in a system
so that another DistEmbedding object can referent to the embeddings.
init_func : callable, optional
The function to create the initial data. If the init function is not provided,
the values of the embeddings are initialized to zero.
part_policy : PartitionPolicy, optional
The partition policy that assigns embeddings to different machines in the cluster.
Currently, it only supports node partition policy or edge partition policy.
The system determines the right partition policy automatically.
Examples
--------
>>> def initializer(shape, dtype):
arr = th.zeros(shape, dtype=dtype)
arr.uniform_(-1, 1)
return arr
>>> emb = dgl.distributed.DistEmbedding(g.number_of_nodes(), 10, init_func=initializer)
>>> optimizer = dgl.distributed.SparseAdagrad([emb], lr=0.001)
>>> for blocks in dataloader:
... feats = emb(nids)
... loss = F.sum(feats + 1, 0)
... loss.backward()
... optimizer.step()
Note
----
When a ``DistEmbedding`` object is used when the deep learning framework is recording
the forward computation, users have to invoke py:meth:`~dgl.distributed.SparseAdagrad.step`
afterwards. Otherwise, there will be some memory leak.
'''
def __init__(self, num_embeddings, embedding_dim, name=None,
init_func=None, part_policy=None):
self._tensor = DistTensor((num_embeddings, embedding_dim), F.float32, name,
init_func, part_policy)
self._trace = []
def __call__(self, idx):
idx = utils.toindex(idx).tousertensor()
emb = self._tensor[idx]
if F.is_recording():
emb = F.attach_grad(emb)
self._trace.append((idx, emb))
return emb
def reset_trace(self):
'''Reset the traced data.
'''
self._trace = []
class SparseAdagradUDF:
''' The UDF to update the embeddings with sparse Adagrad.
Parameters
----------
lr : float
The learning rate.
'''
def __init__(self, lr):
self._lr = lr
def __call__(self, data_store, name, indices, data):
''' Update the embeddings with sparse Adagrad.
This function runs on the KVStore server. It updates the gradients by scaling them
according to the state sum.
Parameters
----------
data_store : dict of data
all data in the kvstore.
name : str
data name
indices : tensor
the indices in the local tensor.
data : tensor (mx.ndarray or torch.tensor)
a tensor with the same row size of id
'''
grad_indices = indices
grad_values = data
embs = data_store[name]
state_sum = data_store[name + "_sum"]
with F.no_grad():
grad_sum = F.mean(grad_values * grad_values, 1)
F.index_add_inplace(state_sum, grad_indices, grad_sum)
std = state_sum[grad_indices] # _sparse_mask
std_values = F.unsqueeze((F.sqrt(std) + 1e-10), 1)
F.index_add_inplace(embs, grad_indices, grad_values / std_values * (-self._lr))
def _init_state(shape, dtype):
return F.zeros(shape, dtype, F.cpu())
class SparseAdagrad:
r''' The sparse Adagrad optimizer.
This optimizer implements a lightweight version of Adagrad algorithm for optimizing
:func:`dgl.distributed.DistEmbedding`. In each mini-batch, it only updates the embeddings
involved in the mini-batch to support efficient training on a graph with many
nodes and edges.
Adagrad maintains a :math:`G_{t,i,j}` for every parameter in the embeddings, where
:math:`G_{t,i,j}=G_{t-1,i,j} + g_{t,i,j}^2` and :math:`g_{t,i,j}` is the gradient of
the dimension :math:`j` of embedding :math:`i` at step :math:`t`.
Instead of maintaining :math:`G_{t,i,j}`, this implementation maintains :math:`G_{t,i}`
for every embedding :math:`i`:
.. math::
G_{t,i}=G_{t-1,i}+ \frac{1}{p} \sum_{0 \le j \lt p}g_{t,i,j}^2
where :math:`p` is the dimension size of an embedding.
The benefit of the implementation is that it consumes much smaller memory and runs
much faster if users' model requires learnable embeddings for nodes or edges.
Parameters
----------
params : list of DistEmbeddings
The list of distributed embeddings.
lr : float
The learning rate.
'''
def __init__(self, params, lr):
self._params = params
self._lr = lr
self._clean_grad = False
# We need to register a state sum for each embedding in the kvstore.
for emb in params:
assert isinstance(emb, DistEmbedding), 'SparseAdagrad only supports DistEmbeding'
name = emb._tensor.name
kvstore = emb._tensor.kvstore
policy = emb._tensor.part_policy
kvstore.init_data(name + "_sum",
(emb._tensor.shape[0],), emb._tensor.dtype,
policy, _init_state)
kvstore.register_push_handler(name, SparseAdagradUDF(self._lr))
def step(self):
''' The step function.
The step function is invoked at the end of every batch to push the gradients
of the embeddings involved in a mini-batch to DGL's servers and update the embeddings.
'''
with F.no_grad():
for emb in self._params:
name = emb._tensor.name
kvstore = emb._tensor.kvstore
trace = emb._trace
if len(trace) == 1:
kvstore.push(name, trace[0][0], F.grad(trace[0][1]))
else:
# TODO(zhengda) we need to merge the gradients of the same embeddings first.
idxs = [t[0] for t in trace]
grads = [F.grad(t[1]) for t in trace]
idxs = F.cat(idxs, 0)
# Here let's adjust the gradients with the learning rate first.
# We'll need to scale them with the state sum on the kvstore server
# after we push them.
grads = F.cat(grads, 0)
kvstore.push(name, idxs, grads)
if self._clean_grad:
# clean gradient track
for emb in self._params:
emb.reset_trace()
self._clean_grad = False
def zero_grad(self):
"""clean grad cache
"""
self._clean_grad = True
......@@ -43,6 +43,7 @@ class KVClient(object):
def add_data(self, name, tensor, part_policy):
'''add data to the client'''
self._data[name] = tensor
self._gdata_name_list.add(name)
if part_policy.policy_str not in self._all_possible_part_policy:
self._all_possible_part_policy[part_policy.policy_str] = part_policy
......
......@@ -172,10 +172,10 @@ def run_client_hierarchy(graph_name, part_id, server_count, node_mask, edge_mask
def check_dist_emb(g, num_clients, num_nodes, num_edges):
from dgl.distributed.optim import SparseAdagrad
from dgl.distributed.nn import NodeEmbedding
from dgl.distributed import DistEmbedding
# Test sparse emb
try:
emb = NodeEmbedding(g.number_of_nodes(), 1, 'emb1', emb_init)
emb = DistEmbedding(g.number_of_nodes(), 1, 'emb1', emb_init)
nids = F.arange(0, int(g.number_of_nodes()))
lr = 0.001
optimizer = SparseAdagrad([emb], lr=lr)
......@@ -199,7 +199,7 @@ def check_dist_emb(g, num_clients, num_nodes, num_edges):
assert np.all(F.asnumpy(grad_sum[nids]) == np.ones((len(nids), 1)) * num_clients)
assert np.all(F.asnumpy(grad_sum[rest]) == np.zeros((len(rest), 1)))
emb = NodeEmbedding(g.number_of_nodes(), 1, 'emb2', emb_init)
emb = DistEmbedding(g.number_of_nodes(), 1, 'emb2', emb_init)
with F.no_grad():
feats1 = emb(nids)
assert np.all(F.asnumpy(feats1) == 0)
......@@ -587,8 +587,8 @@ def test_server_client():
check_server_client(False, 2, 2)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support distributed NodeEmbedding")
@unittest.skipIf(dgl.backend.backend_name == "mxnet", reason="Mxnet doesn't support distributed NodeEmbedding")
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support distributed DistEmbedding")
@unittest.skipIf(dgl.backend.backend_name == "mxnet", reason="Mxnet doesn't support distributed DistEmbedding")
def test_dist_emb_server_client():
os.environ['DGL_DIST_MODE'] = 'distributed'
check_dist_emb_server_client(True, 1, 1)
......@@ -615,8 +615,8 @@ def test_standalone():
print(e)
dgl.distributed.exit_client() # this is needed since there's two test here in one process
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support distributed NodeEmbedding")
@unittest.skipIf(dgl.backend.backend_name == "mxnet", reason="Mxnet doesn't support distributed NodeEmbedding")
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support distributed DistEmbedding")
@unittest.skipIf(dgl.backend.backend_name == "mxnet", reason="Mxnet doesn't support distributed DistEmbedding")
def test_standalone_node_emb():
os.environ['DGL_DIST_MODE'] = 'standalone'
......
......@@ -16,7 +16,7 @@ import backend as F
import unittest
import pickle
import random
from dgl.distributed.nn import NodeEmbedding
from dgl.distributed import DistEmbedding
from dgl.distributed.optim import SparseAdagrad, SparseAdam
def create_random_graph(n):
......@@ -78,8 +78,8 @@ def run_client(graph_name, cli_id, part_id, server_count):
policy = dgl.distributed.PartitionPolicy('node', g.get_partition_book())
num_nodes = g.number_of_nodes()
emb_dim = 4
dgl_emb = NodeEmbedding(num_nodes, emb_dim, name='optim', init_func=initializer, part_policy=policy)
dgl_emb_zero = NodeEmbedding(num_nodes, emb_dim, name='optim-zero', init_func=initializer, part_policy=policy)
dgl_emb = DistEmbedding(num_nodes, emb_dim, name='optim', init_func=initializer, part_policy=policy)
dgl_emb_zero = DistEmbedding(num_nodes, emb_dim, name='optim-zero', init_func=initializer, part_policy=policy)
dgl_adam = SparseAdam(params=[dgl_emb, dgl_emb_zero], lr=0.01)
dgl_adam._world_size = 1
dgl_adam._rank = 0
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment