Unverified Commit 6456bcad authored by user4543's avatar user4543 Committed by GitHub
Browse files

Bug - Fix bugs in sync results on root rank for e2e model benchmarks (#342)

**Description**
Fix bugs in sync results on root rank for e2e model benchmarks.

Bugs:
 - results were not changed to sync results (grammer)
 - sync results not applyed to all ranks but only root rank
 - output result on local_rank 0 not global root rank
parent c770ed5d
...@@ -35,6 +35,7 @@ def __init__(self, name, parameters=''): ...@@ -35,6 +35,7 @@ def __init__(self, name, parameters=''):
self._benchmark_type = BenchmarkType.MODEL self._benchmark_type = BenchmarkType.MODEL
self._world_size = 1 self._world_size = 1
self._local_rank = None self._local_rank = None
self._global_rank = None
self._dataset = None self._dataset = None
self._dataloader = None self._dataloader = None
self._model = None self._model = None
...@@ -242,7 +243,8 @@ def __train(self, precision): ...@@ -242,7 +243,8 @@ def __train(self, precision):
# The unit of step time should be millisecond. # The unit of step time should be millisecond.
step_times = self._train_step(precision) step_times = self._train_step(precision)
if not self.__process_model_result(ModelAction.TRAIN, precision, step_times): step_times = self.__process_model_result(ModelAction.TRAIN, precision, step_times)
if not step_times:
self._result.set_return_code(ReturnCode.INVALID_BENCHMARK_RESULT) self._result.set_return_code(ReturnCode.INVALID_BENCHMARK_RESULT)
return False return False
...@@ -266,7 +268,8 @@ def __inference(self, precision): ...@@ -266,7 +268,8 @@ def __inference(self, precision):
self._create_model(precision) self._create_model(precision)
# The unit of step time should be millisecond. # The unit of step time should be millisecond.
step_times = self._inference_step(precision) step_times = self._inference_step(precision)
if not self.__process_model_result(ModelAction.INFERENCE, precision, step_times): step_times = self.__process_model_result(ModelAction.INFERENCE, precision, step_times)
if not step_times:
self._result.set_return_code(ReturnCode.INVALID_BENCHMARK_RESULT) self._result.set_return_code(ReturnCode.INVALID_BENCHMARK_RESULT)
return False return False
...@@ -369,9 +372,9 @@ def _sync_result(self, result): ...@@ -369,9 +372,9 @@ def _sync_result(self, result):
result (list): The result data to sync. result (list): The result data to sync.
Return: Return:
True if reduce result data successfully. Result if reduce result data successfully, otherwise None.
""" """
return True return result
def __process_model_result(self, model_action, precision, step_times): def __process_model_result(self, model_action, precision, step_times):
"""Function to process raw results and save the summarized results. """Function to process raw results and save the summarized results.
...@@ -382,7 +385,7 @@ def __process_model_result(self, model_action, precision, step_times): ...@@ -382,7 +385,7 @@ def __process_model_result(self, model_action, precision, step_times):
step_times (list): The step time list of every training/inference step, unit is millisecond. step_times (list): The step time list of every training/inference step, unit is millisecond.
Return: Return:
True if step_times list is not empty. step_times if step_times list is not empty, otherwise None.
""" """
if len(step_times) == 0: if len(step_times) == 0:
logger.error( logger.error(
...@@ -390,7 +393,7 @@ def __process_model_result(self, model_action, precision, step_times): ...@@ -390,7 +393,7 @@ def __process_model_result(self, model_action, precision, step_times):
self._curr_run_index, self._name, model_action, precision self._curr_run_index, self._name, model_action, precision
) )
) )
return False return None
precision_metric = {'float16': 'fp16', 'float32': 'fp32', 'float64': 'fp64', 'bfloat16': 'bf16'} precision_metric = {'float16': 'fp16', 'float32': 'fp32', 'float64': 'fp64', 'bfloat16': 'bf16'}
if precision.value in precision_metric.keys(): if precision.value in precision_metric.keys():
...@@ -404,9 +407,10 @@ def __process_model_result(self, model_action, precision, step_times): ...@@ -404,9 +407,10 @@ def __process_model_result(self, model_action, precision, step_times):
self._result.add_raw_data(metric_t, throughput, self._args.log_raw_data) self._result.add_raw_data(metric_t, throughput, self._args.log_raw_data)
if model_action == ModelAction.TRAIN: if model_action == ModelAction.TRAIN:
if not self._sync_result(step_times): step_times = self._sync_result(step_times)
return False if not step_times:
if self._local_rank is None or self._local_rank == 0: return None
if self._local_rank is None or self._global_rank == 0:
self._result.add_result(metric_s, statistics.mean(step_times)) self._result.add_result(metric_s, statistics.mean(step_times))
throughput = [millisecond_per_second / step_time * self._args.batch_size for step_time in step_times] throughput = [millisecond_per_second / step_time * self._args.batch_size for step_time in step_times]
self._result.add_result(metric_t, statistics.mean(throughput)) self._result.add_result(metric_t, statistics.mean(throughput))
...@@ -416,7 +420,7 @@ def __process_model_result(self, model_action, precision, step_times): ...@@ -416,7 +420,7 @@ def __process_model_result(self, model_action, precision, step_times):
self._process_percentile_result(metric_s, step_times) self._process_percentile_result(metric_s, step_times)
self._process_percentile_result(metric_t, throughput) self._process_percentile_result(metric_t, throughput)
return True return step_times
@abstractmethod @abstractmethod
def _cal_params_count(self): def _cal_params_count(self):
......
...@@ -60,6 +60,7 @@ def _init_distributed_setting(self): ...@@ -60,6 +60,7 @@ def _init_distributed_setting(self):
hvd.init() hvd.init()
self._world_size = int(hvd.size()) self._world_size = int(hvd.size())
self._local_rank = int(hvd.local_rank()) self._local_rank = int(hvd.local_rank())
self._global_rank = int(hvd.rank())
elif self._args.distributed_impl == DistributedImpl.DDP: elif self._args.distributed_impl == DistributedImpl.DDP:
if os.environ.get('WORLD_SIZE') is None or os.environ.get('LOCAL_RANK') is None: if os.environ.get('WORLD_SIZE') is None or os.environ.get('LOCAL_RANK') is None:
logger.error( logger.error(
...@@ -70,17 +71,17 @@ def _init_distributed_setting(self): ...@@ -70,17 +71,17 @@ def _init_distributed_setting(self):
# torch >= 1.9.0a0 torch.distributed.elastic is used by default # torch >= 1.9.0a0 torch.distributed.elastic is used by default
port = int(os.environ['MASTER_PORT']) + 1 port = int(os.environ['MASTER_PORT']) + 1
addr = os.environ['MASTER_ADDR'] addr = os.environ['MASTER_ADDR']
global_rank = int(os.environ['RANK']) self._global_rank = int(os.environ['RANK'])
self._local_rank = int(os.environ['LOCAL_RANK']) self._local_rank = int(os.environ['LOCAL_RANK'])
self._world_size = int(os.environ['WORLD_SIZE']) self._world_size = int(os.environ['WORLD_SIZE'])
logger.debug('ip:{},port:{},rank:{},world:{}'.format(addr, port, global_rank, self._world_size)) logger.debug('ip:{},port:{},rank:{},world:{}'.format(addr, port, self._global_rank, self._world_size))
store = PrefixStore( store = PrefixStore(
self._name, TCPStore(addr, port, self._world_size, global_rank == 0, timedelta(seconds=300)) self._name, TCPStore(addr, port, self._world_size, self._global_rank == 0, timedelta(seconds=300))
) )
torch.distributed.init_process_group( torch.distributed.init_process_group(
backend=self._args.distributed_backend.value, backend=self._args.distributed_backend.value,
timeout=timedelta(seconds=300), timeout=timedelta(seconds=300),
rank=global_rank, rank=self._global_rank,
world_size=self._world_size, world_size=self._world_size,
store=store store=store
) )
...@@ -195,10 +196,11 @@ def _sync_result(self, result): ...@@ -195,10 +196,11 @@ def _sync_result(self, result):
result (list): The result data to sync. result (list): The result data to sync.
Return: Return:
True if reduce result data successfully. Result if reduce result data successfully, otherwise None.
""" """
if not super()._sync_result(result): result = super()._sync_result(result)
return False if not result:
return None
try: try:
if self._args.distributed_impl == DistributedImpl.DDP: if self._args.distributed_impl == DistributedImpl.DDP:
...@@ -206,7 +208,7 @@ def _sync_result(self, result): ...@@ -206,7 +208,7 @@ def _sync_result(self, result):
tensor = torch.as_tensor(result).cuda() tensor = torch.as_tensor(result).cuda()
else: else:
tensor = torch.as_tensor(result) tensor = torch.as_tensor(result)
torch.distributed.reduce(tensor, 0, op=torch.distributed.ReduceOp.MAX) torch.distributed.all_reduce(tensor, op=torch.distributed.ReduceOp.MAX)
result = tensor.tolist() result = tensor.tolist()
except BaseException as e: except BaseException as e:
logger.error( logger.error(
...@@ -214,9 +216,9 @@ def _sync_result(self, result): ...@@ -214,9 +216,9 @@ def _sync_result(self, result):
self._name, self._args.distributed_impl, str(e) self._name, self._args.distributed_impl, str(e)
) )
) )
return False return None
return True return result
def _postprocess(self): def _postprocess(self):
"""Postprocess/cleanup operations after the benchmarking. """Postprocess/cleanup operations after the benchmarking.
......
...@@ -233,6 +233,13 @@ def test_pytorch_base(): ...@@ -233,6 +233,13 @@ def test_pytorch_base():
benchmark._optimizer_type = None benchmark._optimizer_type = None
assert (benchmark._create_optimizer() is False) assert (benchmark._create_optimizer() is False)
# Test _sync_result().
step_time = [2.0, 2.0]
benchmark._args.distributed_impl = DistributedImpl.DDP
step_time = benchmark._sync_result(step_time)
assert (not step_time)
benchmark._args.distributed_impl = None
# Test _postprocess(). # Test _postprocess().
assert (benchmark._postprocess()) assert (benchmark._postprocess())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment