Unverified Commit fce96140 authored by Chang Liu's avatar Chang Liu Committed by GitHub
Browse files

[Example][Refactor] Refactor graphsage multigpu and full-graph example (#4430)

* Add refactors for multi-gpu and full-graph example

* Fix format

* Update

* Update

* Update
parent 2dd06b21
"""
Inductive Representation Learning on Large Graphs
Paper: http://papers.nips.cc/paper/6703-inductive-representation-learning-on-large-graphs.pdf
Code: https://github.com/williamleif/graphsage-simple
Simple reference implementation of GraphSAGE.
"""
import argparse
import time
import numpy as np
import networkx as nx
import torch
import torch.nn as nn
import torch.nn.functional as F
import dgl
from dgl import DGLGraph
from dgl.data import register_data_args, load_data
from dgl.nn.pytorch.conv import SAGEConv
import dgl.nn as dglnn
from dgl.data import CoraGraphDataset, CiteseerGraphDataset, PubmedGraphDataset
from dgl import AddSelfLoop
import argparse
class GraphSAGE(nn.Module):
def __init__(self,
in_feats,
n_hidden,
n_classes,
n_layers,
activation,
dropout,
aggregator_type):
super(GraphSAGE, self).__init__()
class SAGE(nn.Module):
def __init__(self, in_size, hid_size, out_size):
super().__init__()
self.layers = nn.ModuleList()
self.dropout = nn.Dropout(dropout)
self.activation = activation
# input layer
self.layers.append(SAGEConv(in_feats, n_hidden, aggregator_type))
# hidden layers
for i in range(n_layers - 1):
self.layers.append(SAGEConv(n_hidden, n_hidden, aggregator_type))
# output layer
self.layers.append(SAGEConv(n_hidden, n_classes, aggregator_type)) # activation None
# two-layer GraphSAGE-mean
self.layers.append(dglnn.SAGEConv(in_size, hid_size, 'gcn'))
self.layers.append(dglnn.SAGEConv(hid_size, out_size, 'gcn'))
self.dropout = nn.Dropout(0.5)
def forward(self, graph, inputs):
h = self.dropout(inputs)
def forward(self, graph, x):
h = self.dropout(x)
for l, layer in enumerate(self.layers):
h = layer(graph, h)
if l != len(self.layers) - 1:
h = self.activation(h)
h = F.relu(h)
h = self.dropout(h)
return h
def evaluate(model, graph, features, labels, nid):
def evaluate(g, features, labels, mask, model):
model.eval()
with torch.no_grad():
logits = model(graph, features)
logits = logits[nid]
labels = labels[nid]
logits = model(g, features)
logits = logits[mask]
labels = labels[mask]
_, indices = torch.max(logits, dim=1)
correct = torch.sum(indices == labels)
return correct.item() * 1.0 / len(labels)
def main(args):
# load and preprocess dataset
data = load_data(args)
g = data[0]
features = g.ndata['feat']
labels = g.ndata['label']
train_mask = g.ndata['train_mask']
val_mask = g.ndata['val_mask']
test_mask = g.ndata['test_mask']
in_feats = features.shape[1]
n_classes = data.num_classes
n_edges = g.number_of_edges()
print("""----Data statistics------'
#Edges %d
#Classes %d
#Train samples %d
#Val samples %d
#Test samples %d""" %
(n_edges, n_classes,
train_mask.int().sum().item(),
val_mask.int().sum().item(),
test_mask.int().sum().item()))
if args.gpu < 0:
cuda = False
else:
cuda = True
torch.cuda.set_device(args.gpu)
features = features.cuda()
labels = labels.cuda()
train_mask = train_mask.cuda()
val_mask = val_mask.cuda()
test_mask = test_mask.cuda()
print("use cuda:", args.gpu)
train_nid = train_mask.nonzero().squeeze()
val_nid = val_mask.nonzero().squeeze()
test_nid = test_mask.nonzero().squeeze()
# graph preprocess and calculate normalization factor
g = dgl.remove_self_loop(g)
n_edges = g.number_of_edges()
if cuda:
g = g.int().to(args.gpu)
# create GraphSAGE model
model = GraphSAGE(in_feats,
args.n_hidden,
n_classes,
args.n_layers,
F.relu,
args.dropout,
args.aggregator_type)
if cuda:
model.cuda()
def train(g, features, labels, masks, model):
# define train/val samples, loss function and optimizer
train_mask, val_mask = masks
loss_fcn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2, weight_decay=5e-4)
# use optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=args.lr, weight_decay=args.weight_decay)
# initialize graph
dur = []
for epoch in range(args.n_epochs):
# training loop
for epoch in range(200):
model.train()
if epoch >= 3:
t0 = time.time()
# forward
logits = model(g, features)
loss = F.cross_entropy(logits[train_nid], labels[train_nid])
loss = loss_fcn(logits[train_mask], labels[train_mask])
optimizer.zero_grad()
loss.backward()
optimizer.step()
if epoch >= 3:
dur.append(time.time() - t0)
acc = evaluate(model, g, features, labels, val_nid)
print("Epoch {:05d} | Time(s) {:.4f} | Loss {:.4f} | Accuracy {:.4f} | "
"ETputs(KTEPS) {:.2f}".format(epoch, np.mean(dur), loss.item(),
acc, n_edges / np.mean(dur) / 1000))
print()
acc = evaluate(model, g, features, labels, test_nid)
print("Test Accuracy {:.4f}".format(acc))
acc = evaluate(g, features, labels, val_mask, model)
print("Epoch {:05d} | Loss {:.4f} | Accuracy {:.4f} "
. format(epoch, loss.item(), acc))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='GraphSAGE')
register_data_args(parser)
parser.add_argument("--dropout", type=float, default=0.5,
help="dropout probability")
parser.add_argument("--gpu", type=int, default=-1,
help="gpu")
parser.add_argument("--lr", type=float, default=1e-2,
help="learning rate")
parser.add_argument("--n-epochs", type=int, default=200,
help="number of training epochs")
parser.add_argument("--n-hidden", type=int, default=16,
help="number of hidden gcn units")
parser.add_argument("--n-layers", type=int, default=1,
help="number of hidden gcn layers")
parser.add_argument("--weight-decay", type=float, default=5e-4,
help="Weight for L2 loss")
parser.add_argument("--aggregator-type", type=str, default="gcn",
help="Aggregator type: mean/gcn/pool/lstm")
parser.add_argument("--dataset", type=str, default="cora",
help="Dataset name ('cora', 'citeseer', 'pubmed')")
args = parser.parse_args()
print(args)
print(f'Training with DGL built-in GraphSage module')
main(args)
# load and preprocess dataset
transform = AddSelfLoop() # by default, it will first remove self-loops to prevent duplication
if args.dataset == 'cora':
data = CoraGraphDataset(transform=transform)
elif args.dataset == 'citeseer':
data = CiteseerGraphDataset(transform=transform)
elif args.dataset == 'pubmed':
data = PubmedGraphDataset(transform=transform)
else:
raise ValueError('Unknown dataset: {}'.format(args.dataset))
g = data[0]
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
g = g.int().to(device)
features = g.ndata['feat']
labels = g.ndata['label']
masks = g.ndata['train_mask'], g.ndata['val_mask']
# create GraphSAGE model
in_size = features.shape[1]
out_size = data.num_classes
model = SAGE(in_size, 16, out_size).to(device)
# model training
print('Training...')
train(g, features, labels, masks, model)
# test the model
print('Testing...')
acc = evaluate(g, features, labels, g.ndata['test_mask'], model)
print("Test accuracy {:.4f}".format(acc))
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.distributed as dist
import torch.distributed.optim
import torchmetrics.functional as MF
import dgl
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
import torch.multiprocessing as mp
import dgl.nn as dglnn
from dgl.multiprocessing import shared_tensor
import time
import numpy as np
from dgl.data import AsNodePredDataset
from dgl.dataloading import DataLoader, NeighborSampler, MultiLayerFullNeighborSampler
from ogb.nodeproppred import DglNodePropPredDataset
import tqdm
import argparse
class SAGE(nn.Module):
def __init__(self, in_feats, n_hidden, n_classes):
def __init__(self, in_size, hid_size, out_size):
super().__init__()
self.layers = nn.ModuleList()
self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))
# three-layer GraphSAGE-mean
self.layers.append(dglnn.SAGEConv(in_size, hid_size, 'mean'))
self.layers.append(dglnn.SAGEConv(hid_size, hid_size, 'mean'))
self.layers.append(dglnn.SAGEConv(hid_size, out_size, 'mean'))
self.dropout = nn.Dropout(0.5)
self.n_hidden = n_hidden
self.n_classes = n_classes
def _forward_layer(self, l, block, x):
h = self.layers[l](block, x)
if l != len(self.layers) - 1:
h = F.relu(h)
h = self.dropout(h)
return h
self.hid_size = hid_size
self.out_size = out_size
def forward(self, blocks, x):
h = x
for l, (layer, block) in enumerate(zip(self.layers, blocks)):
h = self._forward_layer(l, blocks[l], h)
h = layer(block, h)
if l != len(self.layers) - 1:
h = F.relu(h)
h = self.dropout(h)
return h
def inference(self, g, device, batch_size):
"""
Perform inference in layer-major order rather than batch-major order.
That is, infer the first layer for the entire graph, and store the
intermediate values h_0, before infering the second layer to generate
h_1. This is done for two reasons: 1) it limits the effect of node
degree on the amount of memory used as it only proccesses 1-hop
neighbors at a time, and 2) it reduces the total amount of computation
required as each node is only processed once per layer.
Parameters
----------
g : DGLGraph
The graph to perform inference on.
device : context
The device this process should use for inference
batch_size : int
The number of items to collect in a batch.
Returns
-------
tensor
The predictions for all nodes in the graph.
"""
def inference(self, g, device, batch_size, use_uva):
g.ndata['h'] = g.ndata['feat']
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(1, prefetch_node_feats=['h'])
sampler = MultiLayerFullNeighborSampler(1, prefetch_node_feats=['h'])
for l, layer in enumerate(self.layers):
dataloader = dgl.dataloading.DataLoader(
dataloader = DataLoader(
g, torch.arange(g.num_nodes(), device=device), sampler, device=device,
batch_size=batch_size, shuffle=False, drop_last=False,
num_workers=0, use_ddp=True, use_uva=True)
# in order to prevent running out of GPU memory, we allocate a
num_workers=0, use_ddp=True, use_uva=use_uva)
# in order to prevent running out of GPU memory, allocate a
# shared output tensor 'y' in host memory
y = shared_tensor(
(g.num_nodes(), self.n_hidden if l != len(self.layers) - 1 else self.n_classes))
(g.num_nodes(), self.hid_size if l != len(self.layers) - 1 else self.out_size))
for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader) \
if dist.get_rank() == 0 else dataloader:
x = blocks[0].srcdata['h']
h = self._forward_layer(l, blocks[0], x)
y[output_nodes] = h.to(y.device)
h = layer(blocks[0], x) # len(blocks) = 1
if l != len(self.layers) - 1:
h = F.relu(h)
h = self.dropout(h)
# non_blocking (with pinned memory) to accelerate data transfer
y[output_nodes] = h.to(y.device, non_blocking=True)
# make sure all GPUs are done writing to 'y'
dist.barrier()
if l + 1 < len(self.layers):
# assign the output features of this layer as the new input
# features for the next layer
g.ndata['h'] = y
else:
# remove the intermediate data from the graph
g.ndata.pop('h')
return y
g.ndata['h'] = y if use_uva else y.to(device)
g.ndata.pop('h')
return y
def train(rank, world_size, graph, num_classes, split_idx):
torch.cuda.set_device(rank)
dist.init_process_group('nccl', 'tcp://127.0.0.1:12347', world_size=world_size, rank=rank)
def evaluate(model, g, dataloader):
model.eval()
ys = []
y_hats = []
for it, (input_nodes, output_nodes, blocks) in enumerate(dataloader):
with torch.no_grad():
x = blocks[0].srcdata['feat']
ys.append(blocks[-1].dstdata['label'])
y_hats.append(model(blocks, x))
return MF.accuracy(torch.cat(y_hats), torch.cat(ys))
model = SAGE(graph.ndata['feat'].shape[1], 256, num_classes).cuda()
model = nn.parallel.DistributedDataParallel(model, device_ids=[rank], output_device=rank)
def layerwise_infer(proc_id, device, g, nid, model, use_uva, batch_size = 2**16):
model.eval()
with torch.no_grad():
pred = model.module.inference(g, device, batch_size, use_uva)
pred = pred[nid]
labels = g.ndata['label'][nid].to(pred.device)
if proc_id == 0:
acc = MF.accuracy(pred, labels)
print("Test Accuracy {:.4f}".format(acc.item()))
def train(proc_id, nprocs, device, g, train_idx, val_idx, model, use_uva):
sampler = NeighborSampler([10, 10, 10],
prefetch_node_feats=['feat'],
prefetch_labels=['label'])
train_dataloader = DataLoader(g, train_idx, sampler, device=device,
batch_size=1024, shuffle=True,
drop_last=False, num_workers=0,
use_ddp=True, use_uva=use_uva)
val_dataloader = DataLoader(g, val_idx, sampler, device=device,
batch_size=1024, shuffle=True,
drop_last=False, num_workers=0,
use_ddp=True, use_uva=use_uva)
opt = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=5e-4)
train_idx, valid_idx, test_idx = split_idx['train'], split_idx['valid'], split_idx['test']
# move ids to GPU
train_idx = train_idx.to('cuda')
valid_idx = valid_idx.to('cuda')
# For training, each process/GPU will get a subset of the
# train_idx/valid_idx, and generate mini-batches indepednetly. This allows
# the only communication neccessary in training to be the all-reduce for
# the gradients performed by the DDP wrapper (created above).
sampler = dgl.dataloading.NeighborSampler(
[15, 10, 5], prefetch_node_feats=['feat'], prefetch_labels=['label'])
train_dataloader = dgl.dataloading.DataLoader(
graph, train_idx, sampler,
device='cuda', batch_size=1024, shuffle=True, drop_last=False,
num_workers=0, use_ddp=True, use_uva=True)
valid_dataloader = dgl.dataloading.DataLoader(
graph, valid_idx, sampler, device='cuda', batch_size=1024, shuffle=True,
drop_last=False, num_workers=0, use_ddp=True,
use_uva=True)
durations = []
for _ in range(10):
for epoch in range(10):
model.train()
t0 = time.time()
total_loss = 0
for it, (input_nodes, output_nodes, blocks) in enumerate(train_dataloader):
x = blocks[0].srcdata['feat']
y = blocks[-1].dstdata['label'][:, 0]
y = blocks[-1].dstdata['label']
y_hat = model(blocks, x)
loss = F.cross_entropy(y_hat, y)
opt.zero_grad()
loss.backward()
opt.step()
if it % 20 == 0 and rank == 0:
acc = MF.accuracy(y_hat, y)
mem = torch.cuda.max_memory_allocated() / 1000000
print('Loss', loss.item(), 'Acc', acc.item(), 'GPU Mem', mem, 'MB')
tt = time.time()
if rank == 0:
print(tt - t0)
durations.append(tt - t0)
model.eval()
ys = []
y_hats = []
for it, (input_nodes, output_nodes, blocks) in enumerate(valid_dataloader):
with torch.no_grad():
x = blocks[0].srcdata['feat']
ys.append(blocks[-1].dstdata['label'])
y_hats.append(model.module(blocks, x))
acc = MF.accuracy(torch.cat(y_hats), torch.cat(ys)) / world_size
total_loss += loss
acc = evaluate(model, g, val_dataloader).to(device) / nprocs
dist.reduce(acc, 0)
if rank == 0:
print('Validation acc:', acc.item())
dist.barrier()
if rank == 0:
print(np.mean(durations[4:]), np.std(durations[4:]))
model.eval()
with torch.no_grad():
# since we do 1-layer at a time, use a very large batch size
pred = model.module.inference(graph, device='cuda', batch_size=2**16)
if rank == 0:
acc = MF.accuracy(pred[test_idx], graph.ndata['label'][test_idx])
print('Test acc:', acc.item())
if (proc_id == 0):
print("Epoch {:05d} | Loss {:.4f} | Accuracy {:.4f} "
.format(epoch, total_loss / (it+1), acc.item()))
def run(proc_id, nprocs, devices, g, data, mode):
# find corresponding device for my rank
device = devices[proc_id]
torch.cuda.set_device(device)
# initialize process group and unpack data for sub-processes
dist.init_process_group(backend="nccl", init_method='tcp://127.0.0.1:12345',
world_size=nprocs, rank=proc_id)
out_size, train_idx, val_idx, test_idx = data
train_idx = train_idx.to(device)
val_idx = val_idx.to(device)
g = g.to(device if mode == 'puregpu' else 'cpu')
# create GraphSAGE model (distributed)
in_size = g.ndata['feat'].shape[1]
model = SAGE(in_size, 256, out_size).to(device)
model = DistributedDataParallel(model, device_ids=[device], output_device=device)
# training + testing
use_uva = (mode == 'mixed')
train(proc_id, nprocs, device, g, train_idx, val_idx, model, use_uva)
layerwise_infer(proc_id, device, g, test_idx, model, use_uva)
# cleanup process group
dist.destroy_process_group()
if __name__ == '__main__':
dataset = DglNodePropPredDataset('ogbn-products')
graph, labels = dataset[0]
graph.ndata['label'] = labels
graph.create_formats_() # must be called before mp.spawn().
split_idx = dataset.get_idx_split()
num_classes = dataset.num_classes
# use all available GPUs
n_procs = torch.cuda.device_count()
# Tested with mp.spawn and fork. Both worked and got 4s per epoch with 4 GPUs
# and 3.86s per epoch with 8 GPUs on p2.8x, compared to 5.2s from official examples.
import torch.multiprocessing as mp
mp.spawn(train, args=(n_procs, graph, num_classes, split_idx), nprocs=n_procs)
parser = argparse.ArgumentParser()
parser.add_argument("--mode", default='mixed', choices=['mixed', 'puregpu'],
help="Training mode. 'mixed' for CPU-GPU mixed training, "
"'puregpu' for pure-GPU training.")
parser.add_argument("--gpu", type=str, default='0',
help="GPU(s) in use. Can be a list of gpu ids for multi-gpu training,"
" e.g., 0,1,2,3.")
args = parser.parse_args()
devices = list(map(int, args.gpu.split(',')))
nprocs = len(devices)
assert torch.cuda.is_available(), f"Must have GPUs to enable multi-gpu training."
print(f'Training in {args.mode} mode using {nprocs} GPU(s)')
# load and preprocess dataset
print('Loading data')
dataset = AsNodePredDataset(DglNodePropPredDataset('ogbn-products'))
g = dataset[0]
# avoid creating certain graph formats in each sub-process to save momory
g.create_formats_()
# thread limiting to avoid resource competition
os.environ['OMP_NUM_THREADS'] = str(mp.cpu_count() // 2 // nprocs)
data = dataset.num_classes, dataset.train_idx, dataset.val_idx, dataset.test_idx
mp.spawn(run, args=(nprocs, devices, g, data, args.mode), nprocs=nprocs)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment