Commit 9556ebcc authored by Ruoxin Sang's avatar Ruoxin Sang Committed by A. Unique TensorFlower
Browse files

Internal change

PiperOrigin-RevId: 323518930
parent a540c3d7
...@@ -207,7 +207,7 @@ class Controller: ...@@ -207,7 +207,7 @@ class Controller:
else: else:
logging.info("Evaluating at train step: %s", current_step) logging.info("Evaluating at train step: %s", current_step)
with self.eval_summary_manager.summary_writer.as_default(): with self.eval_summary_manager.summary_writer().as_default():
eval_outputs = self.evaluator.evaluate(steps) eval_outputs = self.evaluator.evaluate(steps)
if eval_outputs: if eval_outputs:
...@@ -339,7 +339,7 @@ class Controller: ...@@ -339,7 +339,7 @@ class Controller:
current_step += num_steps current_step += num_steps
num_steps = tf.convert_to_tensor(num_steps, dtype=tf.int32) num_steps = tf.convert_to_tensor(num_steps, dtype=tf.int32)
with self.summary_manager.summary_writer.as_default(): with self.summary_manager.summary_writer().as_default():
# Create a lambda that returns true when summaries should be written. # Create a lambda that returns true when summaries should be written.
should_record = False # Allows static optimization in no-summary cases. should_record = False # Allows static optimization in no-summary cases.
if self.summary_interval: if self.summary_interval:
......
...@@ -158,6 +158,57 @@ class TestEvaluator(standard_runner.StandardEvaluator): ...@@ -158,6 +158,57 @@ class TestEvaluator(standard_runner.StandardEvaluator):
} }
class TestEvaluatorWithNestedSummary(standard_runner.StandardEvaluator):
"""Implements the training and evaluation APIs for the test model."""
def __init__(self):
self.strategy = tf.distribute.get_strategy()
self.model = create_model()
dataset = self.strategy.experimental_distribute_datasets_from_function(
dataset_fn)
dataset2 = self.strategy.experimental_distribute_datasets_from_function(
dataset_fn)
self.loss = tf.keras.metrics.Mean("loss", dtype=tf.float32)
self.accuracy = tf.keras.metrics.CategoricalAccuracy(
"accuracy", dtype=tf.float32)
self.loss2 = tf.keras.metrics.Mean("loss", dtype=tf.float32)
self.accuracy2 = tf.keras.metrics.CategoricalAccuracy(
"accuracy", dtype=tf.float32)
standard_runner.StandardEvaluator.__init__(
self, eval_dataset={
"dataset": dataset,
"dataset2": dataset2
})
def eval_step(self, iterator):
def _replicated_step(loss, accuracy, inputs):
"""Replicated evaluation step."""
inputs, targets = inputs
outputs = self.model(inputs)
loss.update_state(tf.keras.losses.MSE(targets, outputs))
accuracy.update_state(targets, outputs)
self.strategy.run(
lambda inputs: _replicated_step(self.loss, self.accuracy, inputs),
args=(next(iterator["dataset"]),))
self.strategy.run(
lambda inputs: _replicated_step(self.loss2, self.accuracy2, inputs),
args=(next(iterator["dataset2"]),))
def eval_end(self):
return {
"dataset": {
"loss": self.loss.result(),
"accuracy": self.accuracy.result()
},
"dataset2": {
"loss": self.loss2.result(),
"accuracy": self.accuracy2.result()
},
}
class TestTrainerWithSummaries(standard_runner.StandardTrainer): class TestTrainerWithSummaries(standard_runner.StandardTrainer):
"""A Trainer model with summaries for testing purposes.""" """A Trainer model with summaries for testing purposes."""
...@@ -570,6 +621,31 @@ class ControllerTest(tf.test.TestCase, parameterized.TestCase): ...@@ -570,6 +621,31 @@ class ControllerTest(tf.test.TestCase, parameterized.TestCase):
self.assertLen( self.assertLen(
summaries_with_matching_keyword("eval_loss", self.model_dir), 2) summaries_with_matching_keyword("eval_loss", self.model_dir), 2)
def test_evaluate_with_nested_summaries(self):
test_evaluator = TestEvaluatorWithNestedSummary()
test_controller = controller.Controller(
evaluator=test_evaluator,
global_step=tf.Variable(0, dtype=tf.int64),
eval_summary_dir=self.model_dir)
test_controller.evaluate(steps=5)
self.assertNotEmpty(
tf.io.gfile.listdir(os.path.join(self.model_dir, "dataset")))
self.assertNotEmpty(
summaries_with_matching_keyword(
"loss", os.path.join(self.model_dir, "dataset")))
self.assertNotEmpty(
summaries_with_matching_keyword(
"accuracy", os.path.join(self.model_dir, "dataset")))
self.assertNotEmpty(
tf.io.gfile.listdir(os.path.join(self.model_dir, "dataset2")))
self.assertNotEmpty(
summaries_with_matching_keyword(
"loss", os.path.join(self.model_dir, "dataset2")))
self.assertNotEmpty(
summaries_with_matching_keyword(
"accuracy", os.path.join(self.model_dir, "dataset2")))
if __name__ == "__main__": if __name__ == "__main__":
tf.test.main() tf.test.main()
...@@ -45,7 +45,8 @@ class AbstractTrainer(tf.Module, metaclass=abc.ABCMeta): ...@@ -45,7 +45,8 @@ class AbstractTrainer(tf.Module, metaclass=abc.ABCMeta):
Returns: Returns:
The function may return a dictionary of `Tensors` or numpy arrays, which The function may return a dictionary of `Tensors` or numpy arrays, which
will be written to logs and as TensorBoard summaries. will be written to logs and as TensorBoard summaries. It can also be a
nested dictionary, yielding a hierarchy of summary directories.
""" """
pass pass
...@@ -67,6 +68,7 @@ class AbstractEvaluator(tf.Module, metaclass=abc.ABCMeta): ...@@ -67,6 +68,7 @@ class AbstractEvaluator(tf.Module, metaclass=abc.ABCMeta):
Returns: Returns:
The function may return a dictionary of `Tensors` or numpy arrays, which The function may return a dictionary of `Tensors` or numpy arrays, which
will be written to logs and as TensorBoard summaries. will be written to logs and as TensorBoard summaries. It can also be a
nested dictionary, yielding a hierarchy of summary directories.
""" """
pass pass
...@@ -144,7 +144,8 @@ class StandardTrainer(runner.AbstractTrainer, metaclass=abc.ABCMeta): ...@@ -144,7 +144,8 @@ class StandardTrainer(runner.AbstractTrainer, metaclass=abc.ABCMeta):
Returns: Returns:
The function may return a dictionary of `Tensors`, which will be The function may return a dictionary of `Tensors`, which will be
written to logs and as TensorBoard summaries. written to logs and as TensorBoard summaries. It can also be a
nested dictionary, yielding a hierarchy of summary directories.
""" """
pass pass
...@@ -261,7 +262,8 @@ class StandardEvaluator(runner.AbstractEvaluator, metaclass=abc.ABCMeta): ...@@ -261,7 +262,8 @@ class StandardEvaluator(runner.AbstractEvaluator, metaclass=abc.ABCMeta):
Returns: Returns:
The function may return a dictionary of `Tensors`, which will be The function may return a dictionary of `Tensors`, which will be
written to logs and as TensorBoard summaries. written to logs and as TensorBoard summaries. It can also be a
nested dictionary, yielding a hierarchy of summary directories.
""" """
pass pass
......
...@@ -20,6 +20,7 @@ import contextlib ...@@ -20,6 +20,7 @@ import contextlib
import functools import functools
import inspect import inspect
import os
import numpy as np import numpy as np
import tensorflow as tf import tensorflow as tf
...@@ -153,44 +154,80 @@ class SummaryManager: ...@@ -153,44 +154,80 @@ class SummaryManager:
self._enabled = (summary_dir is not None) self._enabled = (summary_dir is not None)
self._summary_dir = summary_dir self._summary_dir = summary_dir
self._summary_fn = summary_fn self._summary_fn = summary_fn
self._summary_writer = None self._summary_writers = {}
if global_step is None: if global_step is None:
self._global_step = tf.summary.experimental.get_step() self._global_step = tf.summary.experimental.get_step()
else: else:
self._global_step = global_step self._global_step = global_step
@property def summary_writer(self, relative_path=""):
def summary_writer(self): """Returns the underlying summary writer.
"""Returns the underlying summary writer."""
if self._summary_writer is not None: Args:
return self._summary_writer relative_path: The current path in which to write summaries, relative to
the summary directory. By default it is empty, which specifies the root
directory.
"""
if self._summary_writers and relative_path in self._summary_writers:
return self._summary_writers[relative_path]
if self._enabled: if self._enabled:
self._summary_writer = tf.summary.create_file_writer(self._summary_dir) self._summary_writers[relative_path] = tf.summary.create_file_writer(
os.path.join(self._summary_dir, relative_path))
else: else:
self._summary_writer = tf.summary.create_noop_writer() self._summary_writers[relative_path] = tf.summary.create_noop_writer()
return self._summary_writer return self._summary_writers[relative_path]
def flush(self): def flush(self):
"""Flush the underlying summary writer.""" """Flush the underlying summary writers."""
if self._enabled: if self._enabled:
tf.summary.flush(self.summary_writer) tf.nest.map_structure(tf.summary.flush, self._summary_writers)
def write_summaries(self, items): def write_summaries(self, summary_dict):
"""Write a bulk of summaries. """Write summaries for the given values.
This recursively creates sub-directories for any nested dictionaries
provided in `summary_dict`, yielding a hierarchy of directories which will
then be reflected in the TensorBoard UI as different colored curves.
E.g. users may evaluate on muliple datasets and return `summary_dict` as a
nested
dictionary.
```
{
"dataset": {
"loss": loss,
"accuracy": accuracy
},
"dataset2": {
"loss": loss2,
"accuracy": accuracy2
},
}
```
It will create two sub directories "dataset" and "dataset2" inside summary
root directory. And each directory write both "loss" and "accuracy"
summaries inside.
Args: Args:
items: a dictionary of `Tensors` for writing summaries. summary_dict: A dictionary of values. If any value in `summary_dict` is
itself a dictionary, then the function will recursively create
subdirectories with names given by the keys in the dictionary. The
Tensor values are summarized using the summary writer instance specific
to the parent relative path.
""" """
# TODO(rxsang): Support writing summaries with nested structure, so users
# can split the summaries into different directories for nicer visualization
# in Tensorboard, like train and eval metrics.
if not self._enabled: if not self._enabled:
return return
self._write_summaries(summary_dict)
with self.summary_writer.as_default(): def _write_summaries(self, summary_dict, relative_path=""):
for name, tensor in items.items(): for name, value in summary_dict.items():
self._summary_fn(name, tensor, step=self._global_step) if isinstance(value, dict):
self._write_summaries(
value, relative_path=os.path.join(relative_path, name))
else:
with self.summary_writer(relative_path).as_default():
self._summary_fn(name, value, step=self._global_step)
class Trigger(metaclass=abc.ABCMeta): class Trigger(metaclass=abc.ABCMeta):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment