tutorial_pipe_multiprocess.py 1.7 KB
Newer Older
1
2
import os

3
from helpers import dist_init, get_data, get_loss_fun, get_model
4
import torch
5
import torch.distributed as dist
6
7
8
9
import torch.multiprocessing as mp
import torch.optim as optim

from fairscale.nn.model_parallel import initialize_model_parallel
10
from fairscale.nn.pipe import MultiProcessPipe
11
12

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
13
14
15
16
17


def run(rank, world_size):
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "10638"
18
    dist_init(rank, world_size)
19
    os.environ["MASTER_PORT"] = "10639"
20
    dist.rpc.init_rpc(f"worker{rank}", rank=rank, world_size=world_size)
21
22
    initialize_model_parallel(1, world_size)

23
24
25
    model = get_model()
    data, target = get_data()[0]
    loss_fn = get_loss_fun()
26

27
    device = torch.device("cuda", rank) if DEVICE == "cuda" else torch.device("cpu")
28

29
    model = MultiProcessPipe(
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
        model,
        balance=[2, 1],
        worker_map={0: "worker0", 1: "worker1"},  # Needed to convert ranks to RPC worker names
        input_device=device,
    ).to(device)

    # define optimizer and loss function
    optimizer = optim.SGD(model.parameters(), lr=0.001)

    # zero the parameter gradients
    optimizer.zero_grad()

    # outputs and target need to be on the same device
    # forward step
    outputs = model(data.to(device))
    # compute loss
    if rank == 1:
        loss = loss_fn(outputs.to(device), target.to(device))

        # backward + optimize
        loss.backward()
        optimizer.step()
    else:
        model.back_helper(outputs)

    print(f"Finished Training Step on {rank}")
56
    dist.rpc.shutdown()
57
58
59
60
61
62
63

    del model


if __name__ == "__main__":
    world_size = 2
    mp.spawn(run, args=(world_size,), nprocs=world_size, join=True)