Commit d2ac4872 authored by Michael Carilli's avatar Michael Carilli
Browse files

Remove deprecated examples and update Docker guidance

parent 1b8303d8
...@@ -69,6 +69,8 @@ It's often convenient to use Apex in Docker containers. Compatible options incl ...@@ -69,6 +69,8 @@ It's often convenient to use Apex in Docker containers. Compatible options incl
* [NVIDIA Pytorch containers from NGC](https://ngc.nvidia.com/catalog/containers/nvidia%2Fpytorch), which come with Apex preinstalled. To use the latest Amp API, you may need to `pip uninstall apex` then reinstall Apex using the **Quick Start** commands below. * [NVIDIA Pytorch containers from NGC](https://ngc.nvidia.com/catalog/containers/nvidia%2Fpytorch), which come with Apex preinstalled. To use the latest Amp API, you may need to `pip uninstall apex` then reinstall Apex using the **Quick Start** commands below.
* [official Pytorch -devel Dockerfiles](https://hub.docker.com/r/pytorch/pytorch/tags), e.g. `docker pull pytorch/pytorch:nightly-devel-cuda10.0-cudnn7`, in which you can install Apex using the **Quick Start** commands. * [official Pytorch -devel Dockerfiles](https://hub.docker.com/r/pytorch/pytorch/tags), e.g. `docker pull pytorch/pytorch:nightly-devel-cuda10.0-cudnn7`, in which you can install Apex using the **Quick Start** commands.
See the [Docker example folder](https://github.com/NVIDIA/apex/tree/master/examples/docker) for details.
# Quick Start # Quick Start
### Linux ### Linux
......
...@@ -65,6 +65,8 @@ def convert_network(network, dtype): ...@@ -65,6 +65,8 @@ def convert_network(network, dtype):
if isinstance(module, torch.nn.modules.batchnorm._BatchNorm) and module.affine is True: if isinstance(module, torch.nn.modules.batchnorm._BatchNorm) and module.affine is True:
continue continue
convert_module(module, dtype) convert_module(module, dtype)
if isinstance(module, torch.nn.RNNBase) or isinstance(module, torch.nn.modules.rnn.RNNBase):
module.flatten_parameters()
return network return network
......
# Simple examples of FP16_Optimizer functionality
To use `FP16_Optimizer` on a half-precision model, or a model with a mixture of
half and float parameters, only two lines of your training script need to change:
1. Construct an `FP16_Optimizer` instance from an existing optimizer.
2. Replace `loss.backward()` with `optimizer.backward(loss)`.
#### [Full API Documentation](https://nvidia.github.io/apex/fp16_utils.html#automatic-management-of-master-params-loss-scaling)
See "Other Options" at the bottom of this page for some cases that require special treatment.
#### Minimal Working Sample
`minimal.py` shows the basic usage of `FP16_Optimizer` with either static or dynamic loss scaling. Test via `python minimal.py`.
#### Closures
`FP16_Optimizer` supports closures with the same control flow as ordinary Pytorch optimizers.
`closure.py` shows an example. Test via `python closure.py`.
See [the API documentation](https://nvidia.github.io/apex/fp16_utils.html#apex.fp16_utils.FP16_Optimizer.step) for more details.
#### Serialization/Deserialization
`FP16_Optimizer` supports saving and loading with the same control flow as ordinary Pytorch optimizers.
`save_load.py` shows an example. Test via `python save_load.py`.
See [the API documentation](https://nvidia.github.io/apex/fp16_utils.html#apex.fp16_utils.FP16_Optimizer.load_state_dict) for more details.
#### Distributed
**distributed_apex** shows an example using `FP16_Optimizer` with Apex DistributedDataParallel.
The usage of `FP16_Optimizer` with distributed does not need to change from ordinary single-process
usage. Test via
```bash
cd distributed_apex
bash run.sh
```
**distributed_pytorch** shows an example using `FP16_Optimizer` with Pytorch DistributedDataParallel.
Again, the usage of `FP16_Optimizer` with distributed does not need to change from ordinary
single-process usage. Test via
```bash
cd distributed_pytorch
bash run.sh
```
#### Other Options
Gradient clipping requires that calls to `torch.nn.utils.clip_grad_norm`
be replaced with [fp16_optimizer_instance.clip_master_grads()](https://nvidia.github.io/apex/fp16_utils.html#apex.fp16_utils.FP16_Optimizer.clip_master_grads). The [word_language_model example](https://github.com/NVIDIA/apex/blob/master/examples/word_language_model/main_fp16_optimizer.py) uses this feature.
Multiple losses will work if you simply replace
```bash
loss1.backward()
loss2.backward()
```
with
```bash
optimizer.backward(loss1)
optimizer.backward(loss2)
```
but `FP16_Optimizer` can be told to handle this more efficiently using the
[update_master_grads()](https://nvidia.github.io/apex/fp16_utils.html#apex.fp16_utils.FP16_Optimizer.update_master_grads) option.
import torch
from apex.fp16_utils import FP16_Optimizer
torch.backends.cudnn.benchmark = True
N, D_in, D_out = 64, 1024, 16
x = torch.randn(N, D_in, device='cuda', dtype=torch.half)
y = torch.randn(N, D_out, device='cuda', dtype=torch.half)
model = torch.nn.Linear(D_in, D_out).cuda().half()
optimizer = torch.optim.LBFGS(model.parameters())
### Construct FP16_Optimizer
optimizer = FP16_Optimizer(optimizer, static_loss_scale=128.0)
###
loss_fn = torch.nn.MSELoss()
for t in range(5):
def closure():
optimizer.zero_grad()
y_pred = model(x)
loss = loss_fn(y_pred.float(), y.float())
### Change loss.backward() within the closure to: ###
optimizer.backward(loss)
###
return loss
loss = optimizer.step(closure)
print("final loss = ", loss)
**distributed_data_parallel.py** and **run.sh** show an example using `FP16_Optimizer` with
`apex.parallel.DistributedDataParallel` and the Pytorch multiprocess launcher script,
[torch.distributed.launch](https://pytorch.org/docs/master/distributed.html#launch-utility).
The usage of `FP16_Optimizer` with distributed does not need to change from ordinary
single-process usage. Test via
```bash
bash run.sh
```
import torch
import argparse
from apex.parallel import DistributedDataParallel as DDP
from apex.fp16_utils import FP16_Optimizer
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=0, type=int)
args = parser.parse_args()
torch.cuda.set_device(args.local_rank)
torch.distributed.init_process_group(backend='nccl',
init_method='env://')
torch.backends.cudnn.benchmark = True
N, D_in, D_out = 64, 1024, 16
x = torch.randn(N, D_in, device='cuda', dtype=torch.half)
y = torch.randn(N, D_out, device='cuda', dtype=torch.half)
model = torch.nn.Linear(D_in, D_out).cuda().half()
model = DDP(model)
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
### Construct FP16_Optimizer ###
optimizer = FP16_Optimizer(optimizer)
###
loss_fn = torch.nn.MSELoss()
for t in range(500):
optimizer.zero_grad()
y_pred = model(x)
loss = loss_fn(y_pred.float(), y.float())
### Change loss.backward() to: ###
optimizer.backward(loss)
###
optimizer.step()
print("final loss = ", loss)
#!/bin/bash
python -m torch.distributed.launch --nproc_per_node=2 distributed_data_parallel.py
**distributed_data_parallel.py** and **run.sh** show an example using `FP16_Optimizer` with
`torch.nn.parallel.DistributedDataParallel` and the Pytorch multiprocess launcher script,
[torch.distributed.launch](https://pytorch.org/docs/master/distributed.html#launch-utility).
The usage of `FP16_Optimizer` with distributed does not need to change from ordinary
single-process usage. Test via
```bash
bash run.sh
```
import torch
import argparse
from apex.fp16_utils import FP16_Optimizer
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=0, type=int)
args = parser.parse_args()
torch.cuda.set_device(args.local_rank)
torch.distributed.init_process_group(backend='nccl',
init_method='env://')
torch.backends.cudnn.benchmark = True
N, D_in, D_out = 64, 1024, 16
x = torch.randn(N, D_in, device='cuda', dtype=torch.half)
y = torch.randn(N, D_out, device='cuda', dtype=torch.half)
model = torch.nn.Linear(D_in, D_out).cuda().half()
model = torch.nn.parallel.DistributedDataParallel(model,
device_ids=[args.local_rank],
output_device=args.local_rank)
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
### Construct FP16_Optimizer ###
optimizer = FP16_Optimizer(optimizer)
###
loss_fn = torch.nn.MSELoss()
for t in range(500):
optimizer.zero_grad()
y_pred = model(x)
loss = loss_fn(y_pred.float(), y.float())
### Change loss.backward() to: ###
optimizer.backward(loss)
###
optimizer.step()
print("final loss = ", loss)
#!/bin/bash
python -m torch.distributed.launch --nproc_per_node=2 distributed_data_parallel.py
import torch
from apex.fp16_utils import FP16_Optimizer
torch.backends.cudnn.benchmark = True
N, D_in, D_out = 64, 1024, 16
x = torch.randn(N, D_in, device='cuda', dtype=torch.half)
y = torch.randn(N, D_out, device='cuda', dtype=torch.half)
model = torch.nn.Linear(D_in, D_out).cuda().half()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=0.9)
### Construct FP16_Optimizer
### FP16_Optimizer will ingest and remember the original optimizer's param_groups.
###
### Construct with static loss scaling...
optimizer = FP16_Optimizer(optimizer, static_loss_scale=128.0)
### ...or dynamic loss scaling
# optimizer = FP16_Optimizer(optimizer,
# dynamic_loss_scale=True,
# dynamic_loss_args={'scale_factor' : 2})
### dynamic_loss_args is optional, for "power users," and unnecessary in most cases.
loss_fn = torch.nn.MSELoss()
for t in range(200):
optimizer.zero_grad()
y_pred = model(x)
loss = loss_fn(y_pred.float(), y.float())
### Change loss.backward() to:
optimizer.backward(loss)
###
optimizer.step()
print("final loss = ", loss)
import torch
from apex.fp16_utils import FP16_Optimizer
torch.backends.cudnn.benchmark = True
N, D_in, D_out = 64, 1024, 16
x = torch.randn(N, D_in, device='cuda', dtype=torch.half)
y = torch.randn(N, D_out, device='cuda', dtype=torch.half)
model = torch.nn.Linear(D_in, D_out).cuda().half()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=0.9)
### Construct FP16_Optimizer with static loss scaling...
optimizer = FP16_Optimizer(optimizer, static_loss_scale=128.0)
### ...or dynamic loss scaling
# optimizer = FP16_Optimizer(optimizer, dynamic_loss_scale=True)
loss_fn = torch.nn.MSELoss()
# The checkpointing shown here is identical to what you'd use without FP16_Optimizer.
#
# We save/load checkpoints within local scopes, so the "checkpoint" object
# does not persist. This helps avoid dangling references to intermediate deserialized data,
# and is good practice for Pytorch in general, not just with FP16_Optimizer.
def save_checkpoint():
checkpoint = {}
checkpoint['model'] = model.state_dict()
checkpoint['optimizer'] = optimizer.state_dict()
torch.save(checkpoint, 'saved.pth')
def load_checkpoint():
checkpoint = torch.load('saved.pth',
map_location = lambda storage, loc: storage.cuda(torch.cuda.current_device()))
model.load_state_dict(checkpoint['model'])
optimizer.load_state_dict(checkpoint['optimizer'])
for t in range(100):
optimizer.zero_grad()
y_pred = model(x)
loss = loss_fn(y_pred.float(), y.float())
optimizer.backward(loss) ### formerly loss.backward()
optimizer.step()
save_checkpoint()
load_checkpoint()
for t in range(100):
optimizer.zero_grad()
y_pred = model(x)
loss = loss_fn(y_pred.float(), y.float())
optimizer.backward(loss) ### formerly loss.backward()
optimizer.step()
print("final loss = ", loss)
## Contents:
**distributed**: Walkthrough of apex distributed data parallel utilities.
**FP16_Optimizer_simple**: Simple examples demonstrating various use cases of `FP16_Optimizer` to automatically manage master parameters and static or dynamic loss scaling.
**imagenet**: Example based on [https://github.com/pytorch/examples/tree/master/imagenet](https://github.com/pytorch/examples/tree/master/imagenet) showing the use of `FP16_Optimizer`, as well as manual management of master parameters and loss scaling for illustration/comparison.
**word_language_model**: Example based on [https://github.com/pytorch/examples/tree/master/word_language_model](https://github.com/pytorch/examples/tree/master/word_language_model) showing the use of `FP16_Optimizer`, as well as manual management of master parameters and loss scaling for illustration/comparison.
**docker**: Example of a minimal Dockerfile that installs Apex on top of an existing container.
# Multiprocess Example based on pytorch/examples/mnist
main.py demonstrates how to modify a simple model to enable multiprocess distributed data parallel
training using the module wrapper `apex.parallel.DistributedDataParallel`
(similar to `torch.nn.parallel.DistributedDataParallel`).
Multiprocess distributed data parallel training frequently outperforms single-process
data parallel training (such as that offered by `torch.nn.DataParallel`) because each process has its
own python interpreter. Therefore, driving multiple GPUs with multiple processes reduces
global interpreter lock contention versus having a single process (with a single GIL) drive all GPUs.
`apex.parallel.DistributedDataParallel` is optimized for use with NCCL. It achieves high performance by
overlapping communication with computation during ``backward()`` and bucketing smaller gradient
transfers to reduce the total number of transfers required.
#### [API Documentation](https://nvidia.github.io/apex/parallel.html)
#### [Source Code](https://github.com/NVIDIA/apex/tree/master/apex/parallel)
#### [Another example: Imagenet with mixed precision](https://github.com/NVIDIA/apex/tree/master/examples/imagenet)
#### [Simple example with FP16_Optimizer](https://github.com/NVIDIA/apex/tree/master/examples/FP16_Optimizer_simple/distributed_apex)
## Getting started
Prior to running please run
```pip install -r requirements.txt```
To download the dataset, run
```python main.py```
without any arguments. Once you have downloaded the dataset, you should not need to do this again.
`main.py` runs multiprocess distributed data parallel jobs using the Pytorch launcher script
[torch.distributed.launch](https://pytorch.org/docs/master/distributed.html#launch-utility).
Jobs are launched via
```bash
python -m torch.distributed.launch --nproc_per_node=N main.py args...
```
`torch.distributed.launch` spawns `N` processes, each of which runs as
`python main.py args... --local_rank <rank>`.
The `local_rank` argument for each process is determined and appended by `torch.distributed.launch`,
and varies between 0 and `N-1`. `torch.distributed.launch` also provides environment variables
for each process.
Internally, each process calls `set_device` according to its local
rank and `init_process_group` with `init_method=`env://' to ingest the provided environment
variables.
For best performance, set `N` equal to the number of visible CUDA devices on the node.
## Converting your own model
To understand how to convert your own model, please see all sections of main.py within ```#=====START: ADDED FOR DISTRIBUTED======``` and ```#=====END: ADDED FOR DISTRIBUTED======``` flags.
## Requirements
Pytorch with NCCL available as a distributed backend. Pytorch 0.4+, installed as a pip or conda package, should have this by default. Otherwise, you can build Pytorch from source, in an environment where NCCL is installed and visible.
from __future__ import print_function
import argparse
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from apex.fp16_utils import to_python_float
#=====START: ADDED FOR DISTRIBUTED======
'''Add custom module for distributed'''
try:
from apex.parallel import DistributedDataParallel as DDP
except ImportError:
raise ImportError("Please install apex from https://www.github.com/nvidia/apex to run this example.")
'''Import distributed data loader'''
import torch.utils.data
import torch.utils.data.distributed
'''Import torch.distributed'''
import torch.distributed as dist
#=====END: ADDED FOR DISTRIBUTED======
# Training settings
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=10, metavar='N',
help='number of epochs to train (default: 10)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
help='SGD momentum (default: 0.5)')
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--seed', type=int, default=1, metavar='S',
help='random seed (default: 1)')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
help='how many batches to wait before logging training status')
#======START: ADDED FOR DISTRIBUTED======
'''
Add some distributed options. For explanation of dist-url and dist-backend please see
http://pytorch.org/tutorials/intermediate/dist_tuto.html
--local_rank will be supplied by the Pytorch launcher wrapper (torch.distributed.launch)
'''
parser.add_argument("--local_rank", default=0, type=int)
#=====END: ADDED FOR DISTRIBUTED======
args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()
#======START: ADDED FOR DISTRIBUTED======
'''Add a convenience flag to see if we are running distributed'''
args.distributed = False
if 'WORLD_SIZE' in os.environ:
args.distributed = int(os.environ['WORLD_SIZE']) > 1
if args.distributed:
'''Check that we are running with cuda, as distributed is only supported for cuda.'''
assert args.cuda, "Distributed mode requires running with CUDA."
'''
Set cuda device so everything is done on the right GPU.
THIS MUST BE DONE AS SOON AS POSSIBLE.
'''
torch.cuda.set_device(args.local_rank)
'''Initialize distributed communication'''
torch.distributed.init_process_group(backend='nccl',
init_method='env://')
#=====END: ADDED FOR DISTRIBUTED======
torch.manual_seed(args.seed)
kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {}
#=====START: ADDED FOR DISTRIBUTED======
'''
Change sampler to distributed if running distributed.
Shuffle data loader only if distributed.
'''
train_dataset = datasets.MNIST('../data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
if args.distributed:
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
else:
train_sampler = None
train_loader = torch.utils.data.DataLoader(
train_dataset, sampler=train_sampler,
batch_size=args.batch_size, shuffle=(train_sampler is None), **kwargs
)
#=====END: ADDED FOR DISTRIBUTED======
test_loader = torch.utils.data.DataLoader(
datasets.MNIST('../data', train=False, transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=args.test_batch_size, shuffle=True, **kwargs)
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)
model = Net()
if args.cuda:
model.cuda()
#=====START: ADDED FOR DISTRIBUTED======
'''
Wrap model in our version of DistributedDataParallel.
This must be done AFTER the model is converted to cuda.
'''
if args.distributed:
model = DDP(model)
#=====END: ADDED FOR DISTRIBUTED======
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
def train(epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
if args.cuda:
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0 and args.local_rank == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), to_python_float(loss.data)))
def test():
model.eval()
test_loss = 0
correct = 0
for data, target in test_loader:
with torch.no_grad():
if args.cuda:
data, target = data.cuda(), target.cuda()
output = model(data)
test_loss += to_python_float(F.nll_loss(output, target, size_average=False).data) # sum up batch loss
pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
correct += pred.eq(target.data.view_as(pred)).float().cpu().sum()
test_loss /= len(test_loader.dataset)
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset),
100. * correct / len(test_loader.dataset)))
for epoch in range(1, args.epochs + 1):
#=====START: ADDED FOR DISTRIBUTED======
if args.distributed:
train_sampler.set_epoch(epoch)
#=====END: ADDED FOR DISTRIBUTED======
train(epoch)
if args.local_rank == 0:
test()
# ImageNet training in PyTorch
This example is based on [https://github.com/pytorch/examples/tree/master/imagenet](https://github.com/pytorch/examples/tree/master/imagenet).
It implements training of popular model architectures, such as ResNet, AlexNet, and VGG on the ImageNet dataset.
`main.py` with the `--fp16` argument demonstrates mixed precision training with manual management of master parameters and loss scaling.
`main_fp16_optimizer.py` with `--fp16` demonstrates use of `apex.fp16_utils.FP16_Optimizer` to automatically manage master parameters and loss scaling.
`main_amp.py` with `--fp16` demonstrates use of Amp to automatically perform all FP16-friendly operations in half precision under the hood. Notice that with Amp:
..* you don't need to explicitly convert your model, or the input data, to half(). Conversions will occur on-the-fly internally within the Amp-patched torch functions.
..* dynamic loss scaling is always used under the hood.
`main_reducer.py` is identical to `main.py`, except that it shows the use of [apex.parallel.Reduce](https://nvidia.github.io/apex/parallel.html#apex.parallel.Reducer) instead of `DistributedDataParallel`.
## Requirements
- `pip install -r requirements.txt`
- Download the ImageNet dataset and move validation images to labeled subfolders
- To do this, you can use the following script: https://raw.githubusercontent.com/soumith/imagenetloader.torch/master/valprep.sh
## Training
To train a model, run `main.py` with the desired model architecture and the path to the ImageNet dataset.
The default learning rate schedule starts at 0.1 and decays by a factor of 10 every 30 epochs. This is appropriate for ResNet and models with batch normalization, but too high for AlexNet and VGG. Use 0.01 as the initial learning rate for AlexNet or VGG:
```bash
python main.py -a alexnet --lr 0.01 /path/to/imagenet/folder
```
The directory at /path/to/imagenet/directory should contain two subdirectories called "train"
and "val" that contain the training and validation data respectively.
## Distributed training
`main.py` and `main_fp16_optimizer.py` have been modified to use the `DistributedDataParallel` module in Apex instead of the one in upstream PyTorch. `apex.parallel.DistributedDataParallel`
is a drop-in replacement for `torch.nn.parallel.DistribtuedDataParallel` (see our [distributed example](https://github.com/NVIDIA/apex/tree/master/examples/distributed)).
The scripts can interact with
[torch.distributed.launch](https://pytorch.org/docs/master/distributed.html#launch-utility)
to spawn multiprocess jobs using the following syntax:
```
python -m torch.distributed.launch --nproc_per_node=NUM_GPUS main.py args...
```
`NUM_GPUS` should be less than or equal to the number of visible GPU devices on the node.
Optionally one can run imagenet with sync batch normalization by adding
`--sync_bn` into the `args...`
## Example commands
(note: batch size `--b 224` assumes your GPUs have >=16GB of onboard memory)
```bash
### Softlink training dataset into current directory
$ ln -sf /data/imagenet/train-jpeg/ train
### Softlink validation dataset into current directory
$ ln -sf /data/imagenet/val-jpeg/ val
### Single-process training
$ python main.py -a resnet50 --fp16 --b 224 --workers 4 --static-loss-scale 128.0 ./
### Single-process training with Amp. Amp's casting causes it to use a bit more memory,
### hence the batch size 128.
$ python main_amp.py -a resnet50 --fp16 --b 128 --workers 4 ./
### Multi-process training (uses all visible GPUs on the node)
$ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS main.py -a resnet50 --fp16 --b 224 --workers 4 --static-loss-scale 128.0 ./
### Multi-process training on GPUs 0 and 1 only
$ export CUDA_VISIBLE_DEVICES=0,1
$ python -m torch.distributed.launch --nproc_per_node=2 main.py -a resnet50 --fp16 --b 224 --workers 4 ./
### Multi-process training with FP16_Optimizer, static loss scale 128.0 (still uses FP32 master params)
$ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS main_fp16_optimizer.py -a resnet50 --fp16 --b 224 --static-loss-scale 128.0 --workers 4 ./
### Multi-process training with FP16_Optimizer, dynamic loss scaling
$ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS main_fp16_optimizer.py -a resnet50 --fp16 --b 224 --dynamic-loss-scale --workers 4 ./
```
## Usage for `main.py` and `main_fp16_optimizer.py`
`main_fp16_optimizer.py` also accepts the optional flag
```bash
--dynamic-loss-scale Use dynamic loss scaling. If supplied, this argument
supersedes --static-loss-scale.
```
import argparse
import os
import shutil
import time
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models
import numpy as np
try:
from apex.parallel import DistributedDataParallel as DDP
from apex.fp16_utils import *
except ImportError:
raise ImportError("Please install apex from https://www.github.com/nvidia/apex to run this example.")
model_names = sorted(name for name in models.__dict__
if name.islower() and not name.startswith("__")
and callable(models.__dict__[name]))
parser = argparse.ArgumentParser(description='PyTorch ImageNet Training')
parser.add_argument('data', metavar='DIR',
help='path to dataset')
parser.add_argument('--arch', '-a', metavar='ARCH', default='resnet18',
choices=model_names,
help='model architecture: ' +
' | '.join(model_names) +
' (default: resnet18)')
parser.add_argument('-j', '--workers', default=4, type=int, metavar='N',
help='number of data loading workers (default: 4)')
parser.add_argument('--epochs', default=90, type=int, metavar='N',
help='number of total epochs to run')
parser.add_argument('--start-epoch', default=0, type=int, metavar='N',
help='manual epoch number (useful on restarts)')
parser.add_argument('-b', '--batch-size', default=256, type=int,
metavar='N', help='mini-batch size per process (default: 256)')
parser.add_argument('--lr', '--learning-rate', default=0.1, type=float,
metavar='LR', help='Initial learning rate. Will be scaled by <global batch size>/256: args.lr = args.lr*float(args.batch_size*args.world_size)/256. A warmup schedule will also be applied over the first 5 epochs.')
parser.add_argument('--momentum', default=0.9, type=float, metavar='M',
help='momentum')
parser.add_argument('--weight-decay', '--wd', default=1e-4, type=float,
metavar='W', help='weight decay (default: 1e-4)')
parser.add_argument('--print-freq', '-p', default=10, type=int,
metavar='N', help='print frequency (default: 10)')
parser.add_argument('--resume', default='', type=str, metavar='PATH',
help='path to latest checkpoint (default: none)')
parser.add_argument('-e', '--evaluate', dest='evaluate', action='store_true',
help='evaluate model on validation set')
parser.add_argument('--pretrained', dest='pretrained', action='store_true',
help='use pre-trained model')
parser.add_argument('--fp16', action='store_true',
help='Run model fp16 mode.')
parser.add_argument('--static-loss-scale', type=float, default=1,
help='Static loss scale, positive power of 2 values can improve fp16 convergence.')
parser.add_argument('--prof', dest='prof', action='store_true',
help='Only run 10 iterations for profiling.')
parser.add_argument('--deterministic', action='store_true')
parser.add_argument("--local_rank", default=0, type=int)
parser.add_argument('--sync_bn', action='store_true',
help='enabling apex sync BN.')
cudnn.benchmark = True
def fast_collate(batch):
imgs = [img[0] for img in batch]
targets = torch.tensor([target[1] for target in batch], dtype=torch.int64)
w = imgs[0].size[0]
h = imgs[0].size[1]
tensor = torch.zeros( (len(imgs), 3, h, w), dtype=torch.uint8 )
for i, img in enumerate(imgs):
nump_array = np.asarray(img, dtype=np.uint8)
tens = torch.from_numpy(nump_array)
if(nump_array.ndim < 3):
nump_array = np.expand_dims(nump_array, axis=-1)
nump_array = np.rollaxis(nump_array, 2)
tensor[i] += torch.from_numpy(nump_array)
return tensor, targets
best_prec1 = 0
args = parser.parse_args()
if args.deterministic:
cudnn.benchmark = False
cudnn.deterministic = True
torch.manual_seed(args.local_rank)
def main():
global best_prec1, args
args.distributed = False
if 'WORLD_SIZE' in os.environ:
args.distributed = int(os.environ['WORLD_SIZE']) > 1
args.gpu = 0
args.world_size = 1
if args.distributed:
args.gpu = args.local_rank % torch.cuda.device_count()
torch.cuda.set_device(args.gpu)
torch.distributed.init_process_group(backend='nccl',
init_method='env://')
args.world_size = torch.distributed.get_world_size()
if args.fp16:
assert torch.backends.cudnn.enabled, "fp16 mode requires cudnn backend to be enabled."
if args.static_loss_scale != 1.0:
if not args.fp16:
print("Warning: static_loss_scale != 1.0 is only necessary with --fp16. "
"Resetting static_loss_scale to 1.0")
args.static_loss_scale = 1.0
# create model
if args.pretrained:
print("=> using pre-trained model '{}'".format(args.arch))
model = models.__dict__[args.arch](pretrained=True)
else:
print("=> creating model '{}'".format(args.arch))
model = models.__dict__[args.arch]()
if args.sync_bn:
import apex
print("using apex synced BN")
model = apex.parallel.convert_syncbn_model(model)
model = model.cuda()
if args.fp16:
model = network_to_half(model)
if args.distributed:
# By default, apex.parallel.DistributedDataParallel overlaps communication with
# computation in the backward pass.
# model = DDP(model)
# delay_allreduce delays all communication to the end of the backward pass.
model = DDP(model, delay_allreduce=True)
global model_params, master_params
if args.fp16:
model_params, master_params = prep_param_lists(model)
else:
master_params = list(model.parameters())
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda()
# Scale learning rate based on global batch size
args.lr = args.lr*float(args.batch_size*args.world_size)/256.
optimizer = torch.optim.SGD(master_params, args.lr,
momentum=args.momentum,
weight_decay=args.weight_decay)
# Optionally resume from a checkpoint
if args.resume:
# Use a local scope to avoid dangling references
def resume():
if os.path.isfile(args.resume):
print("=> loading checkpoint '{}'".format(args.resume))
checkpoint = torch.load(args.resume, map_location = lambda storage, loc: storage.cuda(args.gpu))
args.start_epoch = checkpoint['epoch']
best_prec1 = checkpoint['best_prec1']
model.load_state_dict(checkpoint['state_dict'])
if args.fp16:
saved_master_params = checkpoint['master_params']
for master, saved in zip(master_params, saved_master_params):
master.data.copy_(saved.data)
optimizer.load_state_dict(checkpoint['optimizer'])
print("=> loaded checkpoint '{}' (epoch {})"
.format(args.resume, checkpoint['epoch']))
else:
print("=> no checkpoint found at '{}'".format(args.resume))
resume()
# Data loading code
traindir = os.path.join(args.data, 'train')
valdir = os.path.join(args.data, 'val')
if(args.arch == "inception_v3"):
crop_size = 299
val_size = 320 # I chose this value arbitrarily, we can adjust.
else:
crop_size = 224
val_size = 256
train_dataset = datasets.ImageFolder(
traindir,
transforms.Compose([
transforms.RandomResizedCrop(crop_size),
transforms.RandomHorizontalFlip(),
# transforms.ToTensor(), Too slow
# normalize,
]))
val_dataset = datasets.ImageFolder(valdir, transforms.Compose([
transforms.Resize(val_size),
transforms.CenterCrop(crop_size),
]))
train_sampler = None
val_sampler = None
if args.distributed:
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
val_sampler = torch.utils.data.distributed.DistributedSampler(val_dataset)
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),
num_workers=args.workers, pin_memory=True, sampler=train_sampler, collate_fn=fast_collate)
val_loader = torch.utils.data.DataLoader(
val_dataset,
batch_size=args.batch_size, shuffle=False,
num_workers=args.workers, pin_memory=True,
sampler=val_sampler,
collate_fn=fast_collate)
if args.evaluate:
validate(val_loader, model, criterion)
return
for epoch in range(args.start_epoch, args.epochs):
if args.distributed:
train_sampler.set_epoch(epoch)
# train for one epoch
train(train_loader, model, criterion, optimizer, epoch)
if args.prof:
break
# evaluate on validation set
prec1 = validate(val_loader, model, criterion)
# remember best prec@1 and save checkpoint
if args.local_rank == 0:
is_best = prec1 > best_prec1
best_prec1 = max(prec1, best_prec1)
# Use local scope to avoid dangling references
def create_and_save_checkpoint():
checkpoint_dict = {
'epoch': epoch + 1,
'arch': args.arch,
'state_dict': model.state_dict(),
'best_prec1': best_prec1,
'optimizer' : optimizer.state_dict(),
}
if args.fp16:
checkpoint_dict['master_params'] = master_params
save_checkpoint(checkpoint_dict, is_best)
create_and_save_checkpoint()
class data_prefetcher():
def __init__(self, loader):
self.loader = iter(loader)
self.stream = torch.cuda.Stream()
self.mean = torch.tensor([0.485 * 255, 0.456 * 255, 0.406 * 255]).cuda().view(1,3,1,1)
self.std = torch.tensor([0.229 * 255, 0.224 * 255, 0.225 * 255]).cuda().view(1,3,1,1)
if args.fp16:
self.mean = self.mean.half()
self.std = self.std.half()
self.preload()
def preload(self):
try:
self.next_input, self.next_target = next(self.loader)
except StopIteration:
self.next_input = None
self.next_target = None
return
with torch.cuda.stream(self.stream):
self.next_input = self.next_input.cuda(non_blocking=True)
self.next_target = self.next_target.cuda(non_blocking=True)
if args.fp16:
self.next_input = self.next_input.half()
else:
self.next_input = self.next_input.float()
self.next_input = self.next_input.sub_(self.mean).div_(self.std)
def next(self):
torch.cuda.current_stream().wait_stream(self.stream)
input = self.next_input
target = self.next_target
self.preload()
return input, target
def train(train_loader, model, criterion, optimizer, epoch):
batch_time = AverageMeter()
data_time = AverageMeter()
losses = AverageMeter()
top1 = AverageMeter()
top5 = AverageMeter()
# switch to train mode
model.train()
end = time.time()
prefetcher = data_prefetcher(train_loader)
input, target = prefetcher.next()
i = -1
while input is not None:
i += 1
adjust_learning_rate(optimizer, epoch, i, len(train_loader))
if args.prof:
if i > 10:
break
# measure data loading time
data_time.update(time.time() - end)
# compute output
output = model(input)
loss = criterion(output, target)
# measure accuracy and record loss
prec1, prec5 = accuracy(output.data, target, topk=(1, 5))
if args.distributed:
reduced_loss = reduce_tensor(loss.data)
prec1 = reduce_tensor(prec1)
prec5 = reduce_tensor(prec5)
else:
reduced_loss = loss.data
losses.update(to_python_float(reduced_loss), input.size(0))
top1.update(to_python_float(prec1), input.size(0))
top5.update(to_python_float(prec5), input.size(0))
loss = loss*args.static_loss_scale
# compute gradient and do SGD step
if args.fp16:
model.zero_grad()
loss.backward()
model_grads_to_master_grads(model_params, master_params)
if args.static_loss_scale != 1:
for param in master_params:
param.grad.data = param.grad.data/args.static_loss_scale
optimizer.step()
master_params_to_model_params(model_params, master_params)
else:
optimizer.zero_grad()
loss.backward()
optimizer.step()
torch.cuda.synchronize()
# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()
input, target = prefetcher.next()
if args.local_rank == 0 and i % args.print_freq == 0 and i > 1:
print('Epoch: [{0}][{1}/{2}]\t'
'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
'Speed {3:.3f} ({4:.3f})\t'
'Data {data_time.val:.3f} ({data_time.avg:.3f})\t'
'Loss {loss.val:.4f} ({loss.avg:.4f})\t'
'Prec@1 {top1.val:.3f} ({top1.avg:.3f})\t'
'Prec@5 {top5.val:.3f} ({top5.avg:.3f})'.format(
epoch, i, len(train_loader),
args.world_size * args.batch_size / batch_time.val,
args.world_size * args.batch_size / batch_time.avg,
batch_time=batch_time,
data_time=data_time, loss=losses, top1=top1, top5=top5))
def validate(val_loader, model, criterion):
batch_time = AverageMeter()
losses = AverageMeter()
top1 = AverageMeter()
top5 = AverageMeter()
# switch to evaluate mode
model.eval()
end = time.time()
prefetcher = data_prefetcher(val_loader)
input, target = prefetcher.next()
i = -1
while input is not None:
i += 1
# compute output
with torch.no_grad():
output = model(input)
loss = criterion(output, target)
# measure accuracy and record loss
prec1, prec5 = accuracy(output.data, target, topk=(1, 5))
if args.distributed:
reduced_loss = reduce_tensor(loss.data)
prec1 = reduce_tensor(prec1)
prec5 = reduce_tensor(prec5)
else:
reduced_loss = loss.data
losses.update(to_python_float(reduced_loss), input.size(0))
top1.update(to_python_float(prec1), input.size(0))
top5.update(to_python_float(prec5), input.size(0))
# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()
if args.local_rank == 0 and i % args.print_freq == 0:
print('Test: [{0}/{1}]\t'
'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
'Speed {2:.3f} ({3:.3f})\t'
'Loss {loss.val:.4f} ({loss.avg:.4f})\t'
'Prec@1 {top1.val:.3f} ({top1.avg:.3f})\t'
'Prec@5 {top5.val:.3f} ({top5.avg:.3f})'.format(
i, len(val_loader),
args.world_size * args.batch_size / batch_time.val,
args.world_size * args.batch_size / batch_time.avg,
batch_time=batch_time, loss=losses,
top1=top1, top5=top5))
input, target = prefetcher.next()
print(' * Prec@1 {top1.avg:.3f} Prec@5 {top5.avg:.3f}'
.format(top1=top1, top5=top5))
return top1.avg
def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'):
torch.save(state, filename)
if is_best:
shutil.copyfile(filename, 'model_best.pth.tar')
class AverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self):
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def adjust_learning_rate(optimizer, epoch, step, len_epoch):
"""LR schedule that should yield 76% converged accuracy with batch size 256"""
factor = epoch // 30
if epoch >= 80:
factor = factor + 1
lr = args.lr*(0.1**factor)
"""Warmup"""
if epoch < 5:
lr = lr*float(1 + step + epoch*len_epoch)/(5.*len_epoch)
# if(args.local_rank == 0):
# print("epoch = {}, step = {}, lr = {}".format(epoch, step, lr))
for param_group in optimizer.param_groups:
param_group['lr'] = lr
def accuracy(output, target, topk=(1,)):
"""Computes the precision@k for the specified values of k"""
maxk = max(topk)
batch_size = target.size(0)
_, pred = output.topk(maxk, 1, True, True)
pred = pred.t()
correct = pred.eq(target.view(1, -1).expand_as(pred))
res = []
for k in topk:
correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
res.append(correct_k.mul_(100.0 / batch_size))
return res
def reduce_tensor(tensor):
rt = tensor.clone()
dist.all_reduce(rt, op=dist.reduce_op.SUM)
rt /= args.world_size
return rt
if __name__ == '__main__':
main()
import argparse
import os
import shutil
import time
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models
import numpy as np
try:
from apex.parallel import DistributedDataParallel as DDP
from apex.fp16_utils import *
from apex import amp
except ImportError:
raise ImportError("Please install apex from https://www.github.com/nvidia/apex to run this example.")
model_names = sorted(name for name in models.__dict__
if name.islower() and not name.startswith("__")
and callable(models.__dict__[name]))
parser = argparse.ArgumentParser(description='PyTorch ImageNet Training')
parser.add_argument('data', metavar='DIR',
help='path to dataset')
parser.add_argument('--arch', '-a', metavar='ARCH', default='resnet18',
choices=model_names,
help='model architecture: ' +
' | '.join(model_names) +
' (default: resnet18)')
parser.add_argument('-j', '--workers', default=4, type=int, metavar='N',
help='number of data loading workers (default: 4)')
parser.add_argument('--epochs', default=90, type=int, metavar='N',
help='number of total epochs to run')
parser.add_argument('--start-epoch', default=0, type=int, metavar='N',
help='manual epoch number (useful on restarts)')
parser.add_argument('-b', '--batch-size', default=256, type=int,
metavar='N', help='mini-batch size per process (default: 256)')
parser.add_argument('--lr', '--learning-rate', default=0.1, type=float,
metavar='LR', help='Initial learning rate. Will be scaled by <global batch size>/256: args.lr = args.lr*float(args.batch_size*args.world_size)/256. A warmup schedule will also be applied over the first 5 epochs.')
parser.add_argument('--momentum', default=0.9, type=float, metavar='M',
help='momentum')
parser.add_argument('--weight-decay', '--wd', default=1e-4, type=float,
metavar='W', help='weight decay (default: 1e-4)')
parser.add_argument('--print-freq', '-p', default=10, type=int,
metavar='N', help='print frequency (default: 10)')
parser.add_argument('--resume', default='', type=str, metavar='PATH',
help='path to latest checkpoint (default: none)')
parser.add_argument('-e', '--evaluate', dest='evaluate', action='store_true',
help='evaluate model on validation set')
parser.add_argument('--pretrained', dest='pretrained', action='store_true',
help='use pre-trained model')
parser.add_argument('--fp16', action='store_true',
help='Run model fp16 mode.')
parser.add_argument('--prof', dest='prof', action='store_true',
help='Only run 10 iterations for profiling.')
parser.add_argument('--deterministic', action='store_true')
parser.add_argument("--local_rank", default=0, type=int)
parser.add_argument('--sync_bn', action='store_true',
help='enabling apex sync BN.')
cudnn.benchmark = True
def fast_collate(batch):
imgs = [img[0] for img in batch]
targets = torch.tensor([target[1] for target in batch], dtype=torch.int64)
w = imgs[0].size[0]
h = imgs[0].size[1]
tensor = torch.zeros( (len(imgs), 3, h, w), dtype=torch.uint8 )
for i, img in enumerate(imgs):
nump_array = np.asarray(img, dtype=np.uint8)
tens = torch.from_numpy(nump_array)
if(nump_array.ndim < 3):
nump_array = np.expand_dims(nump_array, axis=-1)
nump_array = np.rollaxis(nump_array, 2)
tensor[i] += torch.from_numpy(nump_array)
return tensor, targets
best_prec1 = 0
args = parser.parse_args()
if args.deterministic:
cudnn.benchmark = False
cudnn.deterministic = True
torch.manual_seed(args.local_rank)
torch.set_printoptions(precision=10)
# Initialize Amp
amp_handle = amp.init(enabled=args.fp16)
def main():
global best_prec1, args
args.distributed = False
if 'WORLD_SIZE' in os.environ:
args.distributed = int(os.environ['WORLD_SIZE']) > 1
args.gpu = 0
args.world_size = 1
if args.distributed:
args.gpu = args.local_rank % torch.cuda.device_count()
torch.cuda.set_device(args.gpu)
torch.distributed.init_process_group(backend='nccl',
init_method='env://')
args.world_size = torch.distributed.get_world_size()
if args.fp16:
assert torch.backends.cudnn.enabled, "fp16 mode requires cudnn backend to be enabled."
# create model
if args.pretrained:
print("=> using pre-trained model '{}'".format(args.arch))
model = models.__dict__[args.arch](pretrained=True)
else:
print("=> creating model '{}'".format(args.arch))
model = models.__dict__[args.arch]()
if args.sync_bn:
import apex
print("using apex synced BN")
model = apex.parallel.convert_syncbn_model(model)
model = model.cuda()
if args.distributed:
# By default, apex.parallel.DistributedDataParallel overlaps communication with
# computation in the backward pass.
# model = DDP(model)
# delay_allreduce delays all communication to the end of the backward pass.
model = DDP(model, delay_allreduce=True)
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda()
# Scale learning rate based on global batch size
args.lr = args.lr*float(args.batch_size*args.world_size)/256.
optimizer = torch.optim.SGD(model.parameters(), args.lr,
momentum=args.momentum,
weight_decay=args.weight_decay)
# Optionally resume from a checkpoint
if args.resume:
# Use a local scope to avoid dangling references
def resume():
if os.path.isfile(args.resume):
print("=> loading checkpoint '{}'".format(args.resume))
checkpoint = torch.load(args.resume, map_location = lambda storage, loc: storage.cuda(args.gpu))
args.start_epoch = checkpoint['epoch']
best_prec1 = checkpoint['best_prec1']
model.load_state_dict(checkpoint['state_dict'])
optimizer.load_state_dict(checkpoint['optimizer'])
print("=> loaded checkpoint '{}' (epoch {})"
.format(args.resume, checkpoint['epoch']))
else:
print("=> no checkpoint found at '{}'".format(args.resume))
resume()
# Data loading code
traindir = os.path.join(args.data, 'train')
valdir = os.path.join(args.data, 'val')
if(args.arch == "inception_v3"):
crop_size = 299
val_size = 320 # I chose this value arbitrarily, we can adjust.
else:
crop_size = 224
val_size = 256
train_dataset = datasets.ImageFolder(
traindir,
transforms.Compose([
transforms.RandomResizedCrop(crop_size),
transforms.RandomHorizontalFlip(),
# transforms.ToTensor(), Too slow
# normalize,
]))
val_dataset = datasets.ImageFolder(valdir, transforms.Compose([
transforms.Resize(val_size),
transforms.CenterCrop(crop_size),
]))
train_sampler = None
val_sampler = None
if args.distributed:
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
val_sampler = torch.utils.data.distributed.DistributedSampler(val_dataset)
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),
num_workers=args.workers, pin_memory=True, sampler=train_sampler, collate_fn=fast_collate)
val_loader = torch.utils.data.DataLoader(
val_dataset,
batch_size=args.batch_size, shuffle=False,
num_workers=args.workers, pin_memory=True,
sampler=val_sampler,
collate_fn=fast_collate)
if args.evaluate:
validate(val_loader, model, criterion)
return
for epoch in range(args.start_epoch, args.epochs):
if args.distributed:
train_sampler.set_epoch(epoch)
# train for one epoch
train(train_loader, model, criterion, optimizer, epoch)
if args.prof:
break
# evaluate on validation set
prec1 = validate(val_loader, model, criterion)
# remember best prec@1 and save checkpoint
if args.local_rank == 0:
is_best = prec1 > best_prec1
best_prec1 = max(prec1, best_prec1)
save_checkpoint({
'epoch': epoch + 1,
'arch': args.arch,
'state_dict': model.state_dict(),
'best_prec1': best_prec1,
'optimizer' : optimizer.state_dict(),
}, is_best)
class data_prefetcher():
def __init__(self, loader):
self.loader = iter(loader)
self.stream = torch.cuda.Stream()
self.mean = torch.tensor([0.485 * 255, 0.456 * 255, 0.406 * 255]).cuda().view(1,3,1,1)
self.std = torch.tensor([0.229 * 255, 0.224 * 255, 0.225 * 255]).cuda().view(1,3,1,1)
# With Amp, it isn't necessary to manually convert data to half.
# Type conversions are done internally on the fly within patched torch functions.
# if args.fp16:
# self.mean = self.mean.half()
# self.std = self.std.half()
self.preload()
def preload(self):
try:
self.next_input, self.next_target = next(self.loader)
except StopIteration:
self.next_input = None
self.next_target = None
return
with torch.cuda.stream(self.stream):
self.next_input = self.next_input.cuda(non_blocking=True)
self.next_target = self.next_target.cuda(non_blocking=True)
# With Amp, it isn't necessary to manually convert data to half.
# Type conversions are done internally on the fly within patched torch functions.
# if args.fp16:
# self.next_input = self.next_input.half()
# else:
self.next_input = self.next_input.float()
self.next_input = self.next_input.sub_(self.mean).div_(self.std)
def next(self):
torch.cuda.current_stream().wait_stream(self.stream)
input = self.next_input
target = self.next_target
self.preload()
return input, target
def train(train_loader, model, criterion, optimizer, epoch):
batch_time = AverageMeter()
data_time = AverageMeter()
losses = AverageMeter()
top1 = AverageMeter()
top5 = AverageMeter()
# switch to train mode
model.train()
end = time.time()
prefetcher = data_prefetcher(train_loader)
input, target = prefetcher.next()
i = -1
while input is not None:
i += 1
adjust_learning_rate(optimizer, epoch, i, len(train_loader))
if args.prof:
if i > 10:
break
# measure data loading time
data_time.update(time.time() - end)
# compute output
output = model(input)
loss = criterion(output, target)
# measure accuracy and record loss
prec1, prec5 = accuracy(output.data, target, topk=(1, 5))
if args.distributed:
reduced_loss = reduce_tensor(loss.data)
prec1 = reduce_tensor(prec1)
prec5 = reduce_tensor(prec5)
else:
reduced_loss = loss.data
losses.update(to_python_float(reduced_loss), input.size(0))
top1.update(to_python_float(prec1), input.size(0))
top5.update(to_python_float(prec5), input.size(0))
# compute gradient and do SGD step
optimizer.zero_grad()
with amp_handle.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
optimizer.step()
torch.cuda.synchronize()
# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()
input, target = prefetcher.next()
if args.local_rank == 0 and i % args.print_freq == 0 and i > 1:
print('Epoch: [{0}][{1}/{2}]\t'
'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
'Speed {3:.3f} ({4:.3f})\t'
'Data {data_time.val:.3f} ({data_time.avg:.3f})\t'
'Loss {loss.val:.10f} ({loss.avg:.4f})\t'
'Prec@1 {top1.val:.3f} ({top1.avg:.3f})\t'
'Prec@5 {top5.val:.3f} ({top5.avg:.3f})'.format(
epoch, i, len(train_loader),
args.world_size * args.batch_size / batch_time.val,
args.world_size * args.batch_size / batch_time.avg,
batch_time=batch_time,
data_time=data_time, loss=losses, top1=top1, top5=top5))
def validate(val_loader, model, criterion):
batch_time = AverageMeter()
losses = AverageMeter()
top1 = AverageMeter()
top5 = AverageMeter()
# switch to evaluate mode
model.eval()
end = time.time()
prefetcher = data_prefetcher(val_loader)
input, target = prefetcher.next()
i = -1
while input is not None:
i += 1
# compute output
with torch.no_grad():
output = model(input)
loss = criterion(output, target)
# measure accuracy and record loss
prec1, prec5 = accuracy(output.data, target, topk=(1, 5))
if args.distributed:
reduced_loss = reduce_tensor(loss.data)
prec1 = reduce_tensor(prec1)
prec5 = reduce_tensor(prec5)
else:
reduced_loss = loss.data
losses.update(to_python_float(reduced_loss), input.size(0))
top1.update(to_python_float(prec1), input.size(0))
top5.update(to_python_float(prec5), input.size(0))
# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()
if args.local_rank == 0 and i % args.print_freq == 0:
print('Test: [{0}/{1}]\t'
'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
'Speed {2:.3f} ({3:.3f})\t'
'Loss {loss.val:.4f} ({loss.avg:.4f})\t'
'Prec@1 {top1.val:.3f} ({top1.avg:.3f})\t'
'Prec@5 {top5.val:.3f} ({top5.avg:.3f})'.format(
i, len(val_loader),
args.world_size * args.batch_size / batch_time.val,
args.world_size * args.batch_size / batch_time.avg,
batch_time=batch_time, loss=losses,
top1=top1, top5=top5))
input, target = prefetcher.next()
print(' * Prec@1 {top1.avg:.3f} Prec@5 {top5.avg:.3f}'
.format(top1=top1, top5=top5))
return top1.avg
def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'):
torch.save(state, filename)
if is_best:
shutil.copyfile(filename, 'model_best.pth.tar')
class AverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self):
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def adjust_learning_rate(optimizer, epoch, step, len_epoch):
"""LR schedule that should yield 76% converged accuracy with batch size 256"""
factor = epoch // 30
if epoch >= 80:
factor = factor + 1
lr = args.lr*(0.1**factor)
"""Warmup"""
if epoch < 5:
lr = lr*float(1 + step + epoch*len_epoch)/(5.*len_epoch)
# if(args.local_rank == 0):
# print("epoch = {}, step = {}, lr = {}".format(epoch, step, lr))
for param_group in optimizer.param_groups:
param_group['lr'] = lr
def accuracy(output, target, topk=(1,)):
"""Computes the precision@k for the specified values of k"""
maxk = max(topk)
batch_size = target.size(0)
_, pred = output.topk(maxk, 1, True, True)
pred = pred.t()
correct = pred.eq(target.view(1, -1).expand_as(pred))
res = []
for k in topk:
correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
res.append(correct_k.mul_(100.0 / batch_size))
return res
def reduce_tensor(tensor):
rt = tensor.clone()
dist.all_reduce(rt, op=dist.reduce_op.SUM)
rt /= args.world_size
return rt
if __name__ == '__main__':
main()
import argparse
import os
import shutil
import time
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models
import numpy as np
try:
from apex.parallel import DistributedDataParallel as DDP
from apex.fp16_utils import *
except ImportError:
raise ImportError("Please install apex from https://www.github.com/nvidia/apex to run this example.")
model_names = sorted(name for name in models.__dict__
if name.islower() and not name.startswith("__")
and callable(models.__dict__[name]))
parser = argparse.ArgumentParser(description='PyTorch ImageNet Training')
parser.add_argument('data', metavar='DIR',
help='path to dataset')
parser.add_argument('--arch', '-a', metavar='ARCH', default='resnet18',
choices=model_names,
help='model architecture: ' +
' | '.join(model_names) +
' (default: resnet18)')
parser.add_argument('-j', '--workers', default=4, type=int, metavar='N',
help='number of data loading workers (default: 4)')
parser.add_argument('--epochs', default=90, type=int, metavar='N',
help='number of total epochs to run')
parser.add_argument('--start-epoch', default=0, type=int, metavar='N',
help='manual epoch number (useful on restarts)')
parser.add_argument('-b', '--batch-size', default=256, type=int,
metavar='N', help='mini-batch size per process (default: 256)')
parser.add_argument('--lr', '--learning-rate', default=0.1, type=float,
metavar='LR', help='Initial learning rate. Will be scaled by <global batch size>/256: args.lr = args.lr*float(args.batch_size*args.world_size)/256. A warmup schedule will also be applied over the first 5 epochs.')
parser.add_argument('--momentum', default=0.9, type=float, metavar='M',
help='momentum')
parser.add_argument('--weight-decay', '--wd', default=1e-4, type=float,
metavar='W', help='weight decay (default: 1e-4)')
parser.add_argument('--print-freq', '-p', default=10, type=int,
metavar='N', help='print frequency (default: 10)')
parser.add_argument('--resume', default='', type=str, metavar='PATH',
help='path to latest checkpoint (default: none)')
parser.add_argument('-e', '--evaluate', dest='evaluate', action='store_true',
help='evaluate model on validation set')
parser.add_argument('--pretrained', dest='pretrained', action='store_true',
help='use pre-trained model')
parser.add_argument('--fp16', action='store_true',
help='Run model fp16 mode.')
parser.add_argument('--static-loss-scale', type=float, default=1,
help='Static loss scale, positive power of 2 values can improve fp16 convergence.')
parser.add_argument('--dynamic-loss-scale', action='store_true',
help='Use dynamic loss scaling. If supplied, this argument supersedes ' +
'--static-loss-scale.')
parser.add_argument('--prof', dest='prof', action='store_true',
help='Only run 10 iterations for profiling.')
parser.add_argument('--deterministic', action='store_true')
parser.add_argument("--local_rank", default=0, type=int)
parser.add_argument('--sync_bn', action='store_true',
help='enabling apex sync BN.')
cudnn.benchmark = True
def fast_collate(batch):
imgs = [img[0] for img in batch]
targets = torch.tensor([target[1] for target in batch], dtype=torch.int64)
w = imgs[0].size[0]
h = imgs[0].size[1]
tensor = torch.zeros( (len(imgs), 3, h, w), dtype=torch.uint8 )
for i, img in enumerate(imgs):
nump_array = np.asarray(img, dtype=np.uint8)
tens = torch.from_numpy(nump_array)
if(nump_array.ndim < 3):
nump_array = np.expand_dims(nump_array, axis=-1)
nump_array = np.rollaxis(nump_array, 2)
tensor[i] += torch.from_numpy(nump_array)
return tensor, targets
best_prec1 = 0
args = parser.parse_args()
if args.deterministic:
cudnn.benchmark = False
cudnn.deterministic = True
torch.manual_seed(args.local_rank)
torch.set_printoptions(precision=10)
def main():
global best_prec1, args
args.distributed = False
if 'WORLD_SIZE' in os.environ:
args.distributed = int(os.environ['WORLD_SIZE']) > 1
args.gpu = 0
args.world_size = 1
if args.distributed:
args.gpu = args.local_rank % torch.cuda.device_count()
torch.cuda.set_device(args.gpu)
torch.distributed.init_process_group(backend='nccl',
init_method='env://')
args.world_size = torch.distributed.get_world_size()
if args.fp16:
assert torch.backends.cudnn.enabled, "fp16 mode requires cudnn backend to be enabled."
if args.static_loss_scale != 1.0:
if not args.fp16:
print("Warning: if --fp16 is not used, static_loss_scale will be ignored.")
# create model
if args.pretrained:
print("=> using pre-trained model '{}'".format(args.arch))
model = models.__dict__[args.arch](pretrained=True)
else:
print("=> creating model '{}'".format(args.arch))
model = models.__dict__[args.arch]()
if args.sync_bn:
import apex
print("using apex synced BN")
model = apex.parallel.convert_syncbn_model(model)
model = model.cuda()
if args.fp16:
model = FP16Model(model)
if args.distributed:
# By default, apex.parallel.DistributedDataParallel overlaps communication with
# computation in the backward pass.
# model = DDP(model)
# delay_allreduce delays all communication to the end of the backward pass.
model = DDP(model, delay_allreduce=True)
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda()
# Scale learning rate based on global batch size
args.lr = args.lr*float(args.batch_size*args.world_size)/256.
optimizer = torch.optim.SGD(model.parameters(), args.lr,
momentum=args.momentum,
weight_decay=args.weight_decay)
if args.fp16:
optimizer = FP16_Optimizer(optimizer,
static_loss_scale=args.static_loss_scale,
dynamic_loss_scale=args.dynamic_loss_scale)
# Optionally resume from a checkpoint
if args.resume:
# Use a local scope to avoid dangling references
def resume():
if os.path.isfile(args.resume):
print("=> loading checkpoint '{}'".format(args.resume))
checkpoint = torch.load(args.resume, map_location = lambda storage, loc: storage.cuda(args.gpu))
args.start_epoch = checkpoint['epoch']
best_prec1 = checkpoint['best_prec1']
model.load_state_dict(checkpoint['state_dict'])
# An FP16_Optimizer instance's state dict internally stashes the master params.
optimizer.load_state_dict(checkpoint['optimizer'])
print("=> loaded checkpoint '{}' (epoch {})"
.format(args.resume, checkpoint['epoch']))
else:
print("=> no checkpoint found at '{}'".format(args.resume))
resume()
# Data loading code
traindir = os.path.join(args.data, 'train')
valdir = os.path.join(args.data, 'val')
if(args.arch == "inception_v3"):
crop_size = 299
val_size = 320 # I chose this value arbitrarily, we can adjust.
else:
crop_size = 224
val_size = 256
train_dataset = datasets.ImageFolder(
traindir,
transforms.Compose([
transforms.RandomResizedCrop(crop_size),
transforms.RandomHorizontalFlip(),
# transforms.ToTensor(), Too slow
# normalize,
]))
val_dataset = datasets.ImageFolder(valdir, transforms.Compose([
transforms.Resize(val_size),
transforms.CenterCrop(crop_size),
]))
train_sampler = None
val_sampler = None
if args.distributed:
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
val_sampler = torch.utils.data.distributed.DistributedSampler(val_dataset)
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),
num_workers=args.workers, pin_memory=True, sampler=train_sampler, collate_fn=fast_collate)
val_loader = torch.utils.data.DataLoader(
val_dataset,
batch_size=args.batch_size, shuffle=False,
num_workers=args.workers, pin_memory=True,
sampler=val_sampler,
collate_fn=fast_collate)
if args.evaluate:
validate(val_loader, model, criterion)
return
for epoch in range(args.start_epoch, args.epochs):
if args.distributed:
train_sampler.set_epoch(epoch)
# train for one epoch
train(train_loader, model, criterion, optimizer, epoch)
if args.prof:
break
# evaluate on validation set
prec1 = validate(val_loader, model, criterion)
# remember best prec@1 and save checkpoint
if args.local_rank == 0:
is_best = prec1 > best_prec1
best_prec1 = max(prec1, best_prec1)
save_checkpoint({
'epoch': epoch + 1,
'arch': args.arch,
'state_dict': model.state_dict(),
'best_prec1': best_prec1,
'optimizer' : optimizer.state_dict(),
}, is_best)
class data_prefetcher():
def __init__(self, loader):
self.loader = iter(loader)
self.stream = torch.cuda.Stream()
self.mean = torch.tensor([0.485 * 255, 0.456 * 255, 0.406 * 255]).cuda().view(1,3,1,1)
self.std = torch.tensor([0.229 * 255, 0.224 * 255, 0.225 * 255]).cuda().view(1,3,1,1)
if args.fp16:
self.mean = self.mean.half()
self.std = self.std.half()
self.preload()
def preload(self):
try:
self.next_input, self.next_target = next(self.loader)
except StopIteration:
self.next_input = None
self.next_target = None
return
with torch.cuda.stream(self.stream):
self.next_input = self.next_input.cuda(non_blocking=True)
self.next_target = self.next_target.cuda(non_blocking=True)
if args.fp16:
self.next_input = self.next_input.half()
else:
self.next_input = self.next_input.float()
self.next_input = self.next_input.sub_(self.mean).div_(self.std)
def next(self):
torch.cuda.current_stream().wait_stream(self.stream)
input = self.next_input
target = self.next_target
self.preload()
return input, target
def train(train_loader, model, criterion, optimizer, epoch):
batch_time = AverageMeter()
data_time = AverageMeter()
losses = AverageMeter()
top1 = AverageMeter()
top5 = AverageMeter()
# switch to train mode
model.train()
end = time.time()
prefetcher = data_prefetcher(train_loader)
input, target = prefetcher.next()
i = -1
while input is not None:
i += 1
adjust_learning_rate(optimizer, epoch, i, len(train_loader))
if args.prof:
if i > 10:
break
# measure data loading time
data_time.update(time.time() - end)
# compute output
output = model(input)
loss = criterion(output, target)
# measure accuracy and record loss
prec1, prec5 = accuracy(output.data, target, topk=(1, 5))
if args.distributed:
reduced_loss = reduce_tensor(loss.data)
prec1 = reduce_tensor(prec1)
prec5 = reduce_tensor(prec5)
else:
reduced_loss = loss.data
losses.update(to_python_float(reduced_loss), input.size(0))
top1.update(to_python_float(prec1), input.size(0))
top5.update(to_python_float(prec5), input.size(0))
# compute gradient and do SGD step
optimizer.zero_grad()
if args.fp16:
optimizer.backward(loss)
else:
loss.backward()
optimizer.step()
torch.cuda.synchronize()
# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()
input, target = prefetcher.next()
if args.local_rank == 0 and i % args.print_freq == 0 and i > 1:
print('Epoch: [{0}][{1}/{2}]\t'
'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
'Speed {3:.3f} ({4:.3f})\t'
'Data {data_time.val:.3f} ({data_time.avg:.3f})\t'
'Loss {loss.val:.10f} ({loss.avg:.4f})\t'
'Prec@1 {top1.val:.3f} ({top1.avg:.3f})\t'
'Prec@5 {top5.val:.3f} ({top5.avg:.3f})'.format(
epoch, i, len(train_loader),
args.world_size * args.batch_size / batch_time.val,
args.world_size * args.batch_size / batch_time.avg,
batch_time=batch_time,
data_time=data_time, loss=losses, top1=top1, top5=top5))
def validate(val_loader, model, criterion):
batch_time = AverageMeter()
losses = AverageMeter()
top1 = AverageMeter()
top5 = AverageMeter()
# switch to evaluate mode
model.eval()
end = time.time()
prefetcher = data_prefetcher(val_loader)
input, target = prefetcher.next()
i = -1
while input is not None:
i += 1
# compute output
with torch.no_grad():
output = model(input)
loss = criterion(output, target)
# measure accuracy and record loss
prec1, prec5 = accuracy(output.data, target, topk=(1, 5))
if args.distributed:
reduced_loss = reduce_tensor(loss.data)
prec1 = reduce_tensor(prec1)
prec5 = reduce_tensor(prec5)
else:
reduced_loss = loss.data
losses.update(to_python_float(reduced_loss), input.size(0))
top1.update(to_python_float(prec1), input.size(0))
top5.update(to_python_float(prec5), input.size(0))
# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()
if args.local_rank == 0 and i % args.print_freq == 0:
print('Test: [{0}/{1}]\t'
'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
'Speed {2:.3f} ({3:.3f})\t'
'Loss {loss.val:.4f} ({loss.avg:.4f})\t'
'Prec@1 {top1.val:.3f} ({top1.avg:.3f})\t'
'Prec@5 {top5.val:.3f} ({top5.avg:.3f})'.format(
i, len(val_loader),
args.world_size * args.batch_size / batch_time.val,
args.world_size * args.batch_size / batch_time.avg,
batch_time=batch_time, loss=losses,
top1=top1, top5=top5))
input, target = prefetcher.next()
print(' * Prec@1 {top1.avg:.3f} Prec@5 {top5.avg:.3f}'
.format(top1=top1, top5=top5))
return top1.avg
def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'):
torch.save(state, filename)
if is_best:
shutil.copyfile(filename, 'model_best.pth.tar')
class AverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self):
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def adjust_learning_rate(optimizer, epoch, step, len_epoch):
"""LR schedule that should yield 76% converged accuracy with batch size 256"""
factor = epoch // 30
if epoch >= 80:
factor = factor + 1
lr = args.lr*(0.1**factor)
"""Warmup"""
if epoch < 5:
lr = lr*float(1 + step + epoch*len_epoch)/(5.*len_epoch)
# if(args.local_rank == 0):
# print("epoch = {}, step = {}, lr = {}".format(epoch, step, lr))
for param_group in optimizer.param_groups:
param_group['lr'] = lr
def accuracy(output, target, topk=(1,)):
"""Computes the precision@k for the specified values of k"""
maxk = max(topk)
batch_size = target.size(0)
_, pred = output.topk(maxk, 1, True, True)
pred = pred.t()
correct = pred.eq(target.view(1, -1).expand_as(pred))
res = []
for k in topk:
correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
res.append(correct_k.mul_(100.0 / batch_size))
return res
def reduce_tensor(tensor):
rt = tensor.clone()
dist.all_reduce(rt, op=dist.reduce_op.SUM)
rt /= args.world_size
return rt
if __name__ == '__main__':
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment