"...git@developer.sourcefind.cn:renzhc/diffusers_dcu.git" did not exist on "5ffbe14c32c38779f6365635cb405957b512eb84"
Unverified Commit d876680a authored by Minjie Wang's avatar Minjie Wang Committed by GitHub
Browse files

[Model][Perf] Improve sage sampling performance (#1364)

* improve speed

* fix bugs

* upd reg test
parent f9ad1c80
...@@ -19,9 +19,17 @@ Results ...@@ -19,9 +19,17 @@ Results
Run with following (available dataset: "cora", "citeseer", "pubmed") Run with following (available dataset: "cora", "citeseer", "pubmed")
```bash ```bash
python3 graphsage.py --dataset cora --gpu 0 python3 train_full.py --dataset cora --gpu 0
``` ```
* cora: ~0.8330 * cora: ~0.8330
* citeseer: ~0.7110 * citeseer: ~0.7110
* pubmed: ~0.7830 * pubmed: ~0.7830
Train w/ mini-batch sampling (on the Reddit dataset)
```bash
python3 train_sampling.py
```
Accuracy: 0.9504
import dgl
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp
from torch.utils.data import DataLoader
import dgl.function as fn
import dgl.nn.pytorch as dglnn
import time
import argparse
from _thread import start_new_thread
from functools import wraps
from dgl.data import RedditDataset
import tqdm
import traceback
#### Neighbor sampler
class NeighborSampler(object):
def __init__(self, g, fanouts):
self.g = g
self.fanouts = fanouts
def sample_blocks(self, seeds):
seeds = th.LongTensor(np.asarray(seeds))
blocks = []
for fanout in self.fanouts:
# For each seed node, sample ``fanout`` neighbors.
frontier = dgl.sampling.sample_neighbors(g, seeds, fanout, replace=True)
# Then we compact the frontier into a bipartite graph for message passing.
block = dgl.to_block(frontier, seeds)
# Obtain the seed nodes for next layer.
seeds = block.srcdata[dgl.NID]
blocks.insert(0, block)
return blocks
class SAGE(nn.Module):
def __init__(self,
in_feats,
n_hidden,
n_classes,
n_layers,
activation,
dropout):
super().__init__()
self.n_layers = n_layers
self.n_hidden = n_hidden
self.n_classes = n_classes
self.layers = nn.ModuleList()
self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
for i in range(1, n_layers - 1):
self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))
self.dropout = nn.Dropout(dropout)
self.activation = activation
def forward(self, blocks, x):
h = x
for l, (layer, block) in enumerate(zip(self.layers, blocks)):
# We need to first copy the representation of nodes on the RHS from the
# appropriate nodes on the LHS.
# Note that the shape of h is (num_nodes_LHS, D) and the shape of h_dst
# would be (num_nodes_RHS, D)
h_dst = h[:block.number_of_nodes(block.dsttype)]
# Then we compute the updated representation on the RHS.
# The shape of h now becomes (num_nodes_RHS, D)
h = layer(block, (h, h_dst))
if l != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
return h
def inference(self, g, x, batch_size, device):
"""
Inference with the GraphSAGE model on full neighbors (i.e. without neighbor sampling).
g : the entire graph.
x : the input of entire node set.
The inference code is written in a fashion that it could handle any number of nodes and
layers.
"""
# During inference with sampling, multi-layer blocks are very inefficient because
# lots of computations in the first few layers are repeated.
# Therefore, we compute the representation of all nodes layer by layer. The nodes
# on each layer are of course splitted in batches.
# TODO: can we standardize this?
nodes = th.arange(g.number_of_nodes())
for l, layer in enumerate(self.layers):
y = th.zeros(g.number_of_nodes(), self.n_hidden if l != len(self.layers) - 1 else self.n_classes)
for start in tqdm.trange(0, len(nodes), batch_size):
end = start + batch_size
batch_nodes = nodes[start:end]
block = dgl.to_block(dgl.in_subgraph(g, batch_nodes), batch_nodes)
input_nodes = block.srcdata[dgl.NID]
h = x[input_nodes].to(device)
h_dst = h[:block.number_of_nodes(block.dsttype)]
h = layer(block, (h, h_dst))
if l != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
y[start:end] = h.cpu()
x = y
return y
def prepare_mp(g):
"""
Explicitly materialize the CSR, CSC and COO representation of the given graph
so that they could be shared via copy-on-write to sampler workers and GPU
trainers.
This is a workaround before full shared memory support on heterogeneous graphs.
"""
g.in_degree(0)
g.out_degree(0)
g.find_edges([0])
def compute_acc(pred, labels):
"""
Compute the accuracy of prediction given the labels.
"""
return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred)
def evaluate(model, g, inputs, labels, val_mask, batch_size, device):
"""
Evaluate the model on the validation set specified by ``val_mask``.
g : The entire graph.
inputs : The features of all the nodes.
labels : The labels of all the nodes.
val_mask : A 0-1 mask indicating which nodes do we actually compute the accuracy for.
batch_size : Number of nodes to compute at the same time.
device : The GPU device to evaluate on.
"""
model.eval()
with th.no_grad():
pred = model.inference(g, inputs, batch_size, device)
model.train()
return compute_acc(pred[val_mask], labels[val_mask])
def load_subtensor(g, labels, seeds, input_nodes, device):
"""
Copys features and labels of a set of nodes onto GPU.
"""
batch_inputs = g.ndata['features'][input_nodes].to(device)
batch_labels = labels[seeds].to(device)
return batch_inputs, batch_labels
#### Entry point
def run(args, device, data):
# Unpack data
train_mask, val_mask, in_feats, labels, n_classes, g = data
train_nid = th.LongTensor(np.nonzero(train_mask)[0])
val_nid = th.LongTensor(np.nonzero(val_mask)[0])
train_mask = th.BoolTensor(train_mask)
val_mask = th.BoolTensor(val_mask)
# Create sampler
sampler = NeighborSampler(g, [int(fanout) for fanout in args.fan_out.split(',')])
# Create PyTorch DataLoader for constructing blocks
dataloader = DataLoader(
dataset=train_nid.numpy(),
batch_size=args.batch_size,
collate_fn=sampler.sample_blocks,
shuffle=True,
drop_last=False,
num_workers=args.num_workers)
# Define model and optimizer
model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout)
model = model.to(device)
loss_fcn = nn.CrossEntropyLoss()
loss_fcn = loss_fcn.to(device)
optimizer = optim.Adam(model.parameters(), lr=args.lr)
# Training loop
avg = 0
iter_tput = []
for epoch in range(args.num_epochs):
tic = time.time()
# Loop over the dataloader to sample the computation dependency graph as a list of
# blocks.
for step, blocks in enumerate(dataloader):
tic_step = time.time()
# The nodes for input lies at the LHS side of the first block.
# The nodes for output lies at the RHS side of the last block.
input_nodes = blocks[0].srcdata[dgl.NID]
seeds = blocks[-1].dstdata[dgl.NID]
# Load the input features as well as output labels
batch_inputs, batch_labels = load_subtensor(g, labels, seeds, input_nodes, device)
# Compute loss and prediction
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, batch_labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
iter_tput.append(len(seeds) / (time.time() - tic_step))
if step % args.log_every == 0:
acc = compute_acc(batch_pred, batch_labels)
gpu_mem_alloc = th.cuda.max_memory_allocated() / 1000000 if th.cuda.is_available() else 0
print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MiB'.format(
epoch, step, loss.item(), acc.item(), np.mean(iter_tput[3:]), gpu_mem_alloc))
toc = time.time()
print('Epoch Time(s): {:.4f}'.format(toc - tic))
if epoch >= 5:
avg += toc - tic
if epoch % args.eval_every == 0 and epoch != 0:
eval_acc = evaluate(model, g, g.ndata['features'], labels, val_mask, args.batch_size, device)
print('Eval Acc {:.4f}'.format(eval_acc))
print('Avg epoch time: {}'.format(avg / (epoch - 4)))
if __name__ == '__main__':
argparser = argparse.ArgumentParser("multi-gpu training")
argparser.add_argument('--gpu', type=int, default=0,
help="GPU device ID. Use -1 for CPU training")
argparser.add_argument('--num-epochs', type=int, default=20)
argparser.add_argument('--num-hidden', type=int, default=16)
argparser.add_argument('--num-layers', type=int, default=2)
argparser.add_argument('--fan-out', type=str, default='10,25')
argparser.add_argument('--batch-size', type=int, default=1000)
argparser.add_argument('--log-every', type=int, default=20)
argparser.add_argument('--eval-every', type=int, default=5)
argparser.add_argument('--lr', type=float, default=0.003)
argparser.add_argument('--dropout', type=float, default=0.5)
argparser.add_argument('--num-workers', type=int, default=0,
help="Number of sampling processes. Use 0 for no extra process.")
args = argparser.parse_args()
if args.gpu >= 0:
device = th.device('cuda:%d' % args.gpu)
else:
device = th.device('cpu')
# load reddit data
data = RedditDataset(self_loop=True)
train_mask = data.train_mask
val_mask = data.val_mask
features = th.Tensor(data.features)
in_feats = features.shape[1]
labels = th.LongTensor(data.labels)
n_classes = data.num_labels
# Construct graph
g = dgl.graph(data.graph.all_edges())
g.ndata['features'] = features
prepare_mp(g)
# Pack data
data = train_mask, val_mask, in_feats, labels, n_classes, g
run(args, device, data)
...@@ -15,6 +15,7 @@ from functools import wraps ...@@ -15,6 +15,7 @@ from functools import wraps
from dgl.data import RedditDataset from dgl.data import RedditDataset
from torch.nn.parallel import DistributedDataParallel from torch.nn.parallel import DistributedDataParallel
import tqdm import tqdm
import traceback
#### Neighbor sampler #### Neighbor sampler
...@@ -50,17 +51,16 @@ class SAGE(nn.Module): ...@@ -50,17 +51,16 @@ class SAGE(nn.Module):
self.n_hidden = n_hidden self.n_hidden = n_hidden
self.n_classes = n_classes self.n_classes = n_classes
self.layers = nn.ModuleList() self.layers = nn.ModuleList()
self.layers.append(dglnn.SAGEConv( self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
in_feats, n_hidden, 'mean', feat_drop=dropout, activation=activation))
for i in range(1, n_layers - 1): for i in range(1, n_layers - 1):
self.layers.append(dglnn.SAGEConv( self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
n_hidden, n_hidden, 'mean', feat_drop=dropout, activation=activation)) self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))
self.layers.append(dglnn.SAGEConv( self.dropout = nn.Dropout(dropout)
n_hidden, n_classes, 'mean', feat_drop=dropout)) self.activation = activation
def forward(self, blocks, x): def forward(self, blocks, x):
h = x h = x
for layer, block in zip(self.layers, blocks): for l, (layer, block) in enumerate(zip(self.layers, blocks)):
# We need to first copy the representation of nodes on the RHS from the # We need to first copy the representation of nodes on the RHS from the
# appropriate nodes on the LHS. # appropriate nodes on the LHS.
# Note that the shape of h is (num_nodes_LHS, D) and the shape of h_dst # Note that the shape of h is (num_nodes_LHS, D) and the shape of h_dst
...@@ -69,6 +69,9 @@ class SAGE(nn.Module): ...@@ -69,6 +69,9 @@ class SAGE(nn.Module):
# Then we compute the updated representation on the RHS. # Then we compute the updated representation on the RHS.
# The shape of h now becomes (num_nodes_RHS, D) # The shape of h now becomes (num_nodes_RHS, D)
h = layer(block, (h, h_dst)) h = layer(block, (h, h_dst))
if l != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
return h return h
def inference(self, g, x, batch_size, device): def inference(self, g, x, batch_size, device):
...@@ -98,6 +101,9 @@ class SAGE(nn.Module): ...@@ -98,6 +101,9 @@ class SAGE(nn.Module):
h = x[input_nodes].to(device) h = x[input_nodes].to(device)
h_dst = h[:block.number_of_nodes(block.dsttype)] h_dst = h[:block.number_of_nodes(block.dsttype)]
h = layer(block, (h, h_dst)) h = layer(block, (h, h_dst))
if l != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
y[start:end] = h.cpu() y[start:end] = h.cpu()
...@@ -181,10 +187,7 @@ def load_subtensor(g, labels, seeds, input_nodes, dev_id): ...@@ -181,10 +187,7 @@ def load_subtensor(g, labels, seeds, input_nodes, dev_id):
#### Entry point #### Entry point
@thread_wrapped_func
def run(proc_id, n_gpus, args, devices, data): def run(proc_id, n_gpus, args, devices, data):
dropout = 0.2
# Start up distributed training, if enabled. # Start up distributed training, if enabled.
dev_id = devices[proc_id] dev_id = devices[proc_id]
if n_gpus > 1: if n_gpus > 1:
...@@ -217,10 +220,10 @@ def run(proc_id, n_gpus, args, devices, data): ...@@ -217,10 +220,10 @@ def run(proc_id, n_gpus, args, devices, data):
collate_fn=sampler.sample_blocks, collate_fn=sampler.sample_blocks,
shuffle=True, shuffle=True,
drop_last=False, drop_last=False,
num_workers=args.num_workers_per_gpu) num_workers=args.num_workers)
# Define model and optimizer # Define model and optimizer
model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, dropout) model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout)
model = model.to(dev_id) model = model.to(dev_id)
if n_gpus > 1: if n_gpus > 1:
model = DistributedDataParallel(model, device_ids=[dev_id], output_device=dev_id) model = DistributedDataParallel(model, device_ids=[dev_id], output_device=dev_id)
...@@ -284,6 +287,7 @@ def run(proc_id, n_gpus, args, devices, data): ...@@ -284,6 +287,7 @@ def run(proc_id, n_gpus, args, devices, data):
eval_acc = evaluate(model.module, g, g.ndata['features'], labels, val_mask, args.batch_size, 0) eval_acc = evaluate(model.module, g, g.ndata['features'], labels, val_mask, args.batch_size, 0)
print('Eval Acc {:.4f}'.format(eval_acc)) print('Eval Acc {:.4f}'.format(eval_acc))
if n_gpus > 1: if n_gpus > 1:
th.distributed.barrier() th.distributed.barrier()
if proc_id == 0: if proc_id == 0:
...@@ -291,7 +295,8 @@ def run(proc_id, n_gpus, args, devices, data): ...@@ -291,7 +295,8 @@ def run(proc_id, n_gpus, args, devices, data):
if __name__ == '__main__': if __name__ == '__main__':
argparser = argparse.ArgumentParser("multi-gpu training") argparser = argparse.ArgumentParser("multi-gpu training")
argparser.add_argument('--gpu', type=str, default='0') argparser.add_argument('--gpu', type=str, default='0',
help="Comma separated list of GPU device IDs.")
argparser.add_argument('--num-epochs', type=int, default=20) argparser.add_argument('--num-epochs', type=int, default=20)
argparser.add_argument('--num-hidden', type=int, default=16) argparser.add_argument('--num-hidden', type=int, default=16)
argparser.add_argument('--num-layers', type=int, default=2) argparser.add_argument('--num-layers', type=int, default=2)
...@@ -300,7 +305,9 @@ if __name__ == '__main__': ...@@ -300,7 +305,9 @@ if __name__ == '__main__':
argparser.add_argument('--log-every', type=int, default=20) argparser.add_argument('--log-every', type=int, default=20)
argparser.add_argument('--eval-every', type=int, default=5) argparser.add_argument('--eval-every', type=int, default=5)
argparser.add_argument('--lr', type=float, default=0.003) argparser.add_argument('--lr', type=float, default=0.003)
argparser.add_argument('--num-workers-per-gpu', type=int, default=0) argparser.add_argument('--dropout', type=float, default=0.5)
argparser.add_argument('--num-workers', type=int, default=0,
help="Number of sampling processes. Use 0 for no extra process.")
args = argparser.parse_args() args = argparser.parse_args()
devices = list(map(int, args.gpu.split(','))) devices = list(map(int, args.gpu.split(',')))
...@@ -326,7 +333,8 @@ if __name__ == '__main__': ...@@ -326,7 +333,8 @@ if __name__ == '__main__':
else: else:
procs = [] procs = []
for proc_id in range(n_gpus): for proc_id in range(n_gpus):
p = mp.Process(target=run, args=(proc_id, n_gpus, args, devices, data)) p = mp.Process(target=thread_wrapped_func(run),
args=(proc_id, n_gpus, args, devices, data))
p.start() p.start()
procs.append(p) procs.append(p)
for p in procs: for p in procs:
......
...@@ -76,7 +76,7 @@ COOMatrix CSRRowWisePick(CSRMatrix mat, IdArray rows, ...@@ -76,7 +76,7 @@ COOMatrix CSRRowWisePick(CSRMatrix mat, IdArray rows,
if (replace) { if (replace) {
all_has_fanout = true; all_has_fanout = true;
} else { } else {
// #pragma omp parallel for reduction(&&:all_has_fanout) #pragma omp parallel for reduction(&&:all_has_fanout)
for (int64_t i = 0; i < num_rows; ++i) { for (int64_t i = 0; i < num_rows; ++i) {
const IdxType rid = rows_data[i]; const IdxType rid = rows_data[i];
const IdxType len = indptr[rid + 1] - indptr[rid]; const IdxType len = indptr[rid + 1] - indptr[rid];
...@@ -84,7 +84,7 @@ COOMatrix CSRRowWisePick(CSRMatrix mat, IdArray rows, ...@@ -84,7 +84,7 @@ COOMatrix CSRRowWisePick(CSRMatrix mat, IdArray rows,
} }
} }
// #pragma omp parallel for #pragma omp parallel for
for (int64_t i = 0; i < num_rows; ++i) { for (int64_t i = 0; i < num_rows; ++i) {
const IdxType rid = rows_data[i]; const IdxType rid = rows_data[i];
CHECK_LT(rid, mat.num_rows); CHECK_LT(rid, mat.num_rows);
......
...@@ -20,6 +20,7 @@ The basic use is execute a script, and get the needed results out of the printed ...@@ -20,6 +20,7 @@ The basic use is execute a script, and get the needed results out of the printed
```bash ```bash
docker run --name dgl-reg --rm --hostname=reg-machine --runtime=nvidia -dit dgllib/dgl-ci-gpu:conda /bin/bash docker run --name dgl-reg --rm --hostname=reg-machine --runtime=nvidia -dit dgllib/dgl-ci-gpu:conda /bin/bash
docker cp /home/ubuntu/asv_data dgl-reg:/root/asv_data/
docker exec dgl-reg bash /root/asv_data/run.sh docker exec dgl-reg bash /root/asv_data/run.sh
docker cp dgl-reg:/root/regression/dgl/asv/. /home/ubuntu/asv_data/ # Change /home/ubuntu/asv to the path you want to put the result docker cp dgl-reg:/root/regression/dgl/asv/. /home/ubuntu/asv_data/ # Change /home/ubuntu/asv to the path you want to put the result
docker stop dgl-reg docker stop dgl-reg
......
...@@ -20,7 +20,7 @@ class GCNBenchmark: ...@@ -20,7 +20,7 @@ class GCNBenchmark:
# self.tmp_dir = Path(tempfile.mkdtemp()) # self.tmp_dir = Path(tempfile.mkdtemp())
def setup(self, backend, dataset, gpu_id): def setup(self, backend, dataset, gpu_id):
log_filename = Path("{}_{}_{}.log".format(backend, dataset, gpu_id)) log_filename = Path("gcn_{}_{}_{}.log".format(backend, dataset, gpu_id))
if log_filename.exists(): if log_filename.exists():
return return
gcn_path = base_path / "examples/{}/gcn/train.py".format(backend) gcn_path = base_path / "examples/{}/gcn/train.py".format(backend)
......
# Write the benchmarking functions here.
# See "Writing benchmarks" in the asv docs for more information.
import subprocess
import os
from pathlib import Path
import numpy as np
import tempfile
base_path = Path("~/regression/dgl/")
class SAGEBenchmark:
params = [['pytorch'], ['0']]
param_names = ['backend', 'gpu']
timeout = 1800
# def setup_cache(self):
# self.tmp_dir = Path(tempfile.mkdtemp())
def setup(self, backend, gpu):
log_filename = Path("sage_sampling_{}_{}.log".format(backend, gpu))
if log_filename.exists():
return
run_path = base_path / "examples/{}/graphsage/train_sampling.py".format(backend)
bashCommand = "/opt/conda/envs/{}-ci/bin/python {} --num-workers=4 --num-epochs=16 --gpu={}".format(
backend, run_path.expanduser(), gpu)
process = subprocess.Popen(bashCommand.split(), stdout=subprocess.PIPE,env=dict(os.environ, DGLBACKEND=backend))
output, error = process.communicate()
print(str(error))
log_filename.write_text(str(output))
def track_sage_time(self, backend):
log_filename = Path("sage_sampling_{}_{}.log".format(backend, gpu))
lines = log_filename.read_text().split("\\n")
time_list = []
for line in lines:
if line.startswith('Epoch Time'):
time_str = line.strip()[15:]
time_list.append(float(time_str))
return np.array(time_list).mean()
def track_sage_accuracy(self, backend):
log_filename = Path("sage_sampling_{}_{}.log".format(backend, gpu))
lines = log_filename.read_text().split("\\n")
test_acc = 0.
for line in lines:
if line.startswith('Eval Acc'):
acc_str = line.strip()[9:]
test_acc = float(acc_str)
return test_acc * 100
SAGEBenchmark.track_sage_time.unit = 's'
SAGEBenchmark.track_sage_accuracy.unit = '%'
#!/bin/bash
set -x
if [ $# -ne 2 ]; then
REPO=dmlc
BRANCH=master
else
REPO=$1
BRANCH=$2
fi
docker run --name dgl-reg --rm --hostname=reg-machine --runtime=nvidia -dit dgllib/dgl-ci-gpu:conda /bin/bash docker run --name dgl-reg --rm --hostname=reg-machine --runtime=nvidia -dit dgllib/dgl-ci-gpu:conda /bin/bash
docker cp /home/ubuntu/asv_data dgl-reg:/root/asv_data/ docker cp /home/ubuntu/asv_data dgl-reg:/root/asv_data/
docker exec dgl-reg bash /root/asv_data/run.sh docker exec dgl-reg bash /root/asv_data/run.sh $REPO $BRANCH
docker cp dgl-reg:/root/regression/dgl/asv/. /home/ubuntu/asv_data/ docker cp dgl-reg:/root/regression/dgl/asv/. /home/ubuntu/asv_data/
docker stop dgl-reg docker stop dgl-reg
\ No newline at end of file
#!/bin/bash
set -e
if [ $# -ne 2 ]; then
echo "run.sh <repo> <branch>"
exit 1
fi
REPO=$1
BRANCH=$2
. /opt/conda/etc/profile.d/conda.sh . /opt/conda/etc/profile.d/conda.sh
cd ~ cd ~
mkdir regression mkdir regression
cd regression cd regression
# git config core.filemode false # git config core.filemode false
git clone --recursive https://github.com/dmlc/dgl.git git clone --recursive https://github.com/$REPO/dgl.git
git checkout $BRANCH
cd dgl cd dgl
mkdir asv mkdir asv
cp -r ~/asv_data/* asv/ cp -r ~/asv_data/* asv/
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment