import dgl import numpy as np import torch as th import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import torch.multiprocessing as mp from torch.utils.data import DataLoader import dgl.function as fn import dgl.nn.pytorch as dglnn import time import argparse from _thread import start_new_thread from functools import wraps from dgl.data import RedditDataset import tqdm import traceback from ogb.nodeproppred import DglNodePropPredDataset from functools import partial from sampler import ClusterIter, subgraph_collate_fn #### Neighbor sampler class SAGE(nn.Module): def __init__(self, in_feats, n_hidden, n_classes, n_layers, activation, dropout): super().__init__() self.n_layers = n_layers self.n_hidden = n_hidden self.n_classes = n_classes self.layers = nn.ModuleList() self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean')) for i in range(1, n_layers - 1): self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean')) self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean')) self.dropout = nn.Dropout(dropout) self.activation = activation def forward(self, g, x): h = x for l, conv in enumerate(self.layers): h = conv(g, h) if l != len(self.layers) - 1: h = self.activation(h) h = self.dropout(h) return h def inference(self, g, x, batch_size, device): """ Inference with the GraphSAGE model on full neighbors (i.e. without neighbor sampling). g : the entire graph. x : the input of entire node set. The inference code is written in a fashion that it could handle any number of nodes and layers. """ # During inference with sampling, multi-layer blocks are very inefficient because # lots of computations in the first few layers are repeated. # Therefore, we compute the representation of all nodes layer by layer. The nodes # on each layer are of course splitted in batches. # TODO: can we standardize this? h = x for l, conv in enumerate(self.layers): h = conv(g, h) if l != len(self.layers) - 1: h = self.activation(h) return h def compute_acc(pred, labels): """ Compute the accuracy of prediction given the labels. """ return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred) def evaluate(model, g, labels, val_nid, test_nid, batch_size, device): """ Evaluate the model on the validation set specified by ``val_mask``. g : The entire graph. inputs : The features of all the nodes. labels : The labels of all the nodes. val_mask : A 0-1 mask indicating which nodes do we actually compute the accuracy for. batch_size : Number of nodes to compute at the same time. device : The GPU device to evaluate on. """ model.eval() with th.no_grad(): inputs = g.ndata['feat'] model = model.cpu() pred = model.inference(g, inputs, batch_size, device) model.train() return compute_acc(pred[val_nid], labels[val_nid]), compute_acc(pred[test_nid], labels[test_nid]), pred def load_subtensor(g, labels, seeds, input_nodes, device): """ Copys features and labels of a set of nodes onto GPU. """ batch_inputs = g.ndata['feat'][input_nodes].to(device) batch_labels = labels[seeds].to(device) return batch_inputs, batch_labels #### Entry point def run(args, device, data): # Unpack data train_nid, val_nid, test_nid, in_feats, labels, n_classes, g, cluster_iterator = data # Define model and optimizer model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout) model = model.to(device) loss_fcn = nn.CrossEntropyLoss() loss_fcn = loss_fcn.to(device) optimizer = optim.Adam(model.parameters(), lr=args.lr, weight_decay=args.wd) # Training loop avg = 0 iter_tput = [] best_eval_acc = 0 best_test_acc = 0 for epoch in range(args.num_epochs): iter_load = 0 iter_far = 0 iter_back = 0 iter_tl = 0 tic = time.time() # Loop over the dataloader to sample the computation dependency graph as a list of # blocks. tic_start = time.time() for step, cluster in enumerate(cluster_iterator): cluster = cluster.int().to(device) mask = cluster.ndata['train_mask'].to(device) if mask.sum() == 0: continue feat = cluster.ndata['feat'].to(device) batch_labels = cluster.ndata['labels'].to(device) tic_step = time.time() batch_pred = model(cluster, feat) batch_pred = batch_pred[mask] batch_labels = batch_labels[mask] loss = loss_fcn(batch_pred, batch_labels) optimizer.zero_grad() tic_far = time.time() loss.backward() optimizer.step() tic_back = time.time() iter_load += (tic_step - tic_start) iter_far += (tic_far - tic_step) iter_back += (tic_back - tic_far) tic_start = time.time() if step % args.log_every == 0: acc = compute_acc(batch_pred, batch_labels) gpu_mem_alloc = th.cuda.max_memory_allocated() / 1000000 if th.cuda.is_available() else 0 print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | GPU {:.1f} MB'.format( epoch, step, loss.item(), acc.item(), gpu_mem_alloc)) toc = time.time() print('Epoch Time(s): {:.4f} Load {:.4f} Forward {:.4f} Backward {:.4f}'.format(toc - tic, iter_load, iter_far, iter_back)) if epoch >= 5: avg += toc - tic if epoch % args.eval_every == 0 and epoch != 0: eval_acc, test_acc, pred = evaluate(model, g, labels, val_nid, test_nid, args.val_batch_size, device) model = model.to(device) if args.save_pred: np.savetxt(args.save_pred + '%02d' % epoch, pred.argmax(1).cpu().numpy(), '%d') print('Eval Acc {:.4f}'.format(eval_acc)) if eval_acc > best_eval_acc: best_eval_acc = eval_acc best_test_acc = test_acc print('Best Eval Acc {:.4f} Test Acc {:.4f}'.format(best_eval_acc, best_test_acc)) print('Avg epoch time: {}'.format(avg / (epoch - 4))) return best_test_acc if __name__ == '__main__': argparser = argparse.ArgumentParser("multi-gpu training") argparser.add_argument('--gpu', type=int, default=0, help="GPU device ID. Use -1 for CPU training") argparser.add_argument('--num-epochs', type=int, default=30) argparser.add_argument('--num-hidden', type=int, default=256) argparser.add_argument('--num-layers', type=int, default=3) argparser.add_argument('--batch-size', type=int, default=32) argparser.add_argument('--val-batch-size', type=int, default=10000) argparser.add_argument('--log-every', type=int, default=20) argparser.add_argument('--eval-every', type=int, default=1) argparser.add_argument('--lr', type=float, default=0.001) argparser.add_argument('--dropout', type=float, default=0.5) argparser.add_argument('--save-pred', type=str, default='') argparser.add_argument('--wd', type=float, default=0) argparser.add_argument('--num_partitions', type=int, default=15000) args = argparser.parse_args() if args.gpu >= 0: device = th.device('cuda:%d' % args.gpu) else: device = th.device('cpu') # load reddit data data = DglNodePropPredDataset(name='ogbn-products') splitted_idx = data.get_idx_split() train_idx, val_idx, test_idx = splitted_idx['train'], splitted_idx['valid'], splitted_idx['test'] graph, labels = data[0] labels = labels[:, 0] num_nodes = train_idx.shape[0] + val_idx.shape[0] + test_idx.shape[0] assert num_nodes == graph.number_of_nodes() graph.ndata['labels'] = labels mask = th.zeros(num_nodes, dtype=th.bool) mask[train_idx] = True graph.ndata['train_mask'] = mask mask = th.zeros(num_nodes, dtype=th.bool) mask[val_idx] = True graph.ndata['valid_mask'] = mask mask = th.zeros(num_nodes, dtype=th.bool) mask[test_idx] = True graph.ndata['test_mask'] = mask graph.in_degree(0) graph.out_degree(0) graph.find_edges(0) cluster_iter_data = ClusterIter( 'ogbn-products', graph, args.num_partitions, args.batch_size, th.cat([train_idx, val_idx, test_idx])) idx = th.arange(args.num_partitions // args.batch_size) cluster_iterator = DataLoader(cluster_iter_data, batch_size=32, shuffle=True, pin_memory=True, num_workers=4, collate_fn=partial(subgraph_collate_fn, graph)) in_feats = graph.ndata['feat'].shape[1] print(in_feats) n_classes = (labels.max() + 1).item() # Pack data data = train_idx, val_idx, test_idx, in_feats, labels, n_classes, graph, cluster_iterator # Run 10 times test_accs = [] for i in range(10): test_accs.append(run(args, device, data)) print('Average test accuracy:', np.mean(test_accs), '±', np.std(test_accs))