"ml/vscode:/vscode.git/clone" did not exist on "f67a6df110cfa37157bb2f393f56b60c3eff3180"
Commit bfc36ef8 authored by A. Unique TensorFlower's avatar A. Unique TensorFlower
Browse files

Internal change

PiperOrigin-RevId: 421210856
parent 4ccce0d4
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Compound layers, which are composition of common layers."""
import enum
from typing import Callable, Optional, Text, Tuple, Union
import pyglove as pg
from pyglove.tensorflow import keras
from pyglove.tensorflow import selections
from pyglove.tensorflow.keras import layers
from pyglove.tensorflow.keras.layers import modeling_utils
import tensorflow as tf
class OpOrder(enum.Enum):
"""Enum for operation order."""
# Order in a sequence of operation, normalization and activation.
OP_NORM_ACT = 0
# Order in a sequence of operation, activation and normalization.
OP_ACT_NORM = 1
# Order in a sequence of activation, operation and normalization.
ACT_OP_NORM = 2
def _op_sequence(op: tf.keras.layers.Layer,
norm: Optional[tf.keras.layers.Layer],
activation: Optional[tf.keras.layers.Layer],
op_order: OpOrder,
name: Optional[Text] = None):
"""Create a sequence of conv, norm, activation layers according the op_order.
Args:
op: A convolutional or linear layer.
norm: An optional normalization layer.
activation: An optional activation layer.
op_order: A string of enum 'op-norm-activation', 'op-activation-norm' or
'activation-op-norm'.
name: Name of the graph block.
Returns:
`layer` if `norm` and `activation` are None, or a sequence of `layer`,
`norm`, `activation` ordered according to the value of `op_order`.
"""
if op_order == OpOrder.OP_NORM_ACT:
net = [op, norm, activation]
elif op_order == OpOrder.OP_ACT_NORM:
net = [op, activation, norm]
elif op_order == OpOrder.ACT_OP_NORM:
net = [activation, op, norm]
else:
raise ValueError('Unsupported OpOrder %r' % op_order)
net = [l for l in net if l is not None]
if len(net) == 1:
return net[0]
return layers.Sequential(net, name=name)
Filters = Union[int, selections.IntSelection, Callable[[tf.Tensor], tf.Tensor]]
# The kernel selection should follow the following value specs
# - pg.typing.Int(min_value=1),
# - pg.typing.Tuple([pg.typing.Int(min_value=1),pg.typing.Int(min_value=1)])
KernelSize = Union[int, Tuple[int, int], selections.IntSelection,
selections.Selection]
@layers.compound
def conv2d(filters: Filters,
kernel_size: KernelSize,
strides: Union[int, Tuple[int, int]] = (1, 1),
padding: Text = 'same',
groups: Union[int, selections.IntSelection] = 1,
kernel_initializer='glorot_uniform',
kernel_regularizer=None,
use_bias: bool = True,
bias_initializer='zeros',
bias_regularizer=None,
normalization: Optional[tf.keras.layers.Layer] = None,
activation: Optional[tf.keras.layers.Layer] = None,
op_order: OpOrder = OpOrder.OP_NORM_ACT,
data_format: Text = 'channels_last',
name: Optional[Text] = None):
"""Create a Conv2D-Normalization-Activation layer."""
if not selections.is_fixed(kernel_size):
candidates = []
for ks in selections.selection_candidates(kernel_size):
candidates.append(
conv2d(
kernel_size=ks,
filters=filters,
strides=strides,
groups=groups,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
use_bias=use_bias,
bias_initializer=bias_initializer,
bias_regularizer=bias_regularizer,
normalization=normalization,
activation=activation,
data_format=data_format,
op_order=op_order,
name='branch_{:}'.format(len(candidates))))
return layers.Switch(
candidates=candidates,
selected_index=selections.selection_index(kernel_size),
name=name)
if not selections.is_fixed(groups):
candidates = []
for gs in selections.selection_candidates(groups):
candidates.append(
conv2d(
kernel_size=kernel_size,
filters=filters,
strides=strides,
groups=gs,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
use_bias=use_bias,
bias_initializer=bias_initializer,
bias_regularizer=bias_regularizer,
normalization=normalization,
activation=activation,
op_order=op_order,
name='group_branch_{:}'.format(len(candidates))))
return layers.Switch(
candidates=candidates,
selected_index=selections.selection_index(groups),
name=name)
conv = layers.Conv2D(
filters=filters,
kernel_size=kernel_size,
strides=strides,
padding=padding,
groups=groups,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
use_bias=use_bias,
bias_initializer=bias_initializer,
bias_regularizer=bias_regularizer,
data_format=data_format,
name='conv2d')
if normalization is not None:
normalization = normalization.clone(override={'name': 'normalization'})
return _op_sequence(conv, normalization, activation, op_order)
@layers.compound
def depthwise_conv2d(
kernel_size: KernelSize,
strides: Union[int, Tuple[int, int]] = (1, 1),
padding: Text = 'same',
depthwise_initializer='glorot_uniform',
depthwise_regularizer=None,
use_bias: bool = True,
bias_initializer='zeros',
bias_regularizer=None,
normalization: Optional[tf.keras.layers.Layer] = None,
activation: Optional[tf.keras.layers.Layer] = None,
op_order: OpOrder = OpOrder.OP_NORM_ACT,
data_format: Text = 'channels_last',
name: Optional[Text] = None):
"""Creates a DepthwiseConv2D-Normalization-Activation layer."""
if not selections.is_fixed(kernel_size):
candidates = []
for i, ks in enumerate(selections.selection_candidates(kernel_size)):
candidates.append(
depthwise_conv2d(
kernel_size=ks,
strides=strides,
depthwise_initializer=depthwise_initializer,
depthwise_regularizer=depthwise_regularizer,
use_bias=use_bias,
bias_initializer=bias_initializer,
bias_regularizer=bias_regularizer,
normalization=normalization,
activation=activation,
op_order=op_order,
data_format=data_format,
name='branch_%d' % i))
return layers.Switch(
candidates=candidates,
selected_index=selections.selection_index(kernel_size),
name=name)
depthwise = layers.DepthwiseConv2D(
kernel_size=kernel_size,
strides=strides,
padding=padding,
depthwise_initializer=depthwise_initializer,
depthwise_regularizer=depthwise_regularizer,
use_bias=use_bias,
bias_initializer=bias_initializer,
bias_regularizer=bias_regularizer,
data_format=data_format,
name='depthwise_conv2d')
if normalization is not None:
normalization = normalization.clone(override={'name': 'normalization'})
return _op_sequence(depthwise, normalization, activation, op_order)
@pg.symbolize
def _expand_filters(
input_filters_mask: tf.Tensor,
is_input_filters_masked: bool,
expansion_factor: float) -> Tuple[tf.Tensor, bool]:
"""Returns input filters mask multiplied by a factor."""
assert input_filters_mask.shape.rank == 1, input_filters_mask
output_filters_mask = tf.sequence_mask(
tf.math.reduce_sum(
tf.cast(input_filters_mask, tf.dtypes.int32)) * expansion_factor,
input_filters_mask.shape[-1] * expansion_factor)
return output_filters_mask, is_input_filters_masked
@layers.compound
def inverted_bottleneck(
filters: Filters,
kernel_size: KernelSize,
expansion_factor: Union[int, selections.IntSelection] = 1,
strides: Union[int, Tuple[int, int]] = (1, 1),
normalization: tf.keras.layers.Layer = layers.BatchNormalization(),
activation: tf.keras.layers.Layer = layers.ReLU(),
kernel_initializer='glorot_uniform',
kernel_regularizer=None,
depthwise_initializer='glorot_uniform',
depthwise_regularizer=None,
post_expansion: Optional[tf.keras.layers.Layer] = None,
post_depthwise: Optional[tf.keras.layers.Layer] = None,
post_projection: Optional[tf.keras.layers.Layer] = None,
collapsed: bool = False,
data_format: Text = 'channels_last',
name: Optional[Text] = None):
"""Creates inverted bottleneck layer.
Args:
filters: output filters
kernel_size: kernel size for the depthwise Conv2D.
expansion_factor: The filters multiplier for the first Conv2D. If 1, the
first Conv2D will be omitted.
strides: Strides for the depthwise Conv2D.
normalization: An optional normalization layer.
activation: An optional activation layer.
kernel_initializer: Kernel initializer used for Conv2D units.
kernel_regularizer: Kernel regularizer for Conv2D units.
depthwise_initializer: Kernel initializer used for depthwise Conv2D units.
depthwise_regularizer: Kernel regularizer for depthwise Conv2D units.
post_expansion: An optional layer that will be inserted after the first
Conv2D.
post_depthwise: An optional layer that will be inserted afther the depthwise
Conv2D.
post_projection: An optional layer that will be inserted after the last
Conv2D.
collapsed: If True, graph will collapse at convolutional units
level on different kernel sizes.
data_format: Data format used for Conv2D and depthwise Conv2D.
name: Name of the layer.
Returns:
An inverted bottleneck layer as a compound layer.
"""
if (not selections.is_fixed(expansion_factor)
and 1 in selections.selection_candidates(expansion_factor)):
raise ValueError(
'Tunable `expansion_factor` with candidates 1 and values greater than 1'
'is not supported: %r.' % expansion_factor)
if not selections.is_fixed(kernel_size) and not collapsed:
candidates = []
for i, ks in enumerate(selections.selection_candidates(kernel_size)):
candidates.append(inverted_bottleneck(
kernel_size=ks,
filters=filters,
expansion_factor=expansion_factor,
strides=strides,
normalization=normalization,
activation=activation,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
depthwise_initializer=depthwise_initializer,
depthwise_regularizer=depthwise_regularizer,
post_expansion=post_expansion,
post_depthwise=post_depthwise,
post_projection=post_projection,
data_format=data_format,
name='branch%d' % i))
return layers.Switch(
candidates=candidates,
selected_index=selections.selection_index(kernel_size),
name=name)
if expansion_factor != 1:
children = [
conv2d( # pylint: disable=unexpected-keyword-arg
filters=_expand_filters( # pylint: disable=no-value-for-parameter
expansion_factor=expansion_factor),
kernel_size=(1, 1),
strides=(1, 1),
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
use_bias=False,
normalization=normalization,
activation=activation,
data_format=data_format,
name='expansion')
]
else:
children = []
if post_expansion:
children.append(post_expansion)
children.append(depthwise_conv2d( # pylint: disable=unexpected-keyword-arg
kernel_size=kernel_size,
strides=strides,
depthwise_initializer=depthwise_initializer,
depthwise_regularizer=depthwise_regularizer,
use_bias=False,
normalization=normalization,
activation=activation,
data_format=data_format,
name='depthwise'))
if post_depthwise:
children.append(post_depthwise)
children.append(conv2d( # pylint: disable=unexpected-keyword-arg
filters=filters,
kernel_size=(1, 1),
strides=(1, 1),
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
use_bias=False,
normalization=normalization,
activation=None,
data_format=data_format,
name='projection'))
if post_projection:
children.append(post_projection)
return layers.Sequential(children)
def inverted_bottleneck_with_se(
filters: Filters,
kernel_size: KernelSize,
expansion_factor: Union[int, selections.IntSelection] = 1,
strides: Union[int, Tuple[int, int]] = (1, 1),
se_ratio: Optional[float] = None,
filters_base: int = 8,
normalization: tf.keras.layers.Layer = layers.BatchNormalization(),
activation: tf.keras.layers.Layer = layers.ReLU(),
kernel_initializer='glorot_uniform',
kernel_regularizer=None,
depthwise_initializer='glorot_uniform',
depthwise_regularizer=None,
name: Optional[Text] = None):
"""An inverted bottleneck layer with possibly squeeze excite."""
post_depthwise = None
if se_ratio:
post_depthwise = SqueezeExcitation(
ratio=se_ratio,
filters_base=filters_base,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
name=name+'_se')
return inverted_bottleneck(
filters=filters,
kernel_size=kernel_size,
expansion_factor=expansion_factor,
strides=strides,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
depthwise_initializer=depthwise_initializer,
depthwise_regularizer=depthwise_regularizer,
normalization=normalization,
activation=activation,
post_depthwise=post_depthwise,
name=name)
@layers.compound
def fused_inverted_bottleneck(
filters: Filters,
kernel_size: KernelSize,
expansion_factor: Union[int, selections.IntSelection] = 1,
strides: Union[int, Tuple[int, int]] = (1, 1),
normalization: tf.keras.layers.Layer = layers.BatchNormalization(),
activation: tf.keras.layers.Layer = layers.ReLU(),
kernel_initializer='glorot_uniform',
kernel_regularizer=None,
post_fusion: Optional[tf.keras.layers.Layer] = None,
post_projection: Optional[tf.keras.layers.Layer] = None,
collapsed: bool = False,
data_format: Text = 'channels_last',
name: Optional[Text] = None):
"""Fused inverted bottleneck.
Reference: https://arxiv.org/pdf/2003.02838.pdf
Args:
filters: output filters
kernel_size: kernel size for the depthwise Conv2D.
expansion_factor: The filters multiplier for the first Conv2D. If 1, the
first Conv2D will be omitted.
strides: Strides for the depthwise Conv2D.
normalization: An optional normalization layer.
activation: An optional activation layer.
kernel_initializer: Kernel initializer used for Conv2D units.
kernel_regularizer: Kernel regularizer for Conv2D units.
post_fusion: An optional layer that will be inserted after the first
Conv2D.
post_projection: An optional layer that will be inserted after the last
Conv2D.
collapsed: If True, graph will collapse at convolutional units
level on different kernel sizes.
data_format: Data format used for Conv2D and depthwise Conv2D.
name: Name of the layer.
Returns:
A fused inverted bottleneck layer as a compound layer.
"""
if (not selections.is_fixed(expansion_factor)
and 1 in selections.selection_candidates(expansion_factor)):
raise ValueError(
'Tunable `expansion_factor` with candidates 1 and values greater than 1'
'is not supported: %r.' % expansion_factor)
if not selections.is_fixed(kernel_size) and not collapsed:
candidates = []
for i, ks in enumerate(selections.selection_candidates(kernel_size)):
candidates.append(fused_inverted_bottleneck(
kernel_size=ks,
filters=filters,
expansion_factor=expansion_factor,
strides=strides,
normalization=normalization,
activation=activation,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
post_fusion=post_fusion,
post_projection=post_projection,
data_format=data_format,
name='branch%d' % i))
return layers.Switch(
candidates=candidates,
selected_index=selections.selection_index(kernel_size),
name=name)
if expansion_factor != 1:
children = [
conv2d( # pylint: disable=unexpected-keyword-arg
filters=_expand_filters( # pylint: disable=no-value-for-parameter
expansion_factor=expansion_factor),
kernel_size=kernel_size,
strides=strides,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
use_bias=False,
normalization=normalization,
activation=activation,
data_format=data_format,
name='expansion')
]
else:
children = []
if post_fusion:
children.append(post_fusion)
children.append(conv2d( # pylint: disable=unexpected-keyword-arg
filters=filters,
kernel_size=(1, 1),
strides=(1, 1),
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
use_bias=False,
normalization=normalization,
activation=None,
data_format=data_format,
name='fusion'))
if post_projection:
children.append(post_projection)
return layers.Sequential(children)
@pg.symbolize
def _scale_filters(
input_filters_mask: tf.Tensor,
is_input_filters_masked: bool,
ratio: Union[float, selections.FloatSelection],
base: int) -> Tuple[tf.Tensor, bool]:
"""Returns input filters mask multiplied by a factor."""
assert input_filters_mask.shape.rank == 1, input_filters_mask
max_filters = modeling_utils.scale_filters(
int(input_filters_mask.shape[-1]), ratio, base)
effective_filters = modeling_utils.scale_filters(
tf.math.reduce_sum(tf.cast(input_filters_mask, tf.dtypes.int32)),
ratio, base)
output_filters_mask = tf.sequence_mask(effective_filters, max_filters)
return output_filters_mask, is_input_filters_masked
@layers.compound
def tucker_bottleneck(
filters: Filters,
kernel_size: Union[int, Tuple[int, int]],
input_scale_ratio: Union[float, selections.FloatSelection],
output_scale_ratio: Union[float, selections.FloatSelection],
strides: Union[int, Tuple[int, int]] = (1, 1),
normalization: tf.keras.layers.Layer = layers.BatchNormalization(),
activation: tf.keras.layers.Layer = layers.ReLU(),
kernel_initializer='glorot_uniform',
kernel_regularizer=None,
data_format: Text = 'channels_last',
scale_filters_base: int = 8):
"""Fused inverted bottleneck.
Reference: https://arxiv.org/pdf/2003.02838.pdf
Args:
filters: output filters
kernel_size: kernel size for the depthwise Conv2D.
input_scale_ratio: The filters scale ratio for the first Conv2D.
If 0, the first Conv2D will be omitted.
output_scale_ratio: The filters scale ratio for the last Conv2D.
strides: Strides for the depthwise Conv2D.
normalization: An optional normalization layer.
activation: An optional activation layer.
kernel_initializer: Kernel initializer used for Conv2D units.
kernel_regularizer: Kernel regularizer for Conv2D units.
data_format: Data format used for Conv2D and depthwise Conv2D.
scale_filters_base: An integer as the base for scaling the filters. The
scaled filters will always be multiple of the base.
Returns:
A fused inverted bottleneck layer as a compound layer.
"""
if (not selections.is_fixed(input_scale_ratio)
and 0 in selections.selection_candidates(input_scale_ratio)):
raise ValueError(
'Tunable `input_scale_ratio` with candidates 0 and values greater than '
'0 is not supported: %r.' % input_scale_ratio)
if (not selections.is_fixed(input_scale_ratio)
or selections.selection_value(input_scale_ratio) > 0):
children = [
conv2d( # pylint: disable=unexpected-keyword-arg
filters=_scale_filters( # pylint: disable=no-value-for-parameter
ratio=input_scale_ratio, base=scale_filters_base),
kernel_size=(1, 1),
strides=(1, 1),
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
use_bias=False,
normalization=normalization,
activation=activation,
data_format=data_format,
name='input_expansion')
]
else:
children = []
children.append(conv2d( # pylint: disable=unexpected-keyword-arg
filters=modeling_utils.scale_filters(
filters, output_scale_ratio, scale_filters_base),
kernel_size=kernel_size,
strides=strides,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
use_bias=False,
normalization=normalization,
activation=activation,
data_format=data_format,
name='output_expansion'))
children.append(conv2d( # pylint: disable=unexpected-keyword-arg
filters=filters,
kernel_size=(1, 1),
strides=(1, 1),
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
use_bias=False,
normalization=normalization,
activation=None, # We do not have activation on the last Conv2D.
data_format=data_format,
name='projection'))
return layers.Sequential(children)
class ScaleFiltersSaver(object):
"""Scale filters based on ratio and base, while save the input filters."""
def __init__(self, ratio, base):
self._input_filters_mask = None
self._is_input_filters_masked = None
self._call = _scale_filters(ratio=ratio, base=base) # pylint: disable=no-value-for-parameter
@property
def value(self):
if self._input_filters_mask is None:
raise ValueError('self._input_filters is None.')
return self._input_filters_mask, self._is_input_filters_masked
@property
def call(self):
return self._call
def __call__(self,
input_filters_mask: tf.Tensor,
is_input_filters_masked: bool):
self._input_filters_mask = input_filters_mask
self._is_input_filters_masked = is_input_filters_masked
return self._call(input_filters_mask, is_input_filters_masked) # pylint: disable=not-callable
def __eq__(self, other: 'ScaleFiltersSaver'):
if not isinstance(self, type(other)):
return False
else:
return self.call == other.call
@pg.symbolize
def _return_saver_value(x, y, z):
del x, y
return z.value
@pg.symbolize
class SqueezeExcitation(keras.Model):
"""Mobile block."""
def __init__(
self,
ratio: float,
filters_base: int = 8,
hidden_activation: tf.keras.layers.Layer = layers.ReLU(),
output_activation: tf.keras.layers.Layer = layers.Activation('sigmoid'),
kernel_initializer='glorot_uniform',
kernel_regularizer=None,
bias_initializer='zeros',
bias_regularizer=None,
name: Optional[Text] = None,
**kwargs):
"""Mobile block.
Args:
ratio: Ratio to scale filters.
filters_base: The number of filters will be rounded to a multiple of
this value.
hidden_activation: Activation for hidden convolutional layer.
output_activation: Activation for output convolutional layer.
kernel_initializer: Initializer for the kernel variables.
kernel_regularizer: Regularizer function applied to the `kernel`
weights matrix'.
bias_initializer: Initializer for the `bias` weights matrix
bias_regularizer: Regularizer function applied to the `bias`
weights matrix.
name: Name of the block.
**kwargs: keyword arguments to be passed.
Returns:
A tuple of 2 tensors (block output, features-before-downsampling)
"""
super().__init__(name=name, **kwargs)
scale_filter_saver = ScaleFiltersSaver(ratio=ratio, base=filters_base)
self._gap = layers.GlobalAveragePooling2D(keepdims=True)
self._se_reduce = layers.Conv2D(
kernel_size=(1, 1),
filters=scale_filter_saver,
strides=(1, 1),
use_bias=True,
activation=hidden_activation,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
bias_initializer=bias_initializer,
bias_regularizer=bias_regularizer,
name=name + '-squeeze' if name is not None else None)
self._se_expand = layers.Conv2D(
kernel_size=(1, 1),
filters=_return_saver_value( # pylint: disable=no-value-for-parameter,unexpected-keyword-arg
z=scale_filter_saver, override_args=True),
strides=(1, 1),
use_bias=True,
activation=output_activation,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
bias_initializer=bias_initializer,
bias_regularizer=bias_regularizer,
name=name + '-excite' if name is not None else None)
self._multiply = layers.Multiply()
def call(self, inputs, training=None):
x = self._gap(inputs)
x = self._se_reduce(x)
x = self._se_expand(x)
return self._multiply([x, inputs])
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for nn_blocks."""
from pyglove.tensorflow import keras
from pyglove.tensorflow import selections
from pyglove.tensorflow.keras import layers
from pyglove.tensorflow.keras.layers import modeling_utils
import tensorflow as tf
from official.projects.tunas.modeling.layers import nn_blocks
class Conv2DTest(tf.test.TestCase):
"""Tests for `nn_blocks.conv2d`."""
def setUp(self):
super().setUp()
bsz, h, w, c = 8, 32, 32, 32
self.input_tensor = tf.random.uniform(shape=[bsz, h, w, c])
def testBareConv2D(self):
"""Test for bare conv2d without normalization and activation."""
self.assertAllClose(
nn_blocks.conv2d(
kernel_size=(3, 3),
filters=8,
strides=(1, 1),
name='Conv',
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1))(self.input_tensor),
layers.Conv2D(
kernel_size=(3, 3),
filters=8,
strides=(1, 1),
padding='same',
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1),
name='Conv')(self.input_tensor))
def testConv2DWithNormAndActivation(self):
"""Test conv2d with normalization and activation."""
# Conv2d-BN-Relu using layers objects.
self.assertAllClose(
nn_blocks.conv2d(
kernel_size=(3, 3),
filters=8,
strides=(2, 2),
normalization=layers.BatchNormalization(),
activation=layers.ReLU(),
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1),
name='Conv')(self.input_tensor),
layers.Sequential([
layers.Conv2D(
kernel_size=(3, 3),
filters=8,
strides=(2, 2),
padding='same',
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
layers.BatchNormalization(),
layers.ReLU()
], name='Conv')(self.input_tensor))
def testConv2DWithTunableKernelSize(self):
"""Test conv2d with normalization and activation."""
# Conv2d-BN-Relu using layers objects.
kernel_size = selections.select(
[(3, 3), (5, 5)], tf.constant(0, dtype=tf.int32))
self.assertAllClose(
nn_blocks.conv2d(
kernel_size=kernel_size,
filters=8,
strides=(2, 2),
normalization=layers.BatchNormalization(),
activation=layers.ReLU(),
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1),
name='Conv')(self.input_tensor),
layers.Switch(
candidates=[
layers.Sequential([
layers.Conv2D(
kernel_size=(3, 3),
filters=8,
strides=(2, 2),
padding='same',
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
layers.BatchNormalization(),
layers.ReLU()
], name='branch_0'),
layers.Sequential([
layers.Conv2D(
kernel_size=(5, 5),
filters=8,
strides=(2, 2),
padding='same',
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
layers.BatchNormalization(),
layers.ReLU()
], name='branch_1')],
selected_index=kernel_size.index,
name='Conv')(self.input_tensor))
def testConv2DWithTunableGroups(self):
"""Test conv2d with normalization and activation."""
# Conv2d-BN-Relu using layers objects.
groups = selections.select(
[1, 2], tf.constant(0, dtype=tf.int32))
self.assertAllClose(
nn_blocks.conv2d(
kernel_size=3,
filters=8,
strides=(2, 2),
groups=groups,
normalization=layers.BatchNormalization(),
activation=layers.ReLU(),
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1),
name='Conv')(self.input_tensor),
layers.Switch(
candidates=[
layers.Sequential([
layers.Conv2D(
kernel_size=3,
filters=8,
strides=(2, 2),
padding='same',
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1),
groups=1),
layers.BatchNormalization(),
layers.ReLU()
], name='group_branch_0'),
layers.Sequential([
layers.Conv2D(
kernel_size=3,
filters=8,
strides=(2, 2),
padding='same',
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1),
groups=2),
layers.BatchNormalization(),
layers.ReLU()
], name='group_branch_1')],
selected_index=groups.index,
name='Conv')(self.input_tensor))
class DepthwiseConv2DTest(tf.test.TestCase):
"""Tests for `nn_blocks.depthwise_conv2d`."""
def setUp(self):
super().setUp()
bsz, h, w, c = 8, 32, 32, 32
self.input_tensor = tf.random.uniform(shape=[bsz, h, w, c])
def testBareDepthwiseConv2D(self):
"""Test for depthwise_conv2d without normalization and activation."""
self.assertAllClose(
nn_blocks.depthwise_conv2d(
kernel_size=(3, 3),
strides=(1, 1),
depthwise_initializer=keras.initializers.ones(),
depthwise_regularizer=keras.regularizers.l2(0.1),
name='DepthwiseConv')(self.input_tensor),
layers.DepthwiseConv2D(
kernel_size=(3, 3),
strides=(1, 1),
padding='same',
depthwise_initializer=keras.initializers.ones(),
depthwise_regularizer=keras.regularizers.l2(0.1),
name='DepthwiseConv')(self.input_tensor))
def testDepthwiseConvWithNormAndActivation(self):
"""Test for depthwise_conv2d with normalization and activation."""
# DepthwiseConv2d-BN-Relu using layers.Object.
self.assertAllClose(
nn_blocks.depthwise_conv2d(
kernel_size=(3, 3),
strides=(1, 1),
depthwise_initializer=keras.initializers.ones(),
depthwise_regularizer=keras.regularizers.l2(0.1),
normalization=layers.BatchNormalization(),
activation=layers.ReLU(),
name='DepthwiseConv'
)(self.input_tensor),
layers.Sequential([
layers.DepthwiseConv2D(
kernel_size=(3, 3),
strides=(1, 1),
padding='same',
depthwise_initializer=keras.initializers.ones(),
depthwise_regularizer=keras.regularizers.l2(0.1)),
layers.BatchNormalization(),
layers.ReLU()
], name='DepthwiseConv')(self.input_tensor))
def testDepthwiseConv2DWithTunableKernelSize(self):
"""Test conv2d with normalization and activation."""
# Conv2d-BN-Relu using layers objects.
kernel_size = selections.select(
[(3, 3), (5, 5)], tf.constant(0, dtype=tf.int32))
self.assertAllClose(
nn_blocks.depthwise_conv2d(
kernel_size=kernel_size,
strides=(1, 1),
depthwise_initializer=keras.initializers.ones(),
depthwise_regularizer=keras.regularizers.l2(0.1),
normalization=layers.BatchNormalization(),
activation=layers.ReLU(),
name='DepthwiseConv'
)(self.input_tensor),
layers.Switch(
candidates=[
layers.Sequential([
layers.DepthwiseConv2D(
kernel_size=(3, 3),
strides=(1, 1),
padding='same',
depthwise_initializer=keras.initializers.ones(),
depthwise_regularizer=keras.regularizers.l2(0.1)),
layers.BatchNormalization(),
layers.ReLU()
], name='branch_0'),
layers.Sequential([
layers.DepthwiseConv2D(
kernel_size=(5, 5),
strides=(1, 1),
padding='same',
depthwise_initializer=keras.initializers.ones(),
depthwise_regularizer=keras.regularizers.l2(0.1)),
layers.BatchNormalization(),
layers.ReLU()
], name='branch_1')],
selected_index=kernel_size.index,
name='DepthwiseConv')(self.input_tensor))
class InvertedBottleneckTest(tf.test.TestCase):
"""Tests for `nn_blocks.inverted_bottleneck`."""
def setUp(self):
super().setUp()
bsz, h, w, c = 8, 32, 32, 32
self.input_tensor = tf.random.uniform(shape=[bsz, h, w, c])
def testRegularInvertedBottleneck(self):
"""Test regular inverted bottleneck without tunable hyper-parameters."""
# Regular inverted bottleneck.
layer = nn_blocks.inverted_bottleneck(
kernel_size=(3, 3),
filters=4,
expansion_factor=2,
normalization=layers.BatchNormalization(),
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1),
depthwise_initializer=keras.initializers.ones(),
depthwise_regularizer=keras.regularizers.l2(0.1),
post_expansion=layers.identity(),
post_depthwise=layers.identity(),
post_projection=layers.identity()) # pylint: disable=unnecessary-lambda
self.assertAllClose(
layer(self.input_tensor),
layers.Sequential([
nn_blocks.conv2d(
kernel_size=(1, 1),
filters=nn_blocks._expand_filters(expansion_factor=2), # pylint: disable=no-value-for-parameter
normalization=layers.BatchNormalization(),
activation=layers.ReLU(),
use_bias=False,
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
layers.identity(),
nn_blocks.depthwise_conv2d(
kernel_size=(3, 3),
normalization=layers.BatchNormalization(),
activation=layers.ReLU(),
depthwise_initializer=keras.initializers.ones(),
depthwise_regularizer=keras.regularizers.l2(0.1),
use_bias=False,),
layers.identity(),
nn_blocks.conv2d(
kernel_size=(1, 1),
filters=4,
normalization=layers.BatchNormalization(),
activation=None,
use_bias=False,
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
layers.identity(),
])(self.input_tensor))
def testSeparateTowers(self):
"""Test `nn_blocks.inverted_bottleneck`."""
op_sel = tf.constant(0, dtype=tf.int32)
filters_sel = tf.constant(0, dtype=tf.int32)
self.assertAllClose(
nn_blocks.inverted_bottleneck(
kernel_size=selections.select([(3, 3), (5, 5)], op_sel),
filters=selections.select([2, 4], filters_sel),
expansion_factor=3,
normalization=layers.BatchNormalization(),
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1),
depthwise_initializer=keras.initializers.ones(),
depthwise_regularizer=keras.regularizers.l2(0.1),
)(self.input_tensor),
layers.Switch([
layers.Sequential([
nn_blocks.conv2d(
kernel_size=(1, 1),
filters=nn_blocks._expand_filters(
expansion_factor=3), # pylint: disable=no-value-for-parameter
normalization=layers.BatchNormalization(),
activation=layers.ReLU(),
use_bias=False,
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
nn_blocks.depthwise_conv2d(
kernel_size=(3, 3),
normalization=layers.BatchNormalization(),
activation=layers.ReLU(),
use_bias=False,
depthwise_initializer=keras.initializers.ones(),
depthwise_regularizer=keras.regularizers.l2(0.1)),
nn_blocks.conv2d(
kernel_size=(1, 1),
filters=selections.select([2, 4], filters_sel),
normalization=layers.BatchNormalization(),
activation=None,
use_bias=False,
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
], name='branch0'),
layers.Sequential([
nn_blocks.conv2d(
kernel_size=(1, 1),
filters=nn_blocks._expand_filters(
expansion_factor=3), # pylint: disable=no-value-for-parameter
normalization=layers.BatchNormalization(),
activation=layers.ReLU(),
use_bias=False,
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
nn_blocks.depthwise_conv2d(
kernel_size=(5, 5),
normalization=layers.BatchNormalization(),
activation=layers.ReLU(),
use_bias=False,
depthwise_initializer=keras.initializers.ones(),
depthwise_regularizer=keras.regularizers.l2(0.1)),
nn_blocks.conv2d(
kernel_size=(1, 1),
filters=selections.select([2, 4], filters_sel),
normalization=layers.BatchNormalization(),
activation=None,
use_bias=False,
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
], name='branch1')
], selected_index=op_sel)(self.input_tensor))
class SqueezeAndExciteTest(tf.test.TestCase):
"""Tests for `nn_blocks.SqueezeExcitation`."""
def testFixedRatio(self):
"""Test fixed ratio."""
xlayer = nn_blocks.SqueezeExcitation(0.25)
inputs = tf.ones(shape=(1, 2, 2, 3))
xlayer(inputs)
kernels = xlayer.trainable_variables
self.assertEqual(kernels[0].shape.as_list(), [1, 1, 3, 8])
self.assertEqual(kernels[1].shape.as_list(), [8])
self.assertEqual(kernels[2].shape.as_list(), [1, 1, 8, 3])
self.assertEqual(kernels[3].shape.as_list(), [3])
class FusedInvertedBottleneckTest(tf.test.TestCase):
"""Tests for `nn_blocks.fused_inverted_bottleneck`."""
def setUp(self):
super().setUp()
bsz, h, w, c = 8, 32, 32, 32
self.input_tensor = tf.random.uniform(shape=[bsz, h, w, c])
def testRegularFusedInvertedBottleneck(self):
"""Test regular inverted bottleneck without tunable hyper-parameters."""
# Regular inverted bottleneck.
layer = nn_blocks.fused_inverted_bottleneck(
kernel_size=(3, 3),
filters=4,
expansion_factor=2,
normalization=layers.BatchNormalization(),
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1),
post_fusion=layers.identity(),
post_projection=layers.identity()) # pylint: disable=unnecessary-lambda
self.assertAllClose(
layer(self.input_tensor),
layers.Sequential([
nn_blocks.conv2d(
kernel_size=(3, 3),
filters=nn_blocks._expand_filters(expansion_factor=2), # pylint: disable=no-value-for-parameter
normalization=layers.BatchNormalization(),
activation=layers.ReLU(),
use_bias=False,
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
layers.identity(),
nn_blocks.conv2d(
kernel_size=(1, 1),
filters=4,
normalization=layers.BatchNormalization(),
activation=None,
use_bias=False,
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
layers.identity(),
])(self.input_tensor))
def testSeparateTowers(self):
"""Test `nn_blocks.inverted_bottleneck`."""
op_sel = tf.constant(0, dtype=tf.int32)
filters_sel = tf.constant(0, dtype=tf.int32)
self.assertAllClose(
nn_blocks.fused_inverted_bottleneck(
kernel_size=selections.select([(3, 3), (5, 5)], op_sel),
filters=selections.select([2, 4], filters_sel),
expansion_factor=3,
normalization=layers.BatchNormalization(),
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1),
)(self.input_tensor),
layers.Switch([
layers.Sequential([
nn_blocks.conv2d(
kernel_size=(3, 3),
filters=nn_blocks._expand_filters(
expansion_factor=3), # pylint: disable=no-value-for-parameter
normalization=layers.BatchNormalization(),
activation=layers.ReLU(),
use_bias=False,
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
nn_blocks.conv2d(
kernel_size=(1, 1),
filters=selections.select([2, 4], filters_sel),
normalization=layers.BatchNormalization(),
activation=None,
use_bias=False,
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
], name='branch0'),
layers.Sequential([
nn_blocks.conv2d(
kernel_size=(5, 5),
filters=nn_blocks._expand_filters(
expansion_factor=3), # pylint: disable=no-value-for-parameter
normalization=layers.BatchNormalization(),
activation=layers.ReLU(),
use_bias=False,
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
nn_blocks.conv2d(
kernel_size=(1, 1),
filters=selections.select([2, 4], filters_sel),
normalization=layers.BatchNormalization(),
activation=None,
use_bias=False,
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
], name='branch1')
], selected_index=op_sel)(self.input_tensor))
class TuckerBottleneckTest(tf.test.TestCase):
"""Tests for `nn_blocks.inverted_bottleneck`."""
def setUp(self):
super().setUp()
bsz, h, w, c = 8, 32, 32, 32
self.input_tensor = tf.random.uniform(shape=[bsz, h, w, c])
def testRegularTuckerBottleneck(self):
"""Test regular inverted bottleneck without tunable hyper-parameters."""
# Regular inverted bottleneck.
layer = nn_blocks.tucker_bottleneck(
kernel_size=(3, 3),
filters=4,
input_scale_ratio=2.0,
output_scale_ratio=4.0,
activation=layers.ReLU(),
normalization=layers.BatchNormalization(),
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)) # pylint: disable=unnecessary-lambda
self.assertAllClose(
layer(self.input_tensor),
layers.Sequential([
nn_blocks.conv2d(
kernel_size=(1, 1),
filters=nn_blocks._scale_filters(ratio=2.0, base=8), # pylint: disable=no-value-for-parameter
normalization=layers.BatchNormalization(),
activation=layers.ReLU(),
use_bias=False,
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
nn_blocks.conv2d(
kernel_size=(3, 3),
filters=modeling_utils.scale_filters(4, 4.0, 8), # pylint: disable=no-value-for-parameter
normalization=layers.BatchNormalization(),
activation=layers.ReLU(),
use_bias=False,
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
nn_blocks.conv2d(
kernel_size=(1, 1),
filters=4,
normalization=layers.BatchNormalization(),
activation=None,
use_bias=False,
kernel_initializer=keras.initializers.ones(),
kernel_regularizer=keras.regularizers.l2(0.1)),
])(self.input_tensor))
if __name__ == '__main__':
tf.test.main()
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Mobile model builder."""
from typing import List, Optional, Sequence, Text
import pyglove as pg
from pyglove.tensorflow import keras
from pyglove.tensorflow import selections
from official.projects.tunas.modeling.layers import nn_blocks
class _MobileModel(keras.Model):
"""Mobile model."""
def __init__(
self,
stem_conv_filters: nn_blocks.Filters,
blocks: Sequence[keras.layers.Layer],
feature_conv_filters: nn_blocks.Filters,
kernel_initializer=keras.initializers.he_normal(),
dense_initializer=keras.initializers.random_normal(stddev=0.01),
# NOTE(daiyip): Keras L2 implementation is 2x of
# tf.contrib.keras.layers.l2_regularizer.
kernel_regularizer=keras.regularizers.l2(4e-5 * 0.5),
normalization=keras.layers.BatchNormalization(
momentum=0.99, epsilon=0.001),
activation=keras.layers.ReLU(),
dropout_rate: float = 0.,
num_classes: Optional[int] = 1001,
name: Optional[Text] = None):
"""Mobile model.
Args:
stem_conv_filters: Filter size for the stem conv unit.
blocks: A list of layers as residual blocks after the stem layer.
feature_conv_filters: Number of penultimate features.
kernel_initializer: Kernel initializer used for the stem and featurizer.
dense_initializer: Kernel initializer used for the classification layer.
kernel_regularizer: Regularizer for the layers in the network.
normalization: Normalization layer used in the network.
activation: Activation layer used in the network.
dropout_rate: Dropout rate for the penultimate features, applicable only
when `num_classes` is not None.
num_classes: Number of classes for the classification model. If None,
the classification layer will be excluded.
name: Name of the model.
Returns:
A list of tensors as model outputs.
If `num_classes` is not None, the list is [logits, penultimate_features]
plus lower-level features.
Otherwise the list is [penultimate_features] plus lower-level features.
"""
super().__init__(name=name)
self._stem = nn_blocks.conv2d(
filters=stem_conv_filters,
kernel_size=(3, 3),
strides=(2, 2),
padding='same',
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
use_bias=False,
normalization=normalization,
activation=activation,
name='stem')
# An ugly hack to track each layer from the `blocks`, since Keras does not
# track `tf.keras.layers.Layer` objects from container-type members.
for i, block in enumerate(blocks):
setattr(self, '_blocks_{:d}'.format(i), block)
self._blocks = blocks
self._featurizer = nn_blocks.conv2d(
filters=feature_conv_filters,
kernel_size=(1, 1),
padding='same',
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
use_bias=False,
normalization=normalization,
activation=activation,
name='features')
self._global_pooling = keras.layers.GlobalAveragePooling2D()
if num_classes is not None:
self._dropout = keras.layers.Dropout(dropout_rate)
self._classification_head = keras.layers.Dense(
num_classes,
kernel_initializer=dense_initializer,
use_bias=True,
name='classification_head')
self.num_classes = num_classes
def call(self, inputs):
x = self._stem(inputs)
lower_level_features = []
for block in self._blocks:
x = block(x)
if isinstance(x, (list, tuple)):
x, features = x[0], x[1:]
lower_level_features.extend(list(features))
x = self._featurizer(x)
x = self._global_pooling(x)
penultimate_features = x
if self.num_classes is None:
return [penultimate_features] + lower_level_features
else:
x = self._dropout(x)
x = self._classification_head(x)
return [x, penultimate_features] + lower_level_features
MobileModel = pg.symbolize(_MobileModel, class_name='MobileModel')
class _MobileBlock(keras.Model):
"""Mobile block."""
def __init__(
self,
# We use List instead of Sequence here since Tuple cannot be
# modified using rebind.
sublayers: List[keras.layers.Layer],
filters: nn_blocks.Filters,
name: Optional[Text] = None):
"""Mobile block.
Args:
sublayers: Sublayers for the block.
filters: Number of filters for the block. All sublayers will be using
this filters.
name: Name of the block.
Returns:
A tuple of 2 tensors (block output, features-before-downsampling)
"""
super().__init__(name=name)
self.sublayers = [s.clone(override={'filters': filters}) for s in sublayers]
self.filters = filters
def call(self, inputs):
get_image_size = lambda x: (int(x.shape[1]), int(x.shape[2]))
x = inputs
image_size = get_image_size(x)
features = []
for layer in self.sublayers:
x = layer(x)
new_image_size = get_image_size(x)
if new_image_size != image_size:
features.append(x)
image_size = new_image_size
return tuple([x] + features)
MobileBlock = pg.symbolize(_MobileBlock, class_name='MobileBlock')
def search_model_v2(
init_filters: Sequence[int],
filters_multipliers: Optional[Sequence[float]] = None,
filters_scale_factor: float = 1.0,
filters_base: int = 8,
se_ratios: Optional[List[float]] = None,
num_classes: Optional[int] = 1001,
normalization=keras.layers.BatchNormalization(momentum=0.0, epsilon=0.001),
activation=keras.layers.ReLU(),
dropout_rate: float = 0.,
kernel_initializer=keras.initializers.he_normal(),
depthwise_initializer=keras.initializers.depthwise_he_normal(),
dense_initializer=keras.initializers.random_normal(stddev=0.01),
# NOTE(daiyip): Keras L2 implementation is 2x of
# tf.contrib.keras.layers.l2_regularizer.
kernel_regularizer=keras.regularizers.l2(4e-5 * 0.5),
name: Optional[Text] = 'search_mobile_model_v2'):
"""A searchable model derived from MobileNetV2.
Args:
init_filters: A list of integers (size=9) as the initial filter size of
each mobile block.
filters_multipliers: An optional list of floats as multipliers for the
filters. If the list size is larger than 1, it is a search space including
searching the filter sizes per block.
filters_scale_factor: Additional scaling factor on top of
filters_multipliers, this is to align with existing TuNAS codebase.
filters_base: An integer as base to compute multiplied filters.
Please see `layers.scale_filters` for details.
se_ratios: Squeeze-and-excite ratios. If empty, SE is not used.
num_classes: Number of classes for the classification model. If None,
the classification layer will be excluded.
normalization: Normalization layer used in the model.
activation: Activation layer used in the model.
dropout_rate: Dropout rate for the penultimate features, applicable only
when `num_classes` is not None.
kernel_initializer: Kernel initializer used for the Conv2D units
in the model.
depthwise_initializer: Kernel initializer used for DepthwiseConv2D units
in the model.
dense_initializer: Kernel initializer used for the classification layer.
kernel_regularizer: Regularizer for the layers in the network.
name: Name of the model, which will be used as the top name scope.
Returns:
A `MobileModel` object (a tf.keras.Model subclass) as the search model.
"""
if not isinstance(init_filters, (tuple, list)) or len(init_filters) != 9:
raise ValueError(
'`init_filters` must be a sequence of 9. '
'Encountered: %r.' % init_filters)
se_ratios = [None] + (se_ratios if se_ratios else [])
def _filters(x):
filters = keras.layers.get_filters(x, filters_multipliers, filters_base)
if filters_scale_factor != 1.0:
# Up to now, filters will contain non-duplicated values. We will then
# apply an additional filters scaling based on the candidates.
# Please be aware of that this round of rescaling may result in duplicated
# entries. We do not dedup these values to be compatible with original
# TuNAS implementation.
filters = keras.layers.maybe_oneof([
keras.layers.get_filters(x, [filters_scale_factor], filters_base)
for x in filters.candidates
], choice_type=keras.layers.ChoiceType.FILTERS)
return filters
def _mobile_layer(layer_index, strides, kernel_size,
expansion_factor, skippable=True):
# Note(luoshixin): collapsed search space is not supported currently.
candidates = []
for i, (sr, f, k) in enumerate(
selections.map_candidates([se_ratios, expansion_factor, kernel_size])):
candidates.append(nn_blocks.inverted_bottleneck_with_se(
# Placeholder, which will be modified at mobile_block level.
filters=1,
strides=strides,
kernel_size=k,
expansion_factor=f,
se_ratio=sr,
normalization=normalization,
activation=activation,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
depthwise_initializer=depthwise_initializer,
depthwise_regularizer=kernel_regularizer,
name='inverted_bottleneck%d' % (i + 1)))
if skippable:
candidates.append(keras.layers.zeros())
op = keras.layers.maybe_oneof(
candidates,
name=('switch' if skippable else ('switch%d' % (layer_index + 1))))
if skippable:
op = keras.layers.Residual(op, name='residual%d' % (layer_index + 1))
return op
# pylint: disable=unexpected-keyword-arg
blocks = [
MobileBlock([
_mobile_layer(0, (1, 1), [(3, 3), (5, 5), (7, 7)], [1], False),
], _filters(init_filters[1]), name='block1'),
MobileBlock([
_mobile_layer(0, (2, 2), [(3, 3), (5, 5), (7, 7)], [3, 6], False),
_mobile_layer(1, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6]),
_mobile_layer(2, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6]),
_mobile_layer(3, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6]),
], _filters(init_filters[2]), name='block2'),
MobileBlock([
_mobile_layer(0, (2, 2), [(3, 3), (5, 5), (7, 7)], [3, 6], False),
_mobile_layer(1, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6]),
_mobile_layer(2, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6]),
_mobile_layer(3, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6]),
], _filters(init_filters[3]), name='block3'),
MobileBlock([
_mobile_layer(0, (2, 2), [(3, 3), (5, 5), (7, 7)], [3, 6], False),
_mobile_layer(1, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6]),
_mobile_layer(2, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6]),
_mobile_layer(3, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6]),
], _filters(init_filters[4]), name='block4'),
MobileBlock([
_mobile_layer(0, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6], False),
_mobile_layer(1, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6]),
_mobile_layer(2, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6]),
_mobile_layer(3, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6]),
], _filters(init_filters[5]), name='block5'),
MobileBlock([
_mobile_layer(0, (2, 2), [(3, 3), (5, 5), (7, 7)], [3, 6], False),
_mobile_layer(1, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6]),
_mobile_layer(2, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6]),
_mobile_layer(3, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6]),
], _filters(init_filters[6]), name='block6'),
MobileBlock([
_mobile_layer(0, (1, 1), [(3, 3), (5, 5), (7, 7)], [3, 6], False),
], _filters(init_filters[7]), name='block7'),
]
return MobileModel( # pylint: disable=unexpected-keyword-arg
stem_conv_filters=_filters(init_filters[0]),
blocks=blocks,
feature_conv_filters=_filters(init_filters[8]),
kernel_initializer=kernel_initializer,
dense_initializer=dense_initializer,
kernel_regularizer=kernel_regularizer,
normalization=normalization,
activation=activation,
dropout_rate=dropout_rate,
num_classes=num_classes,
name=name)
def static_model(
search_model: MobileModel,
dna: pg.DNA,
use_stateful_batch_norm: bool = True) -> MobileModel:
"""Returns a static model from a search model and a DNA."""
model = pg.template(search_model).decode(dna)
return pg.patch_on_member(
model,
keras.layers.BatchNormalization, 'momentum',
0.99 if use_stateful_batch_norm else 0.0)
def static_mobile_model(
op_indices: Sequence[int],
init_filters: Sequence[int],
num_classes: int,
weight_decay: float,
dropout: float = 0.0,
filters_multiplier: float = 1.0,
name: Optional[Text] = None) -> MobileModel:
"""Create static mobile model."""
assert len(op_indices) == 22
# NOTE(daiyip): Regularizer value of Keras L2 implementation is 2x of
# tf.contrib.keras.layers.l2_regularizer.
search_model = search_model_v2(
init_filters,
num_classes=num_classes,
filters_multipliers=[filters_multiplier],
dropout_rate=dropout,
kernel_regularizer=keras.regularizers.l2(weight_decay * 0.5),
name=name)
return static_model(search_model, pg.DNA.parse(list(op_indices)))
MOBILENET_V2_FILTERS = (32, 16, 24, 32, 64, 96, 160, 320, 1280)
MNASNET_FILTERS = (32, 16, 24, 40, 80, 96, 192, 320, 1280)
PROXYLESSNAS_GPU_FILTERS = (40, 24, 32, 56, 112, 128, 256, 432, 1728)
PROXYLESSNAS_CPU_FILTERS = (40, 24, 32, 48, 88, 104, 216, 360, 1432)
PROXYLESSNAS_MOBILE_FILTERS = (32, 16, 32, 40, 80, 96, 192, 320, 1280)
MOBILEDET_EDGE_TPU_FILTERS = (32, 16, 32, 48, 96, 96, 160, 192, 192)
MOBILENET_V2_OPERATIONS = (0, 3, 3, 6, 6, 3, 3, 3, 6, 3, 3,
3, 3, 3, 3, 3, 6, 3, 3, 3, 6, 3)
MNASNET_OPERATIONS = (0, 0, 0, 0, 6, 1, 1, 1, 6, 4, 4,
4, 6, 3, 3, 6, 6, 4, 4, 4, 4, 3)
PROXYLESSNAS_GPU_OPERATIONS = (0, 1, 6, 6, 6, 2, 6, 6, 0, 5, 6,
6, 1, 4, 6, 0, 1, 5, 5, 5, 4, 5)
PROXYLESSNAS_CPU_OPERATIONS = (0, 3, 0, 0, 0, 3, 0, 0, 1, 3, 6,
6, 0, 4, 0, 0, 0, 4, 1, 1, 0, 4)
PROXYLESSNAS_MOBILE_OPERATIONS = (0, 1, 0, 6, 6, 2, 0, 1, 1, 5, 1,
1, 1, 4, 1, 1, 1, 5, 5, 2, 2, 5)
MOBILE_DEFAULT_FILTER_MULTIPLIERS = (0.5, 0.625, 0.75, 1.0, 1.25, 1.5, 2.0)
MOBILEDET_EDGE_TPU_FILTER_MULTIPLIERS = MOBILE_DEFAULT_FILTER_MULTIPLIERS
def mobilenet_v2(num_classes: Optional[int] = 1001,
weight_decay: float = 4e-5,
dropout: float = 0.0) -> MobileModel:
"""MobileNet v2."""
return static_mobile_model(
op_indices=MOBILENET_V2_OPERATIONS,
init_filters=MOBILENET_V2_FILTERS,
num_classes=num_classes,
weight_decay=weight_decay,
dropout=dropout,
name='mobilenet_v2')
def mnasnet(num_classes: Optional[int] = 1001,
weight_decay: float = 4e-5,
dropout: float = 0.0) -> MobileModel:
"""MNASNet."""
return static_mobile_model(
op_indices=MNASNET_OPERATIONS,
init_filters=MNASNET_FILTERS,
num_classes=num_classes,
weight_decay=weight_decay,
dropout=dropout,
name='mnasnet')
def proxyless_nas_gpu(num_classes: Optional[int] = 1001,
weight_decay: float = 4e-5,
dropout: float = 0.0) -> MobileModel:
"""ProxylessNAS searched for GPU."""
return static_mobile_model(
op_indices=PROXYLESSNAS_GPU_OPERATIONS,
init_filters=PROXYLESSNAS_GPU_FILTERS,
num_classes=num_classes,
weight_decay=weight_decay,
dropout=dropout,
name='proxyless_nas_gpu')
def proxyless_nas_cpu(num_classes: Optional[int] = 1001,
weight_decay: float = 4e-5,
dropout: float = 0.0) -> MobileModel:
"""ProxylessNAS searched for CPU."""
return static_mobile_model(
op_indices=PROXYLESSNAS_CPU_OPERATIONS,
init_filters=PROXYLESSNAS_CPU_FILTERS,
num_classes=num_classes,
weight_decay=weight_decay,
dropout=dropout,
name='proxyless_nas_cpu')
def proxyless_nas_mobile(num_classes: Optional[int] = 1001,
weight_decay: float = 4e-5,
dropout: float = 0.0) -> MobileModel:
"""ProxylessNAS searched for mobile device."""
return static_mobile_model(
op_indices=PROXYLESSNAS_MOBILE_OPERATIONS,
init_filters=PROXYLESSNAS_MOBILE_FILTERS,
num_classes=num_classes,
weight_decay=weight_decay,
dropout=dropout,
name='proxyless_nas_mobile')
# Search spaces from the TuNAS paper:
# pylint:disable=line-too-long
# Reference:
# Gabriel Bender & Hanxiao Liu, et al. Can Weight Sharing Outperform Random Architecture Search? An Investigation With TuNAS
# https://openaccess.thecvf.com/content_CVPR_2020/html/Bender_Can_Weight_Sharing_Outperform_Random_Architecture_Search_An_Investigation_With_CVPR_2020_paper.html
# pylint:enable=line-too-long
def proxylessnas_search(
num_classes: Optional[int] = 1001) -> MobileModel:
"""Original Proxyless NAS search space."""
return search_model_v2(
init_filters=PROXYLESSNAS_MOBILE_FILTERS,
num_classes=num_classes,
name='proxylessnas_search')
def proxylessnas_with_filters_doubled_every_block(
num_classes: Optional[int] = 1001) -> MobileModel:
"""A variant search space of `proxylessnas_search`.
In this search space the number of filters is doubled in each consecutive
block. This search space is a baseline in the Tunas paper to evaluate the
effect of searching over filter sizes compared to traditional heuristics.
Args:
num_classes: Number of classes for the classification model. If None,
the classification layer will be excluded.
Returns:
A `MobileModel` object (a tf.keras.Model subclass) as the search model.
"""
return search_model_v2(
init_filters=(16, 16, 16, 32, 64, 128, 256, 512, 1024),
num_classes=num_classes,
name='proxylessnas_with_filters_doubled_every_block_search')
def proxylessnas_with_filters_doubled_every_stride2(
num_classes: Optional[int] = 1001) -> MobileModel:
"""A variant search space of `proxylessnas_search`.
This search space is an extension of the ProxylessNas search space where it is
made possible to search over the output filter sizes.
Args:
num_classes: Number of classes for the classification model. If None,
the classification layer will be excluded.
Returns:
A `MobileModel` object (a tf.keras.Model subclass) as the search model.
"""
return search_model_v2(
init_filters=(16, 16, 32, 64, 128, 128, 256, 256, 512),
num_classes=num_classes,
name='proxylessnas_with_filters_doubled_every_stride2_search')
def proxylessnas_outfilters_search(
num_classes: Optional[int] = 1001) -> MobileModel:
"""A variant search space of `proxylessnas_search`.
This search space is an extension of the ProxylessNas search space where it is
made possible to search over the output filter sizes.
Args:
num_classes: Number of classes for the classification model. If None,
the classification layer will be excluded.
Returns:
A `MobileModel` object (a tf.keras.Model subclass) as the search model.
"""
return search_model_v2(
init_filters=(16, 16, 16, 32, 64, 128, 256, 512, 1024),
filters_multipliers=(0.5, 0.625, 0.75, 1.0, 1.25, 1.5, 2.0),
num_classes=num_classes,
name='proxylessnas_outfilters_search')
def proxylessnas_with_mobilenet_v2_filters_search(
num_classes: Optional[int] = 1001) -> MobileModel:
"""Original Proxyless NAS search space."""
return search_model_v2(
init_filters=MOBILENET_V2_FILTERS,
num_classes=num_classes,
name='proxylessnas_with_mobilenet_v2_filters_search')
def mobilenet_v2_filters_search(
num_classes: Optional[int] = 1001) -> MobileModel:
"""MobileNetV2 filters search."""
search_model = search_model_v2(
init_filters=MOBILENET_V2_FILTERS,
filters_multipliers=MOBILE_DEFAULT_FILTER_MULTIPLIERS,
num_classes=num_classes,
name='mobilenet_v2_filters_search')
def select_ops(x):
return keras.layers.get_choice_type(x) == keras.layers.ChoiceType.OP
return pg.materialize(
search_model,
pg.DNA.parse(list(MOBILENET_V2_OPERATIONS)),
where=select_ops)
def tunas_search_model(ssd: Text) -> MobileModel: # pytype: disable=invalid-annotation
"""Get TuNAS search model by search space name."""
# Note(luoshixin): collapsed search space is not supported, and hence
# not migrated currently.
ssd_map = {
'proxylessnas_search': proxylessnas_search,
'proxylessnas_with_filters_doubled_every_block':
proxylessnas_with_filters_doubled_every_block,
'proxylessnas_with_filters_doubled_every_stride2':
proxylessnas_with_filters_doubled_every_stride2,
'proxylessnas_outfilters_search': proxylessnas_outfilters_search,
}
if ssd not in ssd_map:
raise ValueError('Unsupported TuNAS search space %r.' % ssd)
return ssd_map[ssd]()
def _swap_op_choices(
model,
initial_op_choices,
body_op_choices):
"""Helper method to swap op choices in a MobileModel."""
context = dict(initial_op=True)
def swap_ops(key_path: pg.KeyPath, value, parent):
del key_path
# Skip static values and non-operation choices.
if (not isinstance(value, pg.hyper.OneOf)
or keras.layers.get_choice_type(value) != keras.layers.ChoiceType.OP):
return value
sample_ibn = value.candidates[0]
assert isinstance(sample_ibn, nn_blocks.inverted_bottleneck), sample_ibn
if context['initial_op']:
candidates = pg.clone(initial_op_choices, deep=True)
context['initial_op'] = False
else:
candidates = pg.clone(body_op_choices, deep=True)
for c in candidates:
keras.layers.inherit_hyperparameters_from(c, sample_ibn, [
'filters', 'strides', 'kernel_initializer', 'kernel_regularizer',
'depthwise_initializer', 'depthwise_regularizer',
'normalization', 'activation'
])
if isinstance(parent, keras.layers.Residual):
candidates.append(keras.layers.zeros())
return keras.layers.maybe_oneof(candidates)
return model.rebind(swap_ops)
def mobiledet_edge_tpu_search(num_classes: Optional[int] = 1001,
weight_decay: float = 4e-5,
dropout: float = 0.0,
filters_scale_factor: float = 1.0,
filters_base: int = 8,
filters_multipliers: Sequence[float] = (
MOBILEDET_EDGE_TPU_FILTER_MULTIPLIERS),
expansion_multipliers: Sequence[int] = (4, 8),
name: Text = 'mobiledet_edge_tpu_search'):
"""Return search model for MOBILEDET_EDGE_TPU search space from TuNAS."""
def _op_choices(
kernel_sizes,
expansion_factors,
tucker_kernel_sizes,
tucker_input_ratios,
tucker_output_ratios):
ops = []
# Add choices from regular inverted bottleneck.
for i, (ef, ks) in enumerate(
selections.map_candidates([expansion_factors, kernel_sizes])):
ops.append(nn_blocks.inverted_bottleneck.partial(
kernel_size=ks, expansion_factor=ef,
name='inverted_bottleneck%d' % i))
# Add choices from fused inverted bottleneck.
for i, (ef, ks) in enumerate(selections.map_candidates(
[expansion_factors, kernel_sizes])):
ops.append(nn_blocks.fused_inverted_bottleneck.partial(
kernel_size=ks, expansion_factor=ef,
name='fused_inverted_bottleneck%d' % i))
# Add choices from tucker bottleneck.
for i, (iratio, ks, oratio) in enumerate(selections.map_candidates([
tucker_input_ratios, tucker_kernel_sizes, tucker_output_ratios])):
ops.append(nn_blocks.tucker_bottleneck.partial(
kernel_size=ks, input_scale_ratio=iratio,
output_scale_ratio=oratio, name='tucker_bottleneck%d' % i))
return ops
initial_op_choices = _op_choices(
kernel_sizes=[(3, 3), (5, 5)],
expansion_factors=[1],
tucker_kernel_sizes=[(3, 3)],
tucker_input_ratios=[0.25, 0.75],
tucker_output_ratios=[0.25, 0.75])
body_op_choices = _op_choices(
kernel_sizes=[(3, 3), (5, 5)],
expansion_factors=expansion_multipliers,
tucker_kernel_sizes=[(3, 3)],
tucker_input_ratios=[0.25, 0.75],
tucker_output_ratios=[0.25, 0.75])
search_model = search_model_v2(
init_filters=MOBILEDET_EDGE_TPU_FILTERS,
filters_multipliers=filters_multipliers,
filters_scale_factor=filters_scale_factor,
filters_base=filters_base,
num_classes=num_classes,
dropout_rate=dropout,
kernel_regularizer=keras.regularizers.l2(weight_decay * 0.5),
name=name)
return _swap_op_choices(search_model, initial_op_choices, body_op_choices)
# This arch string is copied from: tunas/detection_search_space.py
DEFAULT_MOBILEDET_EDGE_TPU_ARCH_STRING = (
'2:5:1:6:4:6:4:0:7:4:4:4:2:2:2:4:4:2:3:3:2:2:3:3:2:1:2:2:3:6:5')
def mobiledet_edge_tpu(
arch_string: Text = DEFAULT_MOBILEDET_EDGE_TPU_ARCH_STRING,
num_classes: Optional[int] = 1001,
filters_scale_factor: float = 1.0,
filters_base: int = 8,
weight_decay: float = 4e-5,
dropout: float = 0.0,
filters_multipliers: Sequence[float] = (
MOBILEDET_EDGE_TPU_FILTER_MULTIPLIERS),
expansion_multipliers: Sequence[int] = (4, 8),
name: Text = 'mobiledet_edge_tpu') -> MobileModel:
"""Static model from MobileDet Edge TPU search space."""
dna = pg.DNA.parse([int(v) for v in arch_string.split(':')])
search_model = mobiledet_edge_tpu_search(
num_classes=num_classes,
weight_decay=weight_decay,
dropout=dropout,
filters_scale_factor=filters_scale_factor,
filters_base=filters_base,
filters_multipliers=filters_multipliers,
expansion_multipliers=expansion_multipliers,
name=name)
return static_model(search_model, dna)
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for mobile_models."""
from absl.testing import parameterized
import numpy as np
import pyglove as pg
from pyglove.tensorflow.keras import layers
import tensorflow as tf
from official.projects.tunas.modeling import mobile_models
class StaticModelTest(tf.test.TestCase, parameterized.TestCase):
"""Tests for `mobile_models.static_model`."""
def testStaticModel(self):
"""Test static model creation."""
with tf.compat.v1.Graph().as_default():
tf.compat.v1.set_random_seed(0)
model = mobile_models.mobilenet_v2()
inputs1 = tf.ones([1, 224, 224, 3])
inputs2 = tf.zeros([1, 224, 224, 3])
outputs1 = model(inputs1)
tf.print(model.summary())
print(model.summary())
print(model.layers)
print(isinstance(model.layers[0],
pg.tensorflow.keras.layers.CompoundLayer))
self.assertLen(model.trainable_variables, 158)
num_trainable_params = np.sum([
np.prod(var.get_shape().as_list())
for var in model.trainable_variables
])
self.assertEqual(num_trainable_params, 3506153)
self.assertLen(model.get_updates_for(inputs1), 104)
outputs2 = model(inputs2)
self.assertLen(model.trainable_variables, 158)
self.assertLen(model.get_updates_for(inputs2), 104)
self.evaluate(tf.compat.v1.global_variables_initializer())
self.assertAllClose(
self.evaluate(tf.reduce_sum(model.losses)), 0.68539262)
self.evaluate(outputs1)
self.evaluate(outputs2)
def testMobileDetEdgeTPU(self):
"""Test MobileDet edge TPU static model."""
with tf.compat.v1.Graph().as_default():
tf.compat.v1.set_random_seed(0)
model = mobile_models.mobiledet_edge_tpu()
inputs = tf.ones([1, 224, 224, 3])
outputs = model(inputs)
self.assertLen(model.trainable_variables, 176)
num_trainable_params = np.sum([
np.prod(var.get_shape().as_list())
for var in model.trainable_variables
])
self.assertEqual(num_trainable_params, 3177497)
self.assertLen(model.get_updates_for(inputs), 116)
self.evaluate(tf.compat.v1.global_variables_initializer())
self.assertAllClose(self.evaluate(tf.reduce_sum(model.losses)), 0.78207)
self.evaluate(outputs)
@parameterized.parameters([
(mobile_models.mnasnet, 158, 4384593, 104),
(mobile_models.proxyless_nas_mobile, 185, 4081793, 122),
])
def testTunasStaticModel(self,
model_builder,
num_trainable_variables,
num_params,
num_updates):
"""Test MNASNet static model."""
with tf.compat.v1.Graph().as_default():
tf.compat.v1.set_random_seed(0)
model = model_builder()
inputs = tf.ones([1, 224, 224, 3])
outputs = model(inputs)
self.assertLen(model.trainable_variables, num_trainable_variables)
num_trainable_params = np.sum([
np.prod(var.get_shape().as_list())
for var in model.trainable_variables
])
self.assertEqual(num_trainable_params, num_params)
self.assertLen(model.get_updates_for(inputs), num_updates)
self.evaluate(tf.compat.v1.global_variables_initializer())
self.evaluate(outputs)
def testMobileDetEdgeTPUMultipliers(self):
"""Test MobileDet edge TPU static model with multiplier arguments."""
with tf.compat.v1.Graph().as_default():
tf.compat.v1.set_random_seed(0)
model = mobile_models.mobiledet_edge_tpu(
filters_multipliers=(0.5, 0.625, 0.75, 1.0, 2.0, 3.0, 4.0),
expansion_multipliers=(6, 8, 10))
inputs = tf.ones([1, 224, 224, 3])
outputs = model(inputs)
self.assertLen(model.trainable_variables, 197)
num_trainable_params = np.sum([
np.prod(var.get_shape().as_list())
for var in model.trainable_variables
])
self.assertEqual(num_trainable_params, 3930105)
self.assertLen(model.get_updates_for(inputs), 130)
self.evaluate(tf.compat.v1.global_variables_initializer())
self.assertAllClose(self.evaluate(tf.reduce_sum(model.losses)), 1.014057)
self.evaluate(outputs)
@parameterized.parameters([
mobile_models.mobilenet_v2,
mobile_models.mobiledet_edge_tpu,
mobile_models.mnasnet,
mobile_models.proxyless_nas_mobile,
mobile_models.proxyless_nas_cpu,
mobile_models.proxyless_nas_gpu
])
def testLayerNamesAreTheSame(self, model_builder):
"""Test variable names are the same with multiple calls."""
def get_layer_names(model):
def _is_layer_name(k, v, p):
del v
return isinstance(p, tf.keras.layers.Layer) and k.key == 'name'
return pg.query(model, custom_selector=_is_layer_name)
self.assertEqual(
get_layer_names(model_builder()),
get_layer_names(model_builder()))
class SearchModelTest(tf.test.TestCase, parameterized.TestCase):
"""Tests for `mobile_models.search_model`."""
def testSearchModel(self):
"""Test search model."""
search_model = mobile_models.mobilenet_v2_filters_search()
dna_spec = pg.dna_spec(search_model)
# The search space only contains 9 filters (2 conv + 7 blocks)
self.assertLen(dna_spec.elements, 9)
# Make sure MobileNetV2 is one point in the search space.
# To do so, we first modify the search space by using the same momentum
# for BatchNormalization, and remove the name for MobileNetV2.
pg.patch_on_member(
search_model, layers.BatchNormalization, 'momentum', 0.99)
mobilenetv2 = mobile_models.mobilenet_v2()
dna = pg.template(search_model).encode(
mobilenetv2.rebind(name='mobilenet_v2_filters_search'))
self.assertEqual(dna, pg.DNA.parse([2, 1, 1, 2, 3, 3, 3, 3, 3]))
def testProxylessSearchModel(self):
"""Test proxyless search model."""
search_model = mobile_models.proxylessnas_search()
dna_spec = pg.dna_spec(search_model)
# The search space only contains 9 filters (2 conv + 7 blocks)
self.assertLen(dna_spec.elements, 22)
# Make sure ProxylessNASMobile is one point in the search space.
# To do so, we first modify the search space by using the same momentum
# for BatchNormalization, and remove the name for ProxylessNASMobile.
pg.patch_on_member(
search_model, layers.BatchNormalization, 'momentum', 0.99)
proxyless_nas_mobile = mobile_models.proxyless_nas_mobile()
dna = pg.template(search_model).encode(
proxyless_nas_mobile.rebind(name='proxylessnas_search'))
self.assertEqual(
dna,
pg.DNA.parse(list(mobile_models.PROXYLESSNAS_MOBILE_OPERATIONS)))
if __name__ == '__main__':
tf.test.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment