train_sampling_multi_gpu.py 11.1 KB
Newer Older
1
import os
2
3
4
5
6
7
import dgl
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
8
import dgl.multiprocessing as mp
9
10
import dgl.nn.pytorch as dglnn
import time
Jinjing Zhou's avatar
Jinjing Zhou committed
11
import math
12
13
14
15
import argparse
from torch.nn.parallel import DistributedDataParallel
import tqdm

16
from model import SAGE
17
from load_graph import load_reddit, inductive_split, load_ogb
18

19
20
21
22
23
24
def compute_acc(pred, labels):
    """
    Compute the accuracy of prediction given the labels.
    """
    return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred)

25
def evaluate(model, g, nfeat, labels, val_nid, device):
26
    """
27
    Evaluate the model on the validation set specified by ``val_nid``.
28
29
30
    g : The entire graph.
    inputs : The features of all the nodes.
    labels : The labels of all the nodes.
31
    val_nid : A node ID tensor indicating which nodes do we actually compute the accuracy for.
32
33
34
35
    device : The GPU device to evaluate on.
    """
    model.eval()
    with th.no_grad():
36
        pred = model.inference(g, nfeat, device, args.batch_size, args.num_workers)
37
    model.train()
38
    return compute_acc(pred[val_nid], labels[val_nid])
39

40
def load_subtensor(nfeat, labels, seeds, input_nodes, dev_id):
41
    """
42
    Extracts features and labels for a subset of nodes.
43
    """
44
    batch_inputs = nfeat[input_nodes].to(dev_id)
45
46
47
48
49
50
51
    batch_labels = labels[seeds].to(dev_id)
    return batch_inputs, batch_labels

#### Entry point

def run(proc_id, n_gpus, args, devices, data):
    # Start up distributed training, if enabled.
52
53
54
    device = th.device(devices[proc_id])
    if n_gpus > 0:
        th.cuda.set_device(device)
55
56
57
58
59
60
61
    if n_gpus > 1:
        dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
            master_ip='127.0.0.1', master_port='12345')
        world_size = n_gpus
        th.distributed.init_process_group(backend="nccl",
                                          init_method=dist_init_method,
                                          world_size=world_size,
62
                                          rank=proc_id)
63
64

    # Unpack data
65
66
    n_classes, train_g, val_g, test_g, train_nfeat, val_nfeat, test_nfeat, \
    train_labels, val_labels, test_labels, train_nid, val_nid, test_nid = data
67

68
69
70
71
72
73
    if args.data_device == 'gpu':
        train_nfeat = train_nfeat.to(device)
        train_labels = train_labels.to(device)
    elif args.data_device == 'uva':
        train_nfeat = dgl.contrib.UnifiedTensor(train_nfeat, device=device)
        train_labels = dgl.contrib.UnifiedTensor(train_labels, device=device)
74
75
76

    in_feats = train_nfeat.shape[1]

77
78
79
80
81
82
83
84
85
    if args.graph_device == 'gpu':
        train_nid = train_nid.to(device)
        train_g = train_g.formats(['csc'])
        train_g = train_g.to(device)
        args.num_workers = 0
    elif args.graph_device == 'uva':
        train_nid = train_nid.to(device)
        train_g.pin_memory_()
        args.num_workers = 0
86

87
    # Create PyTorch DataLoader for constructing blocks
88
    sampler = dgl.dataloading.MultiLayerNeighborSampler(
89
        [int(fanout) for fanout in args.fan_out.split(',')])
90
    dataloader = dgl.dataloading.NodeDataLoader(
91
        train_g,
92
93
        train_nid,
        sampler,
94
        use_ddp=n_gpus > 1,
95
        device=device,
96
97
98
        batch_size=args.batch_size,
        shuffle=True,
        drop_last=False,
99
        num_workers=args.num_workers)
100
101

    # Define model and optimizer
102
    model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout)
103
    model = model.to(device)
104
    if n_gpus > 1:
105
        model = DistributedDataParallel(model, device_ids=[device], output_device=device)
106
107
108
109
110
111
112
113
    loss_fcn = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=args.lr)

    # Training loop
    avg = 0
    iter_tput = []
    for epoch in range(args.num_epochs):
        tic = time.time()
114
115
116

        # Loop over the dataloader to sample the computation dependency graph as a list of
        # blocks.
117
        for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
118
119
120
121
            if proc_id == 0:
                tic_step = time.time()

            # Load the input features as well as output labels
122
            batch_inputs, batch_labels = load_subtensor(train_nfeat, train_labels,
123
124
                                                        seeds, input_nodes, device)
            blocks = [block.int().to(device) for block in blocks]
125
126
127
128
129
130
131
132
133
134
135
            # Compute loss and prediction
            batch_pred = model(blocks, batch_inputs)
            loss = loss_fcn(batch_pred, batch_labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if proc_id == 0:
                iter_tput.append(len(seeds) * n_gpus / (time.time() - tic_step))
            if step % args.log_every == 0 and proc_id == 0:
                acc = compute_acc(batch_pred, batch_labels)
maqy1995's avatar
maqy1995 committed
136
                print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MB'.format(
137
                    epoch, step, loss.item(), acc.item(), np.mean(iter_tput[3:]), th.cuda.max_memory_allocated() / 1000000))
138
139
140
141
142
143
144
145
146
147

        if n_gpus > 1:
            th.distributed.barrier()

        toc = time.time()
        if proc_id == 0:
            print('Epoch Time(s): {:.4f}'.format(toc - tic))
            if epoch >= 5:
                avg += toc - tic
            if epoch % args.eval_every == 0 and epoch != 0:
148
                if n_gpus == 1:
149
                    eval_acc = evaluate(
150
                        model, val_g, val_nfeat, val_labels, val_nid, devices[0])
151
                    test_acc = evaluate(
152
                        model, test_g, test_nfeat, test_labels, test_nid, devices[0])
153
                else:
154
                    eval_acc = evaluate(
155
                        model.module, val_g, val_nfeat, val_labels, val_nid, devices[0])
156
                    test_acc = evaluate(
157
                        model.module, test_g, test_nfeat, test_labels, test_nid, devices[0])
158
                print('Eval Acc {:.4f}'.format(eval_acc))
159
                print('Test Acc: {:.4f}'.format(test_acc))
160
161
162
163
164
165
166
167

    if n_gpus > 1:
        th.distributed.barrier()
    if proc_id == 0:
        print('Avg epoch time: {}'.format(avg / (epoch - 4)))

if __name__ == '__main__':
    argparser = argparse.ArgumentParser("multi-gpu training")
168
    argparser.add_argument('--gpu', type=str, default='0',
169
                           help="Comma separated list of GPU device IDs.")
170
    argparser.add_argument('--dataset', type=str, default='reddit')
171
172
173
    argparser.add_argument('--num-epochs', type=int, default=20)
    argparser.add_argument('--num-hidden', type=int, default=16)
    argparser.add_argument('--num-layers', type=int, default=2)
174
    argparser.add_argument('--fan-out', type=str, default='10,25')
175
176
177
178
    argparser.add_argument('--batch-size', type=int, default=1000)
    argparser.add_argument('--log-every', type=int, default=20)
    argparser.add_argument('--eval-every', type=int, default=5)
    argparser.add_argument('--lr', type=float, default=0.003)
179
180
    argparser.add_argument('--dropout', type=float, default=0.5)
    argparser.add_argument('--num-workers', type=int, default=0,
181
                           help="Number of sampling processes. Use 0 for no extra process.")
182
    argparser.add_argument('--inductive', action='store_true',
183
                           help="Inductive learning setting")
184
185
186
187
    argparser.add_argument('--graph-device', choices=('cpu', 'gpu', 'uva'), default='cpu',
                           help="Device to perform the sampling. "
                                "Must have 0 workers for 'gpu' and 'uva'")
    argparser.add_argument('--data-device', choices=('cpu', 'gpu', 'uva'), default='gpu',
188
189
190
                           help="By default the script puts all node features and labels "
                                "on GPU when using it to save time for data copy. This may "
                                "be undesired if they cannot fit in GPU memory at once. "
191
192
193
                                "Use 'cpu' to keep the features on host memory and "
                                "'uva' to enable UnifiedTensor (GPU zero-copy access on "
                                "pinned host memory).")
194
    args = argparser.parse_args()
195

196
197
198
    devices = list(map(int, args.gpu.split(',')))
    n_gpus = len(devices)

199
200
201
202
    if args.dataset == 'reddit':
        g, n_classes = load_reddit()
    elif args.dataset == 'ogbn-products':
        g, n_classes = load_ogb('ogbn-products')
203
204
205
206
207
208
    elif args.dataset == 'ogbn-papers100M':
        g, n_classes = load_ogb('ogbn-papers100M')
        g = dgl.add_reverse_edges(g)
        # convert labels to integer
        g.ndata['labels'] = th.as_tensor(g.ndata['labels'], dtype=th.int64)
        g.ndata.pop('year')
209
210
211
    else:
        raise Exception('unknown dataset')

212
213
    if args.inductive:
        train_g, val_g, test_g = inductive_split(g)
214
215
216
217
218
219
        train_nfeat = train_g.ndata.pop('features')
        val_nfeat = val_g.ndata.pop('features')
        test_nfeat = test_g.ndata.pop('features')
        train_labels = train_g.ndata.pop('labels')
        val_labels = val_g.ndata.pop('labels')
        test_labels = test_g.ndata.pop('labels')
220
221
    else:
        train_g = val_g = test_g = g
222
223
224
225
226
227
228
        train_nfeat = val_nfeat = test_nfeat = g.ndata.pop('features')
        train_labels = val_labels = test_labels = g.ndata.pop('labels')

    test_nid = test_g.ndata.pop('test_mask',
        ~(test_g.ndata['train_mask'] | test_g.ndata['val_mask'])).nonzero().squeeze()
    train_nid = train_g.ndata.pop('train_mask').nonzero().squeeze()
    val_nid = val_g.ndata.pop('val_mask').nonzero().squeeze()
229

230
231
    # Create csr/coo/csc formats before launching training processes with multi-gpu.
    # This avoids creating certain formats in each sub-process, which saves momory and CPU.
232
233
234
    train_g.create_formats_()
    val_g.create_formats_()
    test_g.create_formats_()
235
236
237
238
239
240
241
242
243
244
245
246
247
248

    # this to avoid competition overhead on machines with many cores.
    # Change it to a proper number on your machine, especially for multi-GPU training.
    os.environ['OMP_NUM_THREADS'] = str(mp.cpu_count() // 2 // n_gpus)
    if n_gpus > 1:
        # Copy the graph to shared memory explicitly before pinning.
        # In other cases, we can just rely on fork's copy-on-write.
        # TODO: the original train_g is not freed.
        if args.graph_device == 'uva':
            train_g = train_g.shared_memory('train_g')
        if args.data_device == 'uva':
            train_nfeat = train_nfeat.share_memory_()
            train_labels = train_labels.share_memory_()

249
    # Pack data
250
251
    data = n_classes, train_g, val_g, test_g, train_nfeat, val_nfeat, test_nfeat, \
           train_labels, val_labels, test_labels, train_nid, val_nid, test_nid
252

253
254
255
256
257
258
259
    if devices[0] == -1:
        assert args.graph_device == 'cpu', \
               f"Must have GPUs to enable {args.graph_device} sampling."
        assert args.data_device == 'cpu', \
               f"Must have GPUs to enable {args.data_device} feature storage."
        run(0, 0, args, ['cpu'], data)
    elif n_gpus == 1:
260
261
262
263
        run(0, n_gpus, args, devices, data)
    else:
        procs = []
        for proc_id in range(n_gpus):
264
            p = mp.Process(target=run, args=(proc_id, n_gpus, args, devices, data))
265
266
267
268
            p.start()
            procs.append(p)
        for p in procs:
            p.join()