train_sampling_multi_gpu.py 8.71 KB
Newer Older
1
2
3
4
5
6
7
8
9
import dgl
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp
import dgl.nn.pytorch as dglnn
import time
Jinjing Zhou's avatar
Jinjing Zhou committed
10
import math
11
12
13
14
import argparse
from torch.nn.parallel import DistributedDataParallel
import tqdm

15
from model import SAGE
16
from utils import thread_wrapped_func
17
from load_graph import load_reddit, inductive_split
18

19
20
21
22
23
24
def compute_acc(pred, labels):
    """
    Compute the accuracy of prediction given the labels.
    """
    return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred)

25
def evaluate(model, g, nfeat, labels, val_nid, device):
26
    """
27
    Evaluate the model on the validation set specified by ``val_nid``.
28
29
30
    g : The entire graph.
    inputs : The features of all the nodes.
    labels : The labels of all the nodes.
31
    val_nid : A node ID tensor indicating which nodes do we actually compute the accuracy for.
32
33
34
35
    device : The GPU device to evaluate on.
    """
    model.eval()
    with th.no_grad():
36
        pred = model.inference(g, nfeat, device, args.batch_size, args.num_workers)
37
    model.train()
38
    return compute_acc(pred[val_nid], labels[val_nid])
39

40
def load_subtensor(nfeat, labels, seeds, input_nodes, dev_id):
41
    """
42
    Extracts features and labels for a subset of nodes.
43
    """
44
    batch_inputs = nfeat[input_nodes].to(dev_id)
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
    batch_labels = labels[seeds].to(dev_id)
    return batch_inputs, batch_labels

#### Entry point

def run(proc_id, n_gpus, args, devices, data):
    # Start up distributed training, if enabled.
    dev_id = devices[proc_id]
    if n_gpus > 1:
        dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
            master_ip='127.0.0.1', master_port='12345')
        world_size = n_gpus
        th.distributed.init_process_group(backend="nccl",
                                          init_method=dist_init_method,
                                          world_size=world_size,
60
                                          rank=proc_id)
61
62
63
    th.cuda.set_device(dev_id)

    # Unpack data
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
    n_classes, train_g, val_g, test_g = data

    if args.inductive:
        train_nfeat = train_g.ndata.pop('features')
        val_nfeat = val_g.ndata.pop('features')
        test_nfeat = test_g.ndata.pop('features')
        train_labels = train_g.ndata.pop('labels')
        val_labels = val_g.ndata.pop('labels')
        test_labels = test_g.ndata.pop('labels')
    else:
        train_nfeat = val_nfeat = test_nfeat = g.ndata.pop('features')
        train_labels = val_labels = test_labels = g.ndata.pop('labels')

    if not args.data_cpu:
        train_nfeat = train_nfeat.to(dev_id)
        train_labels = train_labels.to(dev_id)

    in_feats = train_nfeat.shape[1]

83
84
    train_mask = train_g.ndata['train_mask']
    val_mask = val_g.ndata['val_mask']
85
    test_mask = ~(test_g.ndata['train_mask'] | test_g.ndata['val_mask'])
86
87
88
    train_nid = train_mask.nonzero().squeeze()
    val_nid = val_mask.nonzero().squeeze()
    test_nid = test_mask.nonzero().squeeze()
89
90

    # Split train_nid
91
    train_nid = th.split(train_nid, math.ceil(len(train_nid) / n_gpus))[proc_id]
92

93
    # Create PyTorch DataLoader for constructing blocks
94
    sampler = dgl.dataloading.MultiLayerNeighborSampler(
95
        [int(fanout) for fanout in args.fan_out.split(',')])
96
    dataloader = dgl.dataloading.NodeDataLoader(
97
        train_g,
98
99
        train_nid,
        sampler,
100
101
102
        batch_size=args.batch_size,
        shuffle=True,
        drop_last=False,
103
        num_workers=args.num_workers)
104
105

    # Define model and optimizer
106
    model = SAGE(in_feats, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout)
107
108
109
110
111
112
113
114
115
116
117
    model = model.to(dev_id)
    if n_gpus > 1:
        model = DistributedDataParallel(model, device_ids=[dev_id], output_device=dev_id)
    loss_fcn = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=args.lr)

    # Training loop
    avg = 0
    iter_tput = []
    for epoch in range(args.num_epochs):
        tic = time.time()
118
119
120

        # Loop over the dataloader to sample the computation dependency graph as a list of
        # blocks.
121
        for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
122
123
124
125
            if proc_id == 0:
                tic_step = time.time()

            # Load the input features as well as output labels
126
127
            batch_inputs, batch_labels = load_subtensor(train_nfeat, train_labels,
                                                        seeds, input_nodes, dev_id)
128
            blocks = [block.int().to(dev_id) for block in blocks]
129
130
131
132
133
134
135
136
137
138
139
            # Compute loss and prediction
            batch_pred = model(blocks, batch_inputs)
            loss = loss_fcn(batch_pred, batch_labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if proc_id == 0:
                iter_tput.append(len(seeds) * n_gpus / (time.time() - tic_step))
            if step % args.log_every == 0 and proc_id == 0:
                acc = compute_acc(batch_pred, batch_labels)
maqy1995's avatar
maqy1995 committed
140
                print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MB'.format(
141
                    epoch, step, loss.item(), acc.item(), np.mean(iter_tput[3:]), th.cuda.max_memory_allocated() / 1000000))
142
143
144
145
146
147
148
149
150
151

        if n_gpus > 1:
            th.distributed.barrier()

        toc = time.time()
        if proc_id == 0:
            print('Epoch Time(s): {:.4f}'.format(toc - tic))
            if epoch >= 5:
                avg += toc - tic
            if epoch % args.eval_every == 0 and epoch != 0:
152
                if n_gpus == 1:
153
                    eval_acc = evaluate(
154
                        model, val_g, val_nfeat, val_labels, val_nid, devices[0])
155
                    test_acc = evaluate(
156
                        model, test_g, test_nfeat, test_labels, test_nid, devices[0])
157
                else:
158
                    eval_acc = evaluate(
159
                        model.module, val_g, val_nfeat, val_labels, val_nid, devices[0])
160
                    test_acc = evaluate(
161
                        model.module, test_g, test_nfeat, test_labels, test_nid, devices[0])
162
                print('Eval Acc {:.4f}'.format(eval_acc))
163
                print('Test Acc: {:.4f}'.format(test_acc))
164

165

166
167
168
169
170
171
172
    if n_gpus > 1:
        th.distributed.barrier()
    if proc_id == 0:
        print('Avg epoch time: {}'.format(avg / (epoch - 4)))

if __name__ == '__main__':
    argparser = argparse.ArgumentParser("multi-gpu training")
173
    argparser.add_argument('--gpu', type=str, default='0',
174
                           help="Comma separated list of GPU device IDs.")
175
176
177
    argparser.add_argument('--num-epochs', type=int, default=20)
    argparser.add_argument('--num-hidden', type=int, default=16)
    argparser.add_argument('--num-layers', type=int, default=2)
178
    argparser.add_argument('--fan-out', type=str, default='10,25')
179
180
181
182
    argparser.add_argument('--batch-size', type=int, default=1000)
    argparser.add_argument('--log-every', type=int, default=20)
    argparser.add_argument('--eval-every', type=int, default=5)
    argparser.add_argument('--lr', type=float, default=0.003)
183
184
    argparser.add_argument('--dropout', type=float, default=0.5)
    argparser.add_argument('--num-workers', type=int, default=0,
185
                           help="Number of sampling processes. Use 0 for no extra process.")
186
    argparser.add_argument('--inductive', action='store_true',
187
188
189
190
191
192
                           help="Inductive learning setting")
    argparser.add_argument('--data-cpu', action='store_true',
                           help="By default the script puts all node features and labels "
                                "on GPU when using it to save time for data copy. This may "
                                "be undesired if they cannot fit in GPU memory at once. "
                                "This flag disables that.")
193
194
195
196
197
    args = argparser.parse_args()
    
    devices = list(map(int, args.gpu.split(',')))
    n_gpus = len(devices)

198
    g, n_classes = load_reddit()
199
    # Construct graph
200
201
202
203
204
205
206
    g = dgl.as_heterograph(g)

    if args.inductive:
        train_g, val_g, test_g = inductive_split(g)
    else:
        train_g = val_g = test_g = g

207
208
    # Create csr/coo/csc formats before launching training processes with multi-gpu.
    # This avoids creating certain formats in each sub-process, which saves momory and CPU.
209
210
211
    train_g.create_formats_()
    val_g.create_formats_()
    test_g.create_formats_()
212
    # Pack data
213
    data = n_classes, train_g, val_g, test_g
214
215
216
217
218
219

    if n_gpus == 1:
        run(0, n_gpus, args, devices, data)
    else:
        procs = []
        for proc_id in range(n_gpus):
220
221
            p = mp.Process(target=thread_wrapped_func(run),
                           args=(proc_id, n_gpus, args, devices, data))
222
223
224
225
            p.start()
            procs.append(p)
        for p in procs:
            p.join()