Unverified Commit 09642d7c authored by Mufei Li's avatar Mufei Li Committed by GitHub
Browse files

[Tutorial] Single Machine Multi-GPU Minibatch Graph Classification (#2940)



* Update

* Update

* Update tutorials/multi/1_graph_classification.py
Co-authored-by: default avatarTong He <hetong007@gmail.com>

* Update

* Update

* Update
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-4-21.us-west-2.compute.internal>
Co-authored-by: default avatarTong He <hetong007@gmail.com>
parent 9a0e13ac
...@@ -25,6 +25,7 @@ Welcome to Deep Graph Library Tutorials and Documentation ...@@ -25,6 +25,7 @@ Welcome to Deep Graph Library Tutorials and Documentation
guide/index guide/index
guide_cn/index guide_cn/index
tutorials/large/index tutorials/large/index
tutorials/multi/index
tutorials/models/index tutorials/models/index
.. toctree:: .. toctree::
......
"""
Single Machine Multi-GPU Minibatch Graph Classification
=======================================================
In this tutorial, you will learn how to use multiple GPUs in training a
graph neural network (GNN) for graph classification. This tutorial assumes
knowledge in GNNs for graph classification and we recommend you to check
:doc:`Training a GNN for Graph Classification <../blitz/5_graph_classification>` otherwise.
(Time estimate: 8 minutes)
To use a single GPU in training a GNN, we need to put the model, graph(s), and other
tensors (e.g. labels) on the same GPU:
"""
"""
import torch
# Use the first GPU
device = torch.device("cuda:0")
model = model.to(device)
graph = graph.to(device)
labels = labels.to(device)
"""
###############################################################################
# The node and edge features in the graphs, if any, will also be on the GPU.
# After that, the forward computation, backward computation and parameter
# update will take place on the GPU. For graph classification, this repeats
# for each minibatch gradient descent.
#
# Using multiple GPUs allows performing more computation per unit of time. It
# is like having a team work together, where each GPU is a team member. We need
# to distribute the computation workload across GPUs and let them synchronize
# the efforts regularly. PyTorch provides convenient APIs for this task with
# multiple processes, one per GPU, and we can use them in conjunction with DGL.
#
# Intuitively, we can distribute the workload along the dimension of data. This
# allows multiple GPUs to perform the forward and backward computation of
# multiple gradient descents in parallel. To distribute a dataset across
# multiple GPUs, we need to partition it into multiple mutually exclusive
# subsets of a similar size, one per GPU. We need to repeat the random
# partition every epoch to guarantee randomness. We can use
# :func:`~dgl.dataloading.pytorch.GraphDataLoader`, which wraps some PyTorch
# APIs and does the job for graph classification in data loading.
#
# Once all GPUs have finished the backward computation for its minibatch,
# we need to synchronize the model parameter update across them. Specifically,
# this involves collecting gradients from all GPUs, averaging them and updating
# the model parameters on each GPU. We can wrap a PyTorch model with
# :func:`~torch.nn.parallel.DistributedDataParallel` so that the model
# parameter update will invoke gradient synchronization first under the hood.
#
# .. image:: https://data.dgl.ai/tutorial/mgpu_gc.png
# :width: 450px
# :align: center
#
# That’s the core behind this tutorial. We will explore it more in detail with
# a complete example below.
#
# Distributed Process Group Initialization
# ----------------------------------------
#
# For communication between multiple processes in multi-gpu training, we need
# to start the distributed backend at the beginning of each process. We use
# `world_size` to refer to the number of processes and `rank` to refer to the
# process ID, which should be an integer from `0` to `world_size - 1`.
#
import torch.distributed as dist
def init_process_group(world_size, rank):
dist.init_process_group(
backend='nccl',
init_method='tcp://127.0.0.1:12345',
world_size=world_size,
rank=rank)
###############################################################################
# Data Loader Preparation
# -----------------------
#
# We split the dataset into training, validation and test subsets. In dataset
# splitting, we need to use a same random seed across processes to ensure a
# same split. We follow the common practice to train with multiple GPUs and
# evaluate with a single GPU, thus only set `use_ddp` to True in the
# :func:`~dgl.dataloading.pytorch.GraphDataLoader` for the training set, where
# `ddp` stands for :func:`~torch.nn.parallel.DistributedDataParallel`.
#
from dgl.data import split_dataset
from dgl.dataloading import GraphDataLoader
def get_dataloaders(dataset, seed, batch_size=32):
# Use a 80:10:10 train-val-test split
train_set, val_set, test_set = split_dataset(dataset,
frac_list=[0.8, 0.1, 0.1],
shuffle=True,
random_state=seed)
train_loader = GraphDataLoader(train_set, use_ddp=True, batch_size=batch_size, shuffle=True)
val_loader = GraphDataLoader(val_set, batch_size=batch_size)
test_loader = GraphDataLoader(test_set, batch_size=batch_size)
return train_loader, val_loader, test_loader
###############################################################################
# Model Initialization
# --------------------
#
# For this tutorial, we use a simplified Graph Isomorphism Network (GIN).
#
import torch.nn as nn
import torch.nn.functional as F
from dgl.nn.pytorch import GINConv, SumPooling
class GIN(nn.Module):
def __init__(self, input_size=1, num_classes=2):
super(GIN, self).__init__()
self.conv1 = GINConv(nn.Linear(input_size, num_classes), aggregator_type='sum')
self.conv2 = GINConv(nn.Linear(num_classes, num_classes), aggregator_type='sum')
self.pool = SumPooling()
def forward(self, g, feats):
feats = self.conv1(g, feats)
feats = F.relu(feats)
feats = self.conv2(g, feats)
return self.pool(g, feats)
###############################################################################
# To ensure same initial model parameters across processes, we need to set the
# same random seed before model initialization. Once we construct a model
# instance, we wrap it with :func:`~torch.nn.parallel.DistributedDataParallel`.
#
import torch
from torch.nn.parallel import DistributedDataParallel
def init_model(seed, device):
torch.manual_seed(seed)
model = GIN().to(device)
model = DistributedDataParallel(model, device_ids=[device], output_device=device)
return model
###############################################################################
# Main Function for Each Process
# -----------------------------
#
# Define the model evaluation function as in the single-GPU setting.
#
def evaluate(model, dataloader, device):
model.eval()
total = 0
total_correct = 0
for bg, labels in dataloader:
bg = bg.to(device)
labels = labels.to(device)
# Get input node features
feats = bg.ndata.pop('attr')
with torch.no_grad():
pred = model(bg, feats)
_, pred = torch.max(pred, 1)
total += len(labels)
total_correct += (pred == labels).sum().cpu().item()
return 1.0 * total_correct / total
###############################################################################
# Define the main function for each process.
#
from torch.optim import Adam
def main(rank, world_size, dataset, seed=0):
init_process_group(world_size, rank)
# Assume the GPU ID to be the same as the process ID
device = torch.device('cuda:{:d}'.format(rank))
torch.cuda.set_device(device)
model = init_model(seed, device)
criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.01)
train_loader, val_loader, test_loader = get_dataloaders(dataset,
seed,
world_size,
rank)
for epoch in range(5):
model.train()
# The line below ensures all processes use a different
# random ordering in data loading for each epoch.
train_loader.set_epoch(epoch)
total_loss = 0
for bg, labels in train_loader:
bg = bg.to(device)
labels = labels.to(device)
feats = bg.ndata.pop('attr')
pred = model(bg, feats)
loss = criterion(pred, labels)
total_loss += loss.cpu().item()
optimizer.zero_grad()
loss.backward()
optimizer.step()
loss = total_loss
print('Loss: {:.4f}'.format(loss))
val_acc = evaluate(model, val_loader, device)
print('Val acc: {:.4f}'.format(val_acc))
test_acc = evaluate(model, test_loader, device)
print('Test acc: {:.4f}'.format(test_acc))
dist.destroy_process_group()
###############################################################################
# Finally we load the dataset and launch the processes.
#
if __name__ == '__main__':
import torch.multiprocessing as mp
from dgl.data import GINDataset
num_gpus = 4
procs = []
dataset = GINDataset(name='IMDBBINARY', self_loop=False)
for rank in range(num_gpus):
p = mp.Process(target=main, args=(rank, num_gpus, dataset))
p.start()
procs.append(p)
for p in procs:
p.join()
Training on Multiple GPUs
=========================
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment