train_sampling_multi_gpu.py 8.86 KB
Newer Older
1
2
3
4
5
6
import dgl
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
7
import dgl.multiprocessing as mp
8
9
import dgl.nn.pytorch as dglnn
import time
Jinjing Zhou's avatar
Jinjing Zhou committed
10
import math
11
12
13
14
import argparse
from torch.nn.parallel import DistributedDataParallel
import tqdm

15
from model import SAGE
16
from load_graph import load_reddit, inductive_split, load_ogb
17

18
19
20
21
22
23
def compute_acc(pred, labels):
    """
    Compute the accuracy of prediction given the labels.
    """
    return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred)

24
def evaluate(model, g, nfeat, labels, val_nid, device):
25
    """
26
    Evaluate the model on the validation set specified by ``val_nid``.
27
28
29
    g : The entire graph.
    inputs : The features of all the nodes.
    labels : The labels of all the nodes.
30
    val_nid : A node ID tensor indicating which nodes do we actually compute the accuracy for.
31
32
33
34
    device : The GPU device to evaluate on.
    """
    model.eval()
    with th.no_grad():
35
        pred = model.inference(g, nfeat, device, args.batch_size, args.num_workers)
36
    model.train()
37
    return compute_acc(pred[val_nid], labels[val_nid])
38

39
def load_subtensor(nfeat, labels, seeds, input_nodes, dev_id):
40
    """
41
    Extracts features and labels for a subset of nodes.
42
    """
43
    batch_inputs = nfeat[input_nodes].to(dev_id)
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
    batch_labels = labels[seeds].to(dev_id)
    return batch_inputs, batch_labels

#### Entry point

def run(proc_id, n_gpus, args, devices, data):
    # Start up distributed training, if enabled.
    dev_id = devices[proc_id]
    if n_gpus > 1:
        dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
            master_ip='127.0.0.1', master_port='12345')
        world_size = n_gpus
        th.distributed.init_process_group(backend="nccl",
                                          init_method=dist_init_method,
                                          world_size=world_size,
59
                                          rank=proc_id)
60
61
62
    th.cuda.set_device(dev_id)

    # Unpack data
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
    n_classes, train_g, val_g, test_g = data

    if args.inductive:
        train_nfeat = train_g.ndata.pop('features')
        val_nfeat = val_g.ndata.pop('features')
        test_nfeat = test_g.ndata.pop('features')
        train_labels = train_g.ndata.pop('labels')
        val_labels = val_g.ndata.pop('labels')
        test_labels = test_g.ndata.pop('labels')
    else:
        train_nfeat = val_nfeat = test_nfeat = g.ndata.pop('features')
        train_labels = val_labels = test_labels = g.ndata.pop('labels')

    if not args.data_cpu:
        train_nfeat = train_nfeat.to(dev_id)
        train_labels = train_labels.to(dev_id)

    in_feats = train_nfeat.shape[1]

82
83
    train_mask = train_g.ndata['train_mask']
    val_mask = val_g.ndata['val_mask']
84
    test_mask = ~(test_g.ndata['train_mask'] | test_g.ndata['val_mask'])
85
86
87
    train_nid = train_mask.nonzero().squeeze()
    val_nid = val_mask.nonzero().squeeze()
    test_nid = test_mask.nonzero().squeeze()
88

89
    # Create PyTorch DataLoader for constructing blocks
90
    sampler = dgl.dataloading.MultiLayerNeighborSampler(
91
        [int(fanout) for fanout in args.fan_out.split(',')])
92
    dataloader = dgl.dataloading.NodeDataLoader(
93
        train_g,
94
95
        train_nid,
        sampler,
96
        use_ddp=n_gpus > 1,
97
98
99
        batch_size=args.batch_size,
        shuffle=True,
        drop_last=False,
100
        num_workers=args.num_workers)
101
102

    # Define model and optimizer
103
    model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout)
104
105
106
107
108
109
110
111
112
113
    model = model.to(dev_id)
    if n_gpus > 1:
        model = DistributedDataParallel(model, device_ids=[dev_id], output_device=dev_id)
    loss_fcn = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=args.lr)

    # Training loop
    avg = 0
    iter_tput = []
    for epoch in range(args.num_epochs):
114
115
        if n_gpus > 1:
            dataloader.set_epoch(epoch)
116
        tic = time.time()
117
118
119

        # Loop over the dataloader to sample the computation dependency graph as a list of
        # blocks.
120
        for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
121
122
123
124
            if proc_id == 0:
                tic_step = time.time()

            # Load the input features as well as output labels
125
126
            batch_inputs, batch_labels = load_subtensor(train_nfeat, train_labels,
                                                        seeds, input_nodes, dev_id)
127
            blocks = [block.int().to(dev_id) for block in blocks]
128
129
130
131
132
133
134
135
136
137
138
            # Compute loss and prediction
            batch_pred = model(blocks, batch_inputs)
            loss = loss_fcn(batch_pred, batch_labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if proc_id == 0:
                iter_tput.append(len(seeds) * n_gpus / (time.time() - tic_step))
            if step % args.log_every == 0 and proc_id == 0:
                acc = compute_acc(batch_pred, batch_labels)
maqy1995's avatar
maqy1995 committed
139
                print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MB'.format(
140
                    epoch, step, loss.item(), acc.item(), np.mean(iter_tput[3:]), th.cuda.max_memory_allocated() / 1000000))
141
142
143
144
145
146
147
148
149
150

        if n_gpus > 1:
            th.distributed.barrier()

        toc = time.time()
        if proc_id == 0:
            print('Epoch Time(s): {:.4f}'.format(toc - tic))
            if epoch >= 5:
                avg += toc - tic
            if epoch % args.eval_every == 0 and epoch != 0:
151
                if n_gpus == 1:
152
                    eval_acc = evaluate(
153
                        model, val_g, val_nfeat, val_labels, val_nid, devices[0])
154
                    test_acc = evaluate(
155
                        model, test_g, test_nfeat, test_labels, test_nid, devices[0])
156
                else:
157
                    eval_acc = evaluate(
158
                        model.module, val_g, val_nfeat, val_labels, val_nid, devices[0])
159
                    test_acc = evaluate(
160
                        model.module, test_g, test_nfeat, test_labels, test_nid, devices[0])
161
                print('Eval Acc {:.4f}'.format(eval_acc))
162
                print('Test Acc: {:.4f}'.format(test_acc))
163

164

165
166
167
168
169
170
171
    if n_gpus > 1:
        th.distributed.barrier()
    if proc_id == 0:
        print('Avg epoch time: {}'.format(avg / (epoch - 4)))

if __name__ == '__main__':
    argparser = argparse.ArgumentParser("multi-gpu training")
172
    argparser.add_argument('--gpu', type=str, default='0',
173
                           help="Comma separated list of GPU device IDs.")
174
    argparser.add_argument('--dataset', type=str, default='reddit')
175
176
177
    argparser.add_argument('--num-epochs', type=int, default=20)
    argparser.add_argument('--num-hidden', type=int, default=16)
    argparser.add_argument('--num-layers', type=int, default=2)
178
    argparser.add_argument('--fan-out', type=str, default='10,25')
179
180
181
182
    argparser.add_argument('--batch-size', type=int, default=1000)
    argparser.add_argument('--log-every', type=int, default=20)
    argparser.add_argument('--eval-every', type=int, default=5)
    argparser.add_argument('--lr', type=float, default=0.003)
183
184
    argparser.add_argument('--dropout', type=float, default=0.5)
    argparser.add_argument('--num-workers', type=int, default=0,
185
                           help="Number of sampling processes. Use 0 for no extra process.")
186
    argparser.add_argument('--inductive', action='store_true',
187
188
189
190
191
192
                           help="Inductive learning setting")
    argparser.add_argument('--data-cpu', action='store_true',
                           help="By default the script puts all node features and labels "
                                "on GPU when using it to save time for data copy. This may "
                                "be undesired if they cannot fit in GPU memory at once. "
                                "This flag disables that.")
193
194
195
196
197
    args = argparser.parse_args()
    
    devices = list(map(int, args.gpu.split(',')))
    n_gpus = len(devices)

198
199
200
201
202
203
204
    if args.dataset == 'reddit':
        g, n_classes = load_reddit()
    elif args.dataset == 'ogbn-products':
        g, n_classes = load_ogb('ogbn-products')
    else:
        raise Exception('unknown dataset')

205
    # Construct graph
206
207
208
209
210
211
212
    g = dgl.as_heterograph(g)

    if args.inductive:
        train_g, val_g, test_g = inductive_split(g)
    else:
        train_g = val_g = test_g = g

213
214
    # Create csr/coo/csc formats before launching training processes with multi-gpu.
    # This avoids creating certain formats in each sub-process, which saves momory and CPU.
215
216
217
    train_g.create_formats_()
    val_g.create_formats_()
    test_g.create_formats_()
218
    # Pack data
219
    data = n_classes, train_g, val_g, test_g
220
221
222
223
224
225

    if n_gpus == 1:
        run(0, n_gpus, args, devices, data)
    else:
        procs = []
        for proc_id in range(n_gpus):
226
            p = mp.Process(target=run, args=(proc_id, n_gpus, args, devices, data))
227
228
229
230
            p.start()
            procs.append(p)
        for p in procs:
            p.join()