"src/git@developer.sourcefind.cn:OpenDAS/dgl.git" did not exist on "3f0c10058a4a327672998b3fa4b51d385018ec03"
Unverified Commit 7176f5f2 authored by Jinjing Zhou's avatar Jinjing Zhou Committed by GitHub
Browse files

[Regression] Multigpu regression test (#2923)

* add multigpu sage

* Fix #2808

* multigpu test

* try

* fix remote url

* fix

* Revert "Fix #2808"

This reverts commit 19741f7d546df34b33759d8d0c9a85992ebc6118.

* add back thread_wrapped_func for backward compatibility

* fix

* fix multigpu

* print nvidia-smi

* use new docker image

* fix

* fix

* add ogb mag
parent cba5b188
"""
Modeling Relational Data with Graph Convolutional Networks
Paper: https://arxiv.org/abs/1703.06103
Code: https://github.com/tkipf/relational-gcn
Difference compared to tkipf/relation-gcn
* l2norm applied to all weights
* remove nodes that won't be touched
"""
import argparse
import gc
import logging
from pathlib import Path
from types import SimpleNamespace
import numpy as np
import time
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.multiprocessing as mp
from torch.multiprocessing import Queue
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data import DataLoader
import dgl
from dgl.nn import RelGraphConv
# import sys
# import os
# dir_path = Path(os.path.dirname(__file__))
# sys.path.insert(0, dir_path.parent)
from .. import utils
# import utils
class EntityClassify(nn.Module):
def __init__(self,
device,
num_nodes,
h_dim,
out_dim,
num_rels,
num_bases=None,
num_hidden_layers=1,
dropout=0,
use_self_loop=False,
low_mem=True,
layer_norm=False):
super(EntityClassify, self).__init__()
self.device = th.device(device if device >= 0 else 'cpu')
self.num_nodes = num_nodes
self.h_dim = h_dim
self.out_dim = out_dim
self.num_rels = num_rels
self.num_bases = None if num_bases < 0 else num_bases
self.num_hidden_layers = num_hidden_layers
self.dropout = dropout
self.use_self_loop = use_self_loop
self.low_mem = low_mem
self.layer_norm = layer_norm
self.layers = nn.ModuleList()
# i2h
self.layers.append(RelGraphConv(
self.h_dim, self.h_dim, self.num_rels, "basis",
self.num_bases, activation=F.relu, self_loop=self.use_self_loop,
low_mem=self.low_mem, dropout=self.dropout, layer_norm=layer_norm))
# h2h
for idx in range(self.num_hidden_layers):
self.layers.append(RelGraphConv(
self.h_dim, self.h_dim, self.num_rels, "basis",
self.num_bases, activation=F.relu, self_loop=self.use_self_loop,
low_mem=self.low_mem, dropout=self.dropout, layer_norm=layer_norm))
# h2o
self.layers.append(RelGraphConv(
self.h_dim, self.out_dim, self.num_rels, "basis",
self.num_bases, activation=None,
self_loop=self.use_self_loop,
low_mem=self.low_mem, layer_norm=layer_norm))
def forward(self, blocks, feats, norm=None):
if blocks is None:
# full graph training
blocks = [self.g] * len(self.layers)
h = feats
for layer, block in zip(self.layers, blocks):
block = block.to(self.device)
h = layer(block, h, block.edata['etype'], block.edata['norm'])
return h
def gen_norm(g):
_, v, eid = g.all_edges(form='all')
_, inverse_index, count = th.unique(
v, return_inverse=True, return_counts=True)
degrees = count[inverse_index]
norm = th.ones(eid.shape[0], device=eid.device) / degrees
norm = norm.unsqueeze(1)
g.edata['norm'] = norm
class NeighborSampler:
def __init__(self, g, target_idx, fanouts):
self.g = g
self.target_idx = target_idx
self.fanouts = fanouts
def sample_blocks(self, seeds):
blocks = []
etypes = []
norms = []
ntypes = []
seeds = th.tensor(seeds).long()
cur = self.target_idx[seeds]
for fanout in self.fanouts:
if fanout is None or fanout == -1:
frontier = dgl.in_subgraph(self.g, cur)
else:
frontier = dgl.sampling.sample_neighbors(self.g, cur, fanout)
block = dgl.to_block(frontier, cur)
gen_norm(block)
cur = block.srcdata[dgl.NID]
blocks.insert(0, block)
return seeds, blocks
@utils.thread_wrapped_func
def run(proc_id, n_gpus, n_cpus, args, devices, dataset, split, queue=None):
from .rgcn_model import RelGraphEmbedLayer
dev_id = devices[proc_id]
g, node_feats, num_of_ntype, num_classes, num_rels, target_idx, \
train_idx, val_idx, test_idx, labels = dataset
labels = labels.cuda(dev_id)
if split is not None:
train_seed, val_seed, test_seed = split
train_idx = train_idx[train_seed]
# val_idx = val_idx[val_seed]
# test_idx = test_idx[test_seed]
fanouts = args.fanout
node_tids = g.ndata[dgl.NTYPE]
sampler = NeighborSampler(g, target_idx, fanouts)
loader = DataLoader(dataset=train_idx.numpy(),
batch_size=args.batch_size,
collate_fn=sampler.sample_blocks,
shuffle=True,
num_workers=args.num_workers)
world_size = n_gpus
dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
master_ip='127.0.0.1', master_port='12345')
backend = 'nccl'
# using sparse embedding or usig mix_cpu_gpu model (embedding model can not be stored in GPU)
if args.dgl_sparse is False:
backend = 'gloo'
print("backend using {}".format(backend))
th.distributed.init_process_group(backend=backend,
init_method=dist_init_method,
world_size=world_size,
rank=dev_id)
# node features
# None for one-hot feature, if not none, it should be the feature tensor.
#
embed_layer = RelGraphEmbedLayer(dev_id,
g.number_of_nodes(),
node_tids,
num_of_ntype,
node_feats,
args.n_hidden,
dgl_sparse=args.dgl_sparse)
# create model
# all model params are in device.
model = EntityClassify(dev_id,
g.number_of_nodes(),
args.n_hidden,
num_classes,
num_rels,
num_bases=args.n_bases,
num_hidden_layers=args.n_layers - 2,
dropout=args.dropout,
use_self_loop=args.use_self_loop,
low_mem=args.low_mem,
layer_norm=args.layer_norm)
model.cuda(dev_id)
model = DistributedDataParallel(
model, device_ids=[dev_id], output_device=dev_id)
if args.dgl_sparse:
embed_layer.cuda(dev_id)
if len(list(embed_layer.parameters())) > 0:
embed_layer = DistributedDataParallel(
embed_layer, device_ids=[dev_id], output_device=dev_id)
else:
if len(list(embed_layer.parameters())) > 0:
embed_layer = DistributedDataParallel(
embed_layer, device_ids=None, output_device=None)
# optimizer
dense_params = list(model.parameters())
if args.node_feats:
if n_gpus > 1:
dense_params += list(embed_layer.module.embeds.parameters())
else:
dense_params += list(embed_layer.embeds.parameters())
optimizer = th.optim.Adam(dense_params, lr=args.lr,
weight_decay=args.l2norm)
if args.dgl_sparse:
all_params = list(model.parameters()) + list(embed_layer.parameters())
optimizer = th.optim.Adam(
all_params, lr=args.lr, weight_decay=args.l2norm)
if n_gpus > 1 and isinstance(embed_layer, DistributedDataParallel):
dgl_emb = embed_layer.module.dgl_emb
else:
dgl_emb = embed_layer.dgl_emb
emb_optimizer = dgl.optim.SparseAdam(
params=dgl_emb, lr=args.sparse_lr, eps=1e-8) if len(dgl_emb) > 0 else None
else:
if n_gpus > 1:
embs = list(embed_layer.module.node_embeds.parameters())
else:
embs = list(embed_layer.node_embeds.parameters())
emb_optimizer = th.optim.SparseAdam(
embs, lr=args.sparse_lr) if len(embs) > 0 else None
# training loop
print("start training...")
forward_time = []
backward_time = []
train_time = 0
validation_time = 0
test_time = 0
last_val_acc = 0.0
do_test = False
if n_gpus > 1 and n_cpus - args.num_workers > 0:
th.set_num_threads(n_cpus-args.num_workers)
steps = 0
time_records = []
model.train()
embed_layer.train()
# Warm up
for i, sample_data in enumerate(loader):
seeds, blocks = sample_data
t0 = time.time()
feats = embed_layer(blocks[0].srcdata[dgl.NID],
blocks[0].srcdata['ntype'],
blocks[0].srcdata['type_id'],
node_feats)
logits = model(blocks, feats)
loss = F.cross_entropy(logits, labels[seeds])
t1 = time.time()
optimizer.zero_grad()
if emb_optimizer is not None:
emb_optimizer.zero_grad()
loss.backward()
if emb_optimizer is not None:
emb_optimizer.step()
optimizer.step()
gc.collect()
if i >= 3:
break
# real time
for i, sample_data in enumerate(loader):
seeds, blocks = sample_data
t0 = time.time()
feats = embed_layer(blocks[0].srcdata[dgl.NID],
blocks[0].srcdata['ntype'],
blocks[0].srcdata['type_id'],
node_feats)
logits = model(blocks, feats)
loss = F.cross_entropy(logits, labels[seeds])
t1 = time.time()
optimizer.zero_grad()
if emb_optimizer is not None:
emb_optimizer.zero_grad()
loss.backward()
if emb_optimizer is not None:
emb_optimizer.step()
optimizer.step()
th.distributed.barrier()
t2 = time.time()
forward_time.append(t1 - t0)
backward_time.append(t2 - t1)
time_records.append(t2 - t0)
gc.collect()
if i >= 10:
break
if proc_id == 0:
queue.put(np.array(time_records))
@utils.skip_if_not_4gpu()
@utils.benchmark('time', timeout=600)
@utils.parametrize('data', ['am', 'ogbn-mag'])
@utils.parametrize('low_mem', [True, False])
@utils.parametrize('dgl_sparse', [True, False])
def track_time(data, low_mem, dgl_sparse):
# load graph data
dataset = utils.process_data(data)
args = config()
devices = [0, 1, 2, 3]
args.low_mem = low_mem
args.dgl_sparse = dgl_sparse
args.dataset = dataset
ogb_dataset = False
if data == 'am':
args.n_bases = 40
args.l2norm = 5e-4
elif data == 'ogbn-mag':
args.n_bases = 2
args.l2norm = 0
else:
raise ValueError()
if ogb_dataset is True:
split_idx = dataset.get_idx_split()
train_idx = split_idx["train"]['paper']
val_idx = split_idx["valid"]['paper']
test_idx = split_idx["test"]['paper']
hg_orig, labels = dataset[0]
subgs = {}
for etype in hg_orig.canonical_etypes:
u, v = hg_orig.all_edges(etype=etype)
subgs[etype] = (u, v)
subgs[(etype[2], 'rev-'+etype[1], etype[0])] = (v, u)
hg = dgl.heterograph(subgs)
hg.nodes['paper'].data['feat'] = hg_orig.nodes['paper'].data['feat']
labels = labels['paper'].squeeze()
num_rels = len(hg.canonical_etypes)
num_of_ntype = len(hg.ntypes)
num_classes = dataset.num_classes
if args.dataset == 'ogbn-mag':
category = 'paper'
print('Number of relations: {}'.format(num_rels))
print('Number of class: {}'.format(num_classes))
print('Number of train: {}'.format(len(train_idx)))
print('Number of valid: {}'.format(len(val_idx)))
print('Number of test: {}'.format(len(test_idx)))
else:
# Load from hetero-graph
hg = dataset[0]
num_rels = len(hg.canonical_etypes)
num_of_ntype = len(hg.ntypes)
category = dataset.predict_category
num_classes = dataset.num_classes
train_mask = hg.nodes[category].data.pop('train_mask')
test_mask = hg.nodes[category].data.pop('test_mask')
labels = hg.nodes[category].data.pop('labels')
train_idx = th.nonzero(train_mask, as_tuple=False).squeeze()
test_idx = th.nonzero(test_mask, as_tuple=False).squeeze()
# AIFB, MUTAG, BGS and AM datasets do not provide validation set split.
# Split train set into train and validation if args.validation is set
# otherwise use train set as the validation set.
if args.validation:
val_idx = train_idx[:len(train_idx) // 5]
train_idx = train_idx[len(train_idx) // 5:]
else:
val_idx = train_idx
node_feats = []
for ntype in hg.ntypes:
if len(hg.nodes[ntype].data) == 0 or args.node_feats is False:
node_feats.append(hg.number_of_nodes(ntype))
else:
assert len(hg.nodes[ntype].data) == 1
feat = hg.nodes[ntype].data.pop('feat')
node_feats.append(feat.share_memory_())
# get target category id
category_id = len(hg.ntypes)
for i, ntype in enumerate(hg.ntypes):
if ntype == category:
category_id = i
print('{}:{}'.format(i, ntype))
g = dgl.to_homogeneous(hg)
g.ndata['ntype'] = g.ndata[dgl.NTYPE]
g.ndata['ntype'].share_memory_()
g.edata['etype'] = g.edata[dgl.ETYPE]
g.edata['etype'].share_memory_()
g.ndata['type_id'] = g.ndata[dgl.NID]
g.ndata['type_id'].share_memory_()
node_ids = th.arange(g.number_of_nodes())
# find out the target node ids
node_tids = g.ndata[dgl.NTYPE]
loc = (node_tids == category_id)
target_idx = node_ids[loc]
target_idx.share_memory_()
train_idx.share_memory_()
val_idx.share_memory_()
test_idx.share_memory_()
# Create csr/coo/csc formats before launching training processes with multi-gpu.
# This avoids creating certain formats in each sub-process, which saves momory and CPU.
g.create_formats_()
n_gpus = len(devices)
n_cpus = mp.cpu_count()
ctx = mp.get_context('fork')
queue = ctx.Queue()
procs = []
num_train_seeds = train_idx.shape[0]
num_valid_seeds = val_idx.shape[0]
num_test_seeds = test_idx.shape[0]
train_seeds = th.randperm(num_train_seeds)
valid_seeds = th.randperm(num_valid_seeds)
test_seeds = th.randperm(num_test_seeds)
tseeds_per_proc = num_train_seeds // n_gpus
vseeds_per_proc = num_valid_seeds // n_gpus
tstseeds_per_proc = num_test_seeds // n_gpus
for proc_id in range(n_gpus):
# we have multi-gpu for training, evaluation and testing
# so split trian set, valid set and test set into num-of-gpu parts.
proc_train_seeds = train_seeds[proc_id * tseeds_per_proc:
(proc_id + 1) * tseeds_per_proc
if (proc_id + 1) * tseeds_per_proc < num_train_seeds
else num_train_seeds]
proc_valid_seeds = valid_seeds[proc_id * vseeds_per_proc:
(proc_id + 1) * vseeds_per_proc
if (proc_id + 1) * vseeds_per_proc < num_valid_seeds
else num_valid_seeds]
proc_test_seeds = test_seeds[proc_id * tstseeds_per_proc:
(proc_id + 1) * tstseeds_per_proc
if (proc_id + 1) * tstseeds_per_proc < num_test_seeds
else num_test_seeds]
p = ctx.Process(target=run, args=(proc_id, n_gpus, n_cpus // n_gpus, args, devices,
(g, node_feats, num_of_ntype, num_classes, num_rels, target_idx,
train_idx, val_idx, test_idx, labels),
(proc_train_seeds,
proc_valid_seeds, proc_test_seeds),
queue))
p.start()
procs.append(p)
for p in procs:
p.join()
time_records = queue.get(block=False)
num_exclude = 10 # exclude first 10 iterations
if len(time_records) < 15:
# exclude less if less records
num_exclude = int(len(time_records)*0.3)
return np.mean(time_records[num_exclude:])
def config():
# parser = argparse.ArgumentParser(description='RGCN')
args = SimpleNamespace(
dropout=0,
n_hidden=16,
gpu="0,1,2,3",
lr=1e-2,
sparse_lr=2e-2,
n_bases=-1,
n_layers=2,
dataset=None,
l2norm=0,
fanout=[10, 25],
use_self_loop=True,
batch_size=100,
layer_norm=False,
validation=False,
node_feats=False,
num_workers=0,
dgl_sparse=False,
low_mem=False,
)
# parser.add_argument("--dropout", type=float, default=0,
# help="dropout probability")
# parser.add_argument("--n-hidden", type=int, default=16,
# help="number of hidden units")
# parser.add_argument("--gpu", type=str, default='0',
# help="gpu")
# parser.add_argument("--lr", type=float, default=1e-2,
# help="learning rate")
# parser.add_argument("--sparse-lr", type=float, default=2e-2,
# help="sparse embedding learning rate")
# parser.add_argument("--n-bases", type=int, default=-1,
# help="number of filter weight matrices, default: -1 [use all]")
# parser.add_argument("--n-layers", type=int, default=2,
# help="number of propagation rounds")
# parser.add_argument("-e", "--n-epochs", type=int, default=50,
# help="number of training epochs")
# parser.add_argument("-d", "--dataset", type=str, required=True,
# help="dataset to use")
# parser.add_argument("--l2norm", type=float, default=0,
# help="l2 norm coef")
# parser.add_argument("--fanout", type=str, default="4, 4",
# help="Fan-out of neighbor sampling.")
# parser.add_argument("--use-self-loop", default=False, action='store_true',
# help="include self feature as a special relation")
# fp = parser.add_mutually_exclusive_group(required=False)
# parser.add_argument("--batch-size", type=int, default=100,
# help="Mini-batch size. ")
# parser.add_argument("--eval-batch-size", type=int, default=32,
# help="Mini-batch size. ")
# parser.add_argument("--num-workers", type=int, default=0,
# help="Number of workers for dataloader.")
# parser.add_argument("--low-mem", default=False, action='store_true',
# help="Whether use low mem RelGraphCov")
# parser.add_argument("--dgl-sparse", default=False, action='store_true',
# help='Use sparse embedding for node embeddings.')
# parser.add_argument('--node-feats', default=False, action='store_true',
# help='Whether use node features')
# parser.add_argument('--layer-norm', default=False, action='store_true',
# help='Use layer norm')
# parser.set_defaults(validation=True)
# args = parser.parse_args()
return args
if __name__ == '__main__':
track_time('am')
from types import SimpleNamespace
from typing import NamedTuple
import dgl
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp
import dgl.nn.pytorch as dglnn
import time
import math
import argparse
from torch.nn.parallel import DistributedDataParallel
import dgl.nn.pytorch as dglnn
from .. import utils
class SAGE(nn.Module):
def __init__(self,
in_feats,
n_hidden,
n_classes,
n_layers,
activation,
dropout):
super().__init__()
self.n_layers = n_layers
self.n_hidden = n_hidden
self.n_classes = n_classes
self.layers = nn.ModuleList()
self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
for i in range(1, n_layers - 1):
self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))
self.dropout = nn.Dropout(dropout)
self.activation = activation
def forward(self, blocks, x):
h = x
for l, (layer, block) in enumerate(zip(self.layers, blocks)):
h = layer(block, h)
if l != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
return h
def load_subtensor(nfeat, labels, seeds, input_nodes, dev_id):
"""
Extracts features and labels for a subset of nodes.
"""
batch_inputs = nfeat[input_nodes].to(dev_id)
batch_labels = labels[seeds].to(dev_id)
return batch_inputs, batch_labels
# Entry point
def run(result_queue, proc_id, n_gpus, args, devices, data):
dev_id = devices[proc_id]
timing_records = []
if n_gpus > 1:
dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
master_ip='127.0.0.1', master_port='12345')
world_size = n_gpus
th.distributed.init_process_group(backend="nccl",
init_method=dist_init_method,
world_size=world_size,
rank=proc_id)
th.cuda.set_device(dev_id)
n_classes, train_g, _, _ = data
train_nfeat = train_g.ndata.pop('feat')
train_labels = train_g.ndata.pop('label')
train_nfeat = train_nfeat.to(dev_id)
train_labels = train_labels.to(dev_id)
in_feats = train_nfeat.shape[1]
train_mask = train_g.ndata['train_mask']
train_nid = train_mask.nonzero().squeeze()
# Split train_nid
train_nid = th.split(train_nid, math.ceil(
len(train_nid) / n_gpus))[proc_id]
# Create PyTorch DataLoader for constructing blocks
sampler = dgl.dataloading.MultiLayerNeighborSampler(
[int(fanout) for fanout in args.fan_out.split(',')])
dataloader = dgl.dataloading.NodeDataLoader(
train_g,
train_nid,
sampler,
batch_size=args.batch_size,
shuffle=True,
drop_last=False,
num_workers=args.num_workers)
# Define model and optimizer
model = SAGE(in_feats, args.num_hidden, n_classes,
args.num_layers, F.relu, args.dropout)
model = model.to(dev_id)
if n_gpus > 1:
model = DistributedDataParallel(
model, device_ids=[dev_id], output_device=dev_id)
loss_fcn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=args.lr)
# Training loop
for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
if proc_id == 0:
tic_step = time.time()
batch_inputs, batch_labels = load_subtensor(train_nfeat, train_labels,
seeds, input_nodes, dev_id)
blocks = [block.int().to(dev_id) for block in blocks]
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, batch_labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if proc_id == 0:
timing_records.append(time.time() - tic_step)
if step >= 50:
break
if n_gpus > 1:
th.distributed.barrier()
if proc_id == 0:
result_queue.put(np.array(timing_records))
@utils.benchmark('time', timeout=600)
@utils.skip_if_not_4gpu()
@utils.parametrize('data', ['reddit', 'ogbn-products'])
def track_time(data):
args = SimpleNamespace(
num_hidden=16,
fan_out = "10,25",
batch_size = 1000,
lr = 0.003,
dropout = 0.5,
num_layers = 2,
num_workers = 4,
)
devices = [0, 1, 2, 3]
n_gpus = len(devices)
data = utils.process_data(data)
g = data[0]
n_classes = data.num_classes
train_g = val_g = test_g = g
# Create csr/coo/csc formats before launching training processes with multi-gpu.
# This avoids creating certain formats in each sub-process, which saves momory and CPU.
train_g.create_formats_()
val_g.create_formats_()
test_g.create_formats_()
# Pack data
data = n_classes, train_g, val_g, test_g
result_queue = mp.Queue()
procs = []
for proc_id in range(n_gpus):
p = mp.Process(target=utils.thread_wrapped_func(run),
args=(result_queue, proc_id, n_gpus, args, devices, data))
p.start()
procs.append(p)
for p in procs:
p.join()
time_records = result_queue.get(block=False)
num_exclude = 10 # exclude first 10 iterations
if len(time_records) < 15:
# exclude less if less records
num_exclude = int(len(time_records)*0.3)
return np.mean(time_records[num_exclude:])
import torch as th
import torch.nn as nn
import dgl
class BaseRGCN(nn.Module):
def __init__(self, num_nodes, h_dim, out_dim, num_rels, num_bases,
num_hidden_layers=1, dropout=0,
use_self_loop=False, use_cuda=False):
super(BaseRGCN, self).__init__()
self.num_nodes = num_nodes
self.h_dim = h_dim
self.out_dim = out_dim
self.num_rels = num_rels
self.num_bases = None if num_bases < 0 else num_bases
self.num_hidden_layers = num_hidden_layers
self.dropout = dropout
self.use_self_loop = use_self_loop
self.use_cuda = use_cuda
# create rgcn layers
self.build_model()
def build_model(self):
self.layers = nn.ModuleList()
# i2h
i2h = self.build_input_layer()
if i2h is not None:
self.layers.append(i2h)
# h2h
for idx in range(self.num_hidden_layers):
h2h = self.build_hidden_layer(idx)
self.layers.append(h2h)
# h2o
h2o = self.build_output_layer()
if h2o is not None:
self.layers.append(h2o)
def build_input_layer(self):
return None
def build_hidden_layer(self, idx):
raise NotImplementedError
def build_output_layer(self):
return None
def forward(self, g, h, r, norm):
for layer in self.layers:
h = layer(g, h, r, norm)
return h
def initializer(emb):
emb.uniform_(-1.0, 1.0)
return emb
class RelGraphEmbedLayer(nn.Module):
r"""Embedding layer for featureless heterograph.
Parameters
----------
dev_id : int
Device to run the layer.
num_nodes : int
Number of nodes.
node_tides : tensor
Storing the node type id for each node starting from 0
num_of_ntype : int
Number of node types
input_size : list of int
A list of input feature size for each node type. If None, we then
treat certain input feature as an one-hot encoding feature.
embed_size : int
Output embed size
dgl_sparse : bool, optional
If true, use dgl.nn.NodeEmbedding otherwise use torch.nn.Embedding
"""
def __init__(self,
dev_id,
num_nodes,
node_tids,
num_of_ntype,
input_size,
embed_size,
dgl_sparse=False):
super(RelGraphEmbedLayer, self).__init__()
self.dev_id = th.device(dev_id if dev_id >= 0 else 'cpu')
self.embed_size = embed_size
self.num_nodes = num_nodes
self.dgl_sparse = dgl_sparse
# create weight embeddings for each node for each relation
self.embeds = nn.ParameterDict()
self.node_embeds = {} if dgl_sparse else nn.ModuleDict()
self.num_of_ntype = num_of_ntype
for ntype in range(num_of_ntype):
if isinstance(input_size[ntype], int):
if dgl_sparse:
self.node_embeds[str(ntype)] = dgl.nn.NodeEmbedding(input_size[ntype], embed_size, name=str(ntype),
init_func=initializer)
else:
sparse_emb = th.nn.Embedding(input_size[ntype], embed_size, sparse=True)
nn.init.uniform_(sparse_emb.weight, -1.0, 1.0)
self.node_embeds[str(ntype)] = sparse_emb
else:
input_emb_size = input_size[ntype].shape[1]
embed = nn.Parameter(th.Tensor(input_emb_size, self.embed_size))
nn.init.xavier_uniform_(embed)
self.embeds[str(ntype)] = embed
@property
def dgl_emb(self):
"""
"""
if self.dgl_sparse:
embs = [emb for emb in self.node_embeds.values()]
return embs
else:
return []
def forward(self, node_ids, node_tids, type_ids, features):
"""Forward computation
Parameters
----------
node_ids : tensor
node ids to generate embedding for.
node_ids : tensor
node type ids
features : list of features
list of initial features for nodes belong to different node type.
If None, the corresponding features is an one-hot encoding feature,
else use the features directly as input feature and matmul a
projection matrix.
Returns
-------
tensor
embeddings as the input of the next layer
"""
tsd_ids = node_ids.to(self.dev_id)
embeds = th.empty(node_ids.shape[0], self.embed_size, device=self.dev_id)
for ntype in range(self.num_of_ntype):
loc = node_tids == ntype
if isinstance(features[ntype], int):
if self.dgl_sparse:
embeds[loc] = self.node_embeds[str(ntype)](type_ids[loc], self.dev_id)
else:
embeds[loc] = self.node_embeds[str(ntype)](type_ids[loc]).to(self.dev_id)
else:
embeds[loc] = features[ntype][type_ids[loc]].to(self.dev_id) @ self.embeds[str(ntype)].to(self.dev_id)
return embeds
...@@ -32,7 +32,37 @@ def _download(url, path, filename): ...@@ -32,7 +32,37 @@ def _download(url, path, filename):
# GRAPH_CACHE = {} # GRAPH_CACHE = {}
import torch.multiprocessing as mp
from _thread import start_new_thread
import traceback
def thread_wrapped_func(func):
"""
Wraps a process entry point to make it work with OpenMP.
"""
@wraps(func)
def decorated_function(*args, **kwargs):
queue = mp.Queue()
def _queue_result():
exception, trace, res = None, None, None
try:
res = func(*args, **kwargs)
except Exception as e:
exception = e
trace = traceback.format_exc()
queue.put((res, exception, trace))
start_new_thread(_queue_result, ())
result, exception, trace = queue.get()
if exception is None:
return result
else:
assert isinstance(exception, Exception)
raise exception.__class__(trace)
return decorated_function
def get_graph(name, format): def get_graph(name, format):
# global GRAPH_CACHE # global GRAPH_CACHE
...@@ -455,6 +485,32 @@ def skip_if_gpu(): ...@@ -455,6 +485,32 @@ def skip_if_gpu():
return func return func
return _wrapper return _wrapper
def _cuda_device_count(q):
import torch
q.put(torch.cuda.device_count())
def get_num_gpu():
import multiprocessing as mp
q = mp.Queue()
p = mp.Process(target=_cuda_device_count, args=(q, ))
p.start()
p.join()
return q.get(block=False)
GPU_COUNT = get_num_gpu()
def skip_if_not_4gpu():
"""skip if DGL_BENCH_DEVICE is gpu
"""
def _wrapper(func):
if GPU_COUNT != 4:
# skip if not enabled
print("Skip {}".format(func.benchmark_name))
func.benchmark_name = "skip_" + func.__name__
return func
return _wrapper
def benchmark(track_type, timeout=60): def benchmark(track_type, timeout=60):
"""Decorator for indicating the benchmark type. """Decorator for indicating the benchmark type.
......
...@@ -12,11 +12,13 @@ pip install --upgrade pip ...@@ -12,11 +12,13 @@ pip install --upgrade pip
pip install asv pip install asv
pip uninstall -y dgl pip uninstall -y dgl
nvidia-smi
export DGL_BENCH_DEVICE=$DEVICE export DGL_BENCH_DEVICE=$DEVICE
echo "DGL_BENCH_DEVICE=$DGL_BENCH_DEVICE" echo "DGL_BENCH_DEVICE=$DGL_BENCH_DEVICE"
pushd $ROOT/benchmarks pushd $ROOT/benchmarks
cat asv.conf.json cat asv.conf.json
asv machine --yes asv machine --yes
asv run -e -v asv run --launch-method=spawn -e -v
asv publish asv publish
popd popd
...@@ -2,11 +2,14 @@ ...@@ -2,11 +2,14 @@
set -e set -e
. /opt/conda/etc/profile.d/conda.sh # . /opt/conda/etc/profile.d/conda.sh
conda activate pytorch-ci # conda activate pytorch-ci
# Default building only with cpu # Default building only with cpu
DEVICE=${DGL_BENCH_DEVICE:-cpu} DEVICE=${DGL_BENCH_DEVICE:-cpu}
pip install -r /asv/torch_gpu_pip.txt
pip install pandas rdflib ogb
# build # build
if [[ $DEVICE == "cpu" ]]; then if [[ $DEVICE == "cpu" ]]; then
CMAKE_VARS="" CMAKE_VARS=""
...@@ -19,4 +22,4 @@ cmake -DCUDA_TOOLKIT_ROOT_DIR=/usr/local/cuda -DBUILD_TORCH=ON $CMAKE_VARS .. ...@@ -19,4 +22,4 @@ cmake -DCUDA_TOOLKIT_ROOT_DIR=/usr/local/cuda -DBUILD_TORCH=ON $CMAKE_VARS ..
make -j make -j
popd popd
conda deactivate # conda deactivate
...@@ -2,11 +2,7 @@ ...@@ -2,11 +2,7 @@
set -e set -e
. /opt/conda/etc/profile.d/conda.sh # . /opt/conda/etc/profile.d/conda.sh
pip install -r /asv/torch_gpu_pip.txt
pip install pandas rdflib ogb
# install # install
pushd python pushd python
......
...@@ -26,7 +26,7 @@ else ...@@ -26,7 +26,7 @@ else
fi fi
WS_ROOT=/asv/dgl WS_ROOT=/asv/dgl
docker pull dgllib/dgl-ci-gpu:conda docker pull public.ecr.aws/s1o7b3d9/benchmakrk_pyg_dgl:cu111_torch181_pyg170
if [ -z "$DGL_REG_CONF"]; then if [ -z "$DGL_REG_CONF"]; then
DOCKER_ENV_OPT="$DOCKER_ENV_OPT" DOCKER_ENV_OPT="$DOCKER_ENV_OPT"
else else
...@@ -56,14 +56,14 @@ if [[ $DEVICE == "cpu" ]]; then ...@@ -56,14 +56,14 @@ if [[ $DEVICE == "cpu" ]]; then
$DOCKER_MOUNT_OPT \ $DOCKER_MOUNT_OPT \
$DOCKER_ENV_OPT \ $DOCKER_ENV_OPT \
--shm-size="4g" \ --shm-size="4g" \
--hostname=$MACHINE -dit dgllib/dgl-ci-gpu:conda /bin/bash --hostname=$MACHINE -dit public.ecr.aws/s1o7b3d9/benchmakrk_pyg_dgl:cu111_torch181_pyg170 /bin/bash
else else
docker run --name dgl-reg \ docker run --name dgl-reg \
--rm --runtime=nvidia \ --rm --gpus all \
$DOCKER_MOUNT_OPT \ $DOCKER_MOUNT_OPT \
$DOCKER_ENV_OPT \ $DOCKER_ENV_OPT \
--shm-size="4g" \ --shm-size="4g" \
--hostname=$MACHINE -dit dgllib/dgl-ci-gpu:conda /bin/bash --hostname=$MACHINE -dit public.ecr.aws/s1o7b3d9/benchmakrk_pyg_dgl:cu111_torch181_pyg170 /bin/bash
fi fi
docker exec dgl-reg mkdir -p $WS_ROOT docker exec dgl-reg mkdir -p $WS_ROOT
......
--find-links https://download.pytorch.org/whl/torch_stable.html --find-links https://download.pytorch.org/whl/lts/1.8/torch_lts.html
torch==1.5.1+cu101 torch==1.8.1+cu111
torchvision==0.6.1+cu101 torchvision==0.9.1+cu111
pytest pytest
nose nose
numpy numpy
cython cython
scipy scipy
networkx networkx==2.5.1
matplotlib matplotlib
nltk nltk
requests[security] requests[security]
tqdm tqdm
awscli awscli
# 0.6.0 is for pytorch 1.5 torchtext==0.9.1
torchtext==0.6.0 pandas
\ No newline at end of file rdflib
ogb==1.3.1
\ No newline at end of file
{ {
"r5.16xlarge": { "r5.16xlarge": {
"tests": [ "tests": [
"" "api.", "kernel.", "model_acc.", "model_speed."
], ],
"env": { "env": {
"DEVICE": "cpu" "DEVICE": "cpu"
...@@ -9,10 +9,18 @@ ...@@ -9,10 +9,18 @@
}, },
"g4dn.2xlarge": { "g4dn.2xlarge": {
"tests": [ "tests": [
"" "api.", "kernel.", "model_acc.", "model_speed."
], ],
"env": { "env": {
"DEVICE": "gpu" "DEVICE": "gpu"
} }
} },
"g4dn.12xlarge": {
"tests": [
"multigpu."
],
"env": {
"DEVICE": "gpu"
}
},
} }
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment