Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
ModelZoo
ResNet50_tensorflow
Commits
1a21d1d3
Commit
1a21d1d3
authored
May 27, 2021
by
Dan Holtmann-Rice
Committed by
A. Unique TensorFlower
May 27, 2021
Browse files
Internal change
PiperOrigin-RevId: 376280713
parent
2d3c9afb
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
738 additions
and
9 deletions
+738
-9
orbit/__init__.py
orbit/__init__.py
+2
-0
orbit/actions.py
orbit/actions.py
+429
-0
orbit/actions_test.py
orbit/actions_test.py
+218
-0
orbit/controller.py
orbit/controller.py
+41
-8
orbit/controller_test.py
orbit/controller_test.py
+48
-1
No files found.
orbit/__init__.py
View file @
1a21d1d3
...
@@ -14,8 +14,10 @@
...
@@ -14,8 +14,10 @@
"""Defines exported symbols for the `orbit` package."""
"""Defines exported symbols for the `orbit` package."""
from
orbit
import
actions
from
orbit
import
utils
from
orbit
import
utils
from
orbit.controller
import
Action
from
orbit.controller
import
Controller
from
orbit.controller
import
Controller
from
orbit.runner
import
AbstractEvaluator
from
orbit.runner
import
AbstractEvaluator
...
...
orbit/actions.py
0 → 100644
View file @
1a21d1d3
# Copyright 2021 The Orbit Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defines an "action" abstraction for use with `orbit.Controller`.
"Actions" are simply arbitrary callables that are applied by the `Controller`
to the output of train steps (after each inner loop of `steps_per_loop` steps)
or an evaluation. This provides a hook mechanism, enabling things like reporting
metrics to Vizier, model exporting, additional logging, etc.
The basic `Action` abstraction (just a type alias) is defined in the
`controller` module. This `actions` module adds a `ConditionalAction` utility
class to make it easy to trigger actions conditionally based on reusable
predicates, as well as a small handful of predefined conditions/actions (in
particular, a `NewBestMetric` condition and an `ExportSavedModel` action).
One example of using actions to do metric-conditional export:
new_best_metric = orbit.actions.NewBestMetric('accuracy')
export_action = orbit.actions.ConditionalAction(
condition=lambda x: x['accuracy'] > 0.9 and new_best_metric(x),
action=orbit.actions.ExportSavedModel(
model,
orbit.actions.ExportFileManager(
base_name=f'{FLAGS.model_dir}/saved_model',
next_id_fn=trainer.global_step.numpy),
signatures=model.infer))
controller = orbit.Controller(
strategy=strategy,
trainer=trainer,
evaluator=evaluator,
eval_actions=[export_action],
global_step=trainer.global_step,
steps_per_loop=FLAGS.steps_per_loop,
checkpoint_manager=checkpoint_manager,
summary_interval=1000)
Note: In multi-client settings where each client runs its own `Controller`
instance, some care should be taken in deciding which clients should run certain
actions. Isolating actions to an individual client (say client 0) can be
achieved using `ConditionalAction` as follows:
client_0_actions = orbit.actions.ConditionalAction(
condition=lambda _: client_id() == 0,
action=[
...
])
In particular, the `NewBestMetric` condition may be used in multi-client
settings if all clients are guaranteed to compute the same metric (ensuring this
is up to client code, not Orbit). However, when saving metrics it may be helpful
to avoid unnecessary writes by setting the `write_value` parameter to `False`
for most clients.
"""
import
json
import
os
import
sys
from
typing
import
Any
,
Callable
,
Optional
,
Sequence
,
Union
import
uuid
from
orbit
import
controller
from
orbit
import
runner
from
orbit
import
utils
import
tensorflow
as
tf
Condition
=
Callable
[[
runner
.
Output
],
Union
[
bool
,
tf
.
Tensor
]]
def
_as_sequence
(
maybe_sequence
:
Union
[
Any
,
Sequence
[
Any
]])
->
Sequence
[
Any
]:
if
isinstance
(
maybe_sequence
,
Sequence
):
return
maybe_sequence
return
[
maybe_sequence
]
class
ConditionalAction
:
"""Represents an action that is only taken when a given condition is met.
This class is itself an `Action` (a callable that can be applied to train or
eval outputs), but is intended to make it easier to write modular and reusable
conditions by decoupling "when" something whappens (the condition) from "what"
happens (the action).
"""
def
__init__
(
self
,
condition
:
Condition
,
action
:
Union
[
controller
.
Action
,
Sequence
[
controller
.
Action
]],
):
"""Initializes the instance.
Args:
condition: A callable accepting train or eval outputs and returing a bool.
action: The action (or optionally sequence of actions) to perform when
`condition` is met.
"""
self
.
condition
=
condition
self
.
action
=
action
def
__call__
(
self
,
output
:
runner
.
Output
)
->
None
:
if
self
.
condition
(
output
):
for
action
in
_as_sequence
(
self
.
action
):
action
(
output
)
MetricFn
=
Callable
[[
runner
.
Output
],
Union
[
float
,
tf
.
Tensor
]]
class
NewBestMetric
:
"""Condition that is satisfied when a new best metric is achieved.
This class keeps track of the best metric value seen so far, optionally in a
persistent (preemption-safe) way.
Two methods are provided, which each satisfy the `Action` protocol: `test` for
only testing whether a new best metric is achieved by a given train/eval
output, and `commit`, which both tests and records the new best metric value
if it is achieved. These separate methods enable the same `NewBestMetric`
instance to be reused as a condition multiple times, and can also provide
additional preemption/failure safety. For example, to avoid updating the best
metric if a model export fails or is pre-empted:
new_best_metric = orbit.actions.NewBestMetric(
'accuracy', filename='/model/dir/best_metric')
action = orbit.actions.ConditionalAction(
condition=new_best_metric.test,
action=[
orbit.actions.ExportSavedModel(...),
new_best_metric.commit
])
The default `__call__` implementation is equivalent to `commit`.
This class is safe to use in multi-client settings if all clients can be
guaranteed to compute the same metric. However when saving metrics it may be
helpful to avoid unnecessary writes by setting the `write_value` parameter to
`False` for most clients.
Attributes:
metric: The metric passed to __init__ (may be a string key or a callable
that can be applied to train/eval output).
higher_is_better: Whether higher metric values are better.
"""
def
__init__
(
self
,
metric
:
Union
[
str
,
MetricFn
],
higher_is_better
:
bool
=
True
,
filename
:
Optional
[
str
]
=
None
,
write_metric
=
True
):
"""Initializes the instance.
Args:
metric: Either a string key name to use to look up a metric (assuming the
train/eval output is a dictionary), or a callable that accepts the
train/eval output and returns a metric value.
higher_is_better: Whether higher metric values are better. If `True`, a
new best metric is achieved when the metric value is strictly greater
than the previous best metric. If `False`, a new best metric is achieved
when the metric value is strictly less than the previous best metric.
filename: A filename to use for storage of the best metric value seen so
far, to allow peristence of the value across preemptions. If `None`
(default), values aren't persisted.
write_metric: If `filename` is set, this controls whether this instance
will write new best metric values to the file, or just read from the
file to obtain the initial value. Setting this to `False` for most
clients in some multi-client setups can avoid unnecessary file writes.
Has no effect if `filename` is `None`.
"""
self
.
metric
=
metric
self
.
higher_is_better
=
higher_is_better
float_max
=
sys
.
float_info
.
max
self
.
_best_value
=
JSONPersistedValue
(
initial_value
=-
float_max
if
higher_is_better
else
float_max
,
filename
=
filename
,
write_value
=
write_metric
)
def
__call__
(
self
,
output
:
runner
.
Output
)
->
bool
:
"""Tests `output` and updates the current best value if necessary.
This is equivalent to `commit` below.
Args:
output: The train or eval output to test.
Returns:
`True` if `output` contains a new best metric value, `False` otherwise.
"""
return
self
.
commit
(
output
)
def
metric_value
(
self
,
output
:
runner
.
Output
)
->
float
:
"""Computes the metric value for the given `output`."""
if
callable
(
self
.
metric
):
value
=
self
.
metric
(
output
)
else
:
value
=
output
[
self
.
metric
]
return
float
(
utils
.
get_value
(
value
))
@
property
def
best_value
(
self
)
->
float
:
"""Returns the best metric value seen so far."""
return
self
.
_best_value
.
read
()
def
test
(
self
,
output
:
runner
.
Output
)
->
bool
:
"""Tests `output` to see if it contains a new best metric value.
If `output` does contain a new best metric value, this method does *not*
save it (i.e., calling this method multiple times in a row with the same
`output` will continue to return `True`).
Args:
output: The train or eval output to test.
Returns:
`True` if `output` contains a new best metric value, `False` otherwise.
"""
metric_value
=
self
.
metric_value
(
output
)
if
self
.
higher_is_better
:
if
metric_value
>
self
.
best_value
:
return
True
else
:
# Lower is better.
if
metric_value
<
self
.
best_value
:
return
True
return
False
def
commit
(
self
,
output
:
runner
.
Output
)
->
bool
:
"""Tests `output` and updates the current best value if necessary.
Unlike `test` above, if `output` does contain a new best metric value, this
method *does* save it (i.e., subsequent calls to this method with the same
`output` will return `False`).
Args:
output: The train or eval output to test.
Returns:
`True` if `output` contains a new best metric value, `False` otherwise.
"""
if
self
.
test
(
output
):
self
.
_best_value
.
write
(
self
.
metric_value
(
output
))
return
True
return
False
class
JSONPersistedValue
:
"""Represents a value that is persisted via a file-based backing store.
The value must be JSON-serializable. Each time the value is updated, it will
be written to the backing file. It is only read from the file at
initialization.
"""
def
__init__
(
self
,
initial_value
:
Any
,
filename
:
str
,
write_value
:
bool
=
True
):
"""Initializes the instance.
Args:
initial_value: The initial value to use if no backing file exists or was
given. This must be a JSON-serializable value (possibly nested
combination of lists, dicts, and primitive values).
filename: The path to use for persistent storage of the value. This may be
`None`, in which case the value is not stable across preemptions.
write_value: If `True`, new values will be written to `filename` on calls
to `write()`. If `False`, `filename` is only read once to restore any
persisted value, and new values will not be written to it. This can be
useful in certain multi-client settings to avoid race conditions or
excessive file writes. If `filename` is `None`, this parameter has no
effect.
"""
self
.
_value
=
None
self
.
_filename
=
filename
self
.
_write_value
=
write_value
if
self
.
_filename
is
not
None
:
if
tf
.
io
.
gfile
.
exists
(
self
.
_filename
):
if
tf
.
io
.
gfile
.
stat
(
self
.
_filename
).
length
>
0
:
with
tf
.
io
.
gfile
.
GFile
(
self
.
_filename
,
'r'
)
as
f
:
self
.
_value
=
json
.
loads
(
f
.
read
())
elif
self
.
_write_value
:
tf
.
io
.
gfile
.
makedirs
(
os
.
path
.
dirname
(
self
.
_filename
))
if
self
.
_value
is
None
:
self
.
write
(
initial_value
)
def
read
(
self
):
"""Returns the value."""
return
self
.
_value
def
write
(
self
,
value
):
"""Writes the value, updating the backing store if one was provided."""
self
.
_value
=
value
if
self
.
_filename
is
not
None
and
self
.
_write_value
:
# To achieve atomic writes, we first write to a temporary file, and then
# rename it to `self._filename`.
tmp_filename
=
f
'
{
self
.
_filename
}
.tmp.
{
uuid
.
uuid4
().
hex
}
'
with
tf
.
io
.
gfile
.
GFile
(
tmp_filename
,
'w'
)
as
f
:
json
.
dump
(
self
.
_value
,
f
)
tf
.
io
.
gfile
.
rename
(
tmp_filename
,
self
.
_filename
,
overwrite
=
True
)
class
_CounterIdFn
:
"""Implements a counter-based ID function for `ExportFileManager`."""
def
__init__
(
self
,
base_name
:
str
):
filenames
=
tf
.
io
.
gfile
.
glob
(
f
'
{
base_name
}
-*'
)
max_counter
=
-
1
for
filename
in
filenames
:
try
:
_
,
file_number
=
filename
.
rsplit
(
'-'
,
maxsplit
=
1
)
max_counter
=
max
(
max_counter
,
int
(
file_number
))
except
ValueError
:
continue
self
.
value
=
max_counter
+
1
def
__call__
(
self
):
output
=
self
.
value
self
.
value
+=
1
return
output
class
ExportFileManager
:
"""Utility class that manages a group of files with a shared base name.
For actions like SavedModel exporting, there are potentially many different
file naming and cleanup strategies that may be desirable. This class provides
a basic interface allowing SavedModel export to be decoupled from these
details, and a default implementation that should work for many basic
scenarios. Users may subclass this class to alter behavior and define more
customized naming and cleanup strategies.
"""
def
__init__
(
self
,
base_name
:
str
,
max_to_keep
:
int
=
5
,
next_id_fn
:
Optional
[
Callable
[[],
int
]]
=
None
):
"""Initializes the instance.
Args:
base_name: A shared base name for file names generated by this class.
max_to_keep: The maximum number of files matching `base_name` to keep
after each call to `cleanup`. The most recent (as determined by file
modification time) `max_to_keep` files are preserved; the rest are
deleted. If < 0, all files are preserved.
next_id_fn: An optional callable that returns integer IDs to append to
base name (formatted as `'{base_name}-{id}'`). The order of integers is
used to sort files to determine the oldest ones deleted by `clean_up`.
If not supplied, a default ID based on an incrementing counter is used.
One common alternative maybe be to use the current global step count,
for instance passing `next_id_fn=global_step.numpy`.
"""
self
.
_base_name
=
base_name
self
.
_max_to_keep
=
max_to_keep
self
.
_next_id_fn
=
next_id_fn
or
_CounterIdFn
(
base_name
)
@
property
def
managed_files
(
self
):
"""Returns all files managed by this instance, in sorted order.
Returns:
The list of files matching the `base_name` provided when constructing this
`ExportFileManager` instance, sorted in increasing integer order of the
IDs returned by `next_id_fn`.
"""
def
id_key
(
name
):
_
,
id_num
=
name
.
rsplit
(
'-'
,
maxsplit
=
1
)
return
int
(
id_num
)
filenames
=
tf
.
io
.
gfile
.
glob
(
f
'
{
self
.
_base_name
}
-*'
)
return
sorted
(
filenames
,
key
=
id_key
)
def
clean_up
(
self
):
"""Cleans up old files matching `{base_name}-*`.
The most recent `max_to_keep` files are preserved.
"""
if
self
.
_max_to_keep
<
0
:
return
for
filename
in
self
.
managed_files
[:
-
self
.
_max_to_keep
]:
tf
.
io
.
gfile
.
rmtree
(
filename
)
def
next_name
(
self
)
->
str
:
"""Returns a new file name based on `base_name` and `next_id_fn()`."""
return
f
'
{
self
.
_base_name
}
-
{
self
.
_next_id_fn
()
}
'
class
ExportSavedModel
:
"""Action that exports the given model as a SavedModel."""
def
__init__
(
self
,
model
:
tf
.
Module
,
file_manager
:
ExportFileManager
,
signatures
,
options
:
Optional
[
tf
.
saved_model
.
SaveOptions
]
=
None
):
"""Initializes the instance.
Args:
model: The model to export.
file_manager: An instance of `ExportFileManager` (or a subclass), that
provides file naming and cleanup functionality.
signatures: The signatures to forward to `tf.saved_model.save()`.
options: Optional options to forward to `tf.saved_model.save()`.
"""
self
.
model
=
model
self
.
file_manager
=
file_manager
self
.
signatures
=
signatures
self
.
options
=
options
def
__call__
(
self
,
_
):
"""Exports the SavedModel."""
export_dir
=
self
.
file_manager
.
next_name
()
tf
.
saved_model
.
save
(
self
.
model
,
export_dir
,
self
.
signatures
,
self
.
options
)
self
.
file_manager
.
clean_up
()
orbit/actions_test.py
0 → 100644
View file @
1a21d1d3
# Copyright 2021 The Orbit Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for orbit.actions."""
import
os
from
orbit
import
actions
import
tensorflow
as
tf
def
_id_key
(
name
):
_
,
id_num
=
name
.
rsplit
(
'-'
,
maxsplit
=
1
)
return
int
(
id_num
)
def
_id_sorted_file_base_names
(
dir_path
):
return
sorted
(
tf
.
io
.
gfile
.
listdir
(
dir_path
),
key
=
_id_key
)
class
TestModel
(
tf
.
Module
):
def
__init__
(
self
):
self
.
value
=
tf
.
Variable
(
0
)
@
tf
.
function
(
input_signature
=
[])
def
__call__
(
self
):
return
self
.
value
class
ActionsTest
(
tf
.
test
.
TestCase
):
def
test_conditional_action
(
self
):
# Define a function to raise an AssertionError, since we can't in a lambda.
def
raise_assertion
(
arg
):
raise
AssertionError
(
str
(
arg
))
conditional_action
=
actions
.
ConditionalAction
(
condition
=
lambda
x
:
x
,
action
=
raise_assertion
)
conditional_action
(
False
)
# Nothing is raised.
with
self
.
assertRaises
(
AssertionError
)
as
ctx
:
conditional_action
(
True
)
self
.
assertEqual
(
ctx
.
exception
.
message
,
'True'
)
def
test_new_best_metric_higher_is_better
(
self
):
new_best_metric
=
actions
.
NewBestMetric
(
lambda
x
:
x
,
higher_is_better
=
True
)
self
.
assertTrue
(
new_best_metric
.
test
(
0.0
))
self
.
assertTrue
(
new_best_metric
.
commit
(
0.0
))
self
.
assertFalse
(
new_best_metric
.
test
(
0.0
))
self
.
assertTrue
(
new_best_metric
.
test
(
1.0
))
def
test_new_best_metric_lower_is_better
(
self
):
new_best_metric
=
actions
.
NewBestMetric
(
lambda
x
:
x
,
higher_is_better
=
False
)
self
.
assertTrue
(
new_best_metric
.
test
(
0.0
))
self
.
assertTrue
(
new_best_metric
.
commit
(
0.0
))
self
.
assertFalse
(
new_best_metric
.
test
(
0.0
))
self
.
assertTrue
(
new_best_metric
.
test
(
-
1.0
))
def
test_new_best_metric_persistence
(
self
):
backing_file
=
self
.
create_tempfile
()
new_best_metric
=
actions
.
NewBestMetric
(
lambda
x
:
x
,
higher_is_better
=
True
,
filename
=
backing_file
.
full_path
,
write_metric
=
False
)
self
.
assertTrue
(
new_best_metric
.
test
(
0.0
))
self
.
assertTrue
(
new_best_metric
.
commit
(
0.0
))
self
.
assertFalse
(
new_best_metric
.
test
(
0.0
))
new_best_metric
=
actions
.
NewBestMetric
(
lambda
x
:
x
,
higher_is_better
=
True
,
filename
=
backing_file
.
full_path
)
self
.
assertLess
(
new_best_metric
.
best_value
,
0.0
)
self
.
assertTrue
(
new_best_metric
.
commit
(
5.0
))
self
.
assertEqual
(
new_best_metric
.
best_value
,
5.0
)
new_best_metric
=
actions
.
NewBestMetric
(
lambda
x
:
x
,
higher_is_better
=
True
,
filename
=
backing_file
.
full_path
)
self
.
assertEqual
(
new_best_metric
.
best_value
,
5.0
)
def
test_json_persisted_value
(
self
):
tempfile
=
self
.
create_tempfile
().
full_path
value
=
{
'a'
:
1
,
'b'
:
2
}
persisted_value
=
actions
.
JSONPersistedValue
(
value
,
tempfile
)
# The inital value is used since tempfile is empty.
self
.
assertEqual
(
persisted_value
.
read
(),
value
)
persisted_value
=
actions
.
JSONPersistedValue
(
'ignored'
,
tempfile
)
# Initial value of 'ignored' is ignored, since there's a value in tempfile.
self
.
assertEqual
(
persisted_value
.
read
(),
value
)
value
=
[
1
,
2
,
3
]
persisted_value
.
write
(
value
)
# Now that a new value is written, it gets read on initialization.
persisted_value
=
actions
.
JSONPersistedValue
([
'also ignored'
],
tempfile
)
self
.
assertEqual
(
persisted_value
.
read
(),
value
)
# Writes can be disabled.
persisted_value
=
actions
.
JSONPersistedValue
(
'ignored'
,
tempfile
,
write_value
=
False
)
self
.
assertEqual
(
persisted_value
.
read
(),
value
)
persisted_value
.
write
(
"won't get persisted"
)
persisted_value
=
actions
.
JSONPersistedValue
(
'ignored'
,
tempfile
,
write_value
=
False
)
self
.
assertEqual
(
persisted_value
.
read
(),
value
)
def
test_json_persisted_value_create_dirs
(
self
):
tempfile
=
os
.
path
.
join
(
self
.
create_tempdir
().
full_path
,
'subdir/value'
)
value
=
{
'a'
:
1
,
'b'
:
2
}
# The directory is not created if write_value=False.
actions
.
JSONPersistedValue
(
value
,
tempfile
,
write_value
=
False
)
self
.
assertFalse
(
tf
.
io
.
gfile
.
exists
(
os
.
path
.
dirname
(
tempfile
)))
actions
.
JSONPersistedValue
(
value
,
tempfile
)
self
.
assertTrue
(
tf
.
io
.
gfile
.
exists
(
tempfile
))
def
test_export_file_manager_default_ids
(
self
):
directory
=
self
.
create_tempdir
()
base_name
=
os
.
path
.
join
(
directory
.
full_path
,
'basename'
)
manager
=
actions
.
ExportFileManager
(
base_name
,
max_to_keep
=
3
)
self
.
assertLen
(
tf
.
io
.
gfile
.
listdir
(
directory
.
full_path
),
0
)
directory
.
create_file
(
manager
.
next_name
())
manager
.
clean_up
()
# Shouldn't do anything...
self
.
assertLen
(
tf
.
io
.
gfile
.
listdir
(
directory
.
full_path
),
1
)
directory
.
create_file
(
manager
.
next_name
())
manager
.
clean_up
()
# Shouldn't do anything...
self
.
assertLen
(
tf
.
io
.
gfile
.
listdir
(
directory
.
full_path
),
2
)
directory
.
create_file
(
manager
.
next_name
())
manager
.
clean_up
()
# Shouldn't do anything...
self
.
assertLen
(
tf
.
io
.
gfile
.
listdir
(
directory
.
full_path
),
3
)
directory
.
create_file
(
manager
.
next_name
())
self
.
assertLen
(
tf
.
io
.
gfile
.
listdir
(
directory
.
full_path
),
4
)
self
.
assertEqual
(
_id_sorted_file_base_names
(
directory
.
full_path
),
[
'basename-0'
,
'basename-1'
,
'basename-2'
,
'basename-3'
])
manager
.
clean_up
()
# Should delete file with lowest ID.
self
.
assertEqual
(
_id_sorted_file_base_names
(
directory
.
full_path
),
[
'basename-1'
,
'basename-2'
,
'basename-3'
])
manager
=
actions
.
ExportFileManager
(
base_name
,
max_to_keep
=
3
)
self
.
assertEqual
(
os
.
path
.
basename
(
manager
.
next_name
()),
'basename-4'
)
def
test_export_file_manager_custom_ids
(
self
):
directory
=
self
.
create_tempdir
()
base_name
=
os
.
path
.
join
(
directory
.
full_path
,
'basename'
)
id_num
=
0
def
next_id
():
return
id_num
manager
=
actions
.
ExportFileManager
(
base_name
,
max_to_keep
=
2
,
next_id_fn
=
next_id
)
self
.
assertLen
(
tf
.
io
.
gfile
.
listdir
(
directory
.
full_path
),
0
)
id_num
=
30
directory
.
create_file
(
manager
.
next_name
())
self
.
assertLen
(
tf
.
io
.
gfile
.
listdir
(
directory
.
full_path
),
1
)
manager
.
clean_up
()
# Shouldn't do anything...
self
.
assertEqual
(
_id_sorted_file_base_names
(
directory
.
full_path
),
[
'basename-30'
])
id_num
=
200
directory
.
create_file
(
manager
.
next_name
())
self
.
assertLen
(
tf
.
io
.
gfile
.
listdir
(
directory
.
full_path
),
2
)
manager
.
clean_up
()
# Shouldn't do anything...
self
.
assertEqual
(
_id_sorted_file_base_names
(
directory
.
full_path
),
[
'basename-30'
,
'basename-200'
])
id_num
=
1000
directory
.
create_file
(
manager
.
next_name
())
self
.
assertLen
(
tf
.
io
.
gfile
.
listdir
(
directory
.
full_path
),
3
)
self
.
assertEqual
(
_id_sorted_file_base_names
(
directory
.
full_path
),
[
'basename-30'
,
'basename-200'
,
'basename-1000'
])
manager
.
clean_up
()
# Should delete file with lowest ID.
self
.
assertLen
(
tf
.
io
.
gfile
.
listdir
(
directory
.
full_path
),
2
)
self
.
assertEqual
(
_id_sorted_file_base_names
(
directory
.
full_path
),
[
'basename-200'
,
'basename-1000'
])
def
test_export_saved_model
(
self
):
directory
=
self
.
create_tempdir
()
base_name
=
os
.
path
.
join
(
directory
.
full_path
,
'basename'
)
file_manager
=
actions
.
ExportFileManager
(
base_name
,
max_to_keep
=
2
)
model
=
TestModel
()
export_action
=
actions
.
ExportSavedModel
(
model
,
file_manager
=
file_manager
,
signatures
=
model
.
__call__
)
model
.
value
.
assign
(
3
)
self
.
assertEqual
(
model
(),
3
)
self
.
assertEmpty
(
file_manager
.
managed_files
)
export_action
({})
self
.
assertLen
(
file_manager
.
managed_files
,
1
)
reloaded_model
=
tf
.
saved_model
.
load
(
file_manager
.
managed_files
[
-
1
])
self
.
assertEqual
(
reloaded_model
(),
3
)
model
.
value
.
assign
(
5
)
self
.
assertEqual
(
model
(),
5
)
export_action
({})
self
.
assertLen
(
file_manager
.
managed_files
,
2
)
reloaded_model
=
tf
.
saved_model
.
load
(
file_manager
.
managed_files
[
-
1
])
self
.
assertEqual
(
reloaded_model
(),
5
)
model
.
value
.
assign
(
7
)
self
.
assertEqual
(
model
(),
7
)
export_action
({})
self
.
assertLen
(
file_manager
.
managed_files
,
2
)
# Still 2, due to clean up.
reloaded_model
=
tf
.
saved_model
.
load
(
file_manager
.
managed_files
[
-
1
])
self
.
assertEqual
(
reloaded_model
(),
7
)
if
__name__
==
'__main__'
:
tf
.
test
.
main
()
orbit/controller.py
View file @
1a21d1d3
...
@@ -17,7 +17,7 @@
...
@@ -17,7 +17,7 @@
import
pprint
import
pprint
import
time
import
time
from
typing
import
Callable
,
Optional
,
Union
from
typing
import
Callable
,
List
,
Optional
,
Union
from
absl
import
logging
from
absl
import
logging
...
@@ -46,6 +46,9 @@ def _format_output(output, indent=4):
...
@@ -46,6 +46,9 @@ def _format_output(output, indent=4):
return
"
\n
"
+
"
\n
"
.
join
(
lines
)
return
"
\n
"
+
"
\n
"
.
join
(
lines
)
Action
=
Callable
[[
runner
.
Output
],
None
]
class
Controller
:
class
Controller
:
"""Class that controls the outer loop of model training and evaluation.
"""Class that controls the outer loop of model training and evaluation.
...
@@ -53,10 +56,9 @@ class Controller:
...
@@ -53,10 +56,9 @@ class Controller:
loops are implemented by users in the form of `AbstractTrainer` and
loops are implemented by users in the form of `AbstractTrainer` and
`AbstractEvaluator` subclasses, and define how to run a given number of
`AbstractEvaluator` subclasses, and define how to run a given number of
training or evaluation steps. The outer loop is provided by this `Controller`,
training or evaluation steps. The outer loop is provided by this `Controller`,
and interleaves calls to the user provided inner loops with additional actions
and interleaves calls to the user-provided inner loops with additional actions
such as saving checkpoints, running evaluations, and writing summaries
such as saving checkpoints, running evaluations, writing summaries, as well as
(depending on the arguments passed to `Controller.__init__` and the method
(optionally) user provided `Action`s (see below).
being called).
There are four top-level "outer loops" provided:
There are four top-level "outer loops" provided:
...
@@ -70,6 +72,15 @@ class Controller:
...
@@ -70,6 +72,15 @@ class Controller:
training and evaluation use cases, the internal details and method
training and evaluation use cases, the internal details and method
implementations are also intended to be simple enough to make subclassing or
implementations are also intended to be simple enough to make subclassing or
other custom outer loop implementations easy to achieve.
other custom outer loop implementations easy to achieve.
Some additional customization can be achieved by supplying `train_actions` or
`eval_actions` when constructing the `Controller`. These are just lists of
arbitrary callables that are applied by the `Controller` to the output of
train steps (after each inner loop of `steps_per_loop` steps) or an
evaluation. This provides a hook mechanism, enabling things like reporting
metrics to Vizier, model exporting, additional logging, etc. See the
`orbit.actions` package for a small handful of predefined actions and some
utility classes that may be useful in defining your own.
"""
"""
def
__init__
(
def
__init__
(
...
@@ -79,6 +90,9 @@ class Controller:
...
@@ -79,6 +90,9 @@ class Controller:
trainer
:
Optional
[
runner
.
AbstractTrainer
]
=
None
,
trainer
:
Optional
[
runner
.
AbstractTrainer
]
=
None
,
evaluator
:
Optional
[
runner
.
AbstractEvaluator
]
=
None
,
evaluator
:
Optional
[
runner
.
AbstractEvaluator
]
=
None
,
strategy
:
Optional
[
tf
.
distribute
.
Strategy
]
=
None
,
strategy
:
Optional
[
tf
.
distribute
.
Strategy
]
=
None
,
# Actions
train_actions
:
Optional
[
List
[
Action
]]
=
None
,
eval_actions
:
Optional
[
List
[
Action
]]
=
None
,
# Train related
# Train related
steps_per_loop
:
Optional
[
int
]
=
None
,
steps_per_loop
:
Optional
[
int
]
=
None
,
checkpoint_manager
:
Optional
[
tf
.
train
.
CheckpointManager
]
=
None
,
checkpoint_manager
:
Optional
[
tf
.
train
.
CheckpointManager
]
=
None
,
...
@@ -86,7 +100,8 @@ class Controller:
...
@@ -86,7 +100,8 @@ class Controller:
summary_interval
:
Optional
[
int
]
=
None
,
summary_interval
:
Optional
[
int
]
=
None
,
summary_dir
:
Optional
[
str
]
=
None
,
summary_dir
:
Optional
[
str
]
=
None
,
# Evaluation related
# Evaluation related
eval_summary_dir
:
Optional
[
str
]
=
None
):
eval_summary_dir
:
Optional
[
str
]
=
None
,
):
"""Initializes a `Controller` instance.
"""Initializes a `Controller` instance.
Note that if `checkpoint_manager` is provided and there are checkpoints in
Note that if `checkpoint_manager` is provided and there are checkpoints in
...
@@ -110,6 +125,12 @@ class Controller:
...
@@ -110,6 +125,12 @@ class Controller:
strategy: An instance of `tf.distribute.Strategy`. If not provided, the
strategy: An instance of `tf.distribute.Strategy`. If not provided, the
strategy will be initialized from the current in-scope strategy using
strategy will be initialized from the current in-scope strategy using
`tf.distribute.get_strategy()`.
`tf.distribute.get_strategy()`.
train_actions: An optional list of `orbit.Action`s to call after each
block of `steps_per_loop` training steps are run. These will be called
with the output of `trainer.train`.
eval_actions: An optional list of `orbit.Action`s to call after each
evaluation. These will be called with the output of
`evaluator.evaluate`.
steps_per_loop: The number of steps to run in each inner loop of training
steps_per_loop: The number of steps to run in each inner loop of training
(passed as the `num_steps` parameter of `trainer.train`).
(passed as the `num_steps` parameter of `trainer.train`).
checkpoint_manager: An instance of `tf.train.CheckpointManager`. If
checkpoint_manager: An instance of `tf.train.CheckpointManager`. If
...
@@ -138,6 +159,7 @@ class Controller:
...
@@ -138,6 +159,7 @@ class Controller:
"""
"""
if
trainer
is
None
and
evaluator
is
None
:
if
trainer
is
None
and
evaluator
is
None
:
raise
ValueError
(
"`trainer` and `evaluator` should not both be `None`."
)
raise
ValueError
(
"`trainer` and `evaluator` should not both be `None`."
)
if
trainer
is
not
None
:
if
trainer
is
not
None
:
if
steps_per_loop
is
None
:
if
steps_per_loop
is
None
:
raise
ValueError
(
raise
ValueError
(
...
@@ -163,6 +185,9 @@ class Controller:
...
@@ -163,6 +185,9 @@ class Controller:
self
.
strategy
=
strategy
or
tf
.
distribute
.
get_strategy
()
self
.
strategy
=
strategy
or
tf
.
distribute
.
get_strategy
()
self
.
train_actions
=
train_actions
or
[]
self
.
eval_actions
=
eval_actions
or
[]
self
.
global_step
=
global_step
self
.
global_step
=
global_step
self
.
checkpoint_manager
=
checkpoint_manager
self
.
checkpoint_manager
=
checkpoint_manager
...
@@ -255,9 +280,13 @@ class Controller:
...
@@ -255,9 +280,13 @@ class Controller:
with
self
.
eval_summary_manager
.
summary_writer
().
as_default
():
with
self
.
eval_summary_manager
.
summary_writer
().
as_default
():
steps_tensor
=
tf
.
convert_to_tensor
(
steps
,
dtype
=
tf
.
int32
)
steps_tensor
=
tf
.
convert_to_tensor
(
steps
,
dtype
=
tf
.
int32
)
eval_output
=
self
.
evaluator
.
evaluate
(
steps_tensor
)
eval_output
=
self
.
evaluator
.
evaluate
(
steps_tensor
)
eval_output
=
tf
.
nest
.
map_structure
(
utils
.
get_value
,
eval_output
or
{})
elapsed
=
time
.
time
()
-
start
elapsed
=
time
.
time
()
-
start
eval_output
=
eval_output
or
{}
for
action
in
self
.
eval_actions
:
action
(
eval_output
)
eval_output
=
tf
.
nest
.
map_structure
(
utils
.
get_value
,
eval_output
)
_log
(
f
" eval | step:
{
current_step
:
6
d
}
| "
_log
(
f
" eval | step:
{
current_step
:
6
d
}
| "
f
"eval time:
{
elapsed
:
6.1
f
}
sec | "
f
"eval time:
{
elapsed
:
6.1
f
}
sec | "
f
"output:
{
_format_output
(
eval_output
)
}
"
)
f
"output:
{
_format_output
(
eval_output
)
}
"
)
...
@@ -408,7 +437,6 @@ class Controller:
...
@@ -408,7 +437,6 @@ class Controller:
with
tf
.
summary
.
record_if
(
should_record
):
with
tf
.
summary
.
record_if
(
should_record
):
num_steps_tensor
=
tf
.
convert_to_tensor
(
num_steps
,
dtype
=
tf
.
int32
)
num_steps_tensor
=
tf
.
convert_to_tensor
(
num_steps
,
dtype
=
tf
.
int32
)
train_output
=
self
.
trainer
.
train
(
num_steps_tensor
)
train_output
=
self
.
trainer
.
train
(
num_steps_tensor
)
train_output
=
tf
.
nest
.
map_structure
(
utils
.
get_value
,
train_output
or
{})
# Verify that global_step was updated properly, then update current_step.
# Verify that global_step was updated properly, then update current_step.
expected_step
=
current_step
+
num_steps
expected_step
=
current_step
+
num_steps
...
@@ -420,6 +448,11 @@ class Controller:
...
@@ -420,6 +448,11 @@ class Controller:
logging
.
warning
(
message
)
logging
.
warning
(
message
)
return
return
train_output
=
train_output
or
{}
for
action
in
self
.
train_actions
:
action
(
train_output
)
train_output
=
tf
.
nest
.
map_structure
(
utils
.
get_value
,
train_output
)
current_step
=
expected_step
current_step
=
expected_step
steps_per_second
=
self
.
step_timer
.
steps_per_second
()
steps_per_second
=
self
.
step_timer
.
steps_per_second
()
_log
(
f
"train | step:
{
current_step
:
6
d
}
| "
_log
(
f
"train | step:
{
current_step
:
6
d
}
| "
...
...
orbit/controller_test.py
View file @
1a21d1d3
...
@@ -583,7 +583,7 @@ class ControllerTest(tf.test.TestCase, parameterized.TestCase):
...
@@ -583,7 +583,7 @@ class ControllerTest(tf.test.TestCase, parameterized.TestCase):
test_runner
=
TestRunner
()
test_runner
=
TestRunner
()
class
EarlyStopController
(
controller
.
Controller
):
class
EarlyStopController
(
controller
.
Controller
):
"""A subclass of Controller supports early stopping."""
"""A subclass of Controller
that
supports early stopping."""
def
train_and_evaluate
(
self
,
def
train_and_evaluate
(
self
,
train_steps
:
int
=
None
,
train_steps
:
int
=
None
,
...
@@ -724,5 +724,52 @@ class ControllerTest(tf.test.TestCase, parameterized.TestCase):
...
@@ -724,5 +724,52 @@ class ControllerTest(tf.test.TestCase, parameterized.TestCase):
summaries_with_matching_keyword
(
summaries_with_matching_keyword
(
"accuracy"
,
os
.
path
.
join
(
self
.
model_dir
,
"dataset2"
)))
"accuracy"
,
os
.
path
.
join
(
self
.
model_dir
,
"dataset2"
)))
def
test_actions
(
self
):
test_runner
=
TestRunner
()
checkpoint
=
tf
.
train
.
Checkpoint
(
model
=
test_runner
.
model
,
optimizer
=
test_runner
.
optimizer
)
checkpoint_manager
=
tf
.
train
.
CheckpointManager
(
checkpoint
,
self
.
model_dir
,
max_to_keep
=
None
,
step_counter
=
test_runner
.
global_step
,
checkpoint_interval
=
10
)
class
OutputRecorderAction
:
"""Simple `Action` that just saves the outputs passed to `__call__`."""
def
__init__
(
self
):
self
.
outputs
=
[]
def
__call__
(
self
,
output
):
self
.
outputs
.
append
(
output
)
train_output_recorder
=
OutputRecorderAction
()
eval_output_recorder
=
OutputRecorderAction
()
test_controller
=
controller
.
Controller
(
trainer
=
test_runner
,
evaluator
=
test_runner
,
train_actions
=
[
train_output_recorder
],
eval_actions
=
[
eval_output_recorder
],
global_step
=
test_runner
.
global_step
,
steps_per_loop
=
2
,
summary_dir
=
os
.
path
.
join
(
self
.
model_dir
,
"summaries/train"
),
checkpoint_manager
=
checkpoint_manager
,
eval_summary_dir
=
os
.
path
.
join
(
self
.
model_dir
,
"summaries/eval"
))
test_controller
.
train_and_evaluate
(
train_steps
=
10
,
eval_steps
=
2
,
eval_interval
=
6
)
self
.
assertLen
(
train_output_recorder
.
outputs
,
5
)
for
output
in
train_output_recorder
.
outputs
:
self
.
assertIn
(
"loss"
,
output
)
self
.
assertGreaterEqual
(
output
[
"loss"
],
0
)
self
.
assertLen
(
eval_output_recorder
.
outputs
,
2
)
for
output
in
eval_output_recorder
.
outputs
:
self
.
assertIn
(
"eval_loss"
,
output
)
self
.
assertGreaterEqual
(
output
[
"eval_loss"
],
0
)
if
__name__
==
"__main__"
:
if
__name__
==
"__main__"
:
tf
.
test
.
main
()
tf
.
test
.
main
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment