Commit e5d0be82 authored by Thor Johnsen's avatar Thor Johnsen
Browse files

Module test improvements, bug fixes

parent d925763a
import os
import torch
from maskrcnn_benchmark.modeling.backbone.resnet import Bottleneck
from maskrcnn_benchmark.layers.nhwc import nhwc_to_nchw_transform, nchw_to_nhwc_transform
from maskrcnn_benchmark.layers.nhwc.batch_norm import FrozenBatchNorm2d_NHWC
from apex.contrib.bottleneck import Bottleneck as FastBottleneck
from apex.contrib.bottleneck import SpatialBottleneck
from apex.contrib.bottleneck import Bottleneck, SpatialBottleneck
from apex.contrib.bottleneck import HaloExchangerNoComm, HaloExchangerAllGather, HaloExchangerSendRecv, HaloExchangerPeer
from apex.contrib.peer_memory import PeerMemoryPool
def single_module_test(ref, rank, world_size, numtype, device, shape, fast, spatial_group_size, in_channels, bottleneck_channels, out_channels, num_groups, stride_in_1x1, stride, dilation, norm_func, nhwc):
# inputs + modules
def ground_truth_bottleneck(C, dtype, explicit_nhwc):
bottleneck = Bottleneck(C,C,C,use_cudnn=True,explicit_nhwc=explicit_nhwc)
bottleneck.to(dtype=dtype, device='cuda')
for p in bottleneck.parameters():
torch.distributed.broadcast(p, 0)
for b in bottleneck.buffers():
torch.distributed.broadcast(b, 0)
return bottleneck
def print_bottleneck_p_and_b(bottleneck):
with torch.no_grad():
for n,p in bottleneck.named_parameters():
print("%s :: %s" % (n, str(p.norm(p=2,dtype=torch.float32))))
for n,p in bottleneck.named_buffers():
print("%s :: %s" % (n, str(p.norm(p=2,dtype=torch.float32))))
def has_nan(x):
if isinstance(x, list) or isinstance(x, tuple):
for xx in x:
if torch.any(torch.isnan(xx)):
return True
return False
elif isinstance(x, dict):
for k,v in x.items():
if torch.any(torch.isnan(v)):
return True
else:
return torch.any(torch.isnan(x))
def rel_diff_t(xx1, xx2):
return ((xx1 - xx2).norm(p=2,dtype=torch.float32) / (xx1 + xx2).norm(p=2,dtype=torch.float32)).item()
def rel_diff(x1, x2):
if isinstance(x1, list) or isinstance(x1, tuple):
return [rel_diff_t(xx1,xx2) for xx1,xx2 in zip(x1,x2)]
elif isinstance(x1, dict):
return [rel_diff_t(xx1, xx2) for (k1,xx1), (k2,xx2) in zip(x1.items(),x2.items())]
else:
return rel_diff_t(x1,x2)
def fprop_and_bprop(x, bottleneck, dy=None):
with torch.no_grad():
input_shape = [1, in_channels] + list(shape)
x = torch.randn(input_shape, dtype=numtype, device=device)
if nhwc:
x = nchw_to_nhwc_transform(x).contiguous()
x = x.clone()
x.grad = None
x.requires_grad = True
print(x.shape, x.stride())
#if spatial_group_size > 1:
# fast = False # hack so fast bottleneck can be run against distributed bottleneck
#if spatial_group_size == 1:
# fast = False
if fast:
if spatial_group_size == 1:
bottleneck = FastBottleneck(
in_channels=in_channels,
bottleneck_channels=bottleneck_channels,
out_channels=out_channels,
stride=stride,
dilation=dilation,
explicit_nhwc=nhwc,
use_cudnn=True)
else:
bottleneck = SpatialBottleneck(
in_channels=in_channels,
bottleneck_channels=bottleneck_channels,
out_channels=out_channels,
stride=stride,
dilation=dilation,
explicit_nhwc=nhwc,
use_cudnn=True,
spatial_group_size=spatial_group_size)
else:
bottleneck = Bottleneck(
in_channels,
bottleneck_channels,
out_channels,
num_groups,
stride_in_1x1,
stride,
dilation,
norm_func,
nhwc,
spatial_group_size)
bottleneck = bottleneck.to(dtype=numtype,device=device)
weights = dict(bottleneck.named_parameters())
if ref is not None:
ref_x, _, ref_weights = ref
Hs,H = x.shape[1], ref_x.shape[1]
assert(Hs*spatial_group_size == H), "Hs not a multiple of H"
ref_x = ref_x[:,rank*Hs:(rank+1)*Hs,:,:]
x.copy_(ref_x)
assert(len(weights) == len(ref_weights)), "Reference weights and weights don't match"
for k in weights.keys():
weights[k].copy_(ref_weights[k])
# forward
out = bottleneck(x)
# gradient output
y = bottleneck(x)
if dy is None:
with torch.no_grad():
dy = torch.randn_like(y) / 1e2
torch.distributed.broadcast(dy, 0)
y.backward(dy)
dgrad = x.grad.detach()
wgrad = {}
for n,p in bottleneck.named_parameters():
wgrad[n] = p.grad.detach()
return x, y, dy, dgrad, wgrad
def ground_truth(N, C, H, W, dtype, memory_format, bottleneck):
if memory_format == 1:
# 1 -> explicit nhwc
explicit_nhwc = True
with torch.no_grad():
x = torch.randn([N,H,W,C], dtype=dtype, device='cuda')
torch.distributed.broadcast(x, 0)
return fprop_and_bprop(x, bottleneck)
else:
# 2 -> native nhwc
# 3 -> nchw
explicit_nhwc = False
assert(False), "Not implemented yet"
def print_ground_truth(gt):
x, y, dy, dgrad, wgrad = gt
if has_nan(y) or has_nan(dgrad) or has_nan(wgrad):
print("Error! Ground truth has NAN")
else:
print("Ok! No NAN found in ground truth")
def apply_to_different_bottleneck(gt, bottleneck):
with torch.no_grad():
grad_out = torch.randn_like(out)
if ref is not None:
_, ref_grad_out, _ = ref
Hs,H = grad_out.shape[1], ref_grad_out.shape[1]
assert(Hs*spatial_group_size == H), "Hs not a multiple of H"
ref_grad_out = ref_grad_out[:,rank*Hs:(rank+1)*Hs,:,:]
grad_out.copy_(ref_grad_out)
x, y, dy, dgrad, wgrad = gt
x = x.clone()
x.requires_grad = True
dy = dy.clone()
return fprop_and_bprop(x, bottleneck, dy)
# backward
out.backward(grad_out)
def compare_single_field(results, f1, f2, l0, l1, l2):
if has_nan(f1) and has_nan(f2):
results[l0] = "both NAN"
elif has_nan(f1):
results[l0] = "%s.%s NAN" % (l1, l0)
elif has_nan(f2):
results[l0] = "%s.%s NAN" % (l2, l0)
else:
results[l0] = "%s" % (str(rel_diff(f1,f2)))
def compare(gt, bt):
x1, y1, dy1, dgrad1, wgrad1 = gt
x2, y2, dy2, dgrad2, wgrad2 = bt
results = {}
compare_single_field(results, y1, y2, "y", "gt", "bt")
compare_single_field(results, dy1, dy2, "dy", "gt", "bt")
compare_single_field(results, dgrad1, dgrad2, "dgrad", "gt", "bt")
compare_single_field(results, wgrad1, wgrad2, "wgrad", "gt", "bt")
for i in range(torch.distributed.get_world_size()):
if i == torch.distributed.get_rank():
print(i,results)
torch.distributed.barrier()
def spatial_parallel_bottleneck(C, dtype, explicit_nhwc, gt_bottleneck, spatial_parallel_args):
spatial_bottleneck = SpatialBottleneck(C,C,C,use_cudnn=True,explicit_nhwc=explicit_nhwc,spatial_parallel_args=spatial_parallel_args)
spatial_bottleneck.to(dtype=dtype, device='cuda')
with torch.no_grad():
dgrad = x.grad.detach()
wgrad = {}
for n,p in bottleneck.named_parameters():
wgrad[n] = p.grad.detach()
if world_size > 1:
if spatial_group_size == 1:
# broadcast x, grad_out and weights from rank 0
with torch.no_grad():
torch.distributed.broadcast(x,0)
torch.distributed.broadcast(grad_out,0)
for k in weights.keys():
torch.distributed.broadcast(weights[k],0)
sp = {}
for n,p in spatial_bottleneck.named_parameters():
sp[n] = p
for n,p in gt_bottleneck.named_parameters():
sp[n].copy_(p)
sb = {}
for n,b in spatial_bottleneck.named_buffers():
sb[n] = b
for n,b in gt_bottleneck.named_buffers():
sb[n].copy_(b)
return spatial_bottleneck
#class HaloExchangerNoComm(HaloExchanger):
# def __init__(self, world_size, spatial_group_size, rank, comm):
#class HaloExchangerAllGather(HaloExchanger):
# def __init__(self, world_size, spatial_group_size, rank, comm):
#class HaloExchangerSendRecv(HaloExchanger):
# def __init__(self, world_size, spatial_group_size, rank, comm):
#class HaloExchangerPeer(HaloExchanger):
# def __init__(self, world_size, spatial_group_size, rank, comm, peer_pool, explicit_nhwc, numSM=1):
def n_way_spatial(halex, gt_bottleneck, gt, explicit_nhwc, world_size, rank, fp32_reduce=False):
assert(explicit_nhwc), "Only tested for explicit nhwc"
x, _, dy, _, _ = gt
N, H, W, C = list(x.shape) # Tensor is already shaped properly for n-way parallel
dtype = x.dtype
spatial_group_size = world_size
spatial_group_rank = rank
spatial_communicator = None
spatial_halo_exchanger = halex
spatial_stream = None # Not in use
spatial_parallel_args = (spatial_group_size, spatial_group_rank, spatial_communicator, spatial_halo_exchanger, spatial_stream)
spatial_bottleneck = spatial_parallel_bottleneck(C, dtype, explicit_nhwc, gt_bottleneck, spatial_parallel_args)
with torch.no_grad():
Hs = H // spatial_group_size
xs = x[:,spatial_group_rank*Hs:(spatial_group_rank+1)*Hs,:,:]
dys = dy[:,spatial_group_rank*Hs:(spatial_group_rank+1)*Hs,:,:]
_, y, _, dgrad, wgrad = fprop_and_bprop(xs, spatial_bottleneck, dys)
# gather output pieces
for n,p in wgrad.items():
if fp32_reduce:
p32 = p.float()
torch.distributed.all_reduce(p32)
p.copy_(p32.half())
else:
# gather dgrad (x.grad), sum wgrad (weights) and out
N,Hs,W,C = dgrad.shape
H = Hs * spatial_group_size
dgrad_gathered = torch.empty((N,H,W,C),dtype=dgrad.dtype,device=dgrad.device)
dgrad_tensors = [dgrad_gathered[:,i*Hs:(i+1)*Hs,:,:] for i in range(spatial_group_size)]
torch.distributed.all_gather(dgrad_tensors, dgrad)
dgrad = dgrad_gathered
N,Hs,W,C = list(out.shape)
H = Hs * spatial_group_size
out_gathered = torch.empty((N,H,W,C),dtype=dgrad.dtype,device=dgrad.device)
out_tensors= [out_gathered[:,i*Hs:(i+1)*Hs,:,:] for i in range(spatial_group_size)]
torch.distributed.all_gather(out_tensors, out)
out = out_gathered
for k in wgrad.keys():
w = wgrad[k].to(dtype=torch.float64)
torch.distributed.all_reduce(w)
wgrad[k].copy_(w.to(dtype=wgrad[k].dtype))
#torch.distributed.all_reduce(wgrad[k])
return x, out, grad_out, weights, dgrad, wgrad
def module_tests(rank, world_size, numtype, device, fast, spatial_group_sizes, init_args):
r = []
for ia in init_args:
shape = ia[0:4]
args = ia[4:]
rr = []
ref = None
for spatial_group_size in spatial_group_sizes:
N,H,W,C = shape
H = H//spatial_group_size
x, out, grad_out, weights, dgrad, wgrad = single_module_test(ref, rank, world_size, numtype, device, [H,W], fast, spatial_group_size, *args)
if ref is None:
assert(spatial_group_size == 1), "Wrong reference weights"
ref = x, grad_out, weights
if rank == 0:
rr.append( (out, dgrad, wgrad) )
if world_size > 1: torch.distributed.barrier()
r.append(rr)
return r
torch.distributed.all_reduce(p)
ys = [torch.empty_like(y) for _ in range(spatial_group_size)]
torch.distributed.all_gather(ys,y)
y = torch.cat(ys,dim=1)
dgrads = [torch.empty_like(dgrad) for _ in range(spatial_group_size)]
torch.distributed.all_gather(dgrads,dgrad)
dgrad = torch.cat(dgrads,dim=1)
return x, y, dy, dgrad, wgrad
def main():
total_num_gpus = int(os.environ["WORLD_SIZE"]) if "WORLD_SIZE" in os.environ else 1
distributed = total_num_gpus > 1
ngpus = torch.cuda.device_count()
if distributed:
torch.distributed.init_process_group("nccl")
rank, world_size = torch.distributed.get_rank(), torch.distributed.get_world_size()
is_master = True if rank == 0 else False
local_rank = rank % ngpus
torch.cuda.set_device(local_rank)
spatial_group_size = total_num_gpus
else:
rank, local_rank, is_master, world_size, spatial_group_size = 0, 0, True, 1, 1
torch.use_deterministic_algorithms(True)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
torch.backends.cuda.matmul.allow_tf32 = False
torch.backends.cudnn.allow_tf32 = False
norm_func = FrozenBatchNorm2d_NHWC
init_args = [
(1, 200, 336, 64, 64, 64, 256, 1, True, 1, 1, norm_func, True),
(1, 200, 336, 256, 256, 64, 256, 1, True, 1, 1, norm_func, True),
(1, 200, 336, 256, 256, 128, 512, 1, True, 2, 1, norm_func, True),
(1, 100, 168, 512, 512, 128, 512, 1, True, 1, 1, norm_func, True),
(1, 100, 168, 512, 512, 256, 1024, 1, True, 2, 1, norm_func, True),
(1, 50, 84, 1024, 1024, 256, 1024, 1, True, 1, 1, norm_func, True),
(1, 50, 84, 1024, 1024, 512, 2048, 1, True, 2, 1, norm_func, True),
(1, 25, 42, 2048, 2048, 512, 2048, 1, True, 1, 1, norm_func, True),
(1, 336, 200, 64, 64, 64, 256, 1, True, 1, 1, norm_func, True),
(1, 336, 200, 256, 256, 64, 256, 1, True, 1, 1, norm_func, True),
(1, 336, 200, 256, 256, 128, 512, 1, True, 2, 1, norm_func, True),
(1, 168, 100, 512, 512, 128, 512, 1, True, 1, 1, norm_func, True),
(1, 168, 100, 512, 512, 256, 1024, 1, True, 2, 1, norm_func, True),
(1, 84, 50, 1024, 1024, 256, 1024, 1, True, 1, 1, norm_func, True),
(1, 84, 50, 1024, 1024, 512, 2048, 1, True, 2, 1, norm_func, True),
(1, 42, 25, 2048, 2048, 512, 2048, 1, True, 1, 1, norm_func, True),
]
init_args = init_args[0:1]
# pad H to account for spatial distribution
padded_init_args = []
for ia in init_args:
N,H,W,C = ia[0:4]
m = spatial_group_size * H // (25 if H < W else 42)
H = ((H + m - 1) // m) * m
args = tuple( [N,H,W,C] + list(ia[4:]) )
padded_init_args.append(args)
init_args = padded_init_args
if rank == 0:
for ia in init_args:
print(ia)
spatial_group_sizes = [1]
if spatial_group_size > 1:
spatial_group_sizes.append(spatial_group_size)
numtype, device, fast = torch.float16, 'cuda', True
r = module_tests(rank, world_size, numtype, device, fast, spatial_group_sizes, init_args)
if world_size > 1: torch.distributed.barrier()
if rank == 0:
for rr in r:
print("***")
for out, dgrad, wgrad in rr:
gr = [("out",out.norm(p=2,dtype=torch.float64).item())]
gr = gr + [("dgrad",dgrad.norm(p=2,dtype=torch.float64).item())]
gr = gr + [(k+".wgrad",wgrad[k].norm(p=2,dtype=torch.float64).item()) for k in wgrad.keys()]
print(gr)
if len(rr) == 2:
out1, dgrad1, wgrad1 = rr[0]
out2, dgrad2, wgrad2 = rr[1]
rtol = 1e-1
out_atol = out1.abs().max().item() * rtol
dgrad_atol = dgrad1.abs().max().item() * rtol
wgrad_atol = {}
for k in wgrad1.keys():
wgrad_atol[k] = wgrad1[k].abs().max().item() * rtol
gr = [("out",torch.allclose(out1,out2,rtol,out_atol,equal_nan=True))]
gr = gr + [("dgrad",torch.allclose(dgrad1,dgrad2,rtol,dgrad_atol,equal_nan=True))]
gr = gr + [(k+".wgrad",torch.allclose(wgrad1[k],wgrad2[k],rtol,wgrad_atol[k],equal_nan=True)) for k in wgrad1.keys()]
print(gr)
gr = [("out",(out1-out2).norm(p=2,dtype=torch.float64).item())]
gr = gr + [("dgrad",(dgrad1-dgrad2).norm(p=2,dtype=torch.float64).item())]
gr = gr + [(k+".wgrad",(wgrad1[k]-wgrad2[k]).norm(p=2,dtype=torch.float64).item()) for k in wgrad1.keys()]
print(gr)
N,H,W,C = out1.shape
Hs = H // spatial_group_size
Ht = Hs-2
print("out1@%d:%d=%s" % (Ht,H,str(out1[0,Ht,:8,:5])))
print("out2@%d:%d=%s" % (Ht,H,str(out2[0,Ht,:8,:5])))
Ht = Hs-1
print("out1@%d:%d=%s" % (Ht,H,str(out1[0,Ht,:8,:5])))
print("out2@%d:%d=%s" % (Ht,H,str(out2[0,Ht,:8,:5])))
Ht = Hs
print("out1@%d:%d=%s" % (Ht,H,str(out1[0,Ht,:8,:5])))
print("out2@%d:%d=%s" % (Ht,H,str(out2[0,Ht,:8,:5])))
Ht = Hs+1
print("out1@%d:%d=%s" % (Ht,H,str(out1[0,Ht,:8,:5])))
print("out2@%d:%d=%s" % (Ht,H,str(out2[0,Ht,:8,:5])))
N,H,W,C = dgrad1.shape
Hs = H // spatial_group_size
Ht = Hs-2
print("dgrad1@%d:%d=%s" % (Ht,H,str(dgrad1[0,Ht,:8,:5])))
print("dgrad2@%d:%d=%s" % (Ht,H,str(dgrad2[0,Ht,:8,:5])))
Ht = Hs-1
print("dgrad1@%d:%d=%s" % (Ht,H,str(dgrad1[0,Ht,:8,:5])))
print("dgrad2@%d:%d=%s" % (Ht,H,str(dgrad2[0,Ht,:8,:5])))
Ht = Hs
print("dgrad1@%d:%d=%s" % (Ht,H,str(dgrad1[0,Ht,:8,:5])))
print("dgrad2@%d:%d=%s" % (Ht,H,str(dgrad2[0,Ht,:8,:5])))
Ht = Hs+1
print("dgrad1@%d:%d=%s" % (Ht,H,str(dgrad1[0,Ht,:8,:5])))
print("dgrad2@%d:%d=%s" % (Ht,H,str(dgrad2[0,Ht,:8,:5])))
if world_size > 1: torch.distributed.barrier()
torch.distributed.init_process_group("nccl")
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
torch.cuda.set_device(rank)
explicit_nhwc = True
dtype = torch.float16
N, C, H, W = 1, 64, 200, 336
Hs = ((H+8*world_size-1) // (8*world_size)) * 8
H = Hs*world_size
gt_bottleneck = ground_truth_bottleneck(C, dtype, explicit_nhwc)
gt = ground_truth(N, C, H, W, dtype, 1, gt_bottleneck)
# verify that spatial bottleneck with group_size 1 produces same results as ground truth bottleneck
spatial_bottleneck = spatial_parallel_bottleneck(C, dtype, explicit_nhwc, gt_bottleneck, None)
bt = apply_to_different_bottleneck(gt, spatial_bottleneck)
compare(gt, bt)
#print_bottleneck_p_and_b(gt_bottleneck)
#print_bottleneck_p_and_b(spatial_bottleneck)
spatial_group_size = world_size
spatial_communicator = None
peer_pool = PeerMemoryPool(rank, world_size, spatial_group_size, 64*1024*1024, 2*1024*1024)
#halex = HaloExchangerAllGather(world_size, spatial_group_size, rank, spatial_communicator)
#halex = HaloExchangerSendRecv(world_size, spatial_group_size, rank, spatial_communicator)
halex = HaloExchangerPeer(world_size, spatial_group_size, rank, spatial_communicator, peer_pool, explicit_nhwc, numSM=1)
bt2 = n_way_spatial(halex, gt_bottleneck, gt, explicit_nhwc, world_size, rank, fp32_reduce=True)
compare(gt, bt2)
if __name__ == "__main__":
......
......@@ -2,6 +2,7 @@ import torch
import torch.distributed as dist
from torch import nn
import nccl_p2p as inc
import peer_memory as pm
# Communication free halo exchanger.
# NB! This halo exchanger does not exchange halos with neighbors as it should, it merely swaps the inputs
......@@ -78,15 +79,21 @@ class HaloExchangerPeer(HaloExchanger):
self.numSM = numSM
def left_right_halo_exchange(self, left_output_halo, right_output_halo, left_input_halo=None, right_input_halo=None):
inplace = False if left_input_halo is None and right_input_halo is None else True
if not inplace:
left_input_halo = torch.empty_like(right_output_halo)
right_input_halo = torch.empty_like(left_output_halo)
channels_last = left_output_halo.is_contiguous(memory_format=torch.channels_last) and not self.explicit_nhwc
left_tx = self.peer_pool.allocate_peer_tensors(list(left_out_halo.shape), left_out_halo.dtype, channels_last, True)
right_tx = self.peer_pool.allocate_peer_tensors(list(right_out_halo.shape), right_out_halo.dtype, channels_last, True)
left_tx = self.peer_pool.allocate_peer_tensors(list(left_output_halo.shape), left_output_halo.dtype, channels_last, True)
right_tx = self.peer_pool.allocate_peer_tensors(list(right_output_halo.shape), right_output_halo.dtype, channels_last, True)
pm.push_pull_halos_1d(
self.diagnostics, self.explicit_nhwc, self.numSM,
left_output_halo, left_tx[self.peer_rank], right_tx[top_neighbor], left_input_halo,
right_output_halo, right_tx[self.peer_rank], left_tx[btm_neighbor], right_input_halo,
self.signals[left_neighbor], self.signals[right_neighbor], self.signals[self.peer_rank]
left_output_halo, left_tx[self.peer_rank], right_tx[self.left_neighbor], left_input_halo,
right_output_halo, right_tx[self.peer_rank], left_tx[self.right_neighbor], right_input_halo,
self.signals[self.left_neighbor], self.signals[self.right_neighbor], self.signals[self.peer_rank]
)
if not inplace:
return left_input_halo, right_input_halo
# Class that combines input volume with halos from neighbors (1d).
class HaloPadder:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment