train_sampling_multi_gpu.py 8.62 KB
Newer Older
1
2
3
4
5
6
import dgl
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
7
import dgl.multiprocessing as mp
8
9
import dgl.nn.pytorch as dglnn
import time
Jinjing Zhou's avatar
Jinjing Zhou committed
10
import math
11
12
13
14
import argparse
from torch.nn.parallel import DistributedDataParallel
import tqdm

15
from model import SAGE
16
from load_graph import load_reddit, inductive_split
17

18
19
20
21
22
23
def compute_acc(pred, labels):
    """
    Compute the accuracy of prediction given the labels.
    """
    return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred)

24
def evaluate(model, g, nfeat, labels, val_nid, device):
25
    """
26
    Evaluate the model on the validation set specified by ``val_nid``.
27
28
29
    g : The entire graph.
    inputs : The features of all the nodes.
    labels : The labels of all the nodes.
30
    val_nid : A node ID tensor indicating which nodes do we actually compute the accuracy for.
31
32
33
34
    device : The GPU device to evaluate on.
    """
    model.eval()
    with th.no_grad():
35
        pred = model.inference(g, nfeat, device, args.batch_size, args.num_workers)
36
    model.train()
37
    return compute_acc(pred[val_nid], labels[val_nid])
38

39
def load_subtensor(nfeat, labels, seeds, input_nodes, dev_id):
40
    """
41
    Extracts features and labels for a subset of nodes.
42
    """
43
    batch_inputs = nfeat[input_nodes].to(dev_id)
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
    batch_labels = labels[seeds].to(dev_id)
    return batch_inputs, batch_labels

#### Entry point

def run(proc_id, n_gpus, args, devices, data):
    # Start up distributed training, if enabled.
    dev_id = devices[proc_id]
    if n_gpus > 1:
        dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
            master_ip='127.0.0.1', master_port='12345')
        world_size = n_gpus
        th.distributed.init_process_group(backend="nccl",
                                          init_method=dist_init_method,
                                          world_size=world_size,
59
                                          rank=proc_id)
60
61
62
    th.cuda.set_device(dev_id)

    # Unpack data
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
    n_classes, train_g, val_g, test_g = data

    if args.inductive:
        train_nfeat = train_g.ndata.pop('features')
        val_nfeat = val_g.ndata.pop('features')
        test_nfeat = test_g.ndata.pop('features')
        train_labels = train_g.ndata.pop('labels')
        val_labels = val_g.ndata.pop('labels')
        test_labels = test_g.ndata.pop('labels')
    else:
        train_nfeat = val_nfeat = test_nfeat = g.ndata.pop('features')
        train_labels = val_labels = test_labels = g.ndata.pop('labels')

    if not args.data_cpu:
        train_nfeat = train_nfeat.to(dev_id)
        train_labels = train_labels.to(dev_id)

    in_feats = train_nfeat.shape[1]

82
83
    train_mask = train_g.ndata['train_mask']
    val_mask = val_g.ndata['val_mask']
84
    test_mask = ~(test_g.ndata['train_mask'] | test_g.ndata['val_mask'])
85
86
87
    train_nid = train_mask.nonzero().squeeze()
    val_nid = val_mask.nonzero().squeeze()
    test_nid = test_mask.nonzero().squeeze()
88
89

    # Split train_nid
90
    train_nid = th.split(train_nid, math.ceil(len(train_nid) / n_gpus))[proc_id]
91

92
    # Create PyTorch DataLoader for constructing blocks
93
    sampler = dgl.dataloading.MultiLayerNeighborSampler(
94
        [int(fanout) for fanout in args.fan_out.split(',')])
95
    dataloader = dgl.dataloading.NodeDataLoader(
96
        train_g,
97
98
        train_nid,
        sampler,
99
100
101
        batch_size=args.batch_size,
        shuffle=True,
        drop_last=False,
102
        num_workers=args.num_workers)
103
104

    # Define model and optimizer
105
    model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout)
106
107
108
109
110
111
112
113
114
115
116
    model = model.to(dev_id)
    if n_gpus > 1:
        model = DistributedDataParallel(model, device_ids=[dev_id], output_device=dev_id)
    loss_fcn = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=args.lr)

    # Training loop
    avg = 0
    iter_tput = []
    for epoch in range(args.num_epochs):
        tic = time.time()
117
118
119

        # Loop over the dataloader to sample the computation dependency graph as a list of
        # blocks.
120
        for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
121
122
123
124
            if proc_id == 0:
                tic_step = time.time()

            # Load the input features as well as output labels
125
126
            batch_inputs, batch_labels = load_subtensor(train_nfeat, train_labels,
                                                        seeds, input_nodes, dev_id)
127
            blocks = [block.int().to(dev_id) for block in blocks]
128
129
130
131
132
133
134
135
136
137
138
            # Compute loss and prediction
            batch_pred = model(blocks, batch_inputs)
            loss = loss_fcn(batch_pred, batch_labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if proc_id == 0:
                iter_tput.append(len(seeds) * n_gpus / (time.time() - tic_step))
            if step % args.log_every == 0 and proc_id == 0:
                acc = compute_acc(batch_pred, batch_labels)
maqy1995's avatar
maqy1995 committed
139
                print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MB'.format(
140
                    epoch, step, loss.item(), acc.item(), np.mean(iter_tput[3:]), th.cuda.max_memory_allocated() / 1000000))
141
142
143
144
145
146
147
148
149
150

        if n_gpus > 1:
            th.distributed.barrier()

        toc = time.time()
        if proc_id == 0:
            print('Epoch Time(s): {:.4f}'.format(toc - tic))
            if epoch >= 5:
                avg += toc - tic
            if epoch % args.eval_every == 0 and epoch != 0:
151
                if n_gpus == 1:
152
                    eval_acc = evaluate(
153
                        model, val_g, val_nfeat, val_labels, val_nid, devices[0])
154
                    test_acc = evaluate(
155
                        model, test_g, test_nfeat, test_labels, test_nid, devices[0])
156
                else:
157
                    eval_acc = evaluate(
158
                        model.module, val_g, val_nfeat, val_labels, val_nid, devices[0])
159
                    test_acc = evaluate(
160
                        model.module, test_g, test_nfeat, test_labels, test_nid, devices[0])
161
                print('Eval Acc {:.4f}'.format(eval_acc))
162
                print('Test Acc: {:.4f}'.format(test_acc))
163

164

165
166
167
168
169
170
171
    if n_gpus > 1:
        th.distributed.barrier()
    if proc_id == 0:
        print('Avg epoch time: {}'.format(avg / (epoch - 4)))

if __name__ == '__main__':
    argparser = argparse.ArgumentParser("multi-gpu training")
172
    argparser.add_argument('--gpu', type=str, default='0',
173
                           help="Comma separated list of GPU device IDs.")
174
175
176
    argparser.add_argument('--num-epochs', type=int, default=20)
    argparser.add_argument('--num-hidden', type=int, default=16)
    argparser.add_argument('--num-layers', type=int, default=2)
177
    argparser.add_argument('--fan-out', type=str, default='10,25')
178
179
180
181
    argparser.add_argument('--batch-size', type=int, default=1000)
    argparser.add_argument('--log-every', type=int, default=20)
    argparser.add_argument('--eval-every', type=int, default=5)
    argparser.add_argument('--lr', type=float, default=0.003)
182
183
    argparser.add_argument('--dropout', type=float, default=0.5)
    argparser.add_argument('--num-workers', type=int, default=0,
184
                           help="Number of sampling processes. Use 0 for no extra process.")
185
    argparser.add_argument('--inductive', action='store_true',
186
187
188
189
190
191
                           help="Inductive learning setting")
    argparser.add_argument('--data-cpu', action='store_true',
                           help="By default the script puts all node features and labels "
                                "on GPU when using it to save time for data copy. This may "
                                "be undesired if they cannot fit in GPU memory at once. "
                                "This flag disables that.")
192
193
194
195
196
    args = argparser.parse_args()
    
    devices = list(map(int, args.gpu.split(',')))
    n_gpus = len(devices)

197
    g, n_classes = load_reddit()
198
    # Construct graph
199
200
201
202
203
204
205
    g = dgl.as_heterograph(g)

    if args.inductive:
        train_g, val_g, test_g = inductive_split(g)
    else:
        train_g = val_g = test_g = g

206
207
    # Create csr/coo/csc formats before launching training processes with multi-gpu.
    # This avoids creating certain formats in each sub-process, which saves momory and CPU.
208
209
210
    train_g.create_formats_()
    val_g.create_formats_()
    test_g.create_formats_()
211
    # Pack data
212
    data = n_classes, train_g, val_g, test_g
213
214
215
216
217
218

    if n_gpus == 1:
        run(0, n_gpus, args, devices, data)
    else:
        procs = []
        for proc_id in range(n_gpus):
219
            p = mp.Process(target=run, args=(proc_id, n_gpus, args, devices, data))
220
221
222
223
            p.start()
            procs.append(p)
        for p in procs:
            p.join()