Commit 1f387c2c authored by Lawrence McAfee's avatar Lawrence McAfee
Browse files

fixed bug for no-interleave pipeline schedule

parent 9a8b89ac
......@@ -28,6 +28,10 @@ from megatron.model import DistributedDataParallel as LocalDDP
from megatron.model import Float16Module
from megatron.model import ModelType
# >>>
from lutil import pax, tp, KEY_RANK
# <<<
def get_forward_backward_func():
args = get_args()
if mpu.get_pipeline_model_parallel_world_size() > 1:
......@@ -42,19 +46,52 @@ def get_forward_backward_func():
forward_backward_func = forward_backward_no_pipelining
return forward_backward_func
def free_output_tensor(output_tensors, deallocate_pipeline_outputs):
# >>>
# def free_output_tensor(output_tensors, deallocate_pipeline_outputs):
# '''Pseudo-free (i.e., set to scalar) the output tensor's '.data' field.
# This method should be called right after the output tensor has been
# sent to the next pipeline stage. At this point, the output tensor is
# only useful for its '.grad_fn' field, and not its '.data'.
# '''
# # >>>
# # raise Exception("hi.")
# # <<<
# if not deallocate_pipeline_outputs or output_tensors is None:
# return
# if isinstance(output_tensors, torch.Tensor):
# output_tensors = [output_tensors]
# for output_tensor in output_tensors:
# # >>>
# # if output_tensor.nelement() < 10:
# # # raise Exception("interesting.")
# # continue
# # <<<
# # >>>
# # output_tensor.data = torch.cuda.FloatTensor([0])
# output_tensor.data = torch.empty(
# (1,),
# device = torch.cuda.current_device(),
# dtype = output_tensor.dtype,
# )
# # <<<
# <<<
def free_output_tensor(out, deallocate_pipeline_outputs):
'''Pseudo-free (i.e., set to scalar) the output tensor's '.data' field.
This method should be called right after the output tensor has been
sent to the next pipeline stage. At this point, the output tensor is
only useful for its '.grad_fn' field, and not its '.data'.
'''
if not deallocate_pipeline_outputs or output_tensors is None:
return
if isinstance(output_tensors, torch.Tensor):
output_tensors = [output_tensors]
for output_tensor in output_tensors:
output_tensor.data = torch.cuda.FloatTensor([0])
assert isinstance(out, torch.Tensor), \
"expected Tensor, found %s." % type(out).__name__
assert out._base is None, \
"counter-productive to free a view of another tensor."
out.data = torch.empty(
(1,),
device = out.device,
dtype = out.dtype,
)
def custom_backward(output, grad_output):
'''Directly call C++ autograd engine.
......@@ -81,6 +118,8 @@ def custom_backward(output, grad_output):
)
# Call c++ engine [ see torch/csrc/autograd/python_engine.cpp ]
# >>>
try:
Variable._execution_engine.run_backward(
tensors = (output,),
grad_tensors = (grad_output,),
......@@ -90,6 +129,11 @@ def custom_backward(output, grad_output):
allow_unreachable=True,
accumulate_grad=True,
)
except Exception as e:
print(">>>> rank = %d. <<<<" % torch.distributed.get_rank())
raise e
# <<<
def forward_step(forward_step_func, data_iterator, model, input_tensor, losses_reduced):
"""Forward step for passed-in model.
......@@ -119,6 +163,14 @@ def forward_step(forward_step_func, data_iterator, model, input_tensor, losses_r
losses_reduced.append(loss_reduced)
timers('forward-compute').stop()
# >>>
# if torch.distributed.get_rank() == 4:
# pax(4, {
# "output_tensor" : tp(output_tensor),
# "input_tensor[-1]" : tp(input_tensor[-1]),
# })
# <<<
# If T5 model (or other model with encoder and decoder)
# and in decoder stack, then send encoder_hidden_state
# downstream as well.
......@@ -165,6 +217,9 @@ def backward_step(optimizer, input_tensor, output_tensor, output_tensor_grad):
if output_tensor_grad[0] is None:
output_tensor = optimizer.scale_loss(output_tensor[0])
if args.deallocate_pipeline_outputs:
# >>>
# pax(4, {"output_tensor": output_tensor})
# <<<
custom_backward(output_tensor[0], output_tensor_grad[0])
else:
torch.autograd.backward(output_tensor[0],
......@@ -617,7 +672,10 @@ def forward_backward_pipelining_without_interleaving(forward_step_func, data_ite
if not forward_only:
input_tensors.append(input_tensor)
output_tensors.append(output_tensor)
free_output_tensor(output_tensor, args.deallocate_pipeline_outputs)
# >>>
# pax(2, {"output_tensor": output_tensor})
# <<<
free_output_tensor(output_tensor[0], args.deallocate_pipeline_outputs)
# Before running 1F1B, need to receive first forward tensor.
# If all microbatches are run in warmup / cooldown phase, then no need to
......@@ -646,7 +704,14 @@ def forward_backward_pipelining_without_interleaving(forward_step_func, data_ite
# Add input_tensor and output_tensor to end of list.
input_tensors.append(input_tensor)
output_tensors.append(output_tensor)
free_output_tensor(output_tensor, args.deallocate_pipeline_outputs)
# >>>
# if torch.distributed.get_rank() == 3:
# pax({"output_tensor": output_tensor})
# <<<
# >>>
# free_output_tensor(output_tensor, args.deallocate_pipeline_outputs)
free_output_tensor(output_tensor[0], args.deallocate_pipeline_outputs)
# <<<
# Pop input_tensor and output_tensor from the start of the list for
# the backward pass.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment