import dgl import torch as th import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import torch.multiprocessing as mp from torch.utils.data import DataLoader import dgl.nn.pytorch as dglnn import time from .. import utils class SAGE(nn.Module): def __init__(self, in_feats, n_hidden, n_classes, n_layers, activation, dropout): super().__init__() self.n_layers = n_layers self.n_hidden = n_hidden self.n_classes = n_classes self.layers = nn.ModuleList() self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean')) for i in range(1, n_layers - 1): self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean')) self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean')) self.dropout = nn.Dropout(dropout) self.activation = activation def forward(self, blocks, x): h = x for l, (layer, block) in enumerate(zip(self.layers, blocks)): h = layer(block, h) if l != len(self.layers) - 1: h = self.activation(h) h = self.dropout(h) return h def inference(self, g, x, batch_size, device): """ Inference with the GraphSAGE model on full neighbors (i.e. without neighbor sampling). g : the entire graph. x : the input of entire node set. The inference code is written in a fashion that it could handle any number of nodes and layers. """ # During inference with sampling, multi-layer blocks are very inefficient because # lots of computations in the first few layers are repeated. # Therefore, we compute the representation of all nodes layer by layer. The nodes # on each layer are of course splitted in batches. # TODO: can we standardize this? for l, layer in enumerate(self.layers): y = th.zeros(g.number_of_nodes(), self.n_hidden if l != len(self.layers) - 1 else self.n_classes) sampler = dgl.dataloading.MultiLayerFullNeighborSampler(1) dataloader = dgl.dataloading.NodeDataLoader( g, th.arange(g.number_of_nodes()), sampler, batch_size=batch_size, shuffle=True, drop_last=False, num_workers=4) for input_nodes, output_nodes, blocks in dataloader: block = blocks[0] block = block.int().to(device) h = x[input_nodes].to(device) h = layer(block, h) if l != len(self.layers) - 1: h = self.activation(h) h = self.dropout(h) y[output_nodes] = h.cpu() x = y return y def compute_acc(pred, labels): """ Compute the accuracy of prediction given the labels. """ labels = labels.long() return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred) def evaluate(model, g, inputs, labels, val_nid, batch_size, device): """ Evaluate the model on the validation set specified by ``val_nid``. g : The entire graph. inputs : The features of all the nodes. labels : The labels of all the nodes. val_nid : the node Ids for validation. batch_size : Number of nodes to compute at the same time. device : The GPU device to evaluate on. """ model.eval() with th.no_grad(): pred = model.inference(g, inputs, batch_size, device) model.train() return compute_acc(pred[val_nid], labels[val_nid]) def load_subtensor(g, seeds, input_nodes, device): """ Copys features and labels of a set of nodes onto GPU. """ batch_inputs = g.ndata['features'][input_nodes].to(device) batch_labels = g.ndata['labels'][seeds].to(device) return batch_inputs, batch_labels @utils.benchmark('acc', 3600) @utils.parametrize('data', ['ogbn-products', "reddit"]) def track_acc(data): data = utils.process_data(data) device = utils.get_bench_device() g = data[0] g.ndata['features'] = g.ndata['feat'] g.ndata['labels'] = g.ndata['label'] in_feats = g.ndata['features'].shape[1] n_classes = data.num_classes # Create csr/coo/csc formats before launching training processes with multi-gpu. # This avoids creating certain formats in each sub-process, which saves momory and CPU. g.create_formats_() num_epochs = 20 num_hidden = 16 num_layers = 2 fan_out = '5,10' batch_size = 1024 lr = 0.003 dropout = 0.5 num_workers = 4 train_nid = th.nonzero(g.ndata['train_mask'], as_tuple=True)[0] # Create PyTorch DataLoader for constructing blocks sampler = dgl.dataloading.MultiLayerNeighborSampler( [int(fanout) for fanout in fan_out.split(',')]) dataloader = dgl.dataloading.NodeDataLoader( g, train_nid, sampler, batch_size=batch_size, shuffle=True, drop_last=False, num_workers=num_workers) # Define model and optimizer model = SAGE(in_feats, num_hidden, n_classes, num_layers, F.relu, dropout) model = model.to(device) loss_fcn = nn.CrossEntropyLoss() loss_fcn = loss_fcn.to(device) optimizer = optim.Adam(model.parameters(), lr=lr) # dry run one epoch for step, (input_nodes, seeds, blocks) in enumerate(dataloader): # Load the input features as well as output labels #batch_inputs, batch_labels = load_subtensor(g, seeds, input_nodes, device) blocks = [block.int().to(device) for block in blocks] batch_inputs = blocks[0].srcdata['features'] batch_labels = blocks[-1].dstdata['labels'] # Compute loss and prediction batch_pred = model(blocks, batch_inputs) loss = loss_fcn(batch_pred, batch_labels) optimizer.zero_grad() loss.backward() optimizer.step() # Training loop for epoch in range(num_epochs): # Loop over the dataloader to sample the computation dependency graph as a list of # blocks. for step, (input_nodes, seeds, blocks) in enumerate(dataloader): # Load the input features as well as output labels #batch_inputs, batch_labels = load_subtensor(g, seeds, input_nodes, device) blocks = [block.int().to(device) for block in blocks] batch_inputs = blocks[0].srcdata['features'] batch_labels = blocks[-1].dstdata['labels'] # Compute loss and prediction batch_pred = model(blocks, batch_inputs) loss = loss_fcn(batch_pred, batch_labels) optimizer.zero_grad() loss.backward() optimizer.step() test_g = g test_nid = th.nonzero( ~(test_g.ndata['train_mask'] | test_g.ndata['val_mask']), as_tuple=True)[0] test_acc = evaluate( model, test_g, test_g.ndata['features'], test_g.ndata['labels'], test_nid, batch_size, device) return test_acc.item()