Commit fad78c16 authored by Michael Carilli's avatar Michael Carilli
Browse files

Stashing work

parent 8db3f95c
from .amp import init, half_function, float_function, promote_function,\
register_half_function, register_float_function, register_promote_function,\
register
from .multi_tensor_apply import MultiTensorApply
from . import compat, rnn_compat, utils, wrap
from .handle import AmpHandle, NoOpHandle
from .lists import functional_overrides, torch_overrides, tensor_overrides
from ..fp16_utils import FP16_Optimizer
from .frontend import *
import functools
......@@ -65,7 +64,7 @@ def register_promote_function(module, name):
# Top-level function to insert _all_ the hooks.
def init(enabled=True, enable_caching=True, verbose=False, allow_banned=False):
def init(enabled=True, loss_scale="dynamic", enable_caching=True, verbose=False, allow_banned=False):
global _DECORATOR_HANDLE
if not enabled:
......@@ -73,7 +72,7 @@ def init(enabled=True, enable_caching=True, verbose=False, allow_banned=False):
_DECORATOR_HANDLE = handle
return handle
handle = AmpHandle(enable_caching, verbose)
handle = AmpHandle(loss_scale, enable_caching, verbose)
# 0) Force-{fp16, fp32} for user-annotated functions
for mod, fn, cast_fn in _USER_CAST_REGISTRY:
......
......@@ -199,7 +199,7 @@ def register(enabled=False,
for k, v in amp.opt_properties.options:
print("{:20} : {}", k, v)
initialize(optimizers, models)
return initialize(optimizers, models)
def check_option_consistency(enabled=False,
......
......@@ -4,14 +4,14 @@ import warnings
from . import utils
from .opt import OptimWrapper
from .scaler import LossScaler
from .scaler import LossScaler, iter_params
class AmpHandle(object):
def __init__(self, enable_caching=True, verbose=False):
def __init__(self, loss_scale="dynamic", enable_caching=True, verbose=False):
self._enable_caching = enable_caching
self._verbose = verbose
self._cache = dict()
self._default_scaler = LossScaler()
self._default_scaler = LossScaler(loss_scale)
self._is_active = True
self._all_wrappers = []
......@@ -44,7 +44,9 @@ class AmpHandle(object):
yield loss * loss_scale
should_skip = self._default_scaler.unscale_and_update(
optimizer.param_groups, loss_scale)
iter_params(optimizer.param_groups),
iter_params(optimizer.param_groups),
loss_scale)
if should_skip:
optimizer_step = optimizer.step
def skip_step():
......
import torch
from torch._six import container_abcs, string_classes
import functools
from apex.fp16_utils import convert_network
def to_type(dtype, t):
......@@ -36,7 +37,7 @@ def initialize(optimizers, models, properties):
if properties.cast_model_type is not None:
if properties.cast_batchnorm is not None:
for model in models:
model.to(properties.cast_model_type)
convert_network(model, properties.cast_model_type)
else:
for model in models:
model.to(properties.cast_model_type)
......@@ -55,3 +56,15 @@ def initialize(optimizers, models, properties):
# State dict trick to recast any preexisting per-param state tensors
for optimizer in optimizers:
optimizer.load_state_dict(optimizer.state_dict())
if properties.master_weights:
for i, optimizer in enumerate(optimizers):
if properties.loss_scale == "dynamic":
optimizers[i] = FP16_Optimizer(optimizer[i], dynamic_loss_scale=True)
else:
optimizers[i] = FP16_Optimizer(optimizer[i], static_loss_scale=properties.loss_scale)
if properties.cast_torch_functions:
handle = amp.init() # the handle is also globally accessible as amp._DECORATOR_HANDLE
return optimizers, models
import torch
from amp_C import prep_multi_tensor_launch
class MultiTensorApply(object):
def __init__(self, max_blocks, max_tensors, max_depth, chunk_size):
self.chunk_size = chunk_size
self.reallocate(max_blocks, max_tensors, max_depth)
def __call__(self, op, noop_flag_buffer, tensor_lists, *args):
self.assign_blocks(tensor_lists)
# print(self.gpu_block_to_tensor)
# print(self.gpu_block_to_chunk)
# print(self.gpu_tensor_sizes)
return op(self.nblocks,
noop_flag_buffer,
self.cpu_tensor_addresses,
self.gpu_block_to_tensor,
self.gpu_block_to_chunk,
self.gpu_tensor_sizes,
self.gpu_tensor_addresses,
self.chunk_size,
tensor_lists,
*args)
# print()
# print([[p.data_ptr() for p in l] for l in tensor_lists])
# print()
# print(self.gpu_tensor_addresses)
def assign_blocks(self, tensor_lists):
needs_reallocate = False
# Currently, this loop appears prohibitively expensive.
# Need to move to c++.
torch.cuda.nvtx.range_push("assign_blocks loop")
# list0 = tensor_lists[0]
# self.nblocks = 0
# for t, tensor in enumerate(list0):
# blocks_this_tensor = (tensor.numel() +
# self.chunk_size - 1)//self.chunk_size
# if not needs_reallocate:
# self.cpu_tensor_sizes[t] = tensor.numel()
# for chunk in range(blocks_this_tensor):
# if self.nblocks >= self.max_blocks:
# needs_reallocate = True
# if not needs_reallocate:
# self.cpu_block_to_tensor[self.nblocks] = t
# self.cpu_block_to_chunk[self.nblocks] = chunk
# self.nblocks += 1
needs_reallocate, self.nblocks = prep_multi_tensor_launch(self.cpu_block_to_tensor,
self.cpu_block_to_chunk,
self.cpu_tensor_sizes,
self.gpu_block_to_tensor,
self.gpu_block_to_chunk,
self.gpu_tensor_sizes,
self.chunk_size,
self.max_depth,
self.max_tensors,
self.max_blocks,
tensor_lists)
torch.cuda.nvtx.range_pop()
print(self.nblocks)
if self.nblocks > self.max_blocks:
self.max_blocks = self.nblocks
if len(tensor_lists) > self.max_depth:
self.max_depth = len(tensor_lists)
if len(tensor_lists[0]) > self.max_tensors:
self.max_tensors = len(tensor_lists[0])
if needs_reallocate:
self.reallocate(self.max_blocks, self.max_tensors, self.max_depth)
needs_reallocate, self.nblocks = prep_multi_tensor_launch(self.cpu_block_to_tensor,
self.cpu_block_to_chunk,
self.cpu_tensor_sizes,
self.gpu_block_to_tensor,
self.gpu_block_to_chunk,
self.gpu_tensor_sizes,
self.chunk_size,
self.max_depth,
self.max_tensors,
self.max_blocks,
tensor_lists)
assert needs_reallocate == 0, "Should not need reallocate on second attempt."
assert self.nblocks <= self.max_blocks, "Should not need to increase blocks again."
def reallocate(self, max_blocks, max_tensors, max_depth):
self.max_blocks = max_blocks
self.max_tensors = max_tensors
self.max_depth = max_depth
self.cpu_block_to_tensor = torch.IntTensor(max_blocks).pin_memory()
self.cpu_block_to_chunk = torch.IntTensor(max_blocks).pin_memory()
self.cpu_tensor_sizes = torch.IntTensor(max_tensors).pin_memory()
self.cpu_tensor_addresses = torch.LongTensor(max_depth, max_tensors).pin_memory()
self.gpu_block_to_tensor = torch.cuda.IntTensor(max_blocks)
self.gpu_block_to_chunk = torch.cuda.IntTensor(max_blocks)
self.gpu_tensor_sizes = torch.cuda.IntTensor(max_tensors)
self.gpu_tensor_addresses = torch.cuda.LongTensor(max_depth, max_tensors)
......@@ -38,7 +38,9 @@ class OptimWrapper(object):
yield loss * loss_scale
self._skip_next[self._loss_idx] = self._cur_loss_scaler().unscale_and_update(
self._optimizer.param_groups, loss_scale)
iter_params(self._optimizer.param_groups),
iter_params(self._optimizer.param_groups),
loss_scale)
self._loss_idx += 1
if len(cached_grads) > 0:
......
......@@ -3,7 +3,7 @@ import logging
# from apex_C import scale_check_overflow
def scale_check_overflow_python(d_grads, scale):
def scale_check_overflow_python(model_grad, scale, master_grad):
# Exception handling for 18.04 compatibility
try:
cpu_sum = float(d_grads.float().sum())
......@@ -14,7 +14,9 @@ def scale_check_overflow_python(d_grads, scale):
else:
if cpu_sum == float('inf') or cpu_sum == -float('inf') or cpu_sum != cpu_sum:
return True
d_grads.mul_(scale)
if master_grad is not model_grad:
master_grad.copy_(model_grad)
master_grad.mul_(scale)
return False
class LossScaler(object):
......@@ -22,10 +24,19 @@ class LossScaler(object):
warned_fp16_grad = False
has_fused_kernel = False
def __init__(self):
self._loss_scale = 2.**16
def __init__(self,
loss_scale,
init_scale=2.**16,
scale_factor=2.,
scale_window=2000):
if loss_scale == "dynamic":
self.dynamic = True
self._loss_scale = init_scale
else:
self.dynamic = False
self._loss_scale = loss_scale
self._max_loss_scale = 2.**24
self._scale_seq_len = 2000
self._scale_seq_len = scale_window
self._unskipped = 0
self._has_overflow = False
try:
......@@ -44,35 +55,37 @@ class LossScaler(object):
def loss_scale(self):
return self._loss_scale
def unscale_and_update(self, param_groups, scale):
def unscale_and_update(self, model_params, master_params, scale):
if LossScaler.has_fused_kernel:
self._overflow_buf.zero_()
self._has_overflow = False
for p in iter_params(param_groups):
if p.grad is not None:
if LossScaler.has_fused_kernel and p.grad.data.type() == "torch.cuda.FloatTensor":
LossScaler.scale_check_overflow_cuda(p.grad.data,
for model, master in zip(model_params, master_params):
if model.grad is not None:
if LossScaler.has_fused_kernel and master.grad.data.type() == "torch.cuda.FloatTensor":
LossScaler.scale_check_overflow_cuda(model.grad.data,
1./scale,
self._overflow_buf,
p.grad.data)
master.grad.data)
else:
if (p.grad.data.type() != "torch.cuda.FloatTensor"
if (master.grad.data.type() != "torch.cuda.FloatTensor"
and not LossScaler.warned_fp16_grad):
logger = logging.getLogger("apex.amp")
logger.warning("Incoming grads are not fp32 (not master grads). "
logger.warning(
"Attempting to downscale {} grads. ".format(master.grad.data.type()) +
"Downscaling non-fp32 grads may indicate an error. "
"When using Amp, you don't need to call .half() on your model.")
LossScaler.warned_fp16_grad = True
self._has_overflow = scale_check_overflow_python(p.grad.data,
1./scale)
if self._has_overflow:
self._has_overflow = scale_check_overflow_python(model.grad.data,
1./scale,
master.grad.data)
if self._has_overflow and self.dynamic:
break
# If the fused kernel is available, we only need one D2H memcopy and sync.
if LossScaler.has_fused_kernel and not self._has_overflow:
if LossScaler.has_fused_kernel and self.dynamic and not self._has_overflow:
self._has_overflow = self._overflow_buf.item()
if self._has_overflow:
if self._has_overflow and self.dynamic:
should_skip = True
self._loss_scale /= 2.
self._unskipped = 0
......@@ -80,7 +93,7 @@ class LossScaler(object):
should_skip = False
self._unskipped += 1
if self._unskipped == self._scale_seq_len:
if self._unskipped == self._scale_seq_len and self.dynamic:
self._loss_scale = min(self._max_loss_scale, self._loss_scale * 2.)
self._unskipped = 0
......
......@@ -4,7 +4,7 @@ from torch.autograd import Variable
from torch.nn.parameter import Parameter
from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
from .loss_scaler import DynamicLossScaler, LossScaler
from ..amp.scaler import LossScaler
from .fp16util import model_grads_to_master_grads, master_params_to_model_params, clip_grad_norm
# TODO: Update overflow check + downscale to use Carl's fused kernel.
......@@ -162,9 +162,9 @@ class FP16_Optimizer(object):
if dynamic_loss_scale:
self.dynamic_loss_scale = True
if dynamic_loss_args is not None:
self.loss_scaler = DynamicLossScaler(**dynamic_loss_args)
self.loss_scaler = LossScaler("dynamic", **dynamic_loss_args)
else:
self.loss_scaler = DynamicLossScaler()
self.loss_scaler = LossScaler("dynamic")
else:
self.dynamic_loss_scale = False
self.loss_scaler = LossScaler(static_loss_scale)
......@@ -480,7 +480,7 @@ class FP16_Optimizer(object):
# a loss scale that works. After you find a loss scale that works, do a final dummy
# backward pass with retain_graph=False to tear down the graph. Doing this would avoid
# discarding the iteration, but probably wouldn't improve overall efficiency.
self.loss_scaler.backward(loss.float(), retain_graph=retain_graph)
loss.float()*loss_scaler.loss_scale().backward(retain_graph=retain_graph)
if update_master_grads:
self.update_master_grads()
......
......@@ -2,17 +2,26 @@ import torch
from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
import ctypes
lib = ctypes.cdll.LoadLibrary(None)
lib.THCudaHalfTensor_normall.argtypes=[ctypes.c_void_p, ctypes.c_void_p]
lib.THCudaHalfTensor_normall.restype = ctypes.c_float
stashed_err = None
try:
lib = ctypes.cdll.LoadLibrary(None)
lib.THCudaHalfTensor_normall.argtypes=[ctypes.c_void_p, ctypes.c_void_p]
lib.THCudaHalfTensor_normall.restype = ctypes.c_float
def fused_norm(input):
def fused_norm(input):
if input.type() == 'torch.cuda.HalfTensor':
# 16384 is half 2 if you stare at it long enough
return lib.THCudaHalfTensor_normall(torch.cuda._state_cdata,
input._cdata, 16384)
else:
return input.norm()
except TypeError as err:
stashed_err = err
def fused_norm(input):
raise RuntimeError("Failed to create fused_norm. This may happen on Windows "
"because of lib = ctypes.cdll.LoadLibrary(None): you can't "
"LoadLibrary with None. Original exception message was ",
stashed_err)
class FP16_Optimizer(object):
"""
......
......@@ -322,7 +322,6 @@ class DistributedDataParallel(Module):
grad_acc = param_tmp.grad_fn.next_functions[0][0]
def allreduce_hook(*unused):
print("hook fired")
if self.delay_allreduce or self.needs_refresh:
# TODO: How do we want to handle multiple backward passes between
# each forward, e.g., backward passes with retain_graph=True?
......
#include <ATen/ATen.h>
#include <ATen/AccumulateType.h>
#include <ATen/cuda/CUDAContext.h>
#include <ATen/cuda/Exceptions.h>
#include <assert.h>
#include <cuda_runtime.h>
template<typename T, typename... ArgTypes>
__global__ void multi_tensor_apply_kernel(
volatile int* noop_flag,
int* block_to_tensor,
int* block_to_chunk, // could also get this from scan
int* tensor_sizes,
int chunk_size,
void** addresses,
int addresses_x,
T callable,
ArgTypes... args) // in_t** in, float** out, float scale
{
__shared__ int noop;
__shared__ int chunk_idx;
__shared__ int tensor_idx;
__shared__ int n;
if(threadIdx.x == 0)
{
noop = *noop_flag;
tensor_idx = block_to_tensor[blockIdx.x];
chunk_idx = block_to_chunk[blockIdx.x];
n = tensor_sizes[tensor_idx];
}
__syncthreads();
if(noop == 1)
return;
// Hand the chunk information to the user-supplied functor to process however it likes.
callable(
noop_flag,
tensor_idx,
chunk_idx,
chunk_size,
n,
addresses,
addresses_x,
args...);
}
#include <ATen/ATen.h>
#include <ATen/AccumulateType.h>
#include <ATen/cuda/CUDAContext.h>
#include <ATen/cuda/Exceptions.h>
#include "multi_tensor_apply.h"
#include <assert.h>
#include <cuda_runtime.h>
#define BLOCK_SIZE 256
#define ILP 4
template<typename in_t>
struct UnscaleFunctor
{
__device__ __forceinline__ void operator()(
volatile int* noop_flag,
int tensor_idx,
int chunk_idx,
int chunk_size,
int n,
void** addresses,
int addresses_x,
float scale)
{
__shared__ int noop;
in_t* in = (in_t*)addresses[tensor_idx];
in += chunk_idx*chunk_size;
float* out = (float*)addresses[addresses_x + tensor_idx];
out += chunk_idx*chunk_size;
n -= chunk_idx*chunk_size;
// Non-divergent exit condition for the __syncthreads
float incoming_vals[ILP];
for(int i_start = 0;
i_start < n && i_start < chunk_size;
i_start += blockDim.x*ILP)
{
if(threadIdx.x == 0)
noop = *noop_flag;
__syncthreads();
if(noop == 1)
break;
#pragma unroll
for(int ii = 0; ii < ILP; ii++)
{
incoming_vals[ii] = 0;
int i = i_start + threadIdx.x + ii*blockDim.x;
if(i < n)
incoming_vals[ii] = static_cast<float>(in[i]);
}
#pragma unroll
for(int ii = 0; ii < ILP; ii++)
{
int i = i_start + threadIdx.x + ii*blockDim.x;
if(i < n)
if(isfinite(incoming_vals[ii]))
out[i] = incoming_vals[ii]*scale;
else
*noop_flag = 1; // Blindly fire off a write. These will race but that's ok.
} // This is NOT guaranteed to be seen immediately by thread 0 on the next iteration.
} // I wonder if there's a way we can rig the short-circuiting with only one syncthreads.
} // It's possible we can just lean on the cache (no smem or syncs) and still be fast.
};
void multi_tensor_unscale_cuda(
int nblocks,
at::Tensor noop_flag,
at::Tensor cpu_tensor_addresses,
at::Tensor gpu_block_to_tensor,
at::Tensor gpu_block_to_chunk,
at::Tensor gpu_tensor_sizes,
at::Tensor gpu_tensor_addresses,
int chunk_size,
std::vector<std::vector<at::Tensor>> tensor_lists,
float scale)
{
using namespace at;
int addresses_x = gpu_tensor_addresses.size(1);
// <.< >.> i don't see any cops. i'm going to access the pointers directly.
// auto addresses_a = cpu_tensor_addresses.accessor<int64_t, 2>();
// This logic could be moved to prep_multi_tensor_launch, but we might need to
// pick which kernel instantiation to launch based on the RTTI of tensor_lists,
// so we may as well accept tensor_lists and extract the pointers here.
void** addresses_a = (void**)cpu_tensor_addresses.data_ptr();
int len0 = tensor_lists[0].size();
for(unsigned int l = 0; l < tensor_lists.size(); l++)
{
AT_CHECK(tensor_lists[l].size() == len0, "Lengths of tensor lists do not match.");
for(unsigned int t = 0; t < tensor_lists[l].size(); t++)
{
AT_CHECK(tensor_lists[l][t].numel() == tensor_lists[0][t].numel(),
"Numel mismatch in corresponding tensors in different lists.");
addresses_a[l*addresses_x + t] = tensor_lists[l][t].data_ptr();
// addresses_a[l][t] = (void*)tensor_lists[l][t].data<float>();
}
}
cudaStream_t stream = at::cuda::getCurrentCUDAStream();
gpu_tensor_addresses.copy_(cpu_tensor_addresses, 1/*non_blocking*/);
// Lock the output (downscaled) type to float.
AT_DISPATCH_FLOATING_TYPES_AND_HALF(tensor_lists[0][0].type(),
"multi_tensor_unscale_cuda",
[&]
{
// using accscalar_t = acc_type<scalar_t, true>;
multi_tensor_apply_kernel<<<nblocks, BLOCK_SIZE, 0, stream>>>(
noop_flag.data<int>(),
gpu_block_to_tensor.data<int>(),
gpu_block_to_chunk.data<int>(),
gpu_tensor_sizes.data<int>(),
chunk_size,
(void**)gpu_tensor_addresses.data_ptr(),
addresses_x,
UnscaleFunctor<scalar_t>(),
scale);
});
AT_CUDA_CHECK(cudaGetLastError());
// AT_CUDA_CHECK(cudaDeviceSynchronize());
}
#include <torch/extension.h>
void multi_tensor_unscale_cuda(
int nblocks,
at::Tensor noop_flag,
at::Tensor cpu_tensor_addresses,
at::Tensor gpu_block_to_tensor,
at::Tensor gpu_block_to_chunk,
at::Tensor gpu_tensor_sizes,
at::Tensor gpu_tensor_addresses,
int chunk_size,
std::vector<std::vector<at::Tensor>> tensor_lists,
float scale);
std::vector<int> prep_multi_tensor_launch(
at::Tensor cpu_block_to_tensor,
at::Tensor cpu_block_to_chunk,
at::Tensor cpu_tensor_sizes,
at::Tensor gpu_block_to_tensor,
at::Tensor gpu_block_to_chunk,
at::Tensor gpu_tensor_sizes,
int chunk_size,
int max_depth,
int max_tensors,
int max_blocks,
std::vector<std::vector<at::Tensor>> tensor_lists)
{
int needs_reallocate = 0;
if(tensor_lists.size() > max_depth || tensor_lists[0].size() > max_tensors)
needs_reallocate = 1;
auto cpu_tensor_sizes_a = cpu_tensor_sizes.accessor<int,1>();
auto cpu_block_to_tensor_a = cpu_block_to_tensor.accessor<int,1>();
auto cpu_block_to_chunk_a = cpu_block_to_chunk.accessor<int,1>();
int nblocks = 0;
for(int t = 0; t < tensor_lists[0].size(); t++)
{
int blocks_this_tensor = (tensor_lists[0][t].numel() + chunk_size - 1)/chunk_size;
if(!needs_reallocate)
cpu_tensor_sizes_a[t] = tensor_lists[0][t].numel();
for(int chunk = 0; chunk < blocks_this_tensor; chunk++)
{
if(nblocks >= max_blocks)
needs_reallocate = 1;
if(!needs_reallocate)
{
cpu_block_to_tensor_a[nblocks] = t;
cpu_block_to_chunk_a[nblocks] = chunk;
}
nblocks++;
}
}
if(!needs_reallocate)
{
gpu_block_to_tensor.copy_(cpu_block_to_tensor, 1);
gpu_block_to_chunk.copy_(cpu_block_to_chunk, 1);
gpu_tensor_sizes.copy_(cpu_tensor_sizes, 1);
}
return std::vector<int>{needs_reallocate, nblocks};
}
void scale_check_overflow_cuda(const at::Tensor& grads,
float scale,
const at::Tensor& d_buf,
......@@ -27,4 +90,7 @@ void scale_check_overflow(at::Tensor grads,
PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
m.def("scale_check_overflow", &scale_check_overflow, "Fused overflow check + scale for FP32 tensors");
m.def("prep_multi_tensor_launch", &prep_multi_tensor_launch, "Prepare multitensor launch");
m.def("multi_tensor_unscale", &multi_tensor_unscale_cuda,
"Fused overflow check + unscale for a list of contiguous tensors");
}
......@@ -39,7 +39,12 @@ if "--cuda_ext" in sys.argv:
ext_modules.append(
CUDAExtension(name='amp_C',
sources=['csrc/scale_check_overflow.cpp',
'csrc/scale_check_overflow_kernel.cu']))
'csrc/scale_check_overflow_kernel.cu',
'csrc/multi_tensor_unscale_kernel.cu'],
extra_compile_args={'cxx': ['-O3',],
'nvcc':['-lineinfo',
'-O3',
'--use_fast_math']}))
ext_modules.append(
CUDAExtension(name='fused_adam_cuda',
sources=['apex/optimizers/csrc/fused_adam_cuda.cpp',
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment