Commit c13c0a3e authored by Lawrence McAfee's avatar Lawrence McAfee
Browse files

debugging; localized issue to gather_params()

parent 9b7854e4
......@@ -251,26 +251,29 @@ class BaseFloat16Optimizer(MegatronOptimizer):
main_grads, self.found_inf, self.grad_scaler.inv_scale)
# Update across all model parallel instances.
# >>>
# torch.distributed.all_reduce(self.found_inf,
# op=torch.distributed.ReduceOp.MAX,
# group=mpu.get_model_parallel_group())
# +++
torch.distributed.all_reduce(self.found_inf,
op=torch.distributed.ReduceOp.MAX,
group=mpu.get_model_parallel_group())
op=torch.distributed.ReduceOp.MAX)
# <<<
# Check for nan.
found_inf_flag = (self.found_inf.item() > 0)
# raise Exception("hi.")
return found_inf_flag
@torch.no_grad()
def step(self):
def step(self, ITERATION):
timers = get_timers()
# Copy gradients from model params to main params.
timers('optimizer-copy-to-main-grad').start()
self._copy_model_grads_to_main_grads()
self._copy_model_grads_to_main_grads(ITERATION)
timers('optimizer-copy-to-main-grad').stop()
# pax(0, {
......@@ -293,7 +296,11 @@ class BaseFloat16Optimizer(MegatronOptimizer):
# If we found inf/nan, skip the update.
if found_inf_flag:
# pax(0, {"found_inf_flag": found_inf_flag})
pax(0, {
"main params" : self.get_main_params(),
"main grads" : self.get_main_grads(),
"found_inf_flag" : found_inf_flag,
})
return False, None, None
# Clip the main gradients.
......@@ -312,23 +319,23 @@ class BaseFloat16Optimizer(MegatronOptimizer):
# >>>
# pax(0, {
# # "optimizer / state" :
# # { hash(k):tp(v) for k,v in self.optimizer.state.items() },
# # "optimizer / state / len" : len(self.optimizer.state),
# # "optimizer / state / 0" : list(self.optimizer.state.values())[0],
# **{"optimizer / state / %s" % hash(k) : tp(v) for k, v in self.optimizer.state.items() },
# "params" : sum(
# s["exp_avg"].numel()
# for s in self.optimizer.state.values()
# ),
# "main params" : self.get_main_params(),
# "main grads" : self.get_main_grads(),
# })
# <<<
# Update params from main params.
timers('optimizer-copy-main-to-model-params').start()
self._copy_main_params_to_model_params()
self._copy_main_params_to_model_params(ITERATION)
timers('optimizer-copy-main-to-model-params').stop()
# >>>
# pax(1, {
# "ITERATION" : ITERATION,
# "model_params" : [ p for m in self.models for p in m.parameters() ],
# })
# <<<
# Successful update.
return True, grad_norm, num_zeros_in_grad
......@@ -978,9 +985,6 @@ class Float16DistributedOptimizer(BaseFloat16Optimizer):
# })
# <<<
# Initialize main params.
self._copy_model_params_to_main_params()
# Update optimizer groups.
# - Also, leverage state_dict() and load_state_dict() to
# recast preexisting per-param state tensors.
......@@ -988,9 +992,58 @@ class Float16DistributedOptimizer(BaseFloat16Optimizer):
[ g["orig_group"] for g in self.opt_group_shards ]
self.optimizer.load_state_dict(self.optimizer.state_dict())
# pax(1, {
# "opt_group_shards" : self.opt_group_shards,
# "param_groups" : self.optimizer.param_groups,
# })
# Initialize main params.
self._copy_model_params_to_main_params()
@staticmethod
def has_nan_debug(tensors):
if isinstance(tensors, torch.Tensor):
tensors = [ tensors ]
assert isinstance(tensors, list)
has_nans = [ (not torch.all(torch.isfinite(t)).item()) for t in tensors ]
has_nan = any(has_nans)
return has_nan
def get_local_model_param_views(self):
'''** FOR DEBUGGING. **'''
model_param_views = []
for group_index, opt_group_shard in enumerate(self.opt_group_shards):
for param, opt_shard in opt_group_shard["param_map"].items():
model_index, dtype = self.param_gbuf_map[param]
gbuf_shard_map = \
self.model_gbuf_shards[model_index][dtype]["param_map"][param]
model_param_shard = gbuf_shard_map["param"]
model_param_views.append(
param.view(-1)[model_param_shard.start:model_param_shard.end])
return model_param_views
def get_local_model_grad_views(self):
'''** FOR DEBUGGING. **'''
model_grad_views = []
for group_index, opt_group_shard in enumerate(self.opt_group_shards):
for param, opt_shard in opt_group_shard["param_map"].items():
model_index, dtype = self.param_gbuf_map[param]
gbuf = self.models[model_index]._grad_buffers[dtype].data
gbuf_shard_map = \
self.model_gbuf_shards[model_index][dtype]["param_map"][param]
gbuf_world_shard = gbuf_shard_map["gbuf_world"]
model_grad_views.append(
gbuf[gbuf_world_shard.start:gbuf_world_shard.end])
return model_grad_views
def get_world_model_params(self):
'''** FOR DEBUGGING. **'''
return [ p for m in self.models for p in m.parameters() ]
def get_main_params(self):
return [ g["params"][0] for g in self.optimizer.param_groups ]
def get_main_grads(self):
return [ p.grad for p in self.get_main_params() ]
def get_main_param(self, group_index):
return self.optimizer.param_groups[group_index]["params"][0]
# return self.optimizer.param_groups[group_index]["params"][0]
return self.get_main_params()[group_index]
def get_main_grad(self, group_index):
return self.get_main_param(group_index).grad
......@@ -1101,21 +1154,24 @@ class Float16DistributedOptimizer(BaseFloat16Optimizer):
# for p, d in self.param_gbuf_map.items()
# ],
# })
pax(1, {
"main params" : self.get_main_params(),
"model params / world" : self.get_world_model_params(),
"gbuf_view_item" : tp(gbuf_view[data_parallel_rank]),
# "model params / local" : self.get_local_model_param_views(),
})
def _collect_main_grad_data_for_unscaling(self):
# return [ p.grad.data for p in self.main_param_shards ]
# return [ p.grad.data for p in self.main_param_shards if p is not None ]
return [ self.get_main_grad(gi).data
for gi in range(len(self.opt_group_shards)) ]
# return [ self.get_main_grad(gi).data
# for gi in range(len(self.opt_group_shards)) ]
return [ g.data for g in self.get_main_grads() ]
def _copy_model_params_to_main_params(self):
for group_index, group_shard in enumerate(self.opt_group_shards):
# main_param = self.main_param_shards[group_index]
# main_param = self.optimizer.param_groups[group_index]["params"][0]
main_param = self.get_main_param(group_index)
# if group_index > 0:
# pax({"main_param": tp(main_param)})
for model_param, main_shard in group_shard["param_map"].items():
# Model shard.
......@@ -1140,15 +1196,29 @@ class Float16DistributedOptimizer(BaseFloat16Optimizer):
# "model_shard" : str(model_shard),
# })
# pax(1, {
# pax(0, {
# **{
# "opt_group_shards / %d" % i : s
# for i, s in enumerate(self.opt_group_shards)
# },
# "main_param_shards" : self.main_param_shards,
# "main_params" : self.get_main_params(),
# })
def _copy_model_grads_to_main_grads(self):
def _copy_model_grads_to_main_grads(self, ITERATION):
# >>>
model_grads = self.get_local_model_grad_views()
model_has_nan = self.has_nan_debug(model_grads)
if model_has_nan:
pax(1, {
"ITERATION" : ITERATION,
"model grads" : model_grads,
"model_has_nan" : model_has_nan,
"model params / local" : self.get_local_model_param_views(),
# "model params / world" : [ list(self.param_gbuf_map),
# "main grads" : self.get_main_grads(),
})
# <<<
for group_index, group_shard in enumerate(self.opt_group_shards):
for model_param, main_shard in group_shard["param_map"].items():
......@@ -1161,7 +1231,6 @@ class Float16DistributedOptimizer(BaseFloat16Optimizer):
# Copy from DDP's contiguous buffer to main shard's grad.
model_grad = self.models[model_index]._grad_buffers[dtype].data
# main_grad = self.main_param_shards[group_index].grad
main_grad = self.get_main_grad(group_index)
# Copy sub-range within tensor.
......@@ -1185,22 +1254,42 @@ class Float16DistributedOptimizer(BaseFloat16Optimizer):
# })
# >>>
# pax(0, {
# "model_gbuf_shards" : self.model_gbuf_shards,
# "opt_group_shards" : self.opt_group_shards,
# pax(1, {
# # "model_gbuf_shards" : self.model_gbuf_shards,
# **{
# "opt_group_shards / %d" % i : s
# for i, s in enumerate(self.opt_group_shards)
# },
# "main_grads" : self.get_main_grads(),
# })
# for param in self.main_param_shards:
# grad = param.grad
# is_nan = torch.any(torch.isnan(grad)).item()
# for group_index, main_grad in enumerate(self.get_main_grads()):
# # is_nan = torch.any(torch.isnan(main_grad)).item()
# if is_nan:
# # opt_group_shard = self.opt_group_shards[group_index]
# # param_views = []
# # for param, shard in opt_group_shard["param_map"].items():
# # ddd
# pax(0, {
# "grad" : tp(grad),
# "opt_group_shard" : self.opt_group_shards[group_index],
# "param_map" : [ (str(p.shape), str(d)) for p, d in self.opt_group_shards[group_index]["param_map"].items() ],
# "gbufs" : [ b.data for m in self.models for d, b in m._grad_buffers.items() ],
# "group_index" : group_index,
# "main_param" : tp(self.get_main_param(group_index)),
# "main_grad" : tp(main_grad),
# "is_nan" : is_nan,
# })
main_grads = self.get_main_grads()
main_has_nan = self.has_nan_debug(main_grads)
if main_has_nan:
raise Exception("hi.")
# pax(1, {
# "model grads" : self.get_local_model_grad_views(),
# })
# <<<
def _copy_main_params_to_model_params(self):
def _copy_main_params_to_model_params(self, ITERATION):
for group_index, group_shard in enumerate(self.opt_group_shards):
for model_param, main_shard in group_shard["param_map"].items():
......@@ -1213,7 +1302,6 @@ class Float16DistributedOptimizer(BaseFloat16Optimizer):
# Use DDP's contiguous buffer to temporarily hold params.
model_param = self.models[model_index]._grad_buffers[dtype].data
# main_param = self.main_param_shards[group_index]
main_param = self.get_main_param(group_index)
# Copy sub-range within tensor.
......@@ -1243,12 +1331,20 @@ class Float16DistributedOptimizer(BaseFloat16Optimizer):
# })
# >>>
for param in self.param_gbuf_map:
is_nan = torch.any(torch.isnan(param)).item()
# is_nan = torch.any(torch.isnan(param)).item()
is_nan = not torch.all(torch.isfinite(param)).item()
if is_nan:
pax({
"param" : tp(param),
"is_nan" : is_nan,
})
# model_param_views = self.get_local_model_param_views()
# pax(1, {
# "ITERATION" : ITERATION,
# "main params" : self.get_main_params(),
# "model params / local" : self.get_local_model_param_views(),
# })
# <<<
# <<<
......
......@@ -403,7 +403,8 @@ def setup_model_and_optimizer(model_provider_func,
def train_step(forward_step_func, data_iterator,
model, optimizer, opt_param_scheduler):
model, optimizer, opt_param_scheduler,
ITERATION):
"""Single training step."""
args = get_args()
timers = get_timers()
......@@ -441,7 +442,7 @@ def train_step(forward_step_func, data_iterator,
# Update parameters.
timers('optimizer').start()
update_successful, grad_norm, num_zeros_in_grad = optimizer.step()
update_successful, grad_norm, num_zeros_in_grad = optimizer.step(ITERATION)
timers('optimizer').stop()
# >>>
......@@ -687,7 +688,10 @@ def train(forward_step_func, model, optimizer, opt_param_scheduler,
train_data_iterator,
model,
optimizer,
opt_param_scheduler)
opt_param_scheduler
# >>>
,ITERATION = iteration)
# <<<
iteration += 1
args.consumed_train_samples += mpu.get_data_parallel_world_size() * \
args.micro_batch_size * \
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment