Unverified Commit 61c20b44 authored by Jiarui Fang's avatar Jiarui Fang Committed by GitHub
Browse files

[log] local throughput metrics (#811)

* Revert "[zero] add ZeroTensorShardStrategy (#793)"

This reverts commit 88759e28.

* [gemini] set cpu memory capacity

* [log] local throughput collecting

* polish

* polish

* polish

* polish code

* polish
parent dd92b90a
...@@ -124,7 +124,7 @@ class LossMetric(Metric): ...@@ -124,7 +124,7 @@ class LossMetric(Metric):
def get_last_step_value(self) -> str: def get_last_step_value(self) -> str:
"""Returns :attr:`last_step_loss`. """Returns :attr:`last_step_loss`.
""" """
return str(self.last_step_loss) return str(self.last_step_loss.cpu().item())
@staticmethod @staticmethod
def is_better(a, b): def is_better(a, b):
...@@ -207,7 +207,7 @@ class AccuracyMetric(Metric): ...@@ -207,7 +207,7 @@ class AccuracyMetric(Metric):
def get_last_step_value(self) -> str: def get_last_step_value(self) -> str:
self.last_step_sum = all_reduce(self.last_step_sum, ParallelMode.DATA) self.last_step_sum = all_reduce(self.last_step_sum, ParallelMode.DATA)
self.last_step_correct = all_reduce(self.last_step_correct, ParallelMode.DATA) self.last_step_correct = all_reduce(self.last_step_correct, ParallelMode.DATA)
return str(_format_number((self.last_step_correct / self.last_step_sum).item())) return str(_format_number((self.last_step_correct / self.last_step_sum).cpu().item()))
def get_accumulated_value(self): def get_accumulated_value(self):
self.accumulated_sum = all_reduce(self.accumulated_sum, ParallelMode.DATA) self.accumulated_sum = all_reduce(self.accumulated_sum, ParallelMode.DATA)
...@@ -324,7 +324,7 @@ class ThroughputMetric(Metric): ...@@ -324,7 +324,7 @@ class ThroughputMetric(Metric):
epoch_only (bool): Whether the metric only read for the full epoch. epoch_only (bool): Whether the metric only read for the full epoch.
""" """
def __init__(self, epoch_only: bool, ignored_steps: int = 0, tflop_per_step: int = 0): def __init__(self, epoch_only: bool, ignored_steps: int = 0, tflop_per_step: int = 0, use_local: bool = False):
super().__init__(epoch_only=epoch_only) super().__init__(epoch_only=epoch_only)
self.ignored_steps = ignored_steps self.ignored_steps = ignored_steps
self.cur_steps = 0 self.cur_steps = 0
...@@ -333,6 +333,7 @@ class ThroughputMetric(Metric): ...@@ -333,6 +333,7 @@ class ThroughputMetric(Metric):
self.last_step_num_samples = torch.zeros(1, device=get_current_device()) self.last_step_num_samples = torch.zeros(1, device=get_current_device())
self.last_step_used_time = torch.zeros(1, device=get_current_device()) self.last_step_used_time = torch.zeros(1, device=get_current_device())
self._tflop_per_step = tflop_per_step self._tflop_per_step = tflop_per_step
self._use_local = use_local
def reset(self) -> None: def reset(self) -> None:
# self.cur_steps = 0 # self.cur_steps = 0
...@@ -350,9 +351,13 @@ class ThroughputMetric(Metric): ...@@ -350,9 +351,13 @@ class ThroughputMetric(Metric):
self.accumulated_used_time += self.last_step_used_time self.accumulated_used_time += self.last_step_used_time
def get_last_step_value(self) -> str: def get_last_step_value(self) -> str:
if self._use_local:
self.last_step_num_samples *= gpc.get_world_size(ParallelMode.DATA)
else:
self.last_step_used_time = all_reduce(self.last_step_used_time, ParallelMode.DATA) / \ self.last_step_used_time = all_reduce(self.last_step_used_time, ParallelMode.DATA) / \
gpc.get_world_size(ParallelMode.DATA) gpc.get_world_size(ParallelMode.DATA)
self.last_step_num_samples = all_reduce(self.last_step_num_samples, ParallelMode.DATA) self.last_step_num_samples = all_reduce(self.last_step_num_samples, ParallelMode.DATA)
sample_per_sec = _format_number(self.last_step_num_samples / (self.last_step_used_time + 1e-12).item()) sample_per_sec = _format_number(self.last_step_num_samples / (self.last_step_used_time + 1e-12).item())
if self._tflop_per_step > 0: if self._tflop_per_step > 0:
tflops = _format_number(self._tflop_per_step / (self.last_step_used_time.item() + 1e-12)) tflops = _format_number(self._tflop_per_step / (self.last_step_used_time.item() + 1e-12))
...@@ -380,19 +385,23 @@ class ThroughputHook(MetricHook): ...@@ -380,19 +385,23 @@ class ThroughputHook(MetricHook):
priority (int, optional): Priority in the printing, hooks with small priority will be printed in front priority (int, optional): Priority in the printing, hooks with small priority will be printed in front
defaults to 10. If different hooks share same priority, the order of printing would defaults to 10. If different hooks share same priority, the order of printing would
depend on the hooks order in the hook list. depend on the hooks order in the hook list.
tflop_per_step(int, optional): tera floating point operations per step.
use_local (bool, optional): Whether to use local time for throughput calculation.
""" """
def __init__(self, ignored_steps: int = 0, priority: int = 10, tflop_per_step: int = 0): def __init__(self, ignored_steps: int = 0, priority: int = 10, tflop_per_step: int = 0, use_local=False):
super().__init__(priority) super().__init__(priority)
self.ignored_steps = ignored_steps self.ignored_steps = ignored_steps
self._tflop_per_step = tflop_per_step self._tflop_per_step = tflop_per_step
self._use_local = use_local
def after_hook_is_attached(self, trainer): def after_hook_is_attached(self, trainer):
self._check_metric_states_initialization(trainer) self._check_metric_states_initialization(trainer)
if self._is_stage_to_compute: if self._is_stage_to_compute:
self.metric = ThroughputMetric(epoch_only=True, self.metric = ThroughputMetric(epoch_only=True,
ignored_steps=self.ignored_steps, ignored_steps=self.ignored_steps,
tflop_per_step=self._tflop_per_step) tflop_per_step=self._tflop_per_step,
use_local=self._use_local)
# register the metric # register the metric
trainer.states['metrics']['train']['Throughput'] = self.metric trainer.states['metrics']['train']['Throughput'] = self.metric
......
...@@ -23,10 +23,10 @@ def convert_to_zero_v2(model: nn.Module, optimizer: torch.optim.Optimizer, model ...@@ -23,10 +23,10 @@ def convert_to_zero_v2(model: nn.Module, optimizer: torch.optim.Optimizer, model
logger = get_dist_logger('convert_to_zero_v2') logger = get_dist_logger('convert_to_zero_v2')
logger.info(f'optimizer_config is {optimizer_config}') logger.info(f'optimizer_config is {optimizer_config}', ranks=[0])
if optimizer_config is None: if optimizer_config is None:
optimizer_config = dict() optimizer_config = dict()
logger.info(f'model_config is {model_config}') logger.info(f'model_config is {model_config}', ranks=[0])
if model_config is None: if model_config is None:
model_config = dict() model_config = dict()
......
...@@ -122,7 +122,8 @@ class ShardedOptimizerV2(ColossalaiOptimizer): ...@@ -122,7 +122,8 @@ class ShardedOptimizerV2(ColossalaiOptimizer):
self._register_master_weight() self._register_master_weight()
if self.gpu_margin_mem_ratio != 0.0 and not isinstance(sharded_model._tensor_placement_policy, if self.gpu_margin_mem_ratio != 0.0 and not isinstance(sharded_model._tensor_placement_policy,
AutoTensorPlacementPolicy): AutoTensorPlacementPolicy):
self._logger.warning(f'gpu_margin_mem_ratio is meaningless when tensor_placement_policy is not "auto"') self._logger.warning(f'gpu_margin_mem_ratio is meaningless when tensor_placement_policy is not "auto"',
ranks=[0])
if self._verbose: if self._verbose:
self._logger.debug( self._logger.debug(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment