Commit 862d70fc authored by Lawrence McAfee's avatar Lawrence McAfee
Browse files

small fixes.

parent 08ee8ea2
......@@ -126,19 +126,11 @@ class DistributedDataParallel(DistributedDataParallelBase):
# the case we use continuous buffers.
# ===================================
self._grad_buffers = None
# >>>
# from collections import defaultdict
# self._grad_buffer_param_offsets = None
self._grad_buffer_param_index_map = None
# <<<
if self.use_contiguous_buffers:
self._grad_buffers = {}
# >>>
# self._grad_buffer_param_offsets = defaultdict(dict)
# self._grad_buffer_param_index_map = defaultdict(dict)
self._grad_buffer_param_index_map = {}
data_parallel_world_size = mpu.get_data_parallel_world_size()
# <<<
# Simple function to define buffer type.
def _get_buffer_type(param):
......
......@@ -34,7 +34,7 @@ from .clip_grads import clip_grad_norm_fp32, count_zeros_fp32
# >>>
from lutil import pax, tp
DEBUG_ITERATION = 2 # 10
DEBUG_ITERATION = 1 # 10
# <<<
......@@ -239,6 +239,9 @@ class MegatronOptimizer(ABC):
torch.distributed.all_reduce(grad, group=mpu.get_position_embedding_group())
def allreduce_embedding_grads(self, args):
# >>>
# return # ** .. TEMPORARY .. **
# <<<
self.allreduce_word_embedding_grads(args)
self.allreduce_position_embedding_grads(args)
......@@ -330,58 +333,60 @@ class MixedPrecisionOptimizer(MegatronOptimizer):
return found_inf_flag
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# @classmethod
# def debug_base(cls, ITERATION, key, value):
# from megatron import get_args
# args = get_args()
# my_rank = torch.distributed.get_rank()
# if ITERATION != DEBUG_ITERATION:
# return
# for r in range(torch.distributed.get_world_size()):
# if my_rank == r:
# print(" + br/%s; [r%d, i%d]; %s, %.12e" % ("fix " if args.use_distributed_optimizer else "main", my_rank, ITERATION, key, value))
# torch.distributed.barrier()
# torch.distributed.barrier()
# # if my_rank == 0:
# # raise Exception("debug.")
# # else:
# # exit(0)
# exit(0)
# def debug_model(self, ITERATION, key, use_grad):
# use_grad = bool(use_grad)
# tensors = [
# (p.main_grad.float() if use_grad else p.float())
# for m in self.models for p in m.parameters()
# ]
# count = sum(t.nelement() for t in tensors)
# return self.debug_base(
# ITERATION,
# "model/%s, %s [count %d]" % (
# "grad" if use_grad else "param",
# key,
# count,
# ),
# # sum(torch.sum(torch.abs(t)) for t in tensors).item() / count,
# sum(torch.sum(torch.abs(t)) for t in tensors),
# )
# def debug_main(self, ITERATION, key, use_grad):
# use_grad = bool(use_grad)
# tensors = [
# p.grad if use_grad else p
# for g in self.optimizer.param_groups
# for p in g["params"]
# ]
# tensors = [ t.float() for t in tensors ]
# count = sum(t.nelement() for t in tensors)
# return self.debug_base(
# ITERATION,
# "main/%s, %s [count %d]" % (
# "grad" if use_grad else "param",
# key,
# count,
# ),
# sum(torch.sum(torch.abs(t)) for t in tensors),
# )
@classmethod
def debug_base(cls, ITERATION, key, value):
from megatron import get_args
args = get_args()
my_rank = torch.distributed.get_rank()
if ITERATION != DEBUG_ITERATION:
return
for r in range(torch.distributed.get_world_size()):
if my_rank == r:
# prefix = " + "
prefix = ""
print("%sbr/%s; [r%d, i%d]; %s, %.12e" % (prefix, "fix " if args.use_distributed_optimizer else "main", my_rank, ITERATION, key, value))
torch.distributed.barrier()
torch.distributed.barrier()
# if my_rank == 0:
# raise Exception("debug.")
# else:
# exit(0)
exit(0)
def debug_model(self, ITERATION, key, use_grad):
use_grad = bool(use_grad)
tensors = [
(p.main_grad.float() if use_grad else p.float())
for m in self.models for p in m.parameters()
]
count = sum(t.nelement() for t in tensors)
return self.debug_base(
ITERATION,
"model/%s, %s [count %d]" % (
"grad" if use_grad else "param",
key,
count,
),
# sum(torch.sum(torch.abs(t)) for t in tensors).item() / count,
sum(torch.sum(torch.abs(t)) for t in tensors),
)
def debug_main(self, ITERATION, key, use_grad):
use_grad = bool(use_grad)
tensors = [
p.grad if use_grad else p
for g in self.optimizer.param_groups
for p in g["params"]
]
tensors = [ t.float() for t in tensors ]
count = sum(t.nelement() for t in tensors)
return self.debug_base(
ITERATION,
"main/%s, %s [count %d]" % (
"grad" if use_grad else "param",
key,
count,
),
sum(torch.sum(torch.abs(t)) for t in tensors),
)
# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
@torch.no_grad()
......@@ -433,6 +438,11 @@ class MixedPrecisionOptimizer(MegatronOptimizer):
self._copy_main_params_to_model_params(ITERATION)
timers('optimizer-copy-main-to-model-params').stop()
# >>>
# self.debug_model(ITERATION, "after copy param.", 0)
# self.debug_main(ITERATION, "after copy param.", 0)
# <<<
# Successful update.
return True, grad_norm, num_zeros_in_grad
......
......@@ -432,7 +432,7 @@ def train_step(forward_step_func, data_iterator,
torch.cuda.empty_cache()
# >>>
# optimizer.debug_model(ITERATION, "before reduce grads.", 0)
# optimizer.debug_model(ITERATION, "before reduce grads.", 1)
# <<<
# >>>
......@@ -451,7 +451,7 @@ def train_step(forward_step_func, data_iterator,
# <<<
# >>>
# optimizer.debug_model(ITERATION, "after gather params.", 0)
optimizer.debug_model(ITERATION, "after gather params.", 0)
# <<<
# Update learning rate.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment