"git@developer.sourcefind.cn:OpenDAS/autoawq_kernels.git" did not exist on "cb74e4eec6bd1446cc84d3d343eef96720a42d0d"
Unverified Commit 68440264 authored by Aidyn-A's avatar Aidyn-A Committed by GitHub
Browse files

[transformer] add loss comparison to test_pipeline_parallel_fwd_bwd (#1374)

* add loss comparison to test_pipeline_parallel_fwd_bwd

* applied some suggested changes

* update test_pipeline_parallel_fwd_bwd.py

* update test_pipeline_parallel_fwd_bwd.py 2

* minor update

* update test_pipeline_parallel_fwd_bwd.py 3
parent c3018b13
......@@ -29,11 +29,35 @@ from apex.transformer.testing import commons as testing_utils
logging.getLogger("apex").setLevel(logging.WARNING)
weight_coeff = 1024
@torch.no_grad()
def init_weights(m):
rank = torch.distributed.get_rank()
if isinstance(m, torch.nn.Linear):
m.weight.fill_((rank + 1.0) / weight_coeff)
m.bias.fill_(1.0)
def get_target_loss(hidden_size: int, microbatch_size: int, parallel_model_world_size: int, world_size: int) -> float:
layers_per_rank = world_size // parallel_model_world_size
data = torch.arange(start = 0, end = layers_per_rank, dtype = torch.int) + 1
w = (torch.arange(world_size, dtype = torch.float) + 1.0) / weight_coeff
b = torch.ones(world_size, dtype = torch.int)
w = hidden_size * w
for s_id in range(0, world_size, layers_per_rank):
e_id = s_id+layers_per_rank
data = w[s_id:e_id] * data + b[s_id:e_id]
return hidden_size * hidden_size * torch.sum(data).item() * microbatch_size / layers_per_rank
class PipelineParallelForwardBackwardTest(DistributedTestBase):
GLOBAL_BATCH_SIZE = 16
MICRO_BATCH_SIZE = 1
MICRO_BATCH_SIZE = 2
HIDDEN_SIZE = 32
@property
......@@ -57,12 +81,11 @@ class PipelineParallelForwardBackwardTest(DistributedTestBase):
)
tensor_model_parallel_world_size = 1
data_parallel_size = 1 + (self.world_size >= 8 and self.world_size % 2 == 0)
pipeline_model_parallel_world_size = (
self.world_size
// (tensor_model_parallel_world_size * data_parallel_size)
if pipeline_model_parallel_world_size is None
else 1
)
if pipeline_model_parallel_world_size is None:
pipeline_model_parallel_world_size = self.world_size // (tensor_model_parallel_world_size * data_parallel_size)
else:
data_parallel_size = self.world_size // (tensor_model_parallel_world_size * pipeline_model_parallel_world_size)
parallel_state.initialize_model_parallel(
tensor_model_parallel_size_=tensor_model_parallel_world_size,
......@@ -83,7 +106,8 @@ class PipelineParallelForwardBackwardTest(DistributedTestBase):
PipelineParallelForwardBackwardTest.HIDDEN_SIZE,
PipelineParallelForwardBackwardTest.HIDDEN_SIZE,
)
batch = (torch.randn(global_batch_shape).cuda(),)
batch =(((self.rank + 1) * torch.ones(global_batch_shape)).cuda(), )
model = build_model(
testing_utils.model_provider_func,
......@@ -91,12 +115,16 @@ class PipelineParallelForwardBackwardTest(DistributedTestBase):
virtual_pipeline_model_parallel_size=virtual_pipeline_model_parallel_size,
hidden_size=PipelineParallelForwardBackwardTest.HIDDEN_SIZE,
)
for model_module in model:
model_module.apply(init_weights)
_param_groups = _get_params_for_weight_decay_optimization(model)
optimizer = torch.optim.Adam(_param_groups, lr=1e-3)
pp_utils.update_num_microbatches(0)
fwd_bwd_func(
loss = fwd_bwd_func(
testing_utils.fwd_step_func,
batch,
model,
......@@ -112,6 +140,15 @@ class PipelineParallelForwardBackwardTest(DistributedTestBase):
deallocate_pipeline_output=deallocate_pipeline_outputs,
)
if dtype == torch.float32:
hidden_size = PipelineParallelForwardBackwardTest.HIDDEN_SIZE
microbatch_size = PipelineParallelForwardBackwardTest.MICRO_BATCH_SIZE
target_loss = get_target_loss(hidden_size, microbatch_size, pipeline_model_parallel_world_size, self.world_size)
for loss_item in loss:
x = loss_item['avg']
torch.testing.assert_close(x, target_loss*torch.ones_like(x))
if not forward_only:
for m in model:
for p in m.parameters():
......@@ -139,12 +176,12 @@ class PipelineParallelForwardBackwardTest(DistributedTestBase):
def test_pipelining_with_interleaving(self):
self._forward_backward_test_impl(
False, _forward_backward_pipelining_with_interleaving, 2, None
False, _forward_backward_pipelining_with_interleaving, None, None
)
def test_pipelining_with_interleaving_inference(self):
self._forward_backward_test_impl(
True, _forward_backward_pipelining_with_interleaving, 2, None
True, _forward_backward_pipelining_with_interleaving, None, None
)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment