import dgl import numpy as np import torch as th import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import dgl.nn.pytorch as dglnn import time import argparse import tqdm from load_graph import load_reddit, inductive_split, load_ogb class SAGE(nn.Module): def __init__(self, in_feats, n_hidden, n_classes, n_layers, activation, dropout): super().__init__() self.n_layers = n_layers self.n_hidden = n_hidden self.n_classes = n_classes self.layers = nn.ModuleList() self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean')) for i in range(1, n_layers - 1): self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean')) self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean')) self.dropout = nn.Dropout(dropout) self.activation = activation def forward(self, blocks, x): h = x for l, (layer, block) in enumerate(zip(self.layers, blocks)): h = layer(block, h) if l != len(self.layers) - 1: h = self.activation(h) h = self.dropout(h) return h def inference(self, g, x, device): """ Inference with the GraphSAGE model on full neighbors (i.e. without neighbor sampling). g : the entire graph. x : the input of entire node set. The inference code is written in a fashion that it could handle any number of nodes and layers. """ # During inference with sampling, multi-layer blocks are very inefficient because # lots of computations in the first few layers are repeated. # Therefore, we compute the representation of all nodes layer by layer. The nodes # on each layer are of course splitted in batches. # TODO: can we standardize this? for l, layer in enumerate(self.layers): y = th.zeros(g.num_nodes(), self.n_hidden if l != len(self.layers) - 1 else self.n_classes) sampler = dgl.dataloading.MultiLayerFullNeighborSampler(1) dataloader = dgl.dataloading.NodeDataLoader( g, th.arange(g.num_nodes()), sampler, batch_size=args.batch_size, shuffle=True, drop_last=False, num_workers=args.num_workers) for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader): block = blocks[0] block = block.int().to(device) h = x[input_nodes].to(device) h = layer(block, h) if l != len(self.layers) - 1: h = self.activation(h) h = self.dropout(h) y[output_nodes] = h.cpu() x = y return y def compute_acc(pred, labels): """ Compute the accuracy of prediction given the labels. """ labels = labels.long() return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred) def evaluate(model, g, nfeat, labels, val_nid, device): """ Evaluate the model on the validation set specified by ``val_nid``. g : The entire graph. inputs : The features of all the nodes. labels : The labels of all the nodes. val_nid : the node Ids for validation. device : The GPU device to evaluate on. """ model.eval() with th.no_grad(): pred = model.inference(g, nfeat, device) model.train() return compute_acc(pred[val_nid], labels[val_nid].to(pred.device)) def load_subtensor(nfeat, labels, seeds, input_nodes, device): """ Extracts features and labels for a subset of nodes """ batch_inputs = nfeat[input_nodes].to(device) batch_labels = labels[seeds].to(device) return batch_inputs, batch_labels #### Entry point def run(args, device, data): # Unpack data n_classes, train_g, val_g, test_g, train_nfeat, train_labels, \ val_nfeat, val_labels, test_nfeat, test_labels = data in_feats = train_nfeat.shape[1] train_nid = th.nonzero(train_g.ndata['train_mask'], as_tuple=True)[0] val_nid = th.nonzero(val_g.ndata['val_mask'], as_tuple=True)[0] test_nid = th.nonzero(~(test_g.ndata['train_mask'] | test_g.ndata['val_mask']), as_tuple=True)[0] dataloader_device = th.device('cpu') if args.sample_gpu: train_nid = train_nid.to(device) # copy only the csc to the GPU train_g = train_g.formats(['csc']) train_g = train_g.to(device) dataloader_device = device # Create PyTorch DataLoader for constructing blocks sampler = dgl.dataloading.MultiLayerNeighborSampler( [int(fanout) for fanout in args.fan_out.split(',')]) dataloader = dgl.dataloading.NodeDataLoader( train_g, train_nid, sampler, device=dataloader_device, batch_size=args.batch_size, shuffle=True, drop_last=False, num_workers=args.num_workers) # Define model and optimizer model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout) model = model.to(device) loss_fcn = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=args.lr) # Training loop avg = 0 iter_tput = [] for epoch in range(args.num_epochs): tic = time.time() # Loop over the dataloader to sample the computation dependency graph as a list of # blocks. tic_step = time.time() for step, (input_nodes, seeds, blocks) in enumerate(dataloader): # Load the input features as well as output labels batch_inputs, batch_labels = load_subtensor(train_nfeat, train_labels, seeds, input_nodes, device) blocks = [block.int().to(device) for block in blocks] # Compute loss and prediction batch_pred = model(blocks, batch_inputs) loss = loss_fcn(batch_pred, batch_labels) optimizer.zero_grad() loss.backward() optimizer.step() iter_tput.append(len(seeds) / (time.time() - tic_step)) if step % args.log_every == 0: acc = compute_acc(batch_pred, batch_labels) gpu_mem_alloc = th.cuda.max_memory_allocated() / 1000000 if th.cuda.is_available() else 0 print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MB'.format( epoch, step, loss.item(), acc.item(), np.mean(iter_tput[3:]), gpu_mem_alloc)) tic_step = time.time() toc = time.time() print('Epoch Time(s): {:.4f}'.format(toc - tic)) if epoch >= 5: avg += toc - tic if epoch % args.eval_every == 0 and epoch != 0: eval_acc = evaluate(model, val_g, val_nfeat, val_labels, val_nid, device) print('Eval Acc {:.4f}'.format(eval_acc)) test_acc = evaluate(model, test_g, test_nfeat, test_labels, test_nid, device) print('Test Acc: {:.4f}'.format(test_acc)) print('Avg epoch time: {}'.format(avg / (epoch - 4))) if __name__ == '__main__': argparser = argparse.ArgumentParser("multi-gpu training") argparser.add_argument('--gpu', type=int, default=0, help="GPU device ID. Use -1 for CPU training") argparser.add_argument('--dataset', type=str, default='reddit') argparser.add_argument('--num-epochs', type=int, default=20) argparser.add_argument('--num-hidden', type=int, default=16) argparser.add_argument('--num-layers', type=int, default=2) argparser.add_argument('--fan-out', type=str, default='10,25') argparser.add_argument('--batch-size', type=int, default=1000) argparser.add_argument('--log-every', type=int, default=20) argparser.add_argument('--eval-every', type=int, default=5) argparser.add_argument('--lr', type=float, default=0.003) argparser.add_argument('--dropout', type=float, default=0.5) argparser.add_argument('--num-workers', type=int, default=4, help="Number of sampling processes. Use 0 for no extra process.") argparser.add_argument('--sample-gpu', action='store_true', help="Perform the sampling process on the GPU. Must have 0 workers.") argparser.add_argument('--inductive', action='store_true', help="Inductive learning setting") argparser.add_argument('--data-cpu', action='store_true', help="By default the script puts all node features and labels " "on GPU when using it to save time for data copy. This may " "be undesired if they cannot fit in GPU memory at once. " "This flag disables that.") args = argparser.parse_args() if args.gpu >= 0: device = th.device('cuda:%d' % args.gpu) else: device = th.device('cpu') if args.dataset == 'reddit': g, n_classes = load_reddit() elif args.dataset == 'ogbn-products': g, n_classes = load_ogb('ogbn-products') else: raise Exception('unknown dataset') if args.inductive: train_g, val_g, test_g = inductive_split(g) train_nfeat = train_g.ndata.pop('features') val_nfeat = val_g.ndata.pop('features') test_nfeat = test_g.ndata.pop('features') train_labels = train_g.ndata.pop('labels') val_labels = val_g.ndata.pop('labels') test_labels = test_g.ndata.pop('labels') else: train_g = val_g = test_g = g train_nfeat = val_nfeat = test_nfeat = g.ndata.pop('features') train_labels = val_labels = test_labels = g.ndata.pop('labels') if not args.data_cpu: train_nfeat = train_nfeat.to(device) train_labels = train_labels.to(device) # Pack data data = n_classes, train_g, val_g, test_g, train_nfeat, train_labels, \ val_nfeat, val_labels, test_nfeat, test_labels run(args, device, data)