Unverified Commit a7e941c3 authored by xiang song(charlie.song)'s avatar xiang song(charlie.song) Committed by GitHub
Browse files

[Feature] Add support for sparse embedding (#2451)



* Add sparse embedding for dgl and update rgcn example

* upd

* Fix

* Revert "Fix"

This reverts commit 4da87cdfb8b8c3506b7fc7376cd2385ba8045c2a.

* Fix

* upd

* upd

* Fix

* Add unitest and update impl

* fix

* Clean up rgcn example code

* upd

* upd

* update

* Fix

* update score

* sparse for sage

* remove model sparse

* upd

* upd

* remove global norm

* revert delete model_sparse.py

* update according to comments

* Fix doc

* upd

* Fix test

* upd

* lint

* lint

* lint

* upd

* upd

* clean up
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-56-220.ec2.internal>
parent 362f72cb
.. _apioptim:
dgl.optim
=========
.. automodule:: dgl.optim
Node embedding optimizer
-------------------------
.. currentmodule:: dgl.optim.pytorch
.. autoclass:: SparseAdagrad
.. autoclass:: SparseAdam
\ No newline at end of file
...@@ -268,3 +268,9 @@ SegmentedKNNGraph ...@@ -268,3 +268,9 @@ SegmentedKNNGraph
:members: :members:
:show-inheritance: :show-inheritance:
NodeEmbedding
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autoclass:: dgl.nn.pytorch.sparse_emb.NodeEmbedding
:members:
:show-inheritance:
...@@ -43,32 +43,32 @@ AIFB: accuracy avg(5 runs) 90.56%, best 94.44% (DGL) ...@@ -43,32 +43,32 @@ AIFB: accuracy avg(5 runs) 90.56%, best 94.44% (DGL)
python3 entity_classify_mp.py -d aifb --testing --gpu 0 --fanout='20,20' --batch-size 128 python3 entity_classify_mp.py -d aifb --testing --gpu 0 --fanout='20,20' --batch-size 128
``` ```
MUTAG: accuracy avg(5 runs) 66.77%, best 69.12% (DGL) MUTAG: accuracy avg(5 runs) 70.00%, best 73.53% (DGL)
``` ```
python3 entity_classify_mp.py -d mutag --l2norm 5e-4 --n-bases 30 --testing --gpu 0 --batch-size 256 --use-self-loop --n-epochs 40 python3 entity_classify_mp.py -d mutag --l2norm 5e-4 --n-bases 30 --testing --gpu 0 --batch-size 64 --fanout '50,40' --use-self-loop --dgl-sparse --n-epochs 30 --sparse-lr 0.01 --dropout 0.7
``` ```
BGS: accuracy avg(5 runs) 91.72%, best 96.55% (DGL) BGS: accuracy avg(5 runs) 84.83%, best 89.66% (DGL)
``` ```
python3 entity_classify_mp.py -d bgs --l2norm 5e-4 --n-bases 40 --testing --gpu 0 --fanout '40,40' --n-epochs=40 --batch-size=128 python3 entity_classify_mp.py -d bgs --l2norm 5e-4 --n-bases 40 --testing --gpu 0 --fanout '50,40' --n-epochs=20 --batch-size=32 --dgl-sparse --lr 0.01 --sparse-lr 0.01 --dropout 0.3
``` ```
AM: accuracy avg(5 runs) 88.28%, best 90.40% (DGL) AM: accuracy avg(5 runs) 88.59%, best 88.89% (DGL)
``` ```
python3 entity_classify_mp.py -d am --l2norm 5e-4 --n-bases 40 --testing --gpu 0 --fanout '35,35' --batch-size 256 --lr 1e-2 --n-hidden 16 --use-self-loop --n-epochs=40 python3 entity_classify_mp.py -d am --l2norm 5e-4 --n-bases 40 --testing --gpu 0 --fanout '35,35' --batch-size 64 --n-hidden 16 --use-self-loop --n-epochs=20 --dgl-sparse --lr 0.01 --sparse-lr 0.02 --dropout 0.7
``` ```
### Entity Classification on OGBN-MAG ### Entity Classification on OGBN-MAG
Test-bd: P3-8xlarge Test-bd: P3-8xlarge
OGBN-MAG accuracy 46.22 OGBN-MAG accuracy 45.5 (3 runs)
``` ```
python3 entity_classify_mp.py -d ogbn-mag --testing --fanout='25,30' --batch-size 512 --n-hidden 64 --lr 0.01 --num-worker 0 --eval-batch-size 8 --low-mem --gpu 0,1,2,3,4,5,6,7 --dropout 0.5 --use-self-loop --n-bases 2 --n-epochs 3 --mix-cpu-gpu --node-feats python3 entity_classify_mp.py -d ogbn-mag --testing --fanout='30,30' --batch-size 1024 --n-hidden 128 --lr 0.01 --num-worker 4 --eval-batch-size 8 --low-mem --gpu 0,1,2,3 --dropout 0.7 --use-self-loop --n-bases 2 --n-epochs 3 --node-feats --dgl-sparse --sparse-lr 0.08
``` ```
OGBN-MAG without node-feats 43.63 OGBN-MAG without node-feats 42.79
``` ```
python3 entity_classify_mp.py -d ogbn-mag --testing --fanout='25,25' --batch-size 256 --n-hidden 64 --lr 0.01 --num-worker 0 --eval-batch-size 8 --low-mem --gpu 0,1,2,3,4,5,6,7 --dropout 0.5 --use-self-loop --n-bases 2 --n-epochs 3 --mix-cpu-gpu --layer-norm python3 entity_classify_mp.py -d ogbn-mag --testing --fanout='30,30' --batch-size 1024 --n-hidden 128 --lr 0.01 --num-worker 4 --eval-batch-size 8 --low-mem --gpu 0,1,2,3 --dropout 0.7 --use-self-loop --n-bases 2 --n-epochs 3 --dgl-sparse --sparse-lr 0.0
``` ```
Test-bd: P2-8xlarge Test-bd: P2-8xlarge
......
...@@ -6,11 +6,9 @@ Difference compared to tkipf/relation-gcn ...@@ -6,11 +6,9 @@ Difference compared to tkipf/relation-gcn
* l2norm applied to all weights * l2norm applied to all weights
* remove nodes that won't be touched * remove nodes that won't be touched
""" """
import argparse import argparse, gc
import itertools
import numpy as np import numpy as np
import time import time
import gc
import torch as th import torch as th
import torch.nn as nn import torch.nn as nn
import torch.nn.functional as F import torch.nn.functional as F
...@@ -44,17 +42,25 @@ class EntityClassify(nn.Module): ...@@ -44,17 +42,25 @@ class EntityClassify(nn.Module):
Output dim size. Output dim size.
num_rels : int num_rels : int
Numer of relation types. Numer of relation types.
num_bases : int num_bases : int, optional
Number of bases. If is none, use number of relations. Number of bases. If is none, use number of relations.
num_hidden_layers : int Default None
num_hidden_layers : int, optional
Number of hidden RelGraphConv Layer Number of hidden RelGraphConv Layer
dropout : float Default 1
Dropout dropout : float, optional
use_self_loop : bool Dropout.
Use self loop if True, default False. Default 0
low_mem : bool use_self_loop : bool, optional
Use self loop if True.
Default True
low_mem : bool, optional
True to use low memory implementation of relation message passing function True to use low memory implementation of relation message passing function
trade speed with memory consumption trade speed with memory consumption
Default True
layer_norm : bool, optional
True to use layer norm.
Default False
""" """
def __init__(self, def __init__(self,
device, device,
...@@ -66,7 +72,7 @@ class EntityClassify(nn.Module): ...@@ -66,7 +72,7 @@ class EntityClassify(nn.Module):
num_hidden_layers=1, num_hidden_layers=1,
dropout=0, dropout=0,
use_self_loop=False, use_self_loop=False,
low_mem=False, low_mem=True,
layer_norm=False): layer_norm=False):
super(EntityClassify, self).__init__() super(EntityClassify, self).__init__()
self.device = th.device(device if device >= 0 else 'cpu') self.device = th.device(device if device >= 0 else 'cpu')
...@@ -110,6 +116,14 @@ class EntityClassify(nn.Module): ...@@ -110,6 +116,14 @@ class EntityClassify(nn.Module):
h = layer(block, h, block.edata['etype'], block.edata['norm']) h = layer(block, h, block.edata['etype'], block.edata['norm'])
return h return h
def gen_norm(g):
_, v, eid = g.all_edges(form='all')
_, inverse_index, count = th.unique(v, return_inverse=True, return_counts=True)
degrees = count[inverse_index]
norm = th.ones(eid.shape[0], device=eid.device) / degrees
norm = norm.unsqueeze(1)
g.edata['norm'] = norm
class NeighborSampler: class NeighborSampler:
"""Neighbor sampler """Neighbor sampler
Parameters Parameters
...@@ -151,11 +165,8 @@ class NeighborSampler: ...@@ -151,11 +165,8 @@ class NeighborSampler:
frontier = dgl.in_subgraph(self.g, cur) frontier = dgl.in_subgraph(self.g, cur)
else: else:
frontier = dgl.sampling.sample_neighbors(self.g, cur, fanout) frontier = dgl.sampling.sample_neighbors(self.g, cur, fanout)
etypes = self.g.edata[dgl.ETYPE][frontier.edata[dgl.EID]]
block = dgl.to_block(frontier, cur) block = dgl.to_block(frontier, cur)
block.srcdata[dgl.NTYPE] = self.g.ndata[dgl.NTYPE][block.srcdata[dgl.NID]] gen_norm(block)
block.srcdata['type_id'] = self.g.ndata[dgl.NID][block.srcdata[dgl.NID]]
block.edata['etype'] = etypes
cur = block.srcdata[dgl.NID] cur = block.srcdata[dgl.NID]
blocks.insert(0, block) blocks.insert(0, block)
return seeds, blocks return seeds, blocks
...@@ -167,24 +178,25 @@ def evaluate(model, embed_layer, eval_loader, node_feats): ...@@ -167,24 +178,25 @@ def evaluate(model, embed_layer, eval_loader, node_feats):
eval_seeds = [] eval_seeds = []
with th.no_grad(): with th.no_grad():
for sample_data in tqdm.tqdm(eval_loader):
th.cuda.empty_cache() th.cuda.empty_cache()
for sample_data in tqdm.tqdm(eval_loader):
seeds, blocks = sample_data seeds, blocks = sample_data
feats = embed_layer(blocks[0].srcdata[dgl.NID], feats = embed_layer(blocks[0].srcdata[dgl.NID],
blocks[0].srcdata[dgl.NTYPE], blocks[0].srcdata['ntype'],
blocks[0].srcdata['type_id'], blocks[0].srcdata['type_id'],
node_feats) node_feats)
logits = model(blocks, feats) logits = model(blocks, feats)
eval_logits.append(logits.cpu().detach()) eval_logits.append(logits.cpu().detach())
eval_seeds.append(seeds.cpu().detach()) eval_seeds.append(seeds.cpu().detach())
eval_logits = th.cat(eval_logits) eval_logits = th.cat(eval_logits)
eval_seeds = th.cat(eval_seeds) eval_seeds = th.cat(eval_seeds)
return eval_logits, eval_seeds return eval_logits, eval_seeds
@thread_wrapped_func @thread_wrapped_func
def run(proc_id, n_gpus, args, devices, dataset, split, queue=None): def run(proc_id, n_gpus, n_cpus, args, devices, dataset, split, queue=None):
dev_id = devices[proc_id] if devices[proc_id] != 'cpu' else -1 dev_id = devices[proc_id] if devices[proc_id] != 'cpu' else -1
g, node_feats, num_of_ntype, num_classes, num_rels, target_idx, \ g, node_feats, num_of_ntype, num_classes, num_rels, target_idx, \
train_idx, val_idx, test_idx, labels = dataset train_idx, val_idx, test_idx, labels = dataset
...@@ -204,14 +216,14 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None): ...@@ -204,14 +216,14 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None):
num_workers=args.num_workers) num_workers=args.num_workers)
# validation sampler # validation sampler
val_sampler = NeighborSampler(g, target_idx, [None] * args.n_layers) val_sampler = NeighborSampler(g, target_idx, fanouts)
val_loader = DataLoader(dataset=val_idx.numpy(), val_loader = DataLoader(dataset=val_idx.numpy(),
batch_size=args.eval_batch_size, batch_size=args.batch_size,
collate_fn=val_sampler.sample_blocks, collate_fn=val_sampler.sample_blocks,
shuffle=False, shuffle=False,
num_workers=args.num_workers) num_workers=args.num_workers)
# validation sampler # test sampler
test_sampler = NeighborSampler(g, target_idx, [None] * args.n_layers) test_sampler = NeighborSampler(g, target_idx, [None] * args.n_layers)
test_loader = DataLoader(dataset=test_idx.numpy(), test_loader = DataLoader(dataset=test_idx.numpy(),
batch_size=args.eval_batch_size, batch_size=args.eval_batch_size,
...@@ -219,15 +231,16 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None): ...@@ -219,15 +231,16 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None):
shuffle=False, shuffle=False,
num_workers=args.num_workers) num_workers=args.num_workers)
world_size = n_gpus
if n_gpus > 1: if n_gpus > 1:
dist_init_method = 'tcp://{master_ip}:{master_port}'.format( dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
master_ip='127.0.0.1', master_port='12345') master_ip='127.0.0.1', master_port='12345')
world_size = n_gpus
backend = 'nccl' backend = 'nccl'
# using sparse embedding or usig mix_cpu_gpu model (embedding model can not be stored in GPU) # using sparse embedding or usig mix_cpu_gpu model (embedding model can not be stored in GPU)
if args.sparse_embedding or args.mix_cpu_gpu: if args.dgl_sparse is False:
backend = 'gloo' backend = 'gloo'
print("backend using {}".format(backend))
th.distributed.init_process_group(backend=backend, th.distributed.init_process_group(backend=backend,
init_method=dist_init_method, init_method=dist_init_method,
world_size=world_size, world_size=world_size,
...@@ -242,7 +255,7 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None): ...@@ -242,7 +255,7 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None):
num_of_ntype, num_of_ntype,
node_feats, node_feats,
args.n_hidden, args.n_hidden,
sparse_emb=args.sparse_embedding) dgl_sparse=args.dgl_sparse)
# create model # create model
# all model params are in device. # all model params are in device.
...@@ -262,22 +275,23 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None): ...@@ -262,22 +275,23 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None):
th.cuda.set_device(dev_id) th.cuda.set_device(dev_id)
labels = labels.to(dev_id) labels = labels.to(dev_id)
model.cuda(dev_id) model.cuda(dev_id)
# embedding layer may not fit into GPU, then use mix_cpu_gpu # with dgl_sparse emb, only node embedding is not in GPU
if args.mix_cpu_gpu is False: if args.dgl_sparse:
embed_layer.cuda(dev_id) embed_layer.cuda(dev_id)
if n_gpus > 1: if n_gpus > 1:
labels = labels.to(dev_id) labels = labels.to(dev_id)
model.cuda(dev_id) model.cuda(dev_id)
if args.mix_cpu_gpu: model = DistributedDataParallel(model, device_ids=[dev_id], output_device=dev_id)
embed_layer = DistributedDataParallel(embed_layer, device_ids=None, output_device=None) if args.dgl_sparse:
else:
embed_layer.cuda(dev_id) embed_layer.cuda(dev_id)
if len(list(embed_layer.parameters())) > 0:
embed_layer = DistributedDataParallel(embed_layer, device_ids=[dev_id], output_device=dev_id) embed_layer = DistributedDataParallel(embed_layer, device_ids=[dev_id], output_device=dev_id)
model = DistributedDataParallel(model, device_ids=[dev_id], output_device=dev_id) else:
if len(list(embed_layer.parameters())) > 0:
embed_layer = DistributedDataParallel(embed_layer, device_ids=None, output_device=None)
# optimizer # optimizer
if args.sparse_embedding:
dense_params = list(model.parameters()) dense_params = list(model.parameters())
if args.node_feats: if args.node_feats:
if n_gpus > 1: if n_gpus > 1:
...@@ -285,20 +299,35 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None): ...@@ -285,20 +299,35 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None):
else: else:
dense_params += list(embed_layer.embeds.parameters()) dense_params += list(embed_layer.embeds.parameters())
optimizer = th.optim.Adam(dense_params, lr=args.lr, weight_decay=args.l2norm) optimizer = th.optim.Adam(dense_params, lr=args.lr, weight_decay=args.l2norm)
if n_gpus > 1:
emb_optimizer = th.optim.SparseAdam(embed_layer.module.node_embeds.parameters(), lr=args.lr) if args.dgl_sparse:
else:
emb_optimizer = th.optim.SparseAdam(embed_layer.node_embeds.parameters(), lr=args.lr)
else:
all_params = list(model.parameters()) + list(embed_layer.parameters()) all_params = list(model.parameters()) + list(embed_layer.parameters())
optimizer = th.optim.Adam(all_params, lr=args.lr, weight_decay=args.l2norm) optimizer = th.optim.Adam(all_params, lr=args.lr, weight_decay=args.l2norm)
if n_gpus > 1 and isinstance(embed_layer, DistributedDataParallel):
dgl_emb = embed_layer.module.dgl_emb
else:
dgl_emb = embed_layer.dgl_emb
emb_optimizer = dgl.optim.SparseAdam(params=dgl_emb, lr=args.sparse_lr, eps=1e-8) if len(dgl_emb) > 0 else None
else:
if n_gpus > 1:
embs = list(embed_layer.module.node_embeds.parameters())
else:
embs = list(embed_layer.node_embeds.parameters())
emb_optimizer = th.optim.SparseAdam(embs, lr=args.sparse_lr) if len(embs) > 0 else None
# training loop # training loop
print("start training...") print("start training...")
forward_time = [] forward_time = []
backward_time = [] backward_time = []
train_time = 0
validation_time = 0
test_time = 0
last_val_acc = 0.0
if n_gpus > 1 and n_cpus - args.num_workers > 0:
th.set_num_threads(n_cpus-args.num_workers)
for epoch in range(args.n_epochs): for epoch in range(args.n_epochs):
tstart = time.time()
model.train() model.train()
embed_layer.train() embed_layer.train()
...@@ -306,20 +335,20 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None): ...@@ -306,20 +335,20 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None):
seeds, blocks = sample_data seeds, blocks = sample_data
t0 = time.time() t0 = time.time()
feats = embed_layer(blocks[0].srcdata[dgl.NID], feats = embed_layer(blocks[0].srcdata[dgl.NID],
blocks[0].srcdata[dgl.NTYPE], blocks[0].srcdata['ntype'],
blocks[0].srcdata['type_id'], blocks[0].srcdata['type_id'],
node_feats) node_feats)
logits = model(blocks, feats) logits = model(blocks, feats)
loss = F.cross_entropy(logits, labels[seeds]) loss = F.cross_entropy(logits, labels[seeds])
t1 = time.time() t1 = time.time()
optimizer.zero_grad() optimizer.zero_grad()
if args.sparse_embedding: if emb_optimizer is not None:
emb_optimizer.zero_grad() emb_optimizer.zero_grad()
loss.backward() loss.backward()
optimizer.step() if emb_optimizer is not None:
if args.sparse_embedding:
emb_optimizer.step() emb_optimizer.step()
optimizer.step()
t2 = time.time() t2 = time.time()
forward_time.append(t1 - t0) forward_time.append(t1 - t0)
...@@ -328,9 +357,28 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None): ...@@ -328,9 +357,28 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None):
if i % 100 and proc_id == 0: if i % 100 and proc_id == 0:
print("Train Accuracy: {:.4f} | Train Loss: {:.4f}". print("Train Accuracy: {:.4f} | Train Loss: {:.4f}".
format(train_acc, loss.item())) format(train_acc, loss.item()))
gc.collect()
print("Epoch {:05d}:{:05d} | Train Forward Time(s) {:.4f} | Backward Time(s) {:.4f}". print("Epoch {:05d}:{:05d} | Train Forward Time(s) {:.4f} | Backward Time(s) {:.4f}".
format(epoch, i, forward_time[-1], backward_time[-1])) format(epoch, args.n_epochs, forward_time[-1], backward_time[-1]))
tend = time.time()
train_time += (tend - tstart)
def collect_eval():
eval_logits = []
eval_seeds = []
for i in range(n_gpus):
log = queue.get()
eval_l, eval_s = log
eval_logits.append(eval_l)
eval_seeds.append(eval_s)
eval_logits = th.cat(eval_logits)
eval_seeds = th.cat(eval_seeds)
eval_loss = F.cross_entropy(eval_logits, labels[eval_seeds].cpu()).item()
eval_acc = th.sum(eval_logits.argmax(dim=1) == labels[eval_seeds].cpu()).item() / len(eval_seeds)
return eval_loss, eval_acc
vstart = time.time()
if (queue is not None) or (proc_id == 0): if (queue is not None) or (proc_id == 0):
val_logits, val_seeds = evaluate(model, embed_layer, val_loader, node_feats) val_logits, val_seeds = evaluate(model, embed_layer, val_loader, node_feats)
if queue is not None: if queue is not None:
...@@ -338,25 +386,27 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None): ...@@ -338,25 +386,27 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None):
# gather evaluation result from multiple processes # gather evaluation result from multiple processes
if proc_id == 0: if proc_id == 0:
if queue is not None: val_loss, val_acc = collect_eval() if queue is not None else \
val_logits = [] (F.cross_entropy(val_logits, labels[val_seeds].cpu()).item(), \
val_seeds = [] th.sum(val_logits.argmax(dim=1) == labels[val_seeds].cpu()).item() / len(val_seeds))
for i in range(n_gpus):
log = queue.get()
val_l, val_s = log
val_logits.append(val_l)
val_seeds.append(val_s)
val_logits = th.cat(val_logits)
val_seeds = th.cat(val_seeds)
val_loss = F.cross_entropy(val_logits, labels[val_seeds].cpu()).item()
val_acc = th.sum(val_logits.argmax(dim=1) == labels[val_seeds].cpu()).item() / len(val_seeds)
do_test = val_acc > last_val_acc
last_val_acc = val_acc
print("Validation Accuracy: {:.4f} | Validation loss: {:.4f}". print("Validation Accuracy: {:.4f} | Validation loss: {:.4f}".
format(val_acc, val_loss)) format(val_acc, val_loss))
if n_gpus > 1: if n_gpus > 1:
th.distributed.barrier() th.distributed.barrier()
if proc_id == 0:
for i in range(1, n_gpus):
queue.put(do_test)
else:
do_test = queue.get()
# only process 0 will do the evaluation vend = time.time()
validation_time += (vend - vstart)
if (epoch + 1) > (args.n_epochs / 2) and do_test:
tstart = time.time()
if (queue is not None) or (proc_id == 0): if (queue is not None) or (proc_id == 0):
test_logits, test_seeds = evaluate(model, embed_layer, test_loader, node_feats) test_logits, test_seeds = evaluate(model, embed_layer, test_loader, node_feats)
if queue is not None: if queue is not None:
...@@ -364,20 +414,13 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None): ...@@ -364,20 +414,13 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None):
# gather evaluation result from multiple processes # gather evaluation result from multiple processes
if proc_id == 0: if proc_id == 0:
if queue is not None: test_loss, test_acc = collect_eval() if queue is not None else \
test_logits = [] (F.cross_entropy(test_logits, labels[test_seeds].cpu()).item(), \
test_seeds = [] th.sum(test_logits.argmax(dim=1) == labels[test_seeds].cpu()).item() / len(test_seeds))
for i in range(n_gpus):
log = queue.get()
test_l, test_s = log
test_logits.append(test_l)
test_seeds.append(test_s)
test_logits = th.cat(test_logits)
test_seeds = th.cat(test_seeds)
test_loss = F.cross_entropy(test_logits, labels[test_seeds].cpu()).item()
test_acc = th.sum(test_logits.argmax(dim=1) == labels[test_seeds].cpu()).item() / len(test_seeds)
print("Test Accuracy: {:.4f} | Test loss: {:.4f}".format(test_acc, test_loss)) print("Test Accuracy: {:.4f} | Test loss: {:.4f}".format(test_acc, test_loss))
print() print()
tend = time.time()
test_time += (tend-tstart)
# sync for test # sync for test
if n_gpus > 1: if n_gpus > 1:
...@@ -387,6 +430,9 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None): ...@@ -387,6 +430,9 @@ def run(proc_id, n_gpus, args, devices, dataset, split, queue=None):
np.mean(forward_time[len(forward_time) // 4:]))) np.mean(forward_time[len(forward_time) // 4:])))
print("{}/{} Mean backward time: {:4f}".format(proc_id, n_gpus, print("{}/{} Mean backward time: {:4f}".format(proc_id, n_gpus,
np.mean(backward_time[len(backward_time) // 4:]))) np.mean(backward_time[len(backward_time) // 4:])))
if proc_id == 0:
print("Test Accuracy: {:.4f} | Test loss: {:.4f}".format(test_acc, test_loss))
print("Train {}s, valid {}s, test {}s".format(train_time, validation_time, test_time))
def main(args, devices): def main(args, devices):
# load graph data # load graph data
...@@ -431,17 +477,6 @@ def main(args, devices): ...@@ -431,17 +477,6 @@ def main(args, devices):
print('Number of valid: {}'.format(len(val_idx))) print('Number of valid: {}'.format(len(val_idx)))
print('Number of test: {}'.format(len(test_idx))) print('Number of test: {}'.format(len(test_idx)))
if args.node_feats:
node_feats = []
for ntype in hg.ntypes:
if len(hg.nodes[ntype].data) == 0:
node_feats.append(None)
else:
assert len(hg.nodes[ntype].data) == 1
feat = hg.nodes[ntype].data.pop('feat')
node_feats.append(feat.share_memory_())
else:
node_feats = [None] * num_of_ntype
else: else:
# Load from hetero-graph # Load from hetero-graph
hg = dataset[0] hg = dataset[0]
...@@ -455,7 +490,6 @@ def main(args, devices): ...@@ -455,7 +490,6 @@ def main(args, devices):
labels = hg.nodes[category].data.pop('labels') labels = hg.nodes[category].data.pop('labels')
train_idx = th.nonzero(train_mask, as_tuple=False).squeeze() train_idx = th.nonzero(train_mask, as_tuple=False).squeeze()
test_idx = th.nonzero(test_mask, as_tuple=False).squeeze() test_idx = th.nonzero(test_mask, as_tuple=False).squeeze()
node_feats = [None] * num_of_ntype
# AIFB, MUTAG, BGS and AM datasets do not provide validation set split. # AIFB, MUTAG, BGS and AM datasets do not provide validation set split.
# Split train set into train and validation if args.validation is set # Split train set into train and validation if args.validation is set
...@@ -466,34 +500,29 @@ def main(args, devices): ...@@ -466,34 +500,29 @@ def main(args, devices):
else: else:
val_idx = train_idx val_idx = train_idx
# calculate norm for each edge type and store in edge node_feats = []
if args.global_norm is False: for ntype in hg.ntypes:
for canonical_etype in hg.canonical_etypes: if len(hg.nodes[ntype].data) == 0 or args.node_feats is False:
u, v, eid = hg.all_edges(form='all', etype=canonical_etype) node_feats.append(hg.number_of_nodes(ntype))
_, inverse_index, count = th.unique(v, return_inverse=True, return_counts=True) else:
degrees = count[inverse_index] assert len(hg.nodes[ntype].data) == 1
norm = th.ones(eid.shape[0]) / degrees feat = hg.nodes[ntype].data.pop('feat')
norm = norm.unsqueeze(1) node_feats.append(feat.share_memory_())
hg.edges[canonical_etype].data['norm'] = norm
# get target category id # get target category id
category_id = len(hg.ntypes) category_id = len(hg.ntypes)
for i, ntype in enumerate(hg.ntypes): for i, ntype in enumerate(hg.ntypes):
if ntype == category: if ntype == category:
category_id = i category_id = i
print('{}:{}'.format(i, ntype))
g = dgl.to_homogeneous(hg, edata=['norm'])
if args.global_norm: g = dgl.to_homogeneous(hg)
u, v, eid = g.all_edges(form='all') g.ndata['ntype'] = g.ndata[dgl.NTYPE]
_, inverse_index, count = th.unique(v, return_inverse=True, return_counts=True) g.ndata['ntype'].share_memory_()
degrees = count[inverse_index] g.edata['etype'] = g.edata[dgl.ETYPE]
norm = th.ones(eid.shape[0]) / degrees g.edata['etype'].share_memory_()
norm = norm.unsqueeze(1) g.ndata['type_id'] = g.ndata[dgl.NID]
g.edata['norm'] = norm g.ndata['type_id'].share_memory_()
g.ndata[dgl.NTYPE].share_memory_()
g.edata[dgl.ETYPE].share_memory_()
g.edata['norm'].share_memory_()
node_ids = th.arange(g.number_of_nodes()) node_ids = th.arange(g.number_of_nodes())
# find out the target node ids # find out the target node ids
...@@ -509,14 +538,15 @@ def main(args, devices): ...@@ -509,14 +538,15 @@ def main(args, devices):
g.create_formats_() g.create_formats_()
n_gpus = len(devices) n_gpus = len(devices)
n_cpus = mp.cpu_count()
# cpu # cpu
if devices[0] == -1: if devices[0] == -1:
run(0, 0, args, ['cpu'], run(0, 0, n_cpus, args, ['cpu'],
(g, node_feats, num_of_ntype, num_classes, num_rels, target_idx, (g, node_feats, num_of_ntype, num_classes, num_rels, target_idx,
train_idx, val_idx, test_idx, labels), None, None) train_idx, val_idx, test_idx, labels), None, None)
# gpu # gpu
elif n_gpus == 1: elif n_gpus == 1:
run(0, n_gpus, args, devices, run(0, n_gpus, n_cpus, args, devices,
(g, node_feats, num_of_ntype, num_classes, num_rels, target_idx, (g, node_feats, num_of_ntype, num_classes, num_rels, target_idx,
train_idx, val_idx, test_idx, labels), None, None) train_idx, val_idx, test_idx, labels), None, None)
# multi gpu # multi gpu
...@@ -547,7 +577,7 @@ def main(args, devices): ...@@ -547,7 +577,7 @@ def main(args, devices):
(proc_id + 1) * tstseeds_per_proc \ (proc_id + 1) * tstseeds_per_proc \
if (proc_id + 1) * tstseeds_per_proc < num_test_seeds \ if (proc_id + 1) * tstseeds_per_proc < num_test_seeds \
else num_test_seeds] else num_test_seeds]
p = mp.Process(target=run, args=(proc_id, n_gpus, args, devices, p = mp.Process(target=run, args=(proc_id, n_gpus, n_cpus // n_gpus, args, devices,
(g, node_feats, num_of_ntype, num_classes, num_rels, target_idx, (g, node_feats, num_of_ntype, num_classes, num_rels, target_idx,
train_idx, val_idx, test_idx, labels), train_idx, val_idx, test_idx, labels),
(proc_train_seeds, proc_valid_seeds, proc_test_seeds), (proc_train_seeds, proc_valid_seeds, proc_test_seeds),
...@@ -568,6 +598,8 @@ def config(): ...@@ -568,6 +598,8 @@ def config():
help="gpu") help="gpu")
parser.add_argument("--lr", type=float, default=1e-2, parser.add_argument("--lr", type=float, default=1e-2,
help="learning rate") help="learning rate")
parser.add_argument("--sparse-lr", type=float, default=2e-2,
help="sparse embedding learning rate")
parser.add_argument("--n-bases", type=int, default=-1, parser.add_argument("--n-bases", type=int, default=-1,
help="number of filter weight matrices, default: -1 [use all]") help="number of filter weight matrices, default: -1 [use all]")
parser.add_argument("--n-layers", type=int, default=2, parser.add_argument("--n-layers", type=int, default=2,
...@@ -578,8 +610,6 @@ def config(): ...@@ -578,8 +610,6 @@ def config():
help="dataset to use") help="dataset to use")
parser.add_argument("--l2norm", type=float, default=0, parser.add_argument("--l2norm", type=float, default=0,
help="l2 norm coef") help="l2 norm coef")
parser.add_argument("--relabel", default=False, action='store_true',
help="remove untouched nodes and relabel")
parser.add_argument("--fanout", type=str, default="4, 4", parser.add_argument("--fanout", type=str, default="4, 4",
help="Fan-out of neighbor sampling.") help="Fan-out of neighbor sampling.")
parser.add_argument("--use-self-loop", default=False, action='store_true', parser.add_argument("--use-self-loop", default=False, action='store_true',
...@@ -589,20 +619,16 @@ def config(): ...@@ -589,20 +619,16 @@ def config():
fp.add_argument('--testing', dest='validation', action='store_false') fp.add_argument('--testing', dest='validation', action='store_false')
parser.add_argument("--batch-size", type=int, default=100, parser.add_argument("--batch-size", type=int, default=100,
help="Mini-batch size. ") help="Mini-batch size. ")
parser.add_argument("--eval-batch-size", type=int, default=128, parser.add_argument("--eval-batch-size", type=int, default=32,
help="Mini-batch size. ") help="Mini-batch size. ")
parser.add_argument("--num-workers", type=int, default=0, parser.add_argument("--num-workers", type=int, default=0,
help="Number of workers for dataloader.") help="Number of workers for dataloader.")
parser.add_argument("--low-mem", default=False, action='store_true', parser.add_argument("--low-mem", default=False, action='store_true',
help="Whether use low mem RelGraphCov") help="Whether use low mem RelGraphCov")
parser.add_argument("--mix-cpu-gpu", default=False, action='store_true', parser.add_argument("--dgl-sparse", default=False, action='store_true',
help="Whether store node embeddins in cpu")
parser.add_argument("--sparse-embedding", action='store_true',
help='Use sparse embedding for node embeddings.') help='Use sparse embedding for node embeddings.')
parser.add_argument('--node-feats', default=False, action='store_true', parser.add_argument('--node-feats', default=False, action='store_true',
help='Whether use node features') help='Whether use node features')
parser.add_argument('--global-norm', default=False, action='store_true',
help='User global norm instead of per node type norm')
parser.add_argument('--layer-norm', default=False, action='store_true', parser.add_argument('--layer-norm', default=False, action='store_true',
help='Use layer norm') help='Use layer norm')
parser.set_defaults(validation=True) parser.set_defaults(validation=True)
......
import torch as th import torch as th
import torch.nn as nn import torch.nn as nn
import dgl
class BaseRGCN(nn.Module): class BaseRGCN(nn.Module):
def __init__(self, num_nodes, h_dim, out_dim, num_rels, num_bases, def __init__(self, num_nodes, h_dim, out_dim, num_rels, num_bases,
num_hidden_layers=1, dropout=0, num_hidden_layers=1, dropout=0,
...@@ -48,6 +50,10 @@ class BaseRGCN(nn.Module): ...@@ -48,6 +50,10 @@ class BaseRGCN(nn.Module):
h = layer(g, h, r, norm) h = layer(g, h, r, norm)
return h return h
def initializer(emb):
emb.uniform_(-1.0, 1.0)
return emb
class RelGraphEmbedLayer(nn.Module): class RelGraphEmbedLayer(nn.Module):
r"""Embedding layer for featureless heterograph. r"""Embedding layer for featureless heterograph.
Parameters Parameters
...@@ -65,8 +71,8 @@ class RelGraphEmbedLayer(nn.Module): ...@@ -65,8 +71,8 @@ class RelGraphEmbedLayer(nn.Module):
treat certain input feature as an one-hot encoding feature. treat certain input feature as an one-hot encoding feature.
embed_size : int embed_size : int
Output embed size Output embed size
embed_name : str, optional dgl_sparse : bool, optional
Embed name If true, use dgl.nn.NodeEmbedding otherwise use torch.nn.Embedding
""" """
def __init__(self, def __init__(self,
dev_id, dev_id,
...@@ -75,29 +81,42 @@ class RelGraphEmbedLayer(nn.Module): ...@@ -75,29 +81,42 @@ class RelGraphEmbedLayer(nn.Module):
num_of_ntype, num_of_ntype,
input_size, input_size,
embed_size, embed_size,
sparse_emb=False, dgl_sparse=False):
embed_name='embed'):
super(RelGraphEmbedLayer, self).__init__() super(RelGraphEmbedLayer, self).__init__()
self.dev_id = th.device(dev_id if dev_id >= 0 else 'cpu') self.dev_id = th.device(dev_id if dev_id >= 0 else 'cpu')
self.embed_size = embed_size self.embed_size = embed_size
self.embed_name = embed_name
self.num_nodes = num_nodes self.num_nodes = num_nodes
self.sparse_emb = sparse_emb self.dgl_sparse = dgl_sparse
# create weight embeddings for each node for each relation # create weight embeddings for each node for each relation
self.embeds = nn.ParameterDict() self.embeds = nn.ParameterDict()
self.node_embeds = {} if dgl_sparse else nn.ModuleDict()
self.num_of_ntype = num_of_ntype self.num_of_ntype = num_of_ntype
self.idmap = th.empty(num_nodes).long()
for ntype in range(num_of_ntype): for ntype in range(num_of_ntype):
if input_size[ntype] is not None: if isinstance(input_size[ntype], int):
if dgl_sparse:
self.node_embeds[str(ntype)] = dgl.nn.NodeEmbedding(input_size[ntype], embed_size, name=str(ntype),
init_func=initializer)
else:
sparse_emb = th.nn.Embedding(input_size[ntype], embed_size, sparse=True)
nn.init.uniform_(sparse_emb.weight, -1.0, 1.0)
self.node_embeds[str(ntype)] = sparse_emb
else:
input_emb_size = input_size[ntype].shape[1] input_emb_size = input_size[ntype].shape[1]
embed = nn.Parameter(th.Tensor(input_emb_size, self.embed_size)) embed = nn.Parameter(th.Tensor(input_emb_size, self.embed_size))
nn.init.xavier_uniform_(embed) nn.init.xavier_uniform_(embed)
self.embeds[str(ntype)] = embed self.embeds[str(ntype)] = embed
self.node_embeds = th.nn.Embedding(node_tids.shape[0], self.embed_size, sparse=self.sparse_emb) @property
nn.init.uniform_(self.node_embeds.weight, -1.0, 1.0) def dgl_emb(self):
"""
"""
if self.dgl_sparse:
embs = [emb for emb in self.node_embeds.values()]
return embs
else:
return []
def forward(self, node_ids, node_tids, type_ids, features): def forward(self, node_ids, node_tids, type_ids, features):
"""Forward computation """Forward computation
...@@ -117,14 +136,16 @@ class RelGraphEmbedLayer(nn.Module): ...@@ -117,14 +136,16 @@ class RelGraphEmbedLayer(nn.Module):
tensor tensor
embeddings as the input of the next layer embeddings as the input of the next layer
""" """
tsd_ids = node_ids.to(self.node_embeds.weight.device) tsd_ids = node_ids.to(self.dev_id)
embeds = th.empty(node_ids.shape[0], self.embed_size, device=self.dev_id) embeds = th.empty(node_ids.shape[0], self.embed_size, device=self.dev_id)
for ntype in range(self.num_of_ntype): for ntype in range(self.num_of_ntype):
if features[ntype] is not None:
loc = node_tids == ntype loc = node_tids == ntype
embeds[loc] = features[ntype][type_ids[loc]].to(self.dev_id) @ self.embeds[str(ntype)].to(self.dev_id) if isinstance(features[ntype], int):
if self.dgl_sparse:
embeds[loc] = self.node_embeds[str(ntype)](type_ids[loc], self.dev_id)
else: else:
loc = node_tids == ntype embeds[loc] = self.node_embeds[str(ntype)](type_ids[loc]).to(self.dev_id)
embeds[loc] = self.node_embeds(tsd_ids[loc]).to(self.dev_id) else:
embeds[loc] = features[ntype][type_ids[loc]].to(self.dev_id) @ self.embeds[str(ntype)].to(self.dev_id)
return embeds return embeds
...@@ -38,6 +38,7 @@ from .transform import * ...@@ -38,6 +38,7 @@ from .transform import *
from .propagate import * from .propagate import *
from .random import * from .random import *
from .data.utils import save_graphs, load_graphs from .data.utils import save_graphs, load_graphs
from . import optim
from ._deprecate.graph import DGLGraph as DGLGraphStale from ._deprecate.graph import DGLGraph as DGLGraphStale
from ._deprecate.nodeflow import * from ._deprecate.nodeflow import *
...@@ -73,7 +73,6 @@ def load_backend(mod_name): ...@@ -73,7 +73,6 @@ def load_backend(mod_name):
else: else:
setattr(thismod, api, _gen_missing_api(api, mod_name)) setattr(thismod, api, _gen_missing_api(api, mod_name))
def get_preferred_backend(): def get_preferred_backend():
config_path = os.path.join(os.path.expanduser('~'), '.dgl', 'config.json') config_path = os.path.join(os.path.expanduser('~'), '.dgl', 'config.json')
backend_name = None backend_name = None
......
...@@ -1615,3 +1615,14 @@ class no_grad(object): ...@@ -1615,3 +1615,14 @@ class no_grad(object):
def __exit__(self, exc_type, exc_value, exc_traceback): def __exit__(self, exc_type, exc_value, exc_traceback):
pass pass
class NodeEmbedding(object):
"""Sparse node embeddings"""
def __init__(self):
pass
def __enter__(self):
pass
def __exit__(self, exc_type, exc_value, exc_traceback):
pass
"""Sparse optimizer is not supported for mxnet"""
\ No newline at end of file
"""Sparse optimizer is not supported for tensorflow"""
\ No newline at end of file
...@@ -5,3 +5,4 @@ from .softmax import * ...@@ -5,3 +5,4 @@ from .softmax import *
from .factory import * from .factory import *
from .hetero import * from .hetero import *
from .utils import Sequential, WeightBasis from .utils import Sequential, WeightBasis
from .sparse_emb import NodeEmbedding
"""Torch NodeEmbedding."""
from datetime import timedelta
import torch as th
from ...backend import pytorch as F
from ...utils import get_shared_mem_array, create_shared_mem_array
_STORE = None
class NodeEmbedding: # NodeEmbedding
'''Class for storing node embeddings.
The class is optimized for training large-scale node embeddings. It updates the embedding in
a sparse way and can scale to graphs with millions of nodes. It also supports partitioning
to multiple GPUs (on a single machine) for more acceleration. It does not support partitioning
across machines.
Currently, DGL provides two optimizers that work with this NodeEmbedding
class: ``SparseAdagrad`` and ``SparseAdam``.
The implementation is based on torch.distributed package. It depends on the pytorch
default distributed process group to collect multi-process information and uses
``torch.distributed.TCPStore`` to share meta-data information across multiple gpu processes.
It use the local address of '127.0.0.1:12346' to initialize the TCPStore.
Parameters
----------
num_embeddings : int
The number of embeddings. Currently, the number of embeddings has to be the same as
the number of nodes.
embedding_dim : int
The dimension size of embeddings.
name : str
The name of the embeddings. The name should uniquely identify the embeddings in the system.
init_func : callable, optional
The function to create the initial data. If the init function is not provided,
the values of the embeddings are initialized to zero.
Examples
--------
Before launching multiple gpu processes
>>> def initializer(emb):
th.nn.init.xavier_uniform_(emb)
return emb
In each training process
>>> emb = dgl.nn.NodeEmbedding(g.number_of_nodes(), 10, 'emb', init_func=initializer)
>>> optimizer = dgl.optim.SparseAdam([emb], lr=0.001)
>>> for blocks in dataloader:
... ...
... feats = emb(nids, gpu_0)
... loss = F.sum(feats + 1, 0)
... loss.backward()
... optimizer.step()
'''
def __init__(self, num_embeddings, embedding_dim, name,
init_func=None):
global _STORE
# Check whether it is multi-gpu training or not.
if th.distributed.is_initialized():
rank = th.distributed.get_rank()
world_size = th.distributed.get_world_size()
else:
rank = -1
world_size = 0
self._rank = rank
self._world_size = world_size
host_name = '127.0.0.1'
port = 12346
if rank <= 0:
emb = create_shared_mem_array(name, (num_embeddings, embedding_dim), th.float32)
if init_func is not None:
emb = init_func(emb)
if rank == 0:
if world_size > 1:
# for multi-gpu training, setup a TCPStore for
# embeding status synchronization across GPU processes
if _STORE is None:
_STORE = th.distributed.TCPStore(
host_name, port, world_size, True, timedelta(seconds=30))
for _ in range(1, world_size):
# send embs
_STORE.set(name, name)
elif rank > 0:
# receive
if _STORE is None:
_STORE = th.distributed.TCPStore(
host_name, port, world_size, False, timedelta(seconds=30))
_STORE.wait([name])
emb = get_shared_mem_array(name, (num_embeddings, embedding_dim), th.float32)
self._store = _STORE
self._tensor = emb
self._num_embeddings = num_embeddings
self._embedding_dim = embedding_dim
self._name = name
self._optm_state = None # track optimizer state
self._trace = [] # track minibatch
def __call__(self, node_ids, device=th.device('cpu')):
"""
node_ids : th.tensor
Index of the embeddings to collect.
device : th.device
Target device to put the collected embeddings.
"""
emb = self._tensor[node_ids].to(device)
if F.is_recording():
emb = F.attach_grad(emb)
self._trace.append((node_ids.to(device, non_blocking=True), emb))
return emb
@property
def store(self):
"""Return torch.distributed.TCPStore for
meta data sharing across processes.
Returns
-------
torch.distributed.TCPStore
KVStore used for meta data sharing.
"""
return self._store
@property
def rank(self):
"""Return rank of current process.
Returns
-------
int
The rank of current process.
"""
return self._rank
@property
def world_size(self):
"""Return world size of the pytorch distributed training env.
Returns
-------
int
The world size of the pytorch distributed training env.
"""
return self._world_size
@property
def name(self):
"""Return the name of NodeEmbedding.
Returns
-------
str
The name of NodeEmbedding.
"""
return self._name
@property
def num_embeddings(self):
"""Return the number of embeddings.
Returns
-------
int
The number of embeddings.
"""
return self._num_embeddings
def set_optm_state(self, state):
"""Store the optimizer related state tensor.
Parameters
----------
state : tuple of torch.Tensor
Optimizer related state.
"""
self._optm_state = state
@property
def optm_state(self):
"""Return the optimizer related state tensor.
Returns
-------
tuple of torch.Tensor
The optimizer related state.
"""
return self._optm_state
@property
def trace(self):
"""Return a trace of the indices of embeddings
used in the training step(s).
Returns
-------
[torch.Tensor]
The indices of embeddings used in the training step(s).
"""
return self._trace
def reset_trace(self):
"""Clean up the trace of the indices of embeddings
used in the training step(s).
"""
self._trace = []
@property
def emb_tensor(self):
"""Return the tensor storing the node embeddings
Returns
-------
torch.Tensor
The tensor storing the node embeddings
"""
return self._tensor
"""dgl optims."""
import importlib
import sys
import os
from ..backend import backend_name
from ..utils import expand_as_pair
def _load_backend(mod_name):
mod = importlib.import_module('.%s' % mod_name, __name__)
thismod = sys.modules[__name__]
for api, obj in mod.__dict__.items():
setattr(thismod, api, obj)
_load_backend(backend_name)
"""dgl optims for pytorch."""
from .sparse_optim import SparseAdagrad, SparseAdam
"""Node embedding optimizers"""
import abc
from abc import abstractmethod
import torch as th
from ...utils import get_shared_mem_array, create_shared_mem_array
from ...nn.pytorch import NodeEmbedding
class SparseGradOptimizer(abc.ABC):
r''' The abstract sparse optimizer.
Note: dgl sparse optimizer only work with dgl.NodeEmbedding
Parameters
----------
params : list of NodeEmbedding
The list of NodeEmbeddings.
lr : float
The learning rate.
'''
def __init__(self, params, lr):
self._params = params
self._lr = lr
self._rank = None
self._world_size = None
self._shared_cache = {}
self._clean_grad = False
def step(self):
''' The step function.
The step function is invoked at the end of every batch to update embeddings
'''
with th.no_grad():
# Frequently alloc and free shared memory to hold intermediate tensor is expensive
# We cache shared memory buffers in shared_emb.
shared_emb = {emb.name: ([], []) for emb in self._params}
# Go through all sparse embeddings
for emb in self._params: # pylint: disable=too-many-nested-blocks
num_embeddings = emb.num_embeddings
emb_name = emb.name
# Each gpu process takes the resposibility of update a range of sparse embedding,
# thus we can parallel the gradient update.
range_size = (num_embeddings + self._world_size - 1) // self._world_size \
if self._world_size > 0 else 0
for idx, data in emb._trace:
grad = data.grad.data
device = grad.device
idx_dtype = idx.dtype
grad_dtype = grad.dtype
grad_dim = grad.shape[1]
if self._world_size > 0:
if emb_name not in self._shared_cache:
self._shared_cache[emb_name] = {}
for i in range(self._world_size):
start = i * range_size
end = (i + 1) * range_size \
if (i + 1) * range_size < num_embeddings \
else num_embeddings
if i == 0:
mask = idx < end
elif i + 1 == self._world_size:
mask = idx >= start
else:
mask = th.logical_and((idx >= start), (idx < end))
idx_i = idx[mask]
grad_i = grad[mask]
if i == self._rank:
shared_emb[emb_name][0].append(idx_i)
shared_emb[emb_name][1].append(grad_i)
else:
# currently nccl does not support Alltoallv operation
# we need to use CPU shared memory to share gradient
# across processes
idx_i = idx_i.to(th.device('cpu'))
grad_i = grad_i.to(th.device('cpu'))
idx_shmem_name = 'idx_{}_{}_{}'.format(emb_name, self._rank, i)
grad_shmem_name = 'grad_{}_{}_{}'.format(emb_name, self._rank, i)
if idx_shmem_name not in self._shared_cache[emb_name] or \
self._shared_cache[emb_name][idx_shmem_name].shape[0] \
< idx_i.shape[0]:
# in case idx_i.shape[0] is 0
idx_shmem = create_shared_mem_array(idx_shmem_name, \
(idx_i.shape[0] * 2 + 2,), idx_dtype)
grad_shmem = create_shared_mem_array(grad_shmem_name, \
(idx_i.shape[0] * 2 + 2, grad_dim), grad_dtype)
self._shared_cache[emb_name][idx_shmem_name] = idx_shmem
self._shared_cache[emb_name][grad_shmem_name] = grad_shmem
self._shared_cache[emb_name][idx_shmem_name][:idx_i.shape[0]] \
= idx_i
self._shared_cache[emb_name][grad_shmem_name][:idx_i.shape[0]] \
= grad_i
emb.store.set(idx_shmem_name, str(idx_i.shape[0]))
# gather gradients from all other processes
for i in range(self._world_size):
if i != self._rank:
idx_shmem_name = 'idx_{}_{}_{}'.format(emb_name, i, self._rank)
grad_shmem_name = 'grad_{}_{}_{}'.format(emb_name, i, self._rank)
size = int(emb.store.get(idx_shmem_name))
if idx_shmem_name not in self._shared_cache[emb_name] or \
self._shared_cache[emb_name][idx_shmem_name].shape[0] < size:
idx_shmem = get_shared_mem_array(idx_shmem_name, \
(size * 2 + 2,), idx_dtype)
grad_shmem = get_shared_mem_array(grad_shmem_name, \
(size * 2 + 2, grad_dim), grad_dtype)
self._shared_cache[emb_name][idx_shmem_name] = idx_shmem
self._shared_cache[emb_name][grad_shmem_name] = grad_shmem
idx_i = self._shared_cache[emb_name][idx_shmem_name][:size]
grad_i = self._shared_cache[emb_name][grad_shmem_name][:size]
shared_emb[emb_name][0].append(idx_i.to(device,
non_blocking=True))
shared_emb[emb_name][1].append(grad_i.to(device,
non_blocking=True))
else:
shared_emb[emb_name][0].append(idx)
shared_emb[emb_name][1].append(grad)
if self._clean_grad:
# clean gradient track
for emb in self._params:
emb.reset_trace()
self._clean_grad = False
for emb in self._params:
emb_name = emb.name
idx = th.cat(shared_emb[emb_name][0], dim=0)
grad = th.cat(shared_emb[emb_name][1], dim=0)
self.update(idx, grad, emb)
# synchronized gradient update
if self._world_size > 1:
th.distributed.barrier()
@abstractmethod
def update(self, idx, grad, emb):
""" Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. we maintains gradient states for
each embedding so they can be updated separately.
Parameters
----------
idx : tensor
Index of the embeddings to be updated.
grad : tensor
Gradient of each embedding.
emb : dgl.nn.NodeEmbedding
Sparse node embedding to update.
"""
def zero_grad(self):
"""clean grad cache
"""
self._clean_grad = True
class SparseAdagrad(SparseGradOptimizer):
r''' Node embedding optimizer using the Adagrad algorithm.
This optimizer implements a sparse version of Adagrad algorithm for
optimizing :class:`dgl.nn.NodeEmbedding`. Being sparse means it only updates
the embeddings whose gradients have updates, which are usually a very
small portion of the total embeddings.
Adagrad maintains a :math:`G_{t,i,j}` for every parameter in the embeddings, where
:math:`G_{t,i,j}=G_{t-1,i,j} + g_{t,i,j}^2` and :math:`g_{t,i,j}` is the gradient of
the dimension :math:`j` of embedding :math:`i` at step :math:`t`.
Parameters
----------
params : list[dgl.nn.NodeEmbedding]
The list of dgl.nn.NodeEmbedding.
lr : float
The learning rate.
eps : float, Optional
The term added to the denominator to improve numerical stability
Default: 1e-10
Examples
--------
>>> def initializer(emb):
th.nn.init.xavier_uniform_(emb)
return emb
>>> emb = dgl.nn.NodeEmbedding(g.number_of_nodes(), 10, 'emb', init_func=initializer)
>>> optimizer = dgl.optim.SparseAdagrad([emb], lr=0.001)
>>> for blocks in dataloader:
... ...
... feats = emb(nids, gpu_0)
... loss = F.sum(feats + 1, 0)
... loss.backward()
... optimizer.step()
'''
def __init__(self, params, lr, eps=1e-10):
super(SparseAdagrad, self).__init__(params, lr)
self._eps = eps
# We need to register a state sum for each embedding in the kvstore.
for emb in params:
assert isinstance(emb, NodeEmbedding), \
'SparseAdagrad only supports dgl.nn.NodeEmbedding'
if self._rank is None:
self._rank = emb.rank
self._world_size = emb.world_size
else:
assert self._rank == emb.rank, \
'MultiGPU rank for each embedding should be same.'
assert self._world_size == emb.world_size, \
'MultiGPU world_size for each embedding should be same.'
if self._rank <= 0:
emb_name = emb.name
state = create_shared_mem_array(emb_name+'_state', \
emb.emb_tensor.shape, th.float32).zero_()
if self._rank == 0:
for _ in range(1, world_size):
# send embs
emb.store.set(emb_name+'_opt', emb_name)
elif self._rank > 0:
# receive
emb_name = emb.name
emb.store.wait([emb_name+'_opt'])
state = get_shared_mem_array(emb_name+'_state', \
emb.emb_tensor.shape, th.float32)
emb.set_optm_state(state)
def update(self, idx, grad, emb):
""" Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. we maintains gradient states for
each embedding so they can be updated separately.
Parameters
----------
idx : tensor
Index of the embeddings to be updated.
grad : tensor
Gradient of each embedding.
emb : dgl.nn.NodeEmbedding
Sparse embedding to update.
"""
eps = self._eps
clr = self._lr
# the update is non-linear so indices must be unique
grad_indices, inverse, cnt = th.unique(idx, return_inverse=True, return_counts=True)
grad_values = th.zeros((grad_indices.shape[0], grad.shape[1]), device=grad.device)
grad_values.index_add_(0, inverse, grad)
grad_values = grad_values / cnt.unsqueeze(1)
grad_sum = (grad_values * grad_values)
state = emb.optm_state
state_dev = state.device
state_idx = grad_indices.to(state_dev)
grad_state = state[state_idx].to(grad.device)
grad_state += grad_sum
state[state_idx] = grad_state.to(state_dev)
std_values = grad_state.add_(eps).sqrt_()
tmp = clr * grad_values / std_values
emb.emb_tensor[state_idx] -= tmp.to(state_dev)
class SparseAdam(SparseGradOptimizer):
r''' Node embedding optimizer using the Adam algorithm.
This optimizer implements a sparse version of Adagrad algorithm for
optimizing :class:`dgl.nn.NodeEmbedding`. Being sparse means it only
updates the embeddings whose gradients have updates, which are usually
a very small portion of the total embeddings.
Adam maintains a :math:`Gm_{t,i,j}` and `Gp_{t,i,j}` for every parameter
in the embeddings, where
:math:`Gm_{t,i,j}=beta1 * Gm_{t-1,i,j} + (1-beta1) * g_{t,i,j}`,
:math:`Gp_{t,i,j}=beta2 * Gp_{t-1,i,j} + (1-beta2) * g_{t,i,j}^2`,
:math:`g_{t,i,j} = lr * Gm_{t,i,j} / (1 - beta1^t) / \sqrt{Gp_{t,i,j} / (1 - beta2^t)}` and
:math:`g_{t,i,j}` is the gradient of the dimension :math:`j` of embedding :math:`i`
at step :math:`t`.
Parameters
----------
params : list[dgl.nn.NodeEmbedding]
The list of dgl.nn.NodeEmbeddings.
lr : float
The learning rate.
betas : tuple[float, float], Optional
Coefficients used for computing running averages of gradient and its square.
Default: (0.9, 0.999)
eps : float, Optional
The term added to the denominator to improve numerical stability
Default: 1e-8
Examples:
>>> def initializer(emb):
th.nn.init.xavier_uniform_(emb)
return emb
>>> emb = dgl.nn.NodeEmbedding(g.number_of_nodes(), 10, 'emb', init_func=initializer)
>>> optimizer = dgl.optim.SparseAdam([emb], lr=0.001)
>>> for blocks in dataloader:
... ...
... feats = emb(nids, gpu_0)
... loss = F.sum(feats + 1, 0)
... loss.backward()
... optimizer.step()
'''
def __init__(self, params, lr, betas=(0.9, 0.999), eps=1e-08):
super(SparseAdam, self).__init__(params, lr)
self._lr = lr
self._beta1 = betas[0]
self._beta2 = betas[1]
self._eps = eps
# We need to register a state sum for each embedding in the kvstore.
for emb in params:
assert isinstance(emb, NodeEmbedding), \
'SparseAdam only supports dgl.nn.NodeEmbedding'
if self._rank is None:
self._rank = emb.rank
self._world_size = emb.world_size
else:
assert self._rank == emb.rank, \
'MultiGPU rank for each embedding should be same.'
assert self._world_size == emb.world_size, \
'MultiGPU world_size for each embedding should be same.'
if self._rank <= 0:
emb_name = emb.name
state_step = create_shared_mem_array(emb_name+'_step', \
(emb.emb_tensor.shape[0],), th.float32).zero_()
state_mem = create_shared_mem_array(emb_name+'_mem', \
emb.emb_tensor.shape, th.float32).zero_()
state_power = create_shared_mem_array(emb_name+'_power', \
emb.emb_tensor.shape, th.float32).zero_()
if self._rank == 0:
state = (state_step, state_mem, state_power)
emb_name = emb.name
for _ in range(1, self._world_size):
# send embs
emb.store.set(emb_name+'_opt', emb_name)
elif self._rank > 0:
# receive
emb_name = emb.name
emb.store.wait([emb_name+'_opt'])
state_step = get_shared_mem_array(emb_name+'_step', \
(emb.emb_tensor.shape[0],), th.float32)
state_mem = get_shared_mem_array(emb_name+'_mem', \
emb.emb_tensor.shape, th.float32)
state_power = get_shared_mem_array(emb_name+'_power', \
emb.emb_tensor.shape, th.float32)
state = (state_step, state_mem, state_power)
emb.set_optm_state(state)
def update(self, idx, grad, emb):
""" Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. we maintains gradient states for
each embedding so they can be updated separately.
Parameters
----------
idx : tensor
Index of the embeddings to be updated.
grad : tensor
Gradient of each embedding.
emb : dgl.nn.NodeEmbedding
Sparse embedding to update.
"""
with th.no_grad():
beta1 = self._beta1
beta2 = self._beta2
eps = self._eps
clr = self._lr
state_step, state_mem, state_power = emb.optm_state
exec_dev = grad.device
state_dev = state_step.device
# There can be duplicated indices due to sampling.
# Thus unique them here and average the gradient here.
grad_indices, inverse, cnt = th.unique(idx,
return_inverse=True,
return_counts=True)
state_idx = grad_indices.to(state_dev)
state_step[state_idx] += 1
state_step = state_step[state_idx].to(exec_dev, non_blocking=True)
orig_mem = state_mem[state_idx].to(exec_dev, non_blocking=True)
orig_power = state_power[state_idx].to(exec_dev, non_blocking=True)
grad_values = th.zeros((grad_indices.shape[0], grad.shape[1]), device=exec_dev)
grad_values.index_add_(0, inverse, grad)
grad_values = grad_values / cnt.unsqueeze(1)
grad_mem = grad_values
grad_power = grad_values * grad_values
update_mem = beta1 * orig_mem + (1.-beta1) * grad_mem
update_power = beta2 * orig_power + (1.-beta2) * grad_power
state_mem[state_idx] = update_mem.to(state_dev, non_blocking=True)
state_power[state_idx] = update_power.to(state_dev, non_blocking=True)
update_mem_corr = update_mem / (1. - th.pow(th.tensor(beta1, device=exec_dev),
state_step)).unsqueeze(1)
update_power_corr = update_power / (1. - th.pow(th.tensor(beta2, device=exec_dev),
state_step)).unsqueeze(1)
std_values = clr * update_mem_corr / (th.sqrt(update_power_corr) + eps)
emb.emb_tensor[state_idx] -= std_values.to(state_dev)
...@@ -2,3 +2,4 @@ ...@@ -2,3 +2,4 @@
from .internal import * from .internal import *
from .data import * from .data import *
from .checks import * from .checks import *
from .shared_mem import *
"""Shared memory utilities."""
from .. import backend as F
from .._ffi.ndarray import empty_shared_mem
def get_shared_mem_array(name, shape, dtype):
""" Get a tensor from shared memory with specific name
Parameters
----------
name : str
The unique name of the shared memory
shape : tuple of int
The shape of the returned tensor
dtype : F.dtype
The dtype of the returned tensor
Returns
-------
F.tensor
The tensor got from shared memory.
"""
name = 'DGL_'+name
new_arr = empty_shared_mem(name, False, shape, F.reverse_data_type_dict[dtype])
dlpack = new_arr.to_dlpack()
return F.zerocopy_from_dlpack(dlpack)
def create_shared_mem_array(name, shape, dtype):
""" Create a tensor from shared memory with the specific name
Parameters
----------
name : str
The unique name of the shared memory
shape : tuple of int
The shape of the returned tensor
dtype : F.dtype
The dtype of the returned tensor
Returns
-------
F.tensor
The created tensor.
"""
name = 'DGL_'+name
new_arr = empty_shared_mem(name, True, shape, F.reverse_data_type_dict[dtype])
dlpack = new_arr.to_dlpack()
return F.zerocopy_from_dlpack(dlpack)
...@@ -32,7 +32,7 @@ class SharedMemoryResource: public Resource { ...@@ -32,7 +32,7 @@ class SharedMemoryResource: public Resource {
} }
void Destroy() { void Destroy() {
LOG(INFO) << "remove " << name << " for shared memory"; // LOG(INFO) << "remove " << name << " for shared memory";
shm_unlink(name.c_str()); shm_unlink(name.c_str());
} }
}; };
...@@ -55,7 +55,7 @@ SharedMemory::~SharedMemory() { ...@@ -55,7 +55,7 @@ SharedMemory::~SharedMemory() {
CHECK(munmap(ptr_, size_) != -1) << strerror(errno); CHECK(munmap(ptr_, size_) != -1) << strerror(errno);
close(fd_); close(fd_);
if (own_) { if (own_) {
LOG(INFO) << "remove " << name << " for shared memory"; // LOG(INFO) << "remove " << name << " for shared memory";
shm_unlink(name.c_str()); shm_unlink(name.c_str());
// The resource has been deleted. We don't need to keep track of it any more. // The resource has been deleted. We don't need to keep track of it any more.
DeleteResource(name); DeleteResource(name);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment