import dgl import numpy as np import torch as th import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import torch.multiprocessing as mp from torch.utils.data import DataLoader import dgl.function as fn import dgl.nn.pytorch as dglnn import time import argparse from dgl.data import RedditDataset import tqdm import traceback from ogb.nodeproppred import DglNodePropPredDataset from functools import partial from sampler import ClusterIter, subgraph_collate_fn #### Neighbor sampler class SAGE(nn.Module): def __init__(self, in_feats, n_hidden, n_classes, n_layers, activation, dropout): super().__init__() self.n_layers = n_layers self.n_hidden = n_hidden self.n_classes = n_classes self.layers = nn.ModuleList() self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean')) for i in range(1, n_layers - 1): self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean')) self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean')) self.dropout = nn.Dropout(dropout) self.activation = activation def forward(self, g, x): h = x for l, conv in enumerate(self.layers): h = conv(g, h) if l != len(self.layers) - 1: h = self.activation(h) h = self.dropout(h) return h def inference(self, g, x, batch_size, device): """ Inference with the GraphSAGE model on full neighbors (i.e. without neighbor sampling). g : the entire graph. x : the input of entire node set. The inference code is written in a fashion that it could handle any number of nodes and layers. """ # During inference with sampling, multi-layer blocks are very inefficient because # lots of computations in the first few layers are repeated. # Therefore, we compute the representation of all nodes layer by layer. The nodes # on each layer are of course splitted in batches. # TODO: can we standardize this? h = x for l, conv in enumerate(self.layers): h = conv(g, h) if l != len(self.layers) - 1: h = self.activation(h) return h def compute_acc(pred, labels): """ Compute the accuracy of prediction given the labels. """ return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred) def evaluate(model, g, labels, val_nid, test_nid, batch_size, device): """ Evaluate the model on the validation set specified by ``val_mask``. g : The entire graph. inputs : The features of all the nodes. labels : The labels of all the nodes. val_mask : A 0-1 mask indicating which nodes do we actually compute the accuracy for. batch_size : Number of nodes to compute at the same time. device : The GPU device to evaluate on. """ model.eval() with th.no_grad(): inputs = g.ndata['feat'] model = model.cpu() pred = model.inference(g, inputs, batch_size, device) model.train() return compute_acc(pred[val_nid], labels[val_nid]), compute_acc(pred[test_nid], labels[test_nid]), pred def load_subtensor(g, labels, seeds, input_nodes, device): """ Copys features and labels of a set of nodes onto GPU. """ batch_inputs = g.ndata['feat'][input_nodes].to(device) batch_labels = labels[seeds].to(device) return batch_inputs, batch_labels #### Entry point def run(args, device, data): # Unpack data train_nid, val_nid, test_nid, in_feats, labels, n_classes, g, cluster_iterator = data # Define model and optimizer model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout) model = model.to(device) loss_fcn = nn.CrossEntropyLoss() loss_fcn = loss_fcn.to(device) optimizer = optim.Adam(model.parameters(), lr=args.lr, weight_decay=args.wd) # Training loop avg = 0 iter_tput = [] best_eval_acc = 0 best_test_acc = 0 for epoch in range(args.num_epochs): iter_load = 0 iter_far = 0 iter_back = 0 iter_tl = 0 tic = time.time() # Loop over the dataloader to sample the computation dependency graph as a list of # blocks. tic_start = time.time() for step, cluster in enumerate(cluster_iterator): cluster = cluster.int().to(device) mask = cluster.ndata['train_mask'].to(device) if mask.sum() == 0: continue feat = cluster.ndata['feat'].to(device) batch_labels = cluster.ndata['labels'].to(device) tic_step = time.time() batch_pred = model(cluster, feat) batch_pred = batch_pred[mask] batch_labels = batch_labels[mask] loss = loss_fcn(batch_pred, batch_labels) optimizer.zero_grad() tic_far = time.time() loss.backward() optimizer.step() tic_back = time.time() iter_load += (tic_step - tic_start) iter_far += (tic_far - tic_step) iter_back += (tic_back - tic_far) tic_start = time.time() if step % args.log_every == 0: acc = compute_acc(batch_pred, batch_labels) gpu_mem_alloc = th.cuda.max_memory_allocated() / 1000000 if th.cuda.is_available() else 0 print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | GPU {:.1f} MB'.format( epoch, step, loss.item(), acc.item(), gpu_mem_alloc)) toc = time.time() print('Epoch Time(s): {:.4f} Load {:.4f} Forward {:.4f} Backward {:.4f}'.format(toc - tic, iter_load, iter_far, iter_back)) if epoch >= 5: avg += toc - tic if epoch % args.eval_every == 0 and epoch != 0: eval_acc, test_acc, pred = evaluate(model, g, labels, val_nid, test_nid, args.val_batch_size, device) model = model.to(device) if args.save_pred: np.savetxt(args.save_pred + '%02d' % epoch, pred.argmax(1).cpu().numpy(), '%d') print('Eval Acc {:.4f}'.format(eval_acc)) if eval_acc > best_eval_acc: best_eval_acc = eval_acc best_test_acc = test_acc print('Best Eval Acc {:.4f} Test Acc {:.4f}'.format(best_eval_acc, best_test_acc)) print('Avg epoch time: {}'.format(avg / (epoch - 4))) return best_test_acc if __name__ == '__main__': argparser = argparse.ArgumentParser("multi-gpu training") argparser.add_argument('--gpu', type=int, default=0, help="GPU device ID. Use -1 for CPU training") argparser.add_argument('--num-epochs', type=int, default=30) argparser.add_argument('--num-hidden', type=int, default=256) argparser.add_argument('--num-layers', type=int, default=3) argparser.add_argument('--batch-size', type=int, default=32) argparser.add_argument('--val-batch-size', type=int, default=10000) argparser.add_argument('--log-every', type=int, default=20) argparser.add_argument('--eval-every', type=int, default=1) argparser.add_argument('--lr', type=float, default=0.001) argparser.add_argument('--dropout', type=float, default=0.5) argparser.add_argument('--save-pred', type=str, default='') argparser.add_argument('--wd', type=float, default=0) argparser.add_argument('--num_partitions', type=int, default=15000) args = argparser.parse_args() if args.gpu >= 0: device = th.device('cuda:%d' % args.gpu) else: device = th.device('cpu') # load ogbn-products data data = DglNodePropPredDataset(name='ogbn-products') splitted_idx = data.get_idx_split() train_idx, val_idx, test_idx = splitted_idx['train'], splitted_idx['valid'], splitted_idx['test'] graph, labels = data[0] labels = labels[:, 0] num_nodes = train_idx.shape[0] + val_idx.shape[0] + test_idx.shape[0] assert num_nodes == graph.number_of_nodes() graph.ndata['labels'] = labels mask = th.zeros(num_nodes, dtype=th.bool) mask[train_idx] = True graph.ndata['train_mask'] = mask mask = th.zeros(num_nodes, dtype=th.bool) mask[val_idx] = True graph.ndata['valid_mask'] = mask mask = th.zeros(num_nodes, dtype=th.bool) mask[test_idx] = True graph.ndata['test_mask'] = mask graph.in_degree(0) graph.out_degree(0) graph.find_edges(0) cluster_iter_data = ClusterIter( 'ogbn-products', graph, args.num_partitions, args.batch_size, th.cat([train_idx, val_idx, test_idx])) idx = th.arange(args.num_partitions // args.batch_size) cluster_iterator = DataLoader(cluster_iter_data, batch_size=32, shuffle=True, pin_memory=True, num_workers=4, collate_fn=partial(subgraph_collate_fn, graph)) in_feats = graph.ndata['feat'].shape[1] print(in_feats) n_classes = (labels.max() + 1).item() # Pack data data = train_idx, val_idx, test_idx, in_feats, labels, n_classes, graph, cluster_iterator # Run 10 times test_accs = [] for i in range(10): test_accs.append(run(args, device, data)) print('Average test accuracy:', np.mean(test_accs), '±', np.std(test_accs))