Unverified Commit 53043d26 authored by Vittorio Caggiano's avatar Vittorio Caggiano Committed by GitHub
Browse files

Classification Examples of oss + pipe | tutorials/doc update (#119)



* wip_example

* [wip]mnist_pipe_example

* [wip]mnist_pipe_example

* [wip]mnist_pipe_example

* [wip]mnist_pipe_example

* [wip]mnist_oss_example

* working prototype

* added tutorial script

* update tutorial

* Update mnist_test_oss.py

* Update mnist_test_oss.py

* Update mnist_test_oss.py

* Update mnist_test_pipe.py

* Update tutorial_oss.py

* Update tutorial_pipe.py

* Update tutorial_pipe.py

* Update mnist_test_oss.py

* Update tutorial_pipe.py

* Update mnist_test_pipe.py

* Update tutorial_pipe.py

* fix black

* fix flacke8

* general fixes

* add example oss+pipe

* fix isort

* Update mnist_test_pipe.py

* fix black
Co-authored-by: default avatarVittorio Caggiano <caggiano@devfair0253.h2.fair>
parent c6d9be79
......@@ -8,6 +8,33 @@ Welcome to fairscale's documentation!
.. toctree::
:maxdepth: 2
:caption: Contents:
:hidden:
api/index
tutorials/index
*fairscale* is a PyTorch extension library for high performance and large scale training for optimizing training on one or across multiple machines/nodes. This library extend basic pytorch capabilities while adding new experimental ones.
Components
----------
* Parallelism:
* `pipeline parallelism <../api/nn/pipe.html>`_
* `tensor parallelism <../api/nn/model_parallel.html>`_
* Optimization:
* `optimizer state sharding <../api/optim/oss.html>`_
.. warning::
This library is under active development.
Please be mindful and create an `issue <https://github.com/facebookresearch/fairscale/issues>`_ if you have any trouble and/or suggestion.
Reference
=========
:ref:`genindex` | :ref:`modindex` | :ref:`search`
......@@ -103,4 +103,6 @@ The above `train` function will then need to be run via a `multiprocessing.spawn
join=True
)
to see it in action, you can test it with the following script _`tutorial_oss.py <../../../examples/tutorial_oss.py>`_
to see it in action, you can test it with the following script `here <../../../examples/tutorial_oss.py>`_.
......@@ -42,3 +42,39 @@ to ``torch.nn.Sequential`` and then wrap it with ``fairscale.nn.Pipe``.
This will run the first two layers on ``cuda:0`` and the last
layer on ``cuda:1``. To learn more, visit the `Pipe <../api/nn/pipe.html>`_ documentation.
You can then define any optimizer and loss function
.. code-block:: default
import torch.optim as optim
import torch.nn.functional as F
optimizer = optim.SGD(model.parameters(), lr=0.001)
loss_fn = F.nll_loss
optimizer.zero_grad()
target = torch.randint(0,2,size=(20,1)).squeeze()
data = torch.randn(20, 10)
Finally, to run the model and compute the loss function, make sure that outputs and target are on the same device
.. code-block:: default
device = model.devices[0]
## outputs and target need to be on the same device
# forward step
outputs = model(data.to(device))
# compute loss
loss = loss_fn(outputs.to(device), target.to(device))
# backward + optimize
loss.backward()
optimizer.step()
run the example `here <../../../examples/tutorial_pipe.py>`_.
\ No newline at end of file
# adapted from https://github.com/pytorch/examples/blob/master/mnist/main.py
from __future__ import print_function
import argparse
import time
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
from fairscale.nn.data_parallel import ShardedDataParallel
WORLD_SIZE = 2
OPTIM = torch.optim.RMSprop
BACKEND = dist.Backend.NCCL if torch.cuda.is_available() else dist.Backend.GLOO
def dist_init(rank, world_size, backend):
print(f"Using backend: {backend}")
dist.init_process_group(backend=backend, init_method="tcp://localhost:29501", rank=rank, world_size=world_size)
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout2d(0.25)
self.dropout2 = nn.Dropout2d(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
def train(rank, args, model, device, train_loader, num_epochs):
##############
# SETUP
dist_init(rank, WORLD_SIZE, BACKEND)
ddp = ShardedDataParallel(
module=model,
optimizer=torch.optim.Adadelta,
optimizer_params={"lr": 1e-4},
world_size=WORLD_SIZE,
broadcast_buffers=True,
)
ddp.train()
optimizer = ddp.optimizer
# Reset the memory use counter
torch.cuda.reset_peak_memory_stats(rank)
# Training loop
torch.cuda.synchronize(rank)
training_start = time.monotonic()
loss_fn = nn.CrossEntropyLoss()
##############
model.train()
measurements = []
for epoch in range(num_epochs):
epoch_start = time.monotonic()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
def closure():
model.zero_grad()
outputs = model(data)
loss = loss_fn(outputs, target)
loss /= WORLD_SIZE
loss.backward()
# if dist.get_rank() == 0:
# print(f"Loss: {loss.item()}")
ddp.reduce() # Send the gradients to the appropriate shards
return loss
optimizer.step(closure)
epoch_end = time.monotonic()
torch.cuda.synchronize(rank)
training_stop = time.monotonic()
print("Total Time:", training_stop - training_start)
def main():
# Training settings
parser = argparse.ArgumentParser(description="PyTorch MNIST Example")
parser.add_argument(
"--batch-size", type=int, default=64, metavar="N", help="input batch size for training (default: 64)"
)
parser.add_argument(
"--test-batch-size", type=int, default=1000, metavar="N", help="input batch size for testing (default: 1000)"
)
parser.add_argument("--epochs", type=int, default=14, metavar="N", help="number of epochs to train (default: 14)")
parser.add_argument("--lr", type=float, default=1.0, metavar="LR", help="learning rate (default: 1.0)")
parser.add_argument("--gamma", type=float, default=0.7, metavar="M", help="Learning rate step gamma (default: 0.7)")
parser.add_argument("--no-cuda", action="store_true", default=False, help="disables CUDA training")
parser.add_argument("--dry-run", action="store_true", default=False, help="quickly check a single pass")
parser.add_argument("--seed", type=int, default=1, metavar="S", help="random seed (default: 1)")
parser.add_argument(
"--log-interval",
type=int,
default=10,
metavar="N",
help="how many batches to wait before logging training status",
)
parser.add_argument("--save-model", action="store_true", default=False, help="For Saving the current Model")
args = parser.parse_args()
use_cuda = not args.no_cuda and torch.cuda.is_available()
torch.manual_seed(args.seed)
device = torch.device("cuda" if use_cuda else "cpu")
kwargs = {"batch_size": args.batch_size}
if use_cuda:
kwargs.update({"num_workers": 1, "pin_memory": True, "shuffle": True},)
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
dataset1 = datasets.MNIST("../data", train=True, download=True, transform=transform)
train_loader = torch.utils.data.DataLoader(dataset1, **kwargs)
model = Net().to(device)
mp.spawn(
train, args=(args, model, device, train_loader, args.epochs), nprocs=WORLD_SIZE, join=True,
)
if __name__ == "__main__":
main()
# adapted from https://github.com/pytorch/examples/blob/master/mnist/main.py
from __future__ import print_function
import argparse
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torchvision import datasets, transforms
from fairscale.nn import Pipe
net = nn.Sequential(
nn.Conv2d(1, 32, 3, 1),
nn.ReLU(),
nn.Conv2d(32, 64, 3, 1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2),
nn.Dropout2d(0.25),
nn.Flatten(1),
nn.Linear(9216, 128),
nn.ReLU(),
nn.Dropout2d(0.5),
nn.Linear(128, 10),
nn.LogSoftmax(dim=1),
)
def train(args, model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output.to(device), target.to(device))
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print(
"Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
epoch,
batch_idx * len(data),
len(train_loader.dataset),
100.0 * batch_idx / len(train_loader),
loss.item(),
)
)
if args.dry_run:
break
def test(model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(output.to(device), target.to(device), reduction="sum").item() # sum up batch loss
pred = output.argmax(dim=1, keepdim=True).to(device) # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
print(
"\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n".format(
test_loss, correct, len(test_loader.dataset), 100.0 * correct / len(test_loader.dataset)
)
)
def main():
# Training settings
parser = argparse.ArgumentParser(description="PyTorch MNIST Example")
parser.add_argument(
"--batch-size", type=int, default=64, metavar="N", help="input batch size for training (default: 64)"
)
parser.add_argument(
"--test-batch-size", type=int, default=1000, metavar="N", help="input batch size for testing (default: 1000)"
)
parser.add_argument("--epochs", type=int, default=14, metavar="N", help="number of epochs to train (default: 14)")
parser.add_argument("--lr", type=float, default=1.0, metavar="LR", help="learning rate (default: 1.0)")
parser.add_argument("--gamma", type=float, default=0.7, metavar="M", help="Learning rate step gamma (default: 0.7)")
parser.add_argument("--dry-run", action="store_true", default=False, help="quickly check a single pass")
parser.add_argument("--seed", type=int, default=1, metavar="S", help="random seed (default: 1)")
parser.add_argument(
"--log-interval",
type=int,
default=10,
metavar="N",
help="how many batches to wait before logging training status",
)
parser.add_argument("--save-model", action="store_true", default=False, help="For Saving the current Model")
args = parser.parse_args()
torch.manual_seed(args.seed)
kwargs = {"batch_size": args.batch_size}
kwargs.update({"num_workers": 1, "pin_memory": True, "shuffle": True},)
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
dataset1 = datasets.MNIST("../data", train=True, download=True, transform=transform)
dataset2 = datasets.MNIST("../data", train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(dataset1, **kwargs)
test_loader = torch.utils.data.DataLoader(dataset2, **kwargs)
model = net
model = Pipe(model, balance=[6, 6], devices=[0, 1], chunks=2)
device = model.devices[0]
optimizer = optim.Adadelta(model.parameters(), lr=args.lr)
scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
for epoch in range(1, args.epochs + 1):
tic = time.perf_counter()
train(args, model, device, train_loader, optimizer, epoch)
toc = time.perf_counter()
print(f">>> TRANING Time {toc - tic:0.4f} seconds")
tic = time.perf_counter()
test(model, device, test_loader)
toc = time.perf_counter()
print(f">>> TESTING Time {toc - tic:0.4f} seconds")
scheduler.step()
if args.save_model:
torch.save(model.state_dict(), "mnist_cnn.pt")
if __name__ == "__main__":
main()
import time
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
from fairscale.optim.oss import OSS
WORLD_SIZE = 2
EPOCHS = 3
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
def dist_init(rank, world_size):
backend = dist.Backend.NCCL if torch.cuda.is_available() else dist.Backend.GLOO # type: ignore
print(f"Using backend: {backend}")
dist.init_process_group(backend=backend, init_method="tcp://localhost:29501", rank=rank, world_size=world_size)
def getModel():
return nn.Sequential(torch.nn.Linear(10, 10), torch.nn.ReLU(), torch.nn.Linear(10, 5))
def getData():
target = torch.randint(0, 2, size=(20, 1)).squeeze()
data = torch.randn(20, 10)
return [(data, target)]
def getLossFun():
return F.nll_loss
def train(rank: int, world_size: int, epochs: int, use_oss: bool):
# DDP
dist_init(rank, world_size)
# Problem statement
model = getModel().to(rank)
dataloader = getData()
loss_fn = getLossFun()
base_optimizer_arguments = {"lr": 1e-4} # any optimizer specific arguments, LR, momentum, etc...
if ~use_oss:
optimizer = torch.optim.SGD(params=model.parameters(), **base_optimizer_arguments)
else:
base_optimizer = torch.optim.SGD
optimizer = OSS(params=model.parameters(), optim=base_optimizer, **base_optimizer_arguments)
training_start = time.monotonic()
# Any relevant training loop, nothing specific to OSS. For example:
model.train()
for e in range(epochs):
for (data, target) in dataloader:
data, target = data.to(rank), target.to(rank)
# Train
model.zero_grad()
outputs = model(data)
loss = loss_fn(outputs, target)
loss /= world_size
loss.backward()
optimizer.step()
print(f"Loss: {loss.item()}")
training_end = time.monotonic()
max_memory = torch.cuda.max_memory_allocated(rank)
print(f"[{dist.get_rank()}] : Training done. {training_end-training_start:.2f} sec")
print(f"[{dist.get_rank()}] : Peak memory {max_memory:.1f}MiB")
if __name__ == "__main__":
training_start1 = time.monotonic()
mp.spawn(train, args=(WORLD_SIZE, EPOCHS, False), nprocs=WORLD_SIZE, join=True)
training_end1 = time.monotonic()
training_start2 = time.monotonic()
mp.spawn(train, args=(WORLD_SIZE, EPOCHS, True), nprocs=WORLD_SIZE, join=True)
training_end2 = time.monotonic()
print("Total Time without:", training_end1 - training_start1)
print("Total Time with:", training_end2 - training_start2)
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import fairscale
model = nn.Sequential(torch.nn.Linear(10, 10), torch.nn.ReLU(), torch.nn.Linear(10, 5))
target = torch.randint(0, 2, size=(20, 1)).squeeze()
data = torch.randn(20, 10)
loss_fn = F.nll_loss
model = fairscale.nn.Pipe(model, balance=[2, 1])
# define optimizer and loss function
optimizer = optim.SGD(model.parameters(), lr=0.001)
# zero the parameter gradients
optimizer.zero_grad()
device = model.devices[0]
# outputs and target need to be on the same device
# forward step
outputs = model(data.to(device))
# compute loss
loss = loss_fn(outputs.to(device), target.to(device))
# backward + optimize
loss.backward()
optimizer.step()
print("Finished Training Step")
del model
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment