"tutorials/git@developer.sourcefind.cn:OpenDAS/dgl.git" did not exist on "8874e83059797c54f3485a123ec69ab54598fee9"
Unverified Commit a506ebe8 authored by Chang Liu's avatar Chang Liu Committed by GitHub
Browse files

[Example][Refactor] Multi-gpu graphsage link prediction example refactor (#5091)

parent 9890201d
More Examples for Training GraphSAGE More Examples for Training GraphSAGE
============================ ============================
### Pure GPU sampling
```bash
python3 pure_gpu_node_classification.py
```
### Unsupervised training
Train w/ mini-batch sampling in an unsupervised fashion (on the Reddit dataset)
```bash
python3 train_sampling_unsupervised.py
```
Notably,
* The loss function is defined by predicting whether an edge exists between two nodes or not. This matches the official
implementation, and is equivalent to the loss defined in the paper with 1-hop random walks.
* When computing the score of `(u, v)`, the connections between node `u` and `v` are removed from neighbor sampling.
This trick increases the F1-micro score on test set by 0.02.
* The performance of the learned embeddings are measured by training a softmax regression with scikit-learn, as described
in the paper.
Micro F1 score reaches 0.9212 on test set.
This example also demonstrates the advanced usages of multi-GPU `DDP` training, UVA-base sampling, full GPU sampling, and fine control of storing the graph structure and features individually.
### Training with PyTorch Lightning ### Training with PyTorch Lightning
We also provide minibatch training scripts with PyTorch Lightning in `train_lightning.py` and `train_lightning_unsupervised.py`. We provide minibatch training scripts with PyTorch Lightning in `train_lightning_unsupervised.py`.
Requires `pytorch_lightning` and `torchmetrics`. Requires `pytorch_lightning` and `torchmetrics`.
```bash ```bash
python3 train_lightning.py
python3 train_lightning_unsupervised.py python3 train_lightning_unsupervised.py
``` ```
import argparse
import time
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchmetrics.functional as MF
import tqdm
from ogb.nodeproppred import DglNodePropPredDataset
import dgl
import dgl.nn as dglnn
class SAGE(nn.Module):
def __init__(self, in_feats, n_hidden, n_classes):
super().__init__()
self.layers = nn.ModuleList()
self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, "mean"))
self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, "mean"))
self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, "mean"))
self.dropout = nn.Dropout(0.5)
self.n_hidden = n_hidden
self.n_classes = n_classes
def forward(self, blocks, x):
h = x
for l, (layer, block) in enumerate(zip(self.layers, blocks)):
h = layer(block, h)
if l != len(self.layers) - 1:
h = F.relu(h)
h = self.dropout(h)
return h
def inference(self, g, device, batch_size, num_workers, buffer_device=None):
# The difference between this inference function and the one in the official
# example is that the intermediate results can also benefit from prefetching.
feat = g.ndata["feat"]
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(
1, prefetch_node_feats=["feat"]
)
dataloader = dgl.dataloading.DataLoader(
g,
torch.arange(g.num_nodes()).to(g.device),
sampler,
device=device,
batch_size=batch_size,
shuffle=False,
drop_last=False,
num_workers=num_workers,
)
if buffer_device is None:
buffer_device = device
for l, layer in enumerate(self.layers):
y = torch.empty(
g.num_nodes(),
self.n_hidden if l != len(self.layers) - 1 else self.n_classes,
device=buffer_device,
pin_memory=True,
)
feat = feat.to(device)
for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader):
# use an explicitly contuous slice
x = feat[input_nodes]
h = layer(blocks[0], x)
if l != len(self.layers) - 1:
h = F.relu(h)
h = self.dropout(h)
# be design, our output nodes are contiguous so we can take
# advantage of that here
y[output_nodes[0] : output_nodes[-1] + 1] = h.to(buffer_device)
feat = y
return y
dataset = DglNodePropPredDataset("ogbn-products")
graph, labels = dataset[0]
graph.ndata["label"] = labels.squeeze()
split_idx = dataset.get_idx_split()
train_idx, valid_idx, test_idx = (
split_idx["train"],
split_idx["valid"],
split_idx["test"],
)
device = "cuda"
train_idx = train_idx.to(device)
valid_idx = valid_idx.to(device)
test_idx = test_idx.to(device)
graph = graph.to(device)
model = SAGE(graph.ndata["feat"].shape[1], 256, dataset.num_classes).to(device)
opt = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=5e-4)
sampler = dgl.dataloading.NeighborSampler(
[15, 10, 5], prefetch_node_feats=["feat"], prefetch_labels=["label"]
)
train_dataloader = dgl.dataloading.DataLoader(
graph,
train_idx,
sampler,
device=device,
batch_size=1024,
shuffle=True,
drop_last=False,
num_workers=0,
use_uva=False,
)
valid_dataloader = dgl.dataloading.DataLoader(
graph,
valid_idx,
sampler,
device=device,
batch_size=1024,
shuffle=True,
drop_last=False,
num_workers=0,
use_uva=False,
)
durations = []
for _ in range(10):
model.train()
t0 = time.time()
for it, (input_nodes, output_nodes, blocks) in enumerate(train_dataloader):
x = blocks[0].srcdata["feat"]
y = blocks[-1].dstdata["label"]
y_hat = model(blocks, x)
loss = F.cross_entropy(y_hat, y)
opt.zero_grad()
loss.backward()
opt.step()
if it % 20 == 0:
acc = MF.accuracy(torch.argmax(y_hat, dim=1), y)
mem = torch.cuda.max_memory_allocated() / 1000000
print("Loss", loss.item(), "Acc", acc.item(), "GPU Mem", mem, "MB")
tt = time.time()
print(tt - t0)
durations.append(tt - t0)
model.eval()
ys = []
y_hats = []
for it, (input_nodes, output_nodes, blocks) in enumerate(valid_dataloader):
with torch.no_grad():
x = blocks[0].srcdata["feat"]
ys.append(blocks[-1].dstdata["label"])
y_hats.append(torch.argmax(model(blocks, x), dim=1))
acc = MF.accuracy(torch.cat(y_hats), torch.cat(ys))
print("Validation acc:", acc.item())
print(np.mean(durations[4:]), np.std(durations[4:]))
# Test accuracy and offline inference of all nodes
model.eval()
with torch.no_grad():
pred = model.inference(graph, device, 4096, 0, "cpu")
pred = pred[test_idx].to(device)
label = graph.ndata["label"][test_idx]
acc = MF.accuracy(torch.argmax(pred, dim=1), label)
print("Test acc:", acc.item())
import os
import dgl
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp
import dgl.function as fn
import dgl.nn.pytorch as dglnn
import time
import argparse
from torch.nn.parallel import DistributedDataParallel
from model import SAGE, compute_acc_unsupervised as compute_acc
from negative_sampler import NegativeSampler
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from load_graph import load_reddit, load_ogb
class CrossEntropyLoss(nn.Module):
def forward(self, block_outputs, pos_graph, neg_graph):
with pos_graph.local_scope():
pos_graph.ndata['h'] = block_outputs
pos_graph.apply_edges(fn.u_dot_v('h', 'h', 'score'))
pos_score = pos_graph.edata['score']
with neg_graph.local_scope():
neg_graph.ndata['h'] = block_outputs
neg_graph.apply_edges(fn.u_dot_v('h', 'h', 'score'))
neg_score = neg_graph.edata['score']
score = th.cat([pos_score, neg_score])
label = th.cat([th.ones_like(pos_score), th.zeros_like(neg_score)]).long()
loss = F.binary_cross_entropy_with_logits(score, label.float())
return loss
def evaluate(model, g, nfeat, labels, train_nids, val_nids, test_nids, device, args):
"""
Evaluate the model on the validation set specified by ``val_mask``.
g : The entire graph.
inputs : The features of all the nodes.
labels : The labels of all the nodes.
val_mask : A 0-1 mask indicating which nodes do we actually compute the accuracy for.
device : The GPU device to evaluate on.
"""
model.eval()
with th.no_grad():
# single gpu
if isinstance(model, SAGE):
pred = model.inference(g, nfeat, device, args.batch_size, args.num_workers)
# multi gpu
else:
pred = model.module.inference(g, nfeat, device, args.batch_size, args.num_workers)
model.train()
return compute_acc(pred, labels, train_nids, val_nids, test_nids)
#### Entry point
def run(proc_id, n_gpus, args, devices, data):
# Unpack data
device = th.device(devices[proc_id])
if n_gpus > 0:
th.cuda.set_device(device)
if n_gpus > 1:
dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
master_ip='127.0.0.1', master_port='12345')
world_size = n_gpus
th.distributed.init_process_group(backend="nccl",
init_method=dist_init_method,
world_size=world_size,
rank=proc_id)
train_nid, val_nid, test_nid, n_classes, g, nfeat, labels = data
if args.data_device == 'gpu':
nfeat = nfeat.to(device)
elif args.data_device == 'uva':
nfeat = dgl.contrib.UnifiedTensor(nfeat, device=device)
in_feats = nfeat.shape[1]
# Create PyTorch DataLoader for constructing blocks
n_edges = g.num_edges()
train_seeds = th.arange(n_edges)
if args.graph_device == 'gpu':
train_seeds = train_seeds.to(device)
g = g.to(device)
args.num_workers = 0
elif args.graph_device == 'uva':
train_seeds = train_seeds.to(device)
g.pin_memory_()
args.num_workers = 0
# Create sampler
sampler = dgl.dataloading.NeighborSampler(
[int(fanout) for fanout in args.fan_out.split(',')])
sampler = dgl.dataloading.as_edge_prediction_sampler(
sampler, exclude='reverse_id',
# For each edge with ID e in Reddit dataset, the reverse edge is e ± |E|/2.
reverse_eids=th.cat([
th.arange(n_edges // 2, n_edges),
th.arange(0, n_edges // 2)]).to(train_seeds),
negative_sampler=NegativeSampler(g, args.num_negs, args.neg_share,
device if args.graph_device == 'uva' else None))
dataloader = dgl.dataloading.EdgeDataLoader(
g, train_seeds, sampler,
device=device,
use_ddp=n_gpus > 1,
batch_size=args.batch_size,
shuffle=True,
drop_last=False,
num_workers=args.num_workers,
use_uva=args.graph_device == 'uva')
# Define model and optimizer
model = SAGE(in_feats, args.num_hidden, args.num_hidden, args.num_layers, F.relu, args.dropout)
model = model.to(device)
if n_gpus > 1:
model = DistributedDataParallel(model, device_ids=[device], output_device=device)
loss_fcn = CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=args.lr)
# Training loop
avg = 0
iter_pos = []
iter_neg = []
iter_d = []
iter_t = []
best_eval_acc = 0
best_test_acc = 0
for epoch in range(args.num_epochs):
tic = time.time()
# Loop over the dataloader to sample the computation dependency graph as a list of
# blocks.
tic_step = time.time()
for step, (input_nodes, pos_graph, neg_graph, blocks) in enumerate(dataloader):
input_nodes = input_nodes.to(nfeat.device)
batch_inputs = nfeat[input_nodes].to(device)
blocks = [block.int() for block in blocks]
d_step = time.time()
# Compute loss and prediction
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, pos_graph, neg_graph)
optimizer.zero_grad()
loss.backward()
optimizer.step()
t = time.time()
pos_edges = pos_graph.num_edges()
neg_edges = neg_graph.num_edges()
iter_pos.append(pos_edges / (t - tic_step))
iter_neg.append(neg_edges / (t - tic_step))
iter_d.append(d_step - tic_step)
iter_t.append(t - d_step)
if step % args.log_every == 0 and proc_id == 0:
gpu_mem_alloc = th.cuda.max_memory_allocated() / 1000000 if th.cuda.is_available() else 0
print('[{}]Epoch {:05d} | Step {:05d} | Loss {:.4f} | Speed (samples/sec) {:.4f}|{:.4f} | Load {:.4f}| train {:.4f} | GPU {:.1f} MB'.format(
proc_id, epoch, step, loss.item(), np.mean(iter_pos[3:]), np.mean(iter_neg[3:]), np.mean(iter_d[3:]), np.mean(iter_t[3:]), gpu_mem_alloc))
tic_step = time.time()
toc = time.time()
if proc_id == 0:
print('Epoch Time(s): {:.4f}'.format(toc - tic))
if epoch >= 5:
avg += toc - tic
if (epoch + 1) % args.eval_every == 0:
eval_acc, test_acc = evaluate(model, g, nfeat, labels, train_nid, val_nid, test_nid, device, args)
print('Eval Acc {:.4f} Test Acc {:.4f}'.format(eval_acc, test_acc))
if eval_acc > best_eval_acc:
best_eval_acc = eval_acc
best_test_acc = test_acc
print('Best Eval Acc {:.4f} Test Acc {:.4f}'.format(best_eval_acc, best_test_acc))
if n_gpus > 1:
th.distributed.barrier()
if proc_id == 0:
print('Avg epoch time: {}'.format(avg / (epoch - 4)))
def main(args):
devices = list(map(int, args.gpu.split(',')))
n_gpus = len(devices)
# load dataset
if args.dataset == 'reddit':
g, n_classes = load_reddit(self_loop=False)
elif args.dataset == 'ogbn-products':
g, n_classes = load_ogb('ogbn-products')
else:
raise Exception('unknown dataset')
train_nid = g.ndata.pop('train_mask').nonzero().squeeze()
val_nid = g.ndata.pop('val_mask').nonzero().squeeze()
test_nid = g.ndata.pop('test_mask').nonzero().squeeze()
nfeat = g.ndata.pop('features')
labels = g.ndata.pop('labels')
# Create csr/coo/csc formats before launching training processes with multi-gpu.
# This avoids creating certain formats in each sub-process, which saves memory and CPU.
g.create_formats_()
# this to avoid competition overhead on machines with many cores.
# Change it to a proper number on your machine, especially for multi-GPU training.
os.environ['OMP_NUM_THREADS'] = str(mp.cpu_count() // 2 // n_gpus)
# Pack data
data = train_nid, val_nid, test_nid, n_classes, g, nfeat, labels
if devices[0] == -1:
assert args.graph_device == 'cpu', \
f"Must have GPUs to enable {args.graph_device} sampling."
assert args.data_device == 'cpu', \
f"Must have GPUs to enable {args.data_device} feature storage."
run(0, 0, args, ['cpu'], data)
elif n_gpus == 1:
run(0, n_gpus, args, devices, data)
else:
mp.spawn(run, args=(n_gpus, args, devices, data), nprocs=n_gpus)
if __name__ == '__main__':
argparser = argparse.ArgumentParser("multi-gpu training")
argparser.add_argument("--gpu", type=str, default='0',
help="GPU, can be a list of gpus for multi-gpu training,"
" e.g., 0,1,2,3; -1 for CPU")
argparser.add_argument('--dataset', type=str, default='reddit',
choices=('reddit', 'ogbn-products'))
argparser.add_argument('--num-epochs', type=int, default=20)
argparser.add_argument('--num-hidden', type=int, default=16)
argparser.add_argument('--num-layers', type=int, default=2)
argparser.add_argument('--num-negs', type=int, default=1)
argparser.add_argument('--neg-share', default=False, action='store_true',
help="sharing neg nodes for positive nodes")
argparser.add_argument('--fan-out', type=str, default='10,25')
argparser.add_argument('--batch-size', type=int, default=10000)
argparser.add_argument('--log-every', type=int, default=20)
argparser.add_argument('--eval-every', type=int, default=5)
argparser.add_argument('--lr', type=float, default=0.003)
argparser.add_argument('--dropout', type=float, default=0.5)
argparser.add_argument('--num-workers', type=int, default=0,
help="Number of sampling processes. Use 0 for no extra process.")
argparser.add_argument('--graph-device', choices=('cpu', 'gpu', 'uva'), default='cpu',
help="Device to perform the sampling. "
"Must have 0 workers for 'gpu' and 'uva'")
argparser.add_argument('--data-device', choices=('cpu', 'gpu', 'uva'), default='gpu',
help="By default the script puts all node features and labels "
"on GPU when using it to save time for data copy. This may "
"be undesired if they cannot fit in GPU memory at once. "
"Use 'cpu' to keep the features on host memory and "
"'uva' to enable UnifiedTensor (GPU zero-copy access on "
"pinned host memory).")
args = argparser.parse_args()
main(args)
...@@ -39,6 +39,8 @@ We test scalability of the code with dataset "ogbg-molhiv" in a machine of type ...@@ -39,6 +39,8 @@ We test scalability of the code with dataset "ogbg-molhiv" in a machine of type
### Node classification ### Node classification
Run with following on dataset "ogbn-products" Run with following on dataset "ogbn-products"
```bash ```bash
...@@ -49,3 +51,23 @@ python3 multi_gpu_node_classification.py ...@@ -49,3 +51,23 @@ python3 multi_gpu_node_classification.py
``` ```
Test Accuracy: ~0.7632 Test Accuracy: ~0.7632
``` ```
### Link prediction
Run with following (available dataset: "ogbn-products", "reddit")
```bash
python3 multi_gpu_link_prediction.py --dataset ogbn-products
```
#### __Results__
```
Eval F1-score: ~0.7999 Test F1-score: ~0.6383
```
Notably,
* The loss function is defined by predicting whether an edge exists between two nodes or not.
* When computing the score of `(u, v)`, the connections between node `u` and `v` are removed from neighbor sampling.
* The performance of the learned embeddings are measured by training a softmax regression with scikit-learn.
import argparse
import os
import time
import dgl.function as fn
import dgl.nn as dglnn
import numpy as np
import sklearn.linear_model as lm
import sklearn.metrics as skm
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
import tqdm
from dgl.data import AsNodePredDataset, RedditDataset
from dgl.dataloading import (
as_edge_prediction_sampler,
DataLoader,
MultiLayerFullNeighborSampler,
NeighborSampler,
)
from dgl.multiprocessing import shared_tensor
from ogb.nodeproppred import DglNodePropPredDataset
from torch.nn.parallel import DistributedDataParallel
class SAGE(nn.Module):
def __init__(self, in_size, hid_size, out_size):
super().__init__()
self.layers = nn.ModuleList()
# two-layer GraphSAGE-mean
self.layers.append(dglnn.SAGEConv(in_size, hid_size, "mean"))
self.layers.append(dglnn.SAGEConv(hid_size, out_size, "mean"))
self.dropout = nn.Dropout(0.5)
self.hid_size = hid_size
self.out_size = out_size
def forward(self, blocks, x):
h = x
for l, (layer, block) in enumerate(zip(self.layers, blocks)):
h = layer(block, h)
if l != len(self.layers) - 1:
h = F.relu(h)
h = self.dropout(h)
return h
def inference(self, g, device, batch_size, use_uva):
g.ndata["h"] = g.ndata["feat"]
sampler = MultiLayerFullNeighborSampler(1, prefetch_node_feats=["h"])
for l, layer in enumerate(self.layers):
dataloader = DataLoader(
g,
torch.arange(g.num_nodes(), device=device),
sampler,
device=device,
batch_size=batch_size,
shuffle=False,
drop_last=False,
num_workers=0,
use_ddp=True,
use_uva=use_uva,
)
# in order to prevent running out of GPU memory, allocate a
# shared output tensor 'y' in host memory
y = shared_tensor(
(
g.num_nodes(),
self.hid_size
if l != len(self.layers) - 1
else self.out_size,
)
)
for input_nodes, output_nodes, blocks in (
tqdm.tqdm(dataloader) if dist.get_rank() == 0 else dataloader
):
x = blocks[0].srcdata["h"]
h = layer(blocks[0], x) # len(blocks) = 1
if l != len(self.layers) - 1:
h = F.relu(h)
h = self.dropout(h)
# non_blocking (with pinned memory) to accelerate data transfer
y[output_nodes] = h.to(y.device, non_blocking=True)
# make sure all GPUs are done writing to 'y'
dist.barrier()
g.ndata["h"] = y if use_uva else y.to(device)
g.ndata.pop("h")
return y
class NegativeSampler(object):
def __init__(self, g, k, neg_share=False, device=None):
if device is None:
device = g.device
self.weights = g.in_degrees().float().to(device) ** 0.75
self.k = k
self.neg_share = neg_share
def __call__(self, g, eids):
src, _ = g.find_edges(eids)
n = len(src)
if self.neg_share and n % self.k == 0:
dst = self.weights.multinomial(n, replacement=True)
dst = dst.view(-1, 1, self.k).expand(-1, self.k, -1).flatten()
else:
dst = self.weights.multinomial(n * self.k, replacement=True)
src = src.repeat_interleave(self.k)
return src, dst
class CrossEntropyLoss(nn.Module):
def forward(self, block_outputs, pos_graph, neg_graph):
with pos_graph.local_scope():
pos_graph.ndata["h"] = block_outputs
pos_graph.apply_edges(fn.u_dot_v("h", "h", "score"))
pos_score = pos_graph.edata["score"]
with neg_graph.local_scope():
neg_graph.ndata["h"] = block_outputs
neg_graph.apply_edges(fn.u_dot_v("h", "h", "score"))
neg_score = neg_graph.edata["score"]
score = torch.cat([pos_score, neg_score])
label = torch.cat(
[torch.ones_like(pos_score), torch.zeros_like(neg_score)]
).long()
loss = F.binary_cross_entropy_with_logits(score, label.float())
return loss
def compute_acc_unsupervised(emb, labels, train_nids, val_nids, test_nids):
"""
Compute the accuracy of prediction given the labels.
"""
emb = emb.cpu().numpy()
labels = labels.cpu().numpy()
train_nids = train_nids.cpu().numpy()
train_labels = labels[train_nids]
val_nids = val_nids.cpu().numpy()
val_labels = labels[val_nids]
test_nids = test_nids.cpu().numpy()
test_labels = labels[test_nids]
emb = (emb - emb.mean(0, keepdims=True)) / emb.std(0, keepdims=True)
lr = lm.LogisticRegression(multi_class="multinomial", max_iter=10000)
lr.fit(emb[train_nids], train_labels)
pred = lr.predict(emb)
f1_micro_eval = skm.f1_score(val_labels, pred[val_nids], average="micro")
f1_micro_test = skm.f1_score(test_labels, pred[test_nids], average="micro")
return f1_micro_eval, f1_micro_test
def evaluate(proc_id, model, g, device, use_uva):
model.eval()
batch_size = 10000
with torch.no_grad():
pred = model.module.inference(g, device, batch_size, use_uva)
return pred
def train(
proc_id, nprocs, device, g, train_idx, val_idx, test_idx, model, use_uva
):
# Create PyTorch DataLoader for constructing blocks
n_edges = g.num_edges()
train_seeds = torch.arange(n_edges).to(device)
labels = g.ndata["label"].to("cpu")
sampler = NeighborSampler([10, 25], prefetch_node_feats=["feat"])
sampler = as_edge_prediction_sampler(
sampler,
exclude="reverse_id",
# For each edge with ID e in Reddit dataset, the reverse edge is e ± |E|/2.
reverse_eids=torch.cat(
[torch.arange(n_edges // 2, n_edges), torch.arange(0, n_edges // 2)]
).to(train_seeds),
# num_negs = 1, neg_share = False
negative_sampler=NegativeSampler(
g, 1, False, device if use_uva else None
),
)
train_dataloader = DataLoader(
g,
train_seeds,
sampler,
device=device,
batch_size=10000,
shuffle=True,
drop_last=False,
num_workers=0,
use_ddp=True,
use_uva=use_uva,
)
opt = torch.optim.Adam(model.parameters(), lr=0.003)
loss_fcn = CrossEntropyLoss()
iter_pos = []
iter_neg = []
for epoch in range(10):
tic = time.time()
model.train()
for step, (input_nodes, pos_graph, neg_graph, blocks) in enumerate(
train_dataloader
):
x = blocks[0].srcdata["feat"]
y_hat = model(blocks, x)
loss = loss_fcn(y_hat, pos_graph, neg_graph)
opt.zero_grad()
loss.backward()
opt.step()
if step % 20 == 0 and proc_id == 0: # log every 20 steps
# gpu memory reserved by PyTorch
gpu_mem_alloc = (
torch.cuda.max_memory_allocated() / 1000000
if torch.cuda.is_available()
else 0
)
print(
f"Epoch {epoch:05d} | Step {step:05d} | Loss {loss.item():.4f} | GPU {gpu_mem_alloc:.1f} MB"
)
t = time.time() - tic
if proc_id == 0:
print(f"Epoch Time(s): {t:.4f}")
if (epoch + 1) % 5 == 0: # eval every 5 epochs
pred = evaluate(proc_id, model, g, device, use_uva) # in parallel
if proc_id == 0:
# only master proc does the accuracy computation
eval_acc, test_acc = compute_acc_unsupervised(
pred, labels, train_idx, val_idx, test_idx
)
print(
f"Epoch {epoch:05d} | Eval F1-score {eval_acc:.4f} | Test F1-Score {test_acc:.4f}"
)
def run(proc_id, nprocs, devices, g, data, mode):
# find corresponding device for my rank
device = devices[proc_id]
torch.cuda.set_device(device)
# initialize process group and unpack data for sub-processes
dist.init_process_group(
backend="nccl",
init_method="tcp://127.0.0.1:12345",
world_size=nprocs,
rank=proc_id,
)
out_size, train_idx, val_idx, test_idx = data
g = g.to(device if mode == "puregpu" else "cpu")
# create GraphSAGE model (distributed)
in_size = g.ndata["feat"].shape[1]
model = SAGE(in_size, 16, 16).to(device)
model = DistributedDataParallel(
model, device_ids=[device], output_device=device
)
# training + testing
use_uva = mode == "mixed"
train(
proc_id, nprocs, device, g, train_idx, val_idx, test_idx, model, use_uva
)
# cleanup process group
dist.destroy_process_group()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--dataset",
type=str,
default="ogbn-products",
choices=["ogbn-products", "reddit"],
help="name of dataset (default: ogbn-products)",
)
parser.add_argument(
"--mode",
default="mixed",
choices=["mixed", "puregpu"],
help="Training mode. 'mixed' for CPU-GPU mixed training, "
"'puregpu' for pure-GPU training.",
)
parser.add_argument(
"--gpu",
type=str,
default="0",
help="GPU(s) in use. Can be a list of gpu ids for multi-gpu training,"
" e.g., 0,1,2,3.",
)
args = parser.parse_args()
devices = list(map(int, args.gpu.split(",")))
nprocs = len(devices)
assert (
torch.cuda.is_available()
), f"Must have GPUs to enable multi-gpu training."
print(f"Training in {args.mode} mode using {nprocs} GPU(s)")
# load and preprocess dataset
print("Loading data")
if args.dataset == "ogbn-products":
# can it be AsLinkPredDataset?
dataset = AsNodePredDataset(DglNodePropPredDataset("ogbn-products"))
elif args.dataset == "reddit":
dataset = AsNodePredDataset(RedditDataset(self_loop=False))
g = dataset[0]
# avoid creating certain graph formats in each sub-process to save momory
g.create_formats_()
# thread limiting to avoid resource competition
os.environ["OMP_NUM_THREADS"] = str(mp.cpu_count() // 2 // nprocs)
data = (
dataset.num_classes,
dataset.train_idx,
dataset.val_idx,
dataset.test_idx,
)
mp.spawn(run, args=(nprocs, devices, g, data, args.mode), nprocs=nprocs)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment