"tests/others/test_config.py" did not exist on "577a6a65d619d5667b982298895034dd496775ad"
Commit cc748b2a authored by Abdullah Rashwan's avatar Abdullah Rashwan Committed by A. Unique TensorFlower
Browse files

Internal change

PiperOrigin-RevId: 329754787
parent 2f788e1d
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Contains definitions of Residual Networks.
Residual networks (ResNets) were proposed in:
[1] Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun
Deep Residual Learning for Image Recognition. arXiv:1512.03385
"""
# Import libraries
import tensorflow as tf
from official.modeling import tf_utils
from official.vision.beta.modeling.layers import nn_blocks
layers = tf.keras.layers
# Specifications for different ResNet variants.
# Each entry specifies block configurations of the particular ResNet variant.
# Each element in the block configuration is in the following format:
# (block_fn, num_filters, block_repeats)
RESNET_SPECS = {
18: [
('residual', 64, 2),
('residual', 128, 2),
('residual', 256, 2),
('residual', 512, 2),
],
34: [
('residual', 64, 3),
('residual', 128, 4),
('residual', 256, 6),
('residual', 512, 3),
],
50: [
('bottleneck', 64, 3),
('bottleneck', 128, 4),
('bottleneck', 256, 6),
('bottleneck', 512, 3),
],
101: [
('bottleneck', 64, 3),
('bottleneck', 128, 4),
('bottleneck', 256, 23),
('bottleneck', 512, 3),
],
152: [
('bottleneck', 64, 3),
('bottleneck', 128, 8),
('bottleneck', 256, 36),
('bottleneck', 512, 3),
],
200: [
('bottleneck', 64, 3),
('bottleneck', 128, 24),
('bottleneck', 256, 36),
('bottleneck', 512, 3),
],
}
@tf.keras.utils.register_keras_serializable(package='Vision')
class ResNet(tf.keras.Model):
"""Class to build ResNet family model."""
def __init__(self,
model_id,
input_specs=layers.InputSpec(shape=[None, None, None, 3]),
activation='relu',
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_initializer='VarianceScaling',
kernel_regularizer=None,
bias_regularizer=None,
**kwargs):
"""ResNet initialization function.
Args:
model_id: `int` depth of ResNet backbone model.
input_specs: `tf.keras.layers.InputSpec` specs of the input tensor.
activation: `str` name of the activation function.
use_sync_bn: if True, use synchronized batch normalization.
norm_momentum: `float` normalization omentum for the moving average.
norm_epsilon: `float` small float added to variance to avoid dividing by
zero.
kernel_initializer: kernel_initializer for convolutional layers.
kernel_regularizer: tf.keras.regularizers.Regularizer object for Conv2D.
Default to None.
bias_regularizer: tf.keras.regularizers.Regularizer object for Conv2d.
Default to None.
**kwargs: keyword arguments to be passed.
"""
self._model_id = model_id
self._input_specs = input_specs
self._use_sync_bn = use_sync_bn
self._activation = activation
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
if use_sync_bn:
self._norm = layers.experimental.SyncBatchNormalization
else:
self._norm = layers.BatchNormalization
self._kernel_initializer = kernel_initializer
self._kernel_regularizer = kernel_regularizer
self._bias_regularizer = bias_regularizer
if tf.keras.backend.image_data_format() == 'channels_last':
bn_axis = -1
else:
bn_axis = 1
# Build ResNet.
inputs = tf.keras.Input(shape=input_specs.shape[1:])
x = layers.Conv2D(
filters=64, kernel_size=7, strides=2, use_bias=False, padding='same',
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer)(
inputs)
x = self._norm(
axis=bn_axis, momentum=norm_momentum, epsilon=norm_epsilon)(
x)
x = tf_utils.get_activation(activation)(x)
x = layers.MaxPool2D(pool_size=3, strides=2, padding='same')(x)
# TODO(xianzhi): keep a list of blocks to make blocks accessible.
endpoints = {}
for i, spec in enumerate(RESNET_SPECS[model_id]):
if spec[0] == 'residual':
block_fn = nn_blocks.ResidualBlock
elif spec[0] == 'bottleneck':
block_fn = nn_blocks.BottleneckBlock
else:
raise ValueError('Block fn `{}` is not supported.'.format(spec[0]))
x = self._block_group(
inputs=x,
filters=spec[1],
strides=(1 if i == 0 else 2),
block_fn=block_fn,
block_repeats=spec[2],
name='block_group_l{}'.format(i + 2))
endpoints[i + 2] = x
self._output_specs = {l: endpoints[l].get_shape() for l in endpoints}
super(ResNet, self).__init__(inputs=inputs, outputs=endpoints, **kwargs)
def _block_group(self,
inputs,
filters,
strides,
block_fn,
block_repeats=1,
name='block_group'):
"""Creates one group of blocks for the ResNet model.
Args:
inputs: `Tensor` of size `[batch, channels, height, width]`.
filters: `int` number of filters for the first convolution of the layer.
strides: `int` stride to use for the first convolution of the layer. If
greater than 1, this layer will downsample the input.
block_fn: Either `nn_blocks.ResidualBlock` or `nn_blocks.BottleneckBlock`.
block_repeats: `int` number of blocks contained in the layer.
name: `str`name for the block.
Returns:
The output `Tensor` of the block layer.
"""
x = block_fn(
filters=filters,
strides=strides,
use_projection=True,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer,
activation=self._activation,
use_sync_bn=self._use_sync_bn,
norm_momentum=self._norm_momentum,
norm_epsilon=self._norm_epsilon)(
inputs)
for _ in range(1, block_repeats):
x = block_fn(
filters=filters,
strides=1,
use_projection=False,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer,
activation=self._activation,
use_sync_bn=self._use_sync_bn,
norm_momentum=self._norm_momentum,
norm_epsilon=self._norm_epsilon)(
x)
return tf.identity(x, name=name)
def get_config(self):
config_dict = {
'model_id': self._model_id,
'activation': self._activation,
'use_sync_bn': self._use_sync_bn,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epsilon,
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
'bias_regularizer': self._bias_regularizer,
}
return config_dict
@classmethod
def from_config(cls, config, custom_objects=None):
return cls(**config)
@property
def output_specs(self):
"""A dict of {level: TensorShape} pairs for the model output."""
return self._output_specs
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Contains definitions of 3D Residual Networks."""
from typing import List, Tuple
# Import libraries
import tensorflow as tf
from official.modeling import tf_utils
from official.vision.beta.modeling.layers import nn_blocks_3d
layers = tf.keras.layers
RESNET_SPECS = {
50: [
('bottleneck3d', 64, 3),
('bottleneck3d', 128, 4),
('bottleneck3d', 256, 6),
('bottleneck3d', 512, 3),
],
101: [
('bottleneck3d', 64, 3),
('bottleneck3d', 128, 4),
('bottleneck3d', 256, 23),
('bottleneck3d', 512, 3),
],
}
@tf.keras.utils.register_keras_serializable(package='Vision')
class ResNet3D(tf.keras.Model):
"""Class to build 3D ResNet family model."""
def __init__(self,
model_id: int,
temporal_strides: List[int],
temporal_kernel_sizes: List[Tuple[int]],
use_self_gating: List[int] = None,
input_specs=layers.InputSpec(shape=[None, None, None, None, 3]),
stem_conv_temporal_stride=2,
stem_pool_temporal_stride=2,
activation='relu',
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_initializer='VarianceScaling',
kernel_regularizer=None,
bias_regularizer=None,
**kwargs):
"""ResNet3D initialization function.
Args:
model_id: `int` depth of ResNet backbone model.
temporal_strides: a list of integers that specifies the temporal strides
for all 3d blocks.
temporal_kernel_sizes: a list of tuples that specifies the temporal kernel
sizes for all 3d blocks in different block groups.
use_self_gating: a list of booleans to specify applying self-gating module
or not in each block group. If None, self-gating is not applied.
input_specs: `tf.keras.layers.InputSpec` specs of the input tensor.
stem_conv_temporal_stride: `int` temporal stride for the first conv layer.
stem_pool_temporal_stride: `int` temporal stride for the first pool layer.
activation: `str` name of the activation function.
use_sync_bn: if True, use synchronized batch normalization.
norm_momentum: `float` normalization omentum for the moving average.
norm_epsilon: `float` small float added to variance to avoid dividing by
zero.
kernel_initializer: kernel_initializer for convolutional layers.
kernel_regularizer: tf.keras.regularizers.Regularizer object for Conv2D.
Default to None.
bias_regularizer: tf.keras.regularizers.Regularizer object for Conv2d.
Default to None.
**kwargs: keyword arguments to be passed.
"""
self._model_id = model_id
self._temporal_strides = temporal_strides
self._temporal_kernel_sizes = temporal_kernel_sizes
self._input_specs = input_specs
self._stem_conv_temporal_stride = stem_conv_temporal_stride
self._stem_pool_temporal_stride = stem_pool_temporal_stride
self._use_self_gating = use_self_gating
self._use_sync_bn = use_sync_bn
self._activation = activation
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
if use_sync_bn:
self._norm = layers.experimental.SyncBatchNormalization
else:
self._norm = layers.BatchNormalization
self._kernel_initializer = kernel_initializer
self._kernel_regularizer = kernel_regularizer
self._bias_regularizer = bias_regularizer
if tf.keras.backend.image_data_format() == 'channels_last':
bn_axis = -1
else:
bn_axis = 1
# Build ResNet3D backbone.
inputs = tf.keras.Input(shape=input_specs.shape[1:])
# Build stem.
x = layers.Conv3D(
filters=64,
kernel_size=[5, 7, 7],
strides=[stem_conv_temporal_stride, 2, 2],
use_bias=False,
padding='same',
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer)(
inputs)
x = self._norm(
axis=bn_axis, momentum=norm_momentum, epsilon=norm_epsilon)(
x)
x = tf_utils.get_activation(activation)(x)
temporal_kernel_size = 1 if stem_pool_temporal_stride == 1 else 3
x = layers.MaxPool3D(
pool_size=[temporal_kernel_size, 3, 3],
strides=[stem_pool_temporal_stride, 2, 2],
padding='same')(
x)
# Build intermediate blocks and endpoints.
resnet_specs = RESNET_SPECS[model_id]
if len(temporal_strides) != len(resnet_specs) or len(
temporal_kernel_sizes) != len(resnet_specs):
raise ValueError(
'Number of blocks in temporal specs should equal to resnet_specs.')
endpoints = {}
for i, resnet_spec in enumerate(resnet_specs):
if resnet_spec[0] == 'bottleneck3d':
block_fn = nn_blocks_3d.BottleneckBlock3D
else:
raise ValueError('Block fn `{}` is not supported.'.format(
resnet_spec[0]))
x = self._block_group(
inputs=x,
filters=resnet_spec[1],
temporal_kernel_sizes=temporal_kernel_sizes[i],
temporal_strides=temporal_strides[i],
spatial_strides=(1 if i == 0 else 2),
block_fn=block_fn,
block_repeats=resnet_spec[2],
use_self_gating=use_self_gating[i] if use_self_gating else False,
name='block_group_l{}'.format(i + 2))
endpoints[i + 2] = x
self._output_specs = {l: endpoints[l].get_shape() for l in endpoints}
super(ResNet3D, self).__init__(inputs=inputs, outputs=endpoints, **kwargs)
def _block_group(self,
inputs,
filters,
temporal_kernel_sizes,
temporal_strides,
spatial_strides,
block_fn=nn_blocks_3d.BottleneckBlock3D,
block_repeats=1,
use_self_gating=False,
name='block_group'):
"""Creates one group of blocks for the ResNet3D model.
Args:
inputs: `Tensor` of size `[batch, channels, height, width]`.
filters: `int` number of filters for the first convolution of the layer.
temporal_kernel_sizes: a tuple that specifies the temporal kernel sizes
for each block in the current group.
temporal_strides: `int` temporal strides for the first convolution in this
group.
spatial_strides: `int` stride to use for the first convolution of the
layer. If greater than 1, this layer will downsample the input.
block_fn: Either `nn_blocks.ResidualBlock` or `nn_blocks.BottleneckBlock`.
block_repeats: `int` number of blocks contained in the layer.
use_self_gating: `bool` apply self-gating module or not.
name: `str`name for the block.
Returns:
The output `Tensor` of the block layer.
"""
if len(temporal_kernel_sizes) != block_repeats:
raise ValueError(
'Number of elements in `temporal_kernel_sizes` must equal to `block_repeats`.'
)
# Only apply self-gating module in the last block.
use_self_gating_list = [False] * (block_repeats - 1) + [use_self_gating]
x = block_fn(
filters=filters,
temporal_kernel_size=temporal_kernel_sizes[0],
temporal_strides=temporal_strides,
spatial_strides=spatial_strides,
use_self_gating=use_self_gating_list[0],
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer,
activation=self._activation,
use_sync_bn=self._use_sync_bn,
norm_momentum=self._norm_momentum,
norm_epsilon=self._norm_epsilon)(
inputs)
for i in range(1, block_repeats):
x = block_fn(
filters=filters,
temporal_kernel_size=temporal_kernel_sizes[i],
temporal_strides=1,
spatial_strides=1,
use_self_gating=use_self_gating_list[i],
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer,
activation=self._activation,
use_sync_bn=self._use_sync_bn,
norm_momentum=self._norm_momentum,
norm_epsilon=self._norm_epsilon)(
x)
return tf.identity(x, name=name)
def get_config(self):
config_dict = {
'model_id': self._model_id,
'temporal_strides': self._temporal_strides,
'temporal_kernel_sizes': self._temporal_kernel_sizes,
'stem_conv_temporal_stride': self._stem_conv_temporal_stride,
'stem_pool_temporal_stride': self._stem_pool_temporal_stride,
'use_self_gating': self._use_self_gating,
'activation': self._activation,
'use_sync_bn': self._use_sync_bn,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epsilon,
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
'bias_regularizer': self._bias_regularizer,
}
return config_dict
@classmethod
def from_config(cls, config, custom_objects=None):
return cls(**config)
@property
def output_specs(self):
"""A dict of {level: TensorShape} pairs for the model output."""
return self._output_specs
# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for resnet."""
# Import libraries
from absl.testing import parameterized
import tensorflow as tf
from official.vision.beta.modeling.backbones import resnet_3d
class ResNet3DTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
(128, 50, 4),
)
def test_network_creation(self, input_size, model_id,
endpoint_filter_scale):
"""Test creation of ResNet3D family models."""
tf.keras.backend.set_image_data_format('channels_last')
temporal_strides = [1, 1, 1, 1]
temporal_kernel_sizes = [(3, 3, 3), (3, 1, 3, 1), (3, 1, 3, 1, 3, 1),
(1, 3, 1)]
use_self_gating = [True, False, True, False]
network = resnet_3d.ResNet3D(
model_id=model_id,
temporal_strides=temporal_strides,
temporal_kernel_sizes=temporal_kernel_sizes,
use_self_gating=use_self_gating,
)
inputs = tf.keras.Input(shape=(8, input_size, input_size, 3), batch_size=1)
endpoints = network(inputs)
self.assertAllEqual([
1, 2, input_size / 2**2, input_size / 2**2, 64 * endpoint_filter_scale
], endpoints[2].shape.as_list())
self.assertAllEqual([
1, 2, input_size / 2**3, input_size / 2**3, 128 * endpoint_filter_scale
], endpoints[3].shape.as_list())
self.assertAllEqual([
1, 2, input_size / 2**4, input_size / 2**4, 256 * endpoint_filter_scale
], endpoints[4].shape.as_list())
self.assertAllEqual([
1, 2, input_size / 2**5, input_size / 2**5, 512 * endpoint_filter_scale
], endpoints[5].shape.as_list())
def test_serialize_deserialize(self):
# Create a network object that sets all of its config options.
kwargs = dict(
model_id=50,
temporal_strides=[1, 1, 1, 1],
temporal_kernel_sizes=[(3, 3, 3), (3, 1, 3, 1), (3, 1, 3, 1, 3, 1),
(1, 3, 1)],
stem_conv_temporal_stride=2,
stem_pool_temporal_stride=2,
use_self_gating=None,
use_sync_bn=False,
activation='relu',
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_initializer='VarianceScaling',
kernel_regularizer=None,
bias_regularizer=None,
)
network = resnet_3d.ResNet3D(**kwargs)
expected_config = dict(kwargs)
self.assertEqual(network.get_config(), expected_config)
# Create another network object from the first object's config.
new_network = resnet_3d.ResNet3D.from_config(network.get_config())
# Validate that the config can be forced to JSON.
_ = new_network.to_json()
# If the serialization was successful, the new config should match the old.
self.assertAllEqual(network.get_config(), new_network.get_config())
if __name__ == '__main__':
tf.test.main()
# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for resnet."""
# Import libraries
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from tensorflow.python.distribute import combinations
from tensorflow.python.distribute import strategy_combinations
from official.vision.beta.modeling.backbones import resnet
class ResNetTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
(128, 18, 1),
(128, 34, 1),
(128, 50, 4),
(128, 101, 4),
(128, 152, 4),
)
def test_network_creation(self, input_size, model_id,
endpoint_filter_scale):
"""Test creation of ResNet family models."""
resnet_params = {
18: 11190464,
34: 21306048,
50: 23561152,
101: 42605504,
152: 58295232,
}
tf.keras.backend.set_image_data_format('channels_last')
network = resnet.ResNet(model_id=model_id)
self.assertEqual(network.count_params(), resnet_params[model_id])
inputs = tf.keras.Input(shape=(input_size, input_size, 3), batch_size=1)
endpoints = network(inputs)
self.assertAllEqual(
[1, input_size / 2**2, input_size / 2**2, 64 * endpoint_filter_scale],
endpoints[2].shape.as_list())
self.assertAllEqual(
[1, input_size / 2**3, input_size / 2**3, 128 * endpoint_filter_scale],
endpoints[3].shape.as_list())
self.assertAllEqual(
[1, input_size / 2**4, input_size / 2**4, 256 * endpoint_filter_scale],
endpoints[4].shape.as_list())
self.assertAllEqual(
[1, input_size / 2**5, input_size / 2**5, 512 * endpoint_filter_scale],
endpoints[5].shape.as_list())
@combinations.generate(
combinations.combine(
strategy=[
strategy_combinations.tpu_strategy,
strategy_combinations.one_device_strategy_gpu,
],
use_sync_bn=[False, True],
))
def test_sync_bn_multiple_devices(self, strategy, use_sync_bn):
"""Test for sync bn on TPU and GPU devices."""
inputs = np.random.rand(64, 128, 128, 3)
tf.keras.backend.set_image_data_format('channels_last')
with strategy.scope():
network = resnet.ResNet(model_id=50, use_sync_bn=use_sync_bn)
_ = network(inputs)
@parameterized.parameters(1, 3, 4)
def test_input_specs(self, input_dim):
"""Test different input feature dimensions."""
tf.keras.backend.set_image_data_format('channels_last')
input_specs = tf.keras.layers.InputSpec(shape=[None, None, None, input_dim])
network = resnet.ResNet(model_id=50, input_specs=input_specs)
inputs = tf.keras.Input(shape=(128, 128, input_dim), batch_size=1)
_ = network(inputs)
def test_serialize_deserialize(self):
# Create a network object that sets all of its config options.
kwargs = dict(
model_id=50,
use_sync_bn=False,
activation='relu',
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_initializer='VarianceScaling',
kernel_regularizer=None,
bias_regularizer=None,
)
network = resnet.ResNet(**kwargs)
expected_config = dict(kwargs)
self.assertEqual(network.get_config(), expected_config)
# Create another network object from the first object's config.
new_network = resnet.ResNet.from_config(network.get_config())
# Validate that the config can be forced to JSON.
_ = new_network.to_json()
# If the serialization was successful, the new config should match the old.
self.assertAllEqual(network.get_config(), new_network.get_config())
if __name__ == '__main__':
tf.test.main()
# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================="""
"""RevNet Implementation.
[1] Aidan N. Gomez, Mengye Ren, Raquel Urtasun, Roger B. Grosse
The Reversible Residual Network: Backpropagation Without Storing Activations
https://arxiv.org/pdf/1707.04585.pdf
"""
from typing import Any, Callable, Dict, Optional
# Import libraries
import tensorflow as tf
from official.modeling import tf_utils
from official.vision.beta.modeling.layers import nn_blocks
# Specifications for different RevNet variants.
# Each entry specifies block configurations of the particular RevNet variant.
# Each element in the block configuration is in the following format:
# (block_fn, num_filters, block_repeats)
REVNET_SPECS = {
38: [
('residual', 32, 3),
('residual', 64, 3),
('residual', 112, 3),
],
56: [
('bottleneck', 128, 2),
('bottleneck', 256, 2),
('bottleneck', 512, 3),
('bottleneck', 832, 2),
],
104: [
('bottleneck', 128, 2),
('bottleneck', 256, 2),
('bottleneck', 512, 11),
('bottleneck', 832, 2),
],
}
@tf.keras.utils.register_keras_serializable(package='Vision')
class RevNet(tf.keras.Model):
"""Reversible ResNet, RevNet implementation."""
def __init__(self,
model_id: int,
input_specs: tf.keras.layers.InputSpec
= tf.keras.layers.InputSpec(shape=[None, None, None, 3]),
activation: str = 'relu',
use_sync_bn: bool = False,
norm_momentum: float = 0.99,
norm_epsilon: float = 0.001,
kernel_initializer: str = 'VarianceScaling',
kernel_regularizer: tf.keras.regularizers.Regularizer = None,
**kwargs):
"""RevNet initialization function.
Args:
model_id: `int` depth/id of ResNet backbone model.
input_specs: `tf.keras.layers.InputSpec` specs of the input tensor.
activation: `str` name of the activation function.
use_sync_bn: `bool` if True, use synchronized batch normalization.
norm_momentum: `float` normalization omentum for the moving average.
norm_epsilon: `float` small float added to variance to avoid dividing by
zero.
kernel_initializer: `str` kernel_initializer for convolutional layers.
kernel_regularizer: `tf.keras.regularizers.Regularizer` for Conv2D.
**kwargs: additional keyword arguments to be passed.
"""
self._model_id = model_id
self._input_specs = input_specs
self._use_sync_bn = use_sync_bn
self._activation = activation
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
self._kernel_initializer = kernel_initializer
self._kernel_regularizer = kernel_regularizer
if use_sync_bn:
self._norm = tf.keras.layers.experimental.SyncBatchNormalization
else:
self._norm = tf.keras.layers.BatchNormalization
axis = -1 if tf.keras.backend.image_data_format() == 'channels_last' else 1
# Build RevNet.
inputs = tf.keras.Input(shape=input_specs.shape[1:])
x = tf.keras.layers.Conv2D(
filters=REVNET_SPECS[model_id][0][1],
kernel_size=7, strides=2, use_bias=False, padding='same',
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer)(inputs)
x = self._norm(
axis=axis, momentum=norm_momentum, epsilon=norm_epsilon)(x)
x = tf_utils.get_activation(activation)(x)
x = tf.keras.layers.MaxPool2D(pool_size=3, strides=2, padding='same')(x)
endpoints = {}
for i, spec in enumerate(REVNET_SPECS[model_id]):
if spec[0] == 'residual':
inner_block_fn = nn_blocks.ResidualInner
elif spec[0] == 'bottleneck':
inner_block_fn = nn_blocks.BottleneckResidualInner
else:
raise ValueError('Block fn `{}` is not supported.'.format(spec[0]))
if spec[1] % 2 != 0:
raise ValueError('Number of output filters must be even to ensure '
'splitting in channel dimension for reversible blocks')
x = self._block_group(
inputs=x,
filters=spec[1],
strides=(1 if i == 0 else 2),
inner_block_fn=inner_block_fn,
block_repeats=spec[2],
batch_norm_first=(i != 0), # Only skip on first block
name='revblock_group_{}'.format(i + 2))
endpoints[i + 2] = x
self._output_specs = {l: endpoints[l].get_shape() for l in endpoints}
super(RevNet, self).__init__(inputs=inputs, outputs=endpoints, **kwargs)
def _block_group(self,
inputs: tf.Tensor,
filters: int,
strides: int,
inner_block_fn: Callable[..., tf.keras.layers.Layer],
block_repeats: int,
batch_norm_first: bool,
name: str = 'revblock_group') -> tf.Tensor:
"""Creates one reversible block for RevNet model.
Args:
inputs: `Tensor` of size `[batch, channels, height, width]`.
filters: `int` number of filters for the first convolution of the layer.
strides: `int` stride to use for the first convolution of the layer. If
greater than 1, this block group will downsample the input.
inner_block_fn: Either `nn_blocks.ResidualInner` or
`nn_blocks.BottleneckResidualInner`.
block_repeats: `int` number of blocks contained in this block group.
batch_norm_first: `bool` whether to apply BatchNormalization and
activation layer before feeding into convolution layers.
name: `str`name for the block.
Returns:
The output `Tensor` of the block layer.
"""
x = inputs
for i in range(block_repeats):
is_first_block = i == 0
# Only first residual layer in block gets downsampled
curr_strides = strides if is_first_block else 1
f = inner_block_fn(
filters=filters // 2,
strides=curr_strides,
batch_norm_first=batch_norm_first and is_first_block,
kernel_regularizer=self._kernel_regularizer)
g = inner_block_fn(
filters=filters // 2,
strides=1,
batch_norm_first=batch_norm_first and is_first_block,
kernel_regularizer=self._kernel_regularizer)
x = nn_blocks.ReversibleLayer(f, g)(x)
return tf.identity(x, name=name)
def get_config(self) -> Dict[str, Any]:
config_dict = {
'model_id': self._model_id,
'activation': self._activation,
'use_sync_bn': self._use_sync_bn,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epsilon,
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
}
return config_dict
@classmethod
def from_config(cls,
config: Dict[str, Any],
custom_objects: Optional[Any] = None) -> tf.keras.Model:
return cls(**config)
@property
def output_specs(self) -> Dict[int, tf.TensorShape]:
"""A dict of {level: TensorShape} pairs for the model output."""
return self._output_specs
# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for RevNet."""
# Import libraries
from absl.testing import parameterized
import tensorflow as tf
from official.vision.beta.modeling.backbones import revnet
class RevNetTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
(128, 56, 4),
(128, 104, 4),
)
def test_network_creation(self, input_size, model_id,
endpoint_filter_scale):
"""Test creation of RevNet family models."""
tf.keras.backend.set_image_data_format('channels_last')
network = revnet.RevNet(model_id=model_id)
inputs = tf.keras.Input(shape=(input_size, input_size, 3), batch_size=1)
endpoints = network(inputs)
network.summary()
self.assertAllEqual(
[1, input_size / 2**2, input_size / 2**2, 128 * endpoint_filter_scale],
endpoints[2].shape.as_list())
self.assertAllEqual(
[1, input_size / 2**3, input_size / 2**3, 256 * endpoint_filter_scale],
endpoints[3].shape.as_list())
self.assertAllEqual(
[1, input_size / 2**4, input_size / 2**4, 512 * endpoint_filter_scale],
endpoints[4].shape.as_list())
self.assertAllEqual(
[1, input_size / 2**5, input_size / 2**5, 832 * endpoint_filter_scale],
endpoints[5].shape.as_list())
@parameterized.parameters(1, 3, 4)
def test_input_specs(self, input_dim):
"""Test different input feature dimensions."""
tf.keras.backend.set_image_data_format('channels_last')
input_specs = tf.keras.layers.InputSpec(shape=[None, None, None, input_dim])
network = revnet.RevNet(model_id=56, input_specs=input_specs)
inputs = tf.keras.Input(shape=(128, 128, input_dim), batch_size=1)
_ = network(inputs)
def test_serialize_deserialize(self):
# Create a network object that sets all of its config options.
kwargs = dict(
model_id=56,
activation='relu',
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_initializer='VarianceScaling',
kernel_regularizer=None,
)
network = revnet.RevNet(**kwargs)
expected_config = dict(kwargs)
self.assertEqual(network.get_config(), expected_config)
# Create another network object from the first object's config.
new_network = revnet.RevNet.from_config(network.get_config())
# Validate that the config can be forced to JSON.
_ = new_network.to_json()
# If the serialization was successful, the new config should match the old.
self.assertAllEqual(network.get_config(), new_network.get_config())
if __name__ == '__main__':
tf.test.main()
# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Implementation of SpineNet model.
X. Du, T-Y. Lin, P. Jin, G. Ghiasi, M. Tan, Y. Cui, Q. V. Le, X. Song
SpineNet: Learning Scale-Permuted Backbone for Recognition and Localization
https://arxiv.org/abs/1912.05027
"""
import math
# Import libraries
from absl import logging
import tensorflow as tf
from official.modeling import tf_utils
from official.vision.beta.modeling.layers import nn_blocks
from official.vision.beta.ops import spatial_transform_ops
layers = tf.keras.layers
FILTER_SIZE_MAP = {
1: 32,
2: 64,
3: 128,
4: 256,
5: 256,
6: 256,
7: 256,
}
# The fixed SpineNet architecture discovered by NAS.
# Each element represents a specification of a building block:
# (block_level, block_fn, (input_offset0, input_offset1), is_output).
SPINENET_BLOCK_SPECS = [
(2, 'bottleneck', (0, 1), False),
(4, 'residual', (0, 1), False),
(3, 'bottleneck', (2, 3), False),
(4, 'bottleneck', (2, 4), False),
(6, 'residual', (3, 5), False),
(4, 'bottleneck', (3, 5), False),
(5, 'residual', (6, 7), False),
(7, 'residual', (6, 8), False),
(5, 'bottleneck', (8, 9), False),
(5, 'bottleneck', (8, 10), False),
(4, 'bottleneck', (5, 10), True),
(3, 'bottleneck', (4, 10), True),
(5, 'bottleneck', (7, 12), True),
(7, 'bottleneck', (5, 14), True),
(6, 'bottleneck', (12, 14), True),
]
SCALING_MAP = {
'49S': {
'endpoints_num_filters': 128,
'filter_size_scale': 0.65,
'resample_alpha': 0.5,
'block_repeats': 1,
},
'49': {
'endpoints_num_filters': 256,
'filter_size_scale': 1.0,
'resample_alpha': 0.5,
'block_repeats': 1,
},
'96': {
'endpoints_num_filters': 256,
'filter_size_scale': 1.0,
'resample_alpha': 0.5,
'block_repeats': 2,
},
'143': {
'endpoints_num_filters': 256,
'filter_size_scale': 1.0,
'resample_alpha': 1.0,
'block_repeats': 3,
},
'190': {
'endpoints_num_filters': 512,
'filter_size_scale': 1.3,
'resample_alpha': 1.0,
'block_repeats': 4,
},
}
class BlockSpec(object):
"""A container class that specifies the block configuration for SpineNet."""
def __init__(self, level, block_fn, input_offsets, is_output):
self.level = level
self.block_fn = block_fn
self.input_offsets = input_offsets
self.is_output = is_output
def build_block_specs(block_specs=None):
"""Builds the list of BlockSpec objects for SpineNet."""
if not block_specs:
block_specs = SPINENET_BLOCK_SPECS
logging.info('Building SpineNet block specs: %s', block_specs)
return [BlockSpec(*b) for b in block_specs]
@tf.keras.utils.register_keras_serializable(package='Vision')
class SpineNet(tf.keras.Model):
"""Class to build SpineNet models."""
def __init__(self,
input_specs=tf.keras.layers.InputSpec(shape=[None, 640, 640, 3]),
min_level=3,
max_level=7,
block_specs=build_block_specs(),
endpoints_num_filters=256,
resample_alpha=0.5,
block_repeats=1,
filter_size_scale=1.0,
kernel_initializer='VarianceScaling',
kernel_regularizer=None,
bias_regularizer=None,
activation='relu',
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
**kwargs):
"""SpineNet model."""
self._input_specs = input_specs
self._min_level = min_level
self._max_level = max_level
self._block_specs = block_specs
self._endpoints_num_filters = endpoints_num_filters
self._resample_alpha = resample_alpha
self._block_repeats = block_repeats
self._filter_size_scale = filter_size_scale
self._kernel_initializer = kernel_initializer
self._kernel_regularizer = kernel_regularizer
self._bias_regularizer = bias_regularizer
self._activation = activation
self._use_sync_bn = use_sync_bn
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
if activation == 'relu':
self._activation_fn = tf.nn.relu
elif activation == 'swish':
self._activation_fn = tf.nn.swish
else:
raise ValueError('Activation {} not implemented.'.format(activation))
self._init_block_fn = 'bottleneck'
self._num_init_blocks = 2
if use_sync_bn:
self._norm = layers.experimental.SyncBatchNormalization
else:
self._norm = layers.BatchNormalization
if tf.keras.backend.image_data_format() == 'channels_last':
self._bn_axis = -1
else:
self._bn_axis = 1
# Build SpineNet.
inputs = tf.keras.Input(shape=input_specs.shape[1:])
net = self._build_stem(inputs=inputs)
net = self._build_scale_permuted_network(
net=net, input_width=input_specs.shape[1])
endpoints = self._build_endpoints(net=net)
self._output_specs = {l: endpoints[l].get_shape() for l in endpoints}
super(SpineNet, self).__init__(inputs=inputs, outputs=endpoints)
def _block_group(self,
inputs,
filters,
strides,
block_fn_cand,
block_repeats=1,
name='block_group'):
"""Creates one group of blocks for the SpineNet model."""
block_fn_candidates = {
'bottleneck': nn_blocks.BottleneckBlock,
'residual': nn_blocks.ResidualBlock,
}
block_fn = block_fn_candidates[block_fn_cand]
_, _, _, num_filters = inputs.get_shape().as_list()
if block_fn_cand == 'bottleneck':
use_projection = not (num_filters == (filters * 4) and strides == 1)
else:
use_projection = not (num_filters == filters and strides == 1)
x = block_fn(
filters=filters,
strides=strides,
use_projection=use_projection,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer,
activation=self._activation,
use_sync_bn=self._use_sync_bn,
norm_momentum=self._norm_momentum,
norm_epsilon=self._norm_epsilon)(
inputs)
for _ in range(1, block_repeats):
x = block_fn(
filters=filters,
strides=1,
use_projection=False,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer,
activation=self._activation,
use_sync_bn=self._use_sync_bn,
norm_momentum=self._norm_momentum,
norm_epsilon=self._norm_epsilon)(
x)
return tf.identity(x, name=name)
def _build_stem(self, inputs):
"""Build SpineNet stem."""
x = layers.Conv2D(
filters=64,
kernel_size=7,
strides=2,
use_bias=False,
padding='same',
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer)(
inputs)
x = self._norm(
axis=self._bn_axis,
momentum=self._norm_momentum,
epsilon=self._norm_epsilon)(
x)
x = tf_utils.get_activation(self._activation_fn)(x)
x = layers.MaxPool2D(pool_size=3, strides=2, padding='same')(x)
net = []
# Build the initial level 2 blocks.
for i in range(self._num_init_blocks):
x = self._block_group(
inputs=x,
filters=int(FILTER_SIZE_MAP[2] * self._filter_size_scale),
strides=1,
block_fn_cand=self._init_block_fn,
block_repeats=self._block_repeats,
name='stem_block_{}'.format(i + 1))
net.append(x)
return net
def _build_scale_permuted_network(self,
net,
input_width,
weighted_fusion=False):
"""Build scale-permuted network."""
net_sizes = [int(math.ceil(input_width / 2**2))] * len(net)
net_block_fns = [self._init_block_fn] * len(net)
num_outgoing_connections = [0] * len(net)
endpoints = {}
for i, block_spec in enumerate(self._block_specs):
# Find out specs for the target block.
target_width = int(math.ceil(input_width / 2**block_spec.level))
target_num_filters = int(FILTER_SIZE_MAP[block_spec.level] *
self._filter_size_scale)
target_block_fn = block_spec.block_fn
# Resample then merge input0 and input1.
parents = []
input0 = block_spec.input_offsets[0]
input1 = block_spec.input_offsets[1]
x0 = self._resample_with_alpha(
inputs=net[input0],
input_width=net_sizes[input0],
input_block_fn=net_block_fns[input0],
target_width=target_width,
target_num_filters=target_num_filters,
target_block_fn=target_block_fn,
alpha=self._resample_alpha)
parents.append(x0)
num_outgoing_connections[input0] += 1
x1 = self._resample_with_alpha(
inputs=net[input1],
input_width=net_sizes[input1],
input_block_fn=net_block_fns[input1],
target_width=target_width,
target_num_filters=target_num_filters,
target_block_fn=target_block_fn,
alpha=self._resample_alpha)
parents.append(x1)
num_outgoing_connections[input1] += 1
# Merge 0 outdegree blocks to the output block.
if block_spec.is_output:
for j, (j_feat,
j_connections) in enumerate(zip(net, num_outgoing_connections)):
if j_connections == 0 and (j_feat.shape[2] == target_width and
j_feat.shape[3] == x0.shape[3]):
parents.append(j_feat)
num_outgoing_connections[j] += 1
# pylint: disable=g-direct-tensorflow-import
if weighted_fusion:
dtype = parents[0].dtype
parent_weights = [
tf.nn.relu(tf.cast(tf.Variable(1.0, name='block{}_fusion{}'.format(
i, j)), dtype=dtype)) for j in range(len(parents))]
weights_sum = tf.add_n(parent_weights)
parents = [
parents[i] * parent_weights[i] / (weights_sum + 0.0001)
for i in range(len(parents))
]
# Fuse all parent nodes then build a new block.
x = tf_utils.get_activation(self._activation_fn)(tf.add_n(parents))
x = self._block_group(
inputs=x,
filters=target_num_filters,
strides=1,
block_fn_cand=target_block_fn,
block_repeats=self._block_repeats,
name='scale_permuted_block_{}'.format(i + 1))
net.append(x)
net_sizes.append(target_width)
net_block_fns.append(target_block_fn)
num_outgoing_connections.append(0)
# Save output feats.
if block_spec.is_output:
if block_spec.level in endpoints:
raise ValueError('Duplicate feats found for output level {}.'.format(
block_spec.level))
if (block_spec.level < self._min_level or
block_spec.level > self._max_level):
raise ValueError('Output level is out of range [{}, {}]'.format(
self._min_level, self._max_level))
endpoints[block_spec.level] = x
return endpoints
def _build_endpoints(self, net):
"""Match filter size for endpoints before sharing conv layers."""
endpoints = {}
for level in range(self._min_level, self._max_level + 1):
x = layers.Conv2D(
filters=self._endpoints_num_filters,
kernel_size=1,
strides=1,
use_bias=False,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer)(
net[level])
x = self._norm(
axis=self._bn_axis,
momentum=self._norm_momentum,
epsilon=self._norm_epsilon)(
x)
x = tf_utils.get_activation(self._activation_fn)(x)
endpoints[level] = x
return endpoints
def _resample_with_alpha(self,
inputs,
input_width,
input_block_fn,
target_width,
target_num_filters,
target_block_fn,
alpha=0.5):
"""Match resolution and feature dimension."""
_, _, _, input_num_filters = inputs.get_shape().as_list()
if input_block_fn == 'bottleneck':
input_num_filters /= 4
new_num_filters = int(input_num_filters * alpha)
x = layers.Conv2D(
filters=new_num_filters,
kernel_size=1,
strides=1,
use_bias=False,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer)(
inputs)
x = self._norm(
axis=self._bn_axis,
momentum=self._norm_momentum,
epsilon=self._norm_epsilon)(
x)
x = tf_utils.get_activation(self._activation_fn)(x)
# Spatial resampling.
if input_width > target_width:
x = layers.Conv2D(
filters=new_num_filters,
kernel_size=3,
strides=2,
padding='SAME',
use_bias=False,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer)(
x)
x = self._norm(
axis=self._bn_axis,
momentum=self._norm_momentum,
epsilon=self._norm_epsilon)(
x)
x = tf_utils.get_activation(self._activation_fn)(x)
input_width /= 2
while input_width > target_width:
x = layers.MaxPool2D(pool_size=3, strides=2, padding='SAME')(x)
input_width /= 2
elif input_width < target_width:
scale = target_width // input_width
x = spatial_transform_ops.nearest_upsampling(x, scale=scale)
# Last 1x1 conv to match filter size.
if target_block_fn == 'bottleneck':
target_num_filters *= 4
x = layers.Conv2D(
filters=target_num_filters,
kernel_size=1,
strides=1,
use_bias=False,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer)(
x)
x = self._norm(
axis=self._bn_axis,
momentum=self._norm_momentum,
epsilon=self._norm_epsilon)(
x)
return x
def get_config(self):
config_dict = {
'min_level': self._min_level,
'max_level': self._max_level,
'endpoints_num_filters': self._endpoints_num_filters,
'resample_alpha': self._resample_alpha,
'block_repeats': self._block_repeats,
'filter_size_scale': self._filter_size_scale,
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
'bias_regularizer': self._bias_regularizer,
'activation': self._activation,
'use_sync_bn': self._use_sync_bn,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epsilon
}
return config_dict
@classmethod
def from_config(cls, config, custom_objects=None):
return cls(**config)
@property
def output_specs(self):
"""A dict of {level: TensorShape} pairs for the model output."""
return self._output_specs
# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for SpineNet."""
# Import libraries
from absl.testing import parameterized
import tensorflow as tf
from official.vision.beta.modeling.backbones import spinenet
class SpineNetTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
(128, 0.65, 1, 0.5, 128),
(256, 1.0, 1, 0.5, 256),
(384, 1.0, 2, 0.5, 256),
(512, 1.0, 3, 1.0, 256),
(640, 1.3, 4, 1.0, 384),
)
def test_network_creation(self, input_size, filter_size_scale, block_repeats,
resample_alpha, endpoints_num_filters):
"""Test creation of SpineNet models."""
min_level = 3
max_level = 7
tf.keras.backend.set_image_data_format('channels_last')
input_specs = tf.keras.layers.InputSpec(
shape=[None, input_size, input_size, 3])
model = spinenet.SpineNet(
input_specs=input_specs,
min_level=min_level,
max_level=max_level,
endpoints_num_filters=endpoints_num_filters,
resample_alpha=resample_alpha,
block_repeats=block_repeats,
filter_size_scale=filter_size_scale,
)
inputs = tf.keras.Input(shape=(input_size, input_size, 3), batch_size=1)
endpoints = model(inputs)
for l in range(min_level, max_level + 1):
self.assertIn(l, endpoints.keys())
self.assertAllEqual(
[1, input_size / 2**l, input_size / 2**l, endpoints_num_filters],
endpoints[l].shape.as_list())
def test_serialize_deserialize(self):
# Create a network object that sets all of its config options.
kwargs = dict(
min_level=3,
max_level=7,
endpoints_num_filters=256,
resample_alpha=0.5,
block_repeats=1,
filter_size_scale=1.0,
use_sync_bn=False,
activation='relu',
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_initializer='VarianceScaling',
kernel_regularizer=None,
bias_regularizer=None,
)
network = spinenet.SpineNet(**kwargs)
expected_config = dict(kwargs)
self.assertEqual(network.get_config(), expected_config)
# Create another network object from the first object's config.
new_network = spinenet.SpineNet.from_config(network.get_config())
# Validate that the config can be forced to JSON.
_ = new_network.to_json()
# If the serialization was successful, the new config should match the old.
self.assertAllEqual(network.get_config(), new_network.get_config())
if __name__ == '__main__':
tf.test.main()
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Build classification models."""
# Import libraries
import tensorflow as tf
layers = tf.keras.layers
@tf.keras.utils.register_keras_serializable(package='Vision')
class ClassificationModel(tf.keras.Model):
"""A classification class builder."""
def __init__(self,
backbone,
num_classes,
input_specs=layers.InputSpec(shape=[None, None, None, 3]),
dropout_rate=0.0,
kernel_initializer='random_uniform',
kernel_regularizer=None,
bias_regularizer=None,
add_head_batch_norm=False,
use_sync_bn: bool = False,
norm_momentum: float = 0.99,
norm_epsilon: float = 0.001,
**kwargs):
"""Classification initialization function.
Args:
backbone: a backbone network.
num_classes: `int` number of classes in classification task.
input_specs: `tf.keras.layers.InputSpec` specs of the input tensor.
dropout_rate: `float` rate for dropout regularization.
kernel_initializer: kernel initializer for the dense layer.
kernel_regularizer: tf.keras.regularizers.Regularizer object. Default to
None.
bias_regularizer: tf.keras.regularizers.Regularizer object. Default to
None.
add_head_batch_norm: `bool` whether to add a batch normalization layer
before pool.
use_sync_bn: `bool` if True, use synchronized batch normalization.
norm_momentum: `float` normalization momentum for the moving average.
norm_epsilon: `float` small float added to variance to avoid dividing by
zero.
**kwargs: keyword arguments to be passed.
"""
self._self_setattr_tracking = False
self._config_dict = {
'backbone': backbone,
'num_classes': num_classes,
'input_specs': input_specs,
'dropout_rate': dropout_rate,
'kernel_initializer': kernel_initializer,
'kernel_regularizer': kernel_regularizer,
'bias_regularizer': bias_regularizer,
'add_head_batch_norm': add_head_batch_norm,
'use_sync_bn': use_sync_bn,
'norm_momentum': norm_momentum,
'norm_epsilon': norm_epsilon,
}
self._input_specs = input_specs
self._kernel_regularizer = kernel_regularizer
self._bias_regularizer = bias_regularizer
self._backbone = backbone
if use_sync_bn:
self._norm = tf.keras.layers.experimental.SyncBatchNormalization
else:
self._norm = tf.keras.layers.BatchNormalization
axis = -1 if tf.keras.backend.image_data_format() == 'channels_last' else 1
inputs = tf.keras.Input(shape=input_specs.shape[1:])
endpoints = backbone(inputs)
x = endpoints[max(endpoints.keys())]
if add_head_batch_norm:
x = self._norm(axis=axis, momentum=norm_momentum, epsilon=norm_epsilon)(x)
x = tf.keras.layers.GlobalAveragePooling2D()(x)
x = tf.keras.layers.Dropout(dropout_rate)(x)
x = tf.keras.layers.Dense(
num_classes, kernel_initializer=kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer)(
x)
super(ClassificationModel, self).__init__(
inputs=inputs, outputs=x, **kwargs)
@property
def checkpoint_items(self):
"""Returns a dictionary of items to be additionally checkpointed."""
return dict(backbone=self.backbone)
@property
def backbone(self):
return self._backbone
def get_config(self):
return self._config_dict
@classmethod
def from_config(cls, config, custom_objects=None):
return cls(**config)
# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for classification network."""
# Import libraries
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from tensorflow.python.distribute import combinations
from tensorflow.python.distribute import strategy_combinations
from official.vision.beta.modeling import backbones
from official.vision.beta.modeling import classification_model
class ClassificationNetworkTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
(128, 50, 'relu'),
(128, 50, 'relu'),
(128, 50, 'swish'),
)
def test_resnet_network_creation(
self, input_size, resnet_model_id, activation):
"""Test for creation of a ResNet-50 classifier."""
inputs = np.random.rand(2, input_size, input_size, 3)
tf.keras.backend.set_image_data_format('channels_last')
backbone = backbones.ResNet(
model_id=resnet_model_id, activation=activation)
self.assertEqual(backbone.count_params(), 23561152)
num_classes = 1000
model = classification_model.ClassificationModel(
backbone=backbone,
num_classes=num_classes,
dropout_rate=0.2,
)
self.assertEqual(model.count_params(), 25610152)
logits = model(inputs)
self.assertAllEqual([2, num_classes], logits.numpy().shape)
def test_revnet_network_creation(self):
"""Test for creation of a RevNet-56 classifier."""
revnet_model_id = 56
inputs = np.random.rand(2, 224, 224, 3)
tf.keras.backend.set_image_data_format('channels_last')
backbone = backbones.RevNet(model_id=revnet_model_id)
self.assertEqual(backbone.count_params(), 19473792)
num_classes = 1000
model = classification_model.ClassificationModel(
backbone=backbone,
num_classes=num_classes,
dropout_rate=0.2,
add_head_batch_norm=True,
)
self.assertEqual(model.count_params(), 22816104)
logits = model(inputs)
self.assertAllEqual([2, num_classes], logits.numpy().shape)
@combinations.generate(
combinations.combine(
strategy=[
strategy_combinations.tpu_strategy,
strategy_combinations.one_device_strategy_gpu,
],
use_sync_bn=[False, True],
))
def test_sync_bn_multiple_devices(self, strategy, use_sync_bn):
"""Test for sync bn on TPU and GPU devices."""
inputs = np.random.rand(64, 128, 128, 3)
tf.keras.backend.set_image_data_format('channels_last')
with strategy.scope():
backbone = backbones.ResNet(model_id=50, use_sync_bn=use_sync_bn)
model = classification_model.ClassificationModel(
backbone=backbone,
num_classes=1000,
dropout_rate=0.2,
)
_ = model(inputs)
@combinations.generate(
combinations.combine(
strategy=[
strategy_combinations.one_device_strategy_gpu,
],
data_format=['channels_last', 'channels_first'],
input_dim=[1, 3, 4]))
def test_data_format_gpu(self, strategy, data_format, input_dim):
"""Test for different data formats on GPU devices."""
if data_format == 'channels_last':
inputs = np.random.rand(2, 128, 128, input_dim)
else:
inputs = np.random.rand(2, input_dim, 128, 128)
input_specs = tf.keras.layers.InputSpec(shape=inputs.shape)
tf.keras.backend.set_image_data_format(data_format)
with strategy.scope():
backbone = backbones.ResNet(model_id=50, input_specs=input_specs)
model = classification_model.ClassificationModel(
backbone=backbone,
num_classes=1000,
input_specs=input_specs,
)
_ = model(inputs)
def test_serialize_deserialize(self):
"""Validate the classification network can be serialized and deserialized."""
tf.keras.backend.set_image_data_format('channels_last')
backbone = backbones.ResNet(model_id=50)
model = classification_model.ClassificationModel(
backbone=backbone, num_classes=1000)
config = model.get_config()
new_model = classification_model.ClassificationModel.from_config(config)
# Validate that the config can be forced to JSON.
_ = new_model.to_json()
# If the serialization was successful, the new config should match the old.
self.assertAllEqual(model.get_config(), new_model.get_config())
if __name__ == '__main__':
tf.test.main()
# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Decoders package definition."""
from official.vision.beta.modeling.decoders.fpn import FPN
# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""factory method."""
# Import libraries
import tensorflow as tf
from official.vision.beta.modeling import decoders
def build_decoder(input_specs,
model_config,
l2_regularizer: tf.keras.regularizers.Regularizer = None):
"""Builds decoder from a config.
Args:
input_specs: `dict` input specifications. A dictionary consists of
{level: TensorShape} from a backbone.
model_config: A OneOfConfig. Model config.
l2_regularizer: tf.keras.regularizers.Regularizer instance. Default to None.
Returns:
tf.keras.Model instance of the decoder.
"""
decoder_type = model_config.decoder.type
decoder_cfg = model_config.decoder.get()
norm_activation_config = model_config.norm_activation
if decoder_type == 'identity':
decoder = None
elif decoder_type == 'fpn':
decoder = decoders.FPN(
input_specs=input_specs,
min_level=model_config.min_level,
max_level=model_config.max_level,
num_filters=decoder_cfg.num_filters,
use_separable_conv=decoder_cfg.use_separable_conv,
activation=norm_activation_config.activation,
use_sync_bn=norm_activation_config.use_sync_bn,
norm_momentum=norm_activation_config.norm_momentum,
norm_epsilon=norm_activation_config.norm_epsilon,
kernel_regularizer=l2_regularizer)
else:
raise ValueError('Decoder {!r} not implement'.format(decoder_type))
return decoder
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Feature Pyramid Networks.
Feature Pyramid Networks were proposed in:
[1] Tsung-Yi Lin, Piotr Dollar, Ross Girshick, Kaiming He, Bharath Hariharan,
, and Serge Belongie
Feature Pyramid Networks for Object Detection. CVPR 2017.
"""
# Import libraries
import tensorflow as tf
from official.modeling import tf_utils
from official.vision.beta.ops import spatial_transform_ops
@tf.keras.utils.register_keras_serializable(package='Vision')
class FPN(tf.keras.Model):
"""Feature pyramid network."""
def __init__(self,
input_specs,
min_level=3,
max_level=7,
num_filters=256,
use_separable_conv=False,
activation='relu',
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_initializer='VarianceScaling',
kernel_regularizer=None,
bias_regularizer=None,
**kwargs):
"""FPN initialization function.
Args:
input_specs: `dict` input specifications. A dictionary consists of
{level: TensorShape} from a backbone.
min_level: `int` minimum level in FPN output feature maps.
max_level: `int` maximum level in FPN output feature maps.
num_filters: `int` number of filters in FPN layers.
use_separable_conv: `bool`, if True use separable convolution for
convolution in FPN layers.
activation: `str` name of the activation function.
use_sync_bn: if True, use synchronized batch normalization.
norm_momentum: `float` normalization omentum for the moving average.
norm_epsilon: `float` small float added to variance to avoid dividing by
zero.
kernel_initializer: kernel_initializer for convolutional layers.
kernel_regularizer: tf.keras.regularizers.Regularizer object for Conv2D.
bias_regularizer: tf.keras.regularizers.Regularizer object for Conv2d.
**kwargs: keyword arguments to be passed.
"""
self._config_dict = {
'input_specs': input_specs,
'min_level': min_level,
'max_level': max_level,
'num_filters': num_filters,
'use_separable_conv': use_separable_conv,
'activation': activation,
'use_sync_bn': use_sync_bn,
'norm_momentum': norm_momentum,
'norm_epsilon': norm_epsilon,
'kernel_initializer': kernel_initializer,
'kernel_regularizer': kernel_regularizer,
'bias_regularizer': bias_regularizer,
}
if use_separable_conv:
conv2d = tf.keras.layers.SeparableConv2D
else:
conv2d = tf.keras.layers.Conv2D
if use_sync_bn:
norm = tf.keras.layers.experimental.SyncBatchNormalization
else:
norm = tf.keras.layers.BatchNormalization
activation_fn = tf.keras.layers.Activation(
tf_utils.get_activation(activation))
# Build input feature pyramid.
if tf.keras.backend.image_data_format() == 'channels_last':
bn_axis = -1
else:
bn_axis = 1
# Get input feature pyramid from backbone.
inputs = self._build_input_pyramid(input_specs, min_level)
backbone_max_level = min(max(inputs.keys()), max_level)
# Build lateral connections.
feats_lateral = {}
for level in range(min_level, backbone_max_level + 1):
feats_lateral[level] = conv2d(
filters=num_filters,
kernel_size=1,
padding='same',
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
bias_regularizer=bias_regularizer)(
inputs[level])
# Build top-down path.
feats = {backbone_max_level: feats_lateral[backbone_max_level]}
for level in range(backbone_max_level - 1, min_level - 1, -1):
feats[level] = spatial_transform_ops.nearest_upsampling(
feats[level + 1], 2) + feats_lateral[level]
# TODO(xianzhi): consider to remove bias in conv2d.
# Build post-hoc 3x3 convolution kernel.
for level in range(min_level, backbone_max_level + 1):
feats[level] = conv2d(
filters=num_filters,
strides=1,
kernel_size=3,
padding='same',
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
bias_regularizer=bias_regularizer)(
feats[level])
# TODO(xianzhi): consider to remove bias in conv2d.
# Build coarser FPN levels introduced for RetinaNet.
for level in range(backbone_max_level + 1, max_level + 1):
feats_in = feats[level - 1]
if level > backbone_max_level + 1:
feats_in = activation_fn(feats_in)
feats[level] = conv2d(
filters=num_filters,
strides=2,
kernel_size=3,
padding='same',
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
bias_regularizer=bias_regularizer)(
feats_in)
# Apply batch norm layers.
for level in range(min_level, max_level + 1):
feats[level] = norm(
axis=bn_axis, momentum=norm_momentum, epsilon=norm_epsilon)(
feats[level])
self._output_specs = {
level: feats[level].get_shape()
for level in range(min_level, max_level + 1)
}
super(FPN, self).__init__(inputs=inputs, outputs=feats, **kwargs)
def _build_input_pyramid(self, input_specs, min_level):
assert isinstance(input_specs, dict)
if min(input_specs.keys()) > min_level:
raise ValueError(
'Backbone min level should be less or equal to FPN min level')
inputs = {}
for level, spec in input_specs.items():
inputs[level] = tf.keras.Input(shape=spec[1:])
return inputs
def get_config(self):
return self._config_dict
@classmethod
def from_config(cls, config, custom_objects=None):
return cls(**config)
@property
def output_specs(self):
"""A dict of {level: TensorShape} pairs for the model output."""
return self._output_specs
# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for resnet."""
# Import libraries
from absl.testing import parameterized
import tensorflow as tf
from official.vision.beta.modeling.backbones import resnet
from official.vision.beta.modeling.decoders import fpn
class FPNTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
(256, 3, 7, False),
(256, 3, 7, True),
)
def test_network_creation(self, input_size, min_level, max_level,
use_separable_conv):
"""Test creation of FPN."""
tf.keras.backend.set_image_data_format('channels_last')
inputs = tf.keras.Input(shape=(input_size, input_size, 3), batch_size=1)
backbone = resnet.ResNet(model_id=50)
network = fpn.FPN(
input_specs=backbone.output_specs,
min_level=min_level,
max_level=max_level,
use_separable_conv=use_separable_conv)
endpoints = backbone(inputs)
feats = network(endpoints)
for level in range(min_level, max_level + 1):
self.assertIn(level, feats)
self.assertAllEqual(
[1, input_size // 2**level, input_size // 2**level, 256],
feats[level].shape.as_list())
def test_serialize_deserialize(self):
# Create a network object that sets all of its config options.
kwargs = dict(
input_specs=resnet.ResNet(model_id=50).output_specs,
min_level=3,
max_level=7,
num_filters=256,
use_separable_conv=False,
use_sync_bn=False,
activation='relu',
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_initializer='VarianceScaling',
kernel_regularizer=None,
bias_regularizer=None,
)
network = fpn.FPN(**kwargs)
expected_config = dict(kwargs)
self.assertEqual(network.get_config(), expected_config)
# Create another network object from the first object's config.
new_network = fpn.FPN.from_config(network.get_config())
# Validate that the config can be forced to JSON.
_ = new_network.to_json()
# If the serialization was successful, the new config should match the old.
self.assertAllEqual(network.get_config(), new_network.get_config())
if __name__ == '__main__':
tf.test.main()
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Factory methods to build models."""
# Import libraries
import tensorflow as tf
from official.vision.beta.configs import image_classification as classification_cfg
from official.vision.beta.configs import maskrcnn as maskrcnn_cfg
from official.vision.beta.configs import retinanet as retinanet_cfg
from official.vision.beta.configs import video_classification as video_classification_cfg
from official.vision.beta.modeling import classification_model
from official.vision.beta.modeling import maskrcnn_model
from official.vision.beta.modeling import retinanet_model
from official.vision.beta.modeling import video_classification_model
from official.vision.beta.modeling.backbones import factory as backbone_factory
from official.vision.beta.modeling.decoders import factory as decoder_factory
from official.vision.beta.modeling.heads import dense_prediction_heads
from official.vision.beta.modeling.heads import instance_heads
from official.vision.beta.modeling.layers import detection_generator
from official.vision.beta.modeling.layers import mask_sampler
from official.vision.beta.modeling.layers import roi_aligner
from official.vision.beta.modeling.layers import roi_generator
from official.vision.beta.modeling.layers import roi_sampler
def build_classification_model(
input_specs: tf.keras.layers.InputSpec,
model_config: classification_cfg.ImageClassificationModel,
l2_regularizer: tf.keras.regularizers.Regularizer = None):
"""Builds the classification model."""
backbone = backbone_factory.build_backbone(
input_specs=input_specs,
model_config=model_config,
l2_regularizer=l2_regularizer)
norm_activation_config = model_config.norm_activation
model = classification_model.ClassificationModel(
backbone=backbone,
num_classes=model_config.num_classes,
input_specs=input_specs,
dropout_rate=model_config.dropout_rate,
kernel_regularizer=l2_regularizer,
add_head_batch_norm=model_config.add_head_batch_norm,
use_sync_bn=norm_activation_config.use_sync_bn,
norm_momentum=norm_activation_config.norm_momentum,
norm_epsilon=norm_activation_config.norm_epsilon)
return model
def build_maskrcnn(input_specs: tf.keras.layers.InputSpec,
model_config: maskrcnn_cfg.MaskRCNN,
l2_regularizer: tf.keras.regularizers.Regularizer = None):
"""Builds Mask R-CNN model."""
backbone = backbone_factory.build_backbone(
input_specs=input_specs,
model_config=model_config,
l2_regularizer=l2_regularizer)
decoder = decoder_factory.build_decoder(
input_specs=backbone.output_specs,
model_config=model_config,
l2_regularizer=l2_regularizer)
rpn_head_config = model_config.rpn_head
roi_generator_config = model_config.roi_generator
roi_sampler_config = model_config.roi_sampler
roi_aligner_config = model_config.roi_aligner
detection_head_config = model_config.detection_head
generator_config = model_config.detection_generator
norm_activation_config = model_config.norm_activation
num_anchors_per_location = (
len(model_config.anchor.aspect_ratios) * model_config.anchor.num_scales)
rpn_head = dense_prediction_heads.RPNHead(
min_level=model_config.min_level,
max_level=model_config.max_level,
num_anchors_per_location=num_anchors_per_location,
num_convs=rpn_head_config.num_convs,
num_filters=rpn_head_config.num_filters,
use_separable_conv=rpn_head_config.use_separable_conv,
activation=norm_activation_config.activation,
use_sync_bn=norm_activation_config.use_sync_bn,
norm_momentum=norm_activation_config.norm_momentum,
norm_epsilon=norm_activation_config.norm_epsilon,
kernel_regularizer=l2_regularizer)
detection_head = instance_heads.DetectionHead(
num_classes=model_config.num_classes,
num_convs=detection_head_config.num_convs,
num_filters=detection_head_config.num_filters,
use_separable_conv=detection_head_config.use_separable_conv,
num_fcs=detection_head_config.num_fcs,
fc_dims=detection_head_config.fc_dims,
activation=norm_activation_config.activation,
use_sync_bn=norm_activation_config.use_sync_bn,
norm_momentum=norm_activation_config.norm_momentum,
norm_epsilon=norm_activation_config.norm_epsilon,
kernel_regularizer=l2_regularizer)
roi_generator_obj = roi_generator.MultilevelROIGenerator(
pre_nms_top_k=roi_generator_config.pre_nms_top_k,
pre_nms_score_threshold=roi_generator_config.pre_nms_score_threshold,
pre_nms_min_size_threshold=(
roi_generator_config.pre_nms_min_size_threshold),
nms_iou_threshold=roi_generator_config.nms_iou_threshold,
num_proposals=roi_generator_config.num_proposals,
test_pre_nms_top_k=roi_generator_config.test_pre_nms_top_k,
test_pre_nms_score_threshold=(
roi_generator_config.test_pre_nms_score_threshold),
test_pre_nms_min_size_threshold=(
roi_generator_config.test_pre_nms_min_size_threshold),
test_nms_iou_threshold=roi_generator_config.test_nms_iou_threshold,
test_num_proposals=roi_generator_config.test_num_proposals,
use_batched_nms=roi_generator_config.use_batched_nms)
roi_sampler_obj = roi_sampler.ROISampler(
mix_gt_boxes=roi_sampler_config.mix_gt_boxes,
num_sampled_rois=roi_sampler_config.num_sampled_rois,
foreground_fraction=roi_sampler_config.foreground_fraction,
foreground_iou_threshold=roi_sampler_config.foreground_iou_threshold,
background_iou_high_threshold=(
roi_sampler_config.background_iou_high_threshold),
background_iou_low_threshold=(
roi_sampler_config.background_iou_low_threshold))
roi_aligner_obj = roi_aligner.MultilevelROIAligner(
crop_size=roi_aligner_config.crop_size,
sample_offset=roi_aligner_config.sample_offset)
detection_generator_obj = detection_generator.DetectionGenerator(
apply_nms=True,
pre_nms_top_k=generator_config.pre_nms_top_k,
pre_nms_score_threshold=generator_config.pre_nms_score_threshold,
nms_iou_threshold=generator_config.nms_iou_threshold,
max_num_detections=generator_config.max_num_detections,
use_batched_nms=generator_config.use_batched_nms)
if model_config.include_mask:
mask_head = instance_heads.MaskHead(
num_classes=model_config.num_classes,
upsample_factor=model_config.mask_head.upsample_factor,
num_convs=model_config.mask_head.num_convs,
num_filters=model_config.mask_head.num_filters,
use_separable_conv=model_config.mask_head.use_separable_conv,
activation=model_config.norm_activation.activation,
norm_momentum=model_config.norm_activation.norm_momentum,
norm_epsilon=model_config.norm_activation.norm_epsilon,
kernel_regularizer=l2_regularizer)
mask_sampler_obj = mask_sampler.MaskSampler(
mask_target_size=(
model_config.mask_roi_aligner.crop_size *
model_config.mask_head.upsample_factor),
num_sampled_masks=model_config.mask_sampler.num_sampled_masks)
mask_roi_aligner_obj = roi_aligner.MultilevelROIAligner(
crop_size=model_config.mask_roi_aligner.crop_size,
sample_offset=model_config.mask_roi_aligner.sample_offset)
else:
mask_head = None
mask_sampler_obj = None
mask_roi_aligner_obj = None
model = maskrcnn_model.MaskRCNNModel(
backbone=backbone,
decoder=decoder,
rpn_head=rpn_head,
detection_head=detection_head,
roi_generator=roi_generator_obj,
roi_sampler=roi_sampler_obj,
roi_aligner=roi_aligner_obj,
detection_generator=detection_generator_obj,
mask_head=mask_head,
mask_sampler=mask_sampler_obj,
mask_roi_aligner=mask_roi_aligner_obj)
return model
def build_retinanet(input_specs: tf.keras.layers.InputSpec,
model_config: retinanet_cfg.RetinaNet,
l2_regularizer: tf.keras.regularizers.Regularizer = None):
"""Builds RetinaNet model."""
backbone = backbone_factory.build_backbone(
input_specs=input_specs,
model_config=model_config,
l2_regularizer=l2_regularizer)
decoder = decoder_factory.build_decoder(
input_specs=backbone.output_specs,
model_config=model_config,
l2_regularizer=l2_regularizer)
head_config = model_config.head
generator_config = model_config.detection_generator
norm_activation_config = model_config.norm_activation
num_anchors_per_location = (
len(model_config.anchor.aspect_ratios) * model_config.anchor.num_scales)
head = dense_prediction_heads.RetinaNetHead(
min_level=model_config.min_level,
max_level=model_config.max_level,
num_classes=model_config.num_classes,
num_anchors_per_location=num_anchors_per_location,
num_convs=head_config.num_convs,
num_filters=head_config.num_filters,
use_separable_conv=head_config.use_separable_conv,
activation=norm_activation_config.activation,
use_sync_bn=norm_activation_config.use_sync_bn,
norm_momentum=norm_activation_config.norm_momentum,
norm_epsilon=norm_activation_config.norm_epsilon,
kernel_regularizer=l2_regularizer)
detection_generator_obj = detection_generator.MultilevelDetectionGenerator(
apply_nms=True,
pre_nms_top_k=generator_config.pre_nms_top_k,
pre_nms_score_threshold=generator_config.pre_nms_score_threshold,
nms_iou_threshold=generator_config.nms_iou_threshold,
max_num_detections=generator_config.max_num_detections,
use_batched_nms=generator_config.use_batched_nms)
model = retinanet_model.RetinaNetModel(
backbone, decoder, head, detection_generator_obj)
return model
def build_video_classification_model(
input_specs: tf.keras.layers.InputSpec,
model_config: video_classification_cfg.VideoClassificationModel,
num_classes: int,
l2_regularizer: tf.keras.regularizers.Regularizer = None):
"""Builds the video classification model."""
backbone = backbone_factory.build_backbone_3d(
input_specs=input_specs,
model_config=model_config,
l2_regularizer=l2_regularizer)
norm_activation_config = model_config.norm_activation
model = video_classification_model.VideoClassificationModel(
backbone=backbone,
num_classes=num_classes,
input_specs=input_specs,
dropout_rate=model_config.dropout_rate,
kernel_regularizer=l2_regularizer,
add_head_batch_norm=model_config.add_head_batch_norm,
use_sync_bn=norm_activation_config.use_sync_bn,
norm_momentum=norm_activation_config.norm_momentum,
norm_epsilon=norm_activation_config.norm_epsilon)
return model
# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for factory.py."""
# Import libraries
from absl.testing import parameterized
import tensorflow as tf
from official.vision.beta.configs import backbones
from official.vision.beta.configs import backbones_3d
from official.vision.beta.configs import image_classification as classification_cfg
from official.vision.beta.configs import maskrcnn as maskrcnn_cfg
from official.vision.beta.configs import retinanet as retinanet_cfg
from official.vision.beta.configs import video_classification as video_classification_cfg
from official.vision.beta.modeling import factory
class ClassificationModelBuilderTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
('resnet', (224, 224), 5e-5),
('resnet', (224, 224), None),
('resnet', (None, None), 5e-5),
('resnet', (None, None), None),
)
def test_builder(self, backbone_type, input_size, weight_decay):
num_classes = 2
input_specs = tf.keras.layers.InputSpec(
shape=[None, input_size[0], input_size[1], 3])
model_config = classification_cfg.ImageClassificationModel(
num_classes=num_classes,
backbone=backbones.Backbone(type=backbone_type))
l2_regularizer = (
tf.keras.regularizers.l2(weight_decay) if weight_decay else None)
_ = factory.build_classification_model(
input_specs=input_specs,
model_config=model_config,
l2_regularizer=l2_regularizer)
class MaskRCNNBuilderTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
('resnet', (640, 640)),
('resnet', (None, None)),
)
def test_builder(self, backbone_type, input_size):
num_classes = 2
input_specs = tf.keras.layers.InputSpec(
shape=[None, input_size[0], input_size[1], 3])
model_config = maskrcnn_cfg.MaskRCNN(
num_classes=num_classes,
backbone=backbones.Backbone(type=backbone_type))
l2_regularizer = tf.keras.regularizers.l2(5e-5)
_ = factory.build_maskrcnn(
input_specs=input_specs,
model_config=model_config,
l2_regularizer=l2_regularizer)
class RetinaNetBuilderTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
('resnet', (640, 640)),
('resnet', (None, None)),
)
def test_builder(self, backbone_type, input_size):
num_classes = 2
input_specs = tf.keras.layers.InputSpec(
shape=[None, input_size[0], input_size[1], 3])
model_config = retinanet_cfg.RetinaNet(
num_classes=num_classes,
backbone=backbones.Backbone(type=backbone_type))
l2_regularizer = tf.keras.regularizers.l2(5e-5)
_ = factory.build_retinanet(
input_specs=input_specs,
model_config=model_config,
l2_regularizer=l2_regularizer)
class VideoClassificationModelBuilderTest(parameterized.TestCase,
tf.test.TestCase):
@parameterized.parameters(
('resnet_3d', (8, 224, 224), 5e-5),
('resnet_3d', (None, None, None), 5e-5),
)
def test_builder(self, backbone_type, input_size, weight_decay):
input_specs = tf.keras.layers.InputSpec(
shape=[None, input_size[0], input_size[1], input_size[2], 3])
model_config = video_classification_cfg.VideoClassificationModel(
backbone=backbones_3d.Backbone3D(type=backbone_type))
l2_regularizer = (
tf.keras.regularizers.l2(weight_decay) if weight_decay else None)
_ = factory.build_video_classification_model(
input_specs=input_specs,
model_config=model_config,
num_classes=2,
l2_regularizer=l2_regularizer)
if __name__ == '__main__':
tf.test.main()
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Dense prediction heads."""
# Import libraries
import numpy as np
import tensorflow as tf
from official.modeling import tf_utils
@tf.keras.utils.register_keras_serializable(package='Vision')
class RetinaNetHead(tf.keras.layers.Layer):
"""RetinaNet head."""
def __init__(self,
min_level,
max_level,
num_classes,
num_anchors_per_location,
num_convs=4,
num_filters=256,
use_separable_conv=False,
activation='relu',
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_regularizer=None,
bias_regularizer=None,
**kwargs):
"""Initialize params to build RetinaNet head.
Args:
min_level: `int` number of minimum feature level.
max_level: `int` number of maximum feature level.
num_classes: `int` number of classes to predict.
num_anchors_per_location: `int` number of number of anchors per pixel
location.
num_convs: `int` number that represents the number of the intermediate
conv layers before the prediction.
num_filters: `int` number that represents the number of filters of the
intermediate conv layers.
use_separable_conv: `bool`, indicating whether the separable conv layers
is used.
activation: `string`, indicating which activation is used, e.g. 'relu',
'swish', etc.
use_sync_bn: `bool`, whether to use synchronized batch normalization
across different replicas.
norm_momentum: `float`, the momentum parameter of the normalization
layers.
norm_epsilon: `float`, the epsilon parameter of the normalization layers.
kernel_regularizer: `tf.keras.regularizers.Regularizer` object for layer
kernal.
bias_regularizer: `tf.keras.regularizers.Regularizer` object for bias.
**kwargs: other keyword arguments passed to Layer.
"""
super(RetinaNetHead, self).__init__(**kwargs)
self._config_dict = {
'min_level': min_level,
'max_level': max_level,
'num_classes': num_classes,
'num_anchors_per_location': num_anchors_per_location,
'num_convs': num_convs,
'num_filters': num_filters,
'use_separable_conv': use_separable_conv,
'activation': activation,
'use_sync_bn': use_sync_bn,
'norm_momentum': norm_momentum,
'norm_epsilon': norm_epsilon,
'kernel_regularizer': kernel_regularizer,
'bias_regularizer': bias_regularizer,
}
if tf.keras.backend.image_data_format() == 'channels_last':
self._bn_axis = -1
else:
self._bn_axis = 1
self._activation = tf_utils.get_activation(activation)
def build(self, input_shape):
"""Creates the variables of the head."""
conv_op = (tf.keras.layers.SeparableConv2D
if self._config_dict['use_separable_conv']
else tf.keras.layers.Conv2D)
conv_kwargs = {
'filters': self._config_dict['num_filters'],
'kernel_size': 3,
'padding': 'same',
'bias_initializer': tf.zeros_initializer(),
'bias_regularizer': self._config_dict['bias_regularizer'],
}
if self._config_dict['use_separable_conv']:
conv_kwargs.update({
'depthwise_initializer': tf.keras.initializers.RandomNormal(
stddev=0.01),
'pointwise_initializer': tf.keras.initializers.RandomNormal(
stddev=0.01),
'depthwise_regularizer': self._config_dict['kernel_regularizer'],
'pointwise_regularizer': self._config_dict['kernel_regularizer'],
})
else:
conv_kwargs.update({
'kernel_initializer': tf.keras.initializers.RandomNormal(
stddev=0.01),
'kernel_regularizer': self._config_dict['kernel_regularizer'],
})
bn_op = (tf.keras.layers.experimental.SyncBatchNormalization
if self._config_dict['use_sync_bn']
else tf.keras.layers.BatchNormalization)
bn_kwargs = {
'axis': self._bn_axis,
'momentum': self._config_dict['norm_momentum'],
'epsilon': self._config_dict['norm_epsilon'],
}
# Class net.
self._cls_convs = []
self._cls_norms = []
for level in range(
self._config_dict['min_level'], self._config_dict['max_level'] + 1):
this_level_cls_norms = []
for i in range(self._config_dict['num_convs']):
if level == self._config_dict['min_level']:
cls_conv_name = 'classnet-conv_{}'.format(i)
self._cls_convs.append(conv_op(name=cls_conv_name, **conv_kwargs))
cls_norm_name = 'classnet-conv-norm_{}_{}'.format(level, i)
this_level_cls_norms.append(bn_op(name=cls_norm_name, **bn_kwargs))
self._cls_norms.append(this_level_cls_norms)
classifier_kwargs = {
'filters': (
self._config_dict['num_classes'] *
self._config_dict['num_anchors_per_location']),
'kernel_size': 3,
'padding': 'same',
'bias_initializer': tf.constant_initializer(-np.log((1 - 0.01) / 0.01)),
'bias_regularizer': self._config_dict['bias_regularizer'],
}
if self._config_dict['use_separable_conv']:
classifier_kwargs.update({
'depthwise_initializer': tf.keras.initializers.RandomNormal(
stddev=1e-5),
'pointwise_initializer': tf.keras.initializers.RandomNormal(
stddev=1e-5),
'depthwise_regularizer': self._config_dict['kernel_regularizer'],
'pointwise_regularizer': self._config_dict['kernel_regularizer'],
})
else:
classifier_kwargs.update({
'kernel_initializer': tf.keras.initializers.RandomNormal(
stddev=1e-5),
'kernel_regularizer': self._config_dict['kernel_regularizer'],
})
self._classifier = conv_op(name='scores', **classifier_kwargs)
# Box net.
self._box_convs = []
self._box_norms = []
for level in range(
self._config_dict['min_level'], self._config_dict['max_level'] + 1):
this_level_box_norms = []
for i in range(self._config_dict['num_convs']):
if level == self._config_dict['min_level']:
box_conv_name = 'boxnet-conv_{}'.format(i)
self._box_convs.append(conv_op(name=box_conv_name, **conv_kwargs))
box_norm_name = 'boxnet-conv-norm_{}_{}'.format(level, i)
this_level_box_norms.append(bn_op(name=box_norm_name, **bn_kwargs))
self._box_norms.append(this_level_box_norms)
box_regressor_kwargs = {
'filters': 4 * self._config_dict['num_anchors_per_location'],
'kernel_size': 3,
'padding': 'same',
'bias_initializer': tf.zeros_initializer(),
'bias_regularizer': self._config_dict['bias_regularizer'],
}
if self._config_dict['use_separable_conv']:
box_regressor_kwargs.update({
'depthwise_initializer': tf.keras.initializers.RandomNormal(
stddev=1e-5),
'pointwise_initializer': tf.keras.initializers.RandomNormal(
stddev=1e-5),
'depthwise_regularizer': self._config_dict['kernel_regularizer'],
'pointwise_regularizer': self._config_dict['kernel_regularizer'],
})
else:
box_regressor_kwargs.update({
'kernel_initializer': tf.keras.initializers.RandomNormal(
stddev=1e-5),
'kernel_regularizer': self._config_dict['kernel_regularizer'],
})
self._box_regressor = conv_op(name='boxes', **box_regressor_kwargs)
super(RetinaNetHead, self).build(input_shape)
def call(self, features):
"""Forward pass of the RetinaNet head.
Args:
features: a dict of tensors
- key: `int`, the level of the multilevel features.
- values: `Tensor`, the feature map tensors, whose shape is
[batch, height_l, width_l, channels].
Returns:
scores: a dict of tensors which includes scores of the predictions.
- key: `int`, the level of the multilevel predictions.
- values: `Tensor`, the box scores predicted from a particular feature
level, whose shape is
[batch, height_l, width_l, num_classes * num_anchors_per_location].
boxes: a dict of tensors which includes coordinates of the predictions.
- key: `int`, the level of the multilevel predictions.
- values: `Tensor`, the box scores predicted from a particular feature
level, whose shape is
[batch, height_l, width_l, 4 * num_anchors_per_location].
"""
scores = {}
boxes = {}
for i, level in enumerate(
range(self._config_dict['min_level'],
self._config_dict['max_level'] + 1)):
this_level_features = features[level]
# class net.
x = this_level_features
for conv, norm in zip(self._cls_convs, self._cls_norms[i]):
x = conv(x)
x = norm(x)
x = self._activation(x)
scores[level] = self._classifier(x)
# box net.
x = this_level_features
for conv, norm in zip(self._box_convs, self._box_norms[i]):
x = conv(x)
x = norm(x)
x = self._activation(x)
boxes[level] = self._box_regressor(x)
return scores, boxes
def get_config(self):
return self._config_dict
@classmethod
def from_config(cls, config):
return cls(**config)
@tf.keras.utils.register_keras_serializable(package='Vision')
class RPNHead(tf.keras.layers.Layer):
"""Region Proposal Network head."""
def __init__(self,
min_level,
max_level,
num_anchors_per_location,
num_convs=1,
num_filters=256,
use_separable_conv=False,
activation='relu',
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_regularizer=None,
bias_regularizer=None,
**kwargs):
"""Initialize params to build Region Proposal Network head.
Args:
min_level: `int` number of minimum feature level.
max_level: `int` number of maximum feature level.
num_anchors_per_location: `int` number of number of anchors per pixel
location.
num_convs: `int` number that represents the number of the intermediate
conv layers before the prediction.
num_filters: `int` number that represents the number of filters of the
intermediate conv layers.
use_separable_conv: `bool`, indicating whether the separable conv layers
is used.
activation: `string`, indicating which activation is used, e.g. 'relu',
'swish', etc.
use_sync_bn: `bool`, whether to use synchronized batch normalization
across different replicas.
norm_momentum: `float`, the momentum parameter of the normalizaton layers.
norm_epsilon: `float`, the epsilon parameter of the normalization layers.
kernel_regularizer: `tf.keras.regularizers.Regularizer` object for layer
kernel.
bias_regularizer: `tf.keras.regularizers.Regularizer` object for bias.
**kwargs: other keyword arguments passed to Layer.
"""
super(RPNHead, self).__init__(**kwargs)
self._config_dict = {
'min_level': min_level,
'max_level': max_level,
'num_anchors_per_location': num_anchors_per_location,
'num_convs': num_convs,
'num_filters': num_filters,
'use_separable_conv': use_separable_conv,
'activation': activation,
'use_sync_bn': use_sync_bn,
'norm_momentum': norm_momentum,
'norm_epsilon': norm_epsilon,
'kernel_regularizer': kernel_regularizer,
'bias_regularizer': bias_regularizer,
}
if tf.keras.backend.image_data_format() == 'channels_last':
self._bn_axis = -1
else:
self._bn_axis = 1
self._activation = tf_utils.get_activation(activation)
def build(self, input_shape):
"""Creates the variables of the head."""
conv_op = (tf.keras.layers.SeparableConv2D
if self._config_dict['use_separable_conv']
else tf.keras.layers.Conv2D)
conv_kwargs = {
'filters': self._config_dict['num_filters'],
'kernel_size': 3,
'padding': 'same',
}
if self._config_dict['use_separable_conv']:
conv_kwargs.update({
'depthwise_initializer': tf.keras.initializers.RandomNormal(
stddev=0.01),
'pointwise_initializer': tf.keras.initializers.RandomNormal(
stddev=0.01),
'bias_initializer': tf.zeros_initializer(),
'depthwise_regularizer': self._config_dict['kernel_regularizer'],
'pointwise_regularizer': self._config_dict['kernel_regularizer'],
'bias_regularizer': self._config_dict['bias_regularizer'],
})
else:
conv_kwargs.update({
'kernel_initializer': tf.keras.initializers.RandomNormal(
stddev=0.01),
'bias_initializer': tf.zeros_initializer(),
'kernel_regularizer': self._config_dict['kernel_regularizer'],
'bias_regularizer': self._config_dict['bias_regularizer'],
})
bn_op = (tf.keras.layers.experimental.SyncBatchNormalization
if self._config_dict['use_sync_bn']
else tf.keras.layers.BatchNormalization)
bn_kwargs = {
'axis': self._bn_axis,
'momentum': self._config_dict['norm_momentum'],
'epsilon': self._config_dict['norm_epsilon'],
}
self._convs = []
self._norms = []
for level in range(
self._config_dict['min_level'], self._config_dict['max_level'] + 1):
this_level_norms = []
for i in range(self._config_dict['num_convs']):
if level == self._config_dict['min_level']:
conv_name = 'rpn-conv_{}'.format(i)
self._convs.append(conv_op(name=conv_name, **conv_kwargs))
norm_name = 'rpn-conv-norm_{}_{}'.format(level, i)
this_level_norms.append(bn_op(name=norm_name, **bn_kwargs))
self._norms.append(this_level_norms)
classifier_kwargs = {
'filters': self._config_dict['num_anchors_per_location'],
'kernel_size': 1,
'padding': 'valid',
}
if self._config_dict['use_separable_conv']:
classifier_kwargs.update({
'depthwise_initializer': tf.keras.initializers.RandomNormal(
stddev=1e-5),
'pointwise_initializer': tf.keras.initializers.RandomNormal(
stddev=1e-5),
'bias_initializer': tf.zeros_initializer(),
'depthwise_regularizer': self._config_dict['kernel_regularizer'],
'pointwise_regularizer': self._config_dict['kernel_regularizer'],
'bias_regularizer': self._config_dict['bias_regularizer'],
})
else:
classifier_kwargs.update({
'kernel_initializer': tf.keras.initializers.RandomNormal(
stddev=1e-5),
'bias_initializer': tf.zeros_initializer(),
'kernel_regularizer': self._config_dict['kernel_regularizer'],
'bias_regularizer': self._config_dict['bias_regularizer'],
})
self._classifier = conv_op(name='rpn-scores', **classifier_kwargs)
box_regressor_kwargs = {
'filters': 4 * self._config_dict['num_anchors_per_location'],
'kernel_size': 1,
'padding': 'valid',
}
if self._config_dict['use_separable_conv']:
box_regressor_kwargs.update({
'depthwise_initializer': tf.keras.initializers.RandomNormal(
stddev=1e-5),
'pointwise_initializer': tf.keras.initializers.RandomNormal(
stddev=1e-5),
'bias_initializer': tf.zeros_initializer(),
'depthwise_regularizer': self._config_dict['kernel_regularizer'],
'pointwise_regularizer': self._config_dict['kernel_regularizer'],
'bias_regularizer': self._config_dict['bias_regularizer'],
})
else:
box_regressor_kwargs.update({
'kernel_initializer': tf.keras.initializers.RandomNormal(
stddev=1e-5),
'bias_initializer': tf.zeros_initializer(),
'kernel_regularizer': self._config_dict['kernel_regularizer'],
'bias_regularizer': self._config_dict['bias_regularizer'],
})
self._box_regressor = conv_op(name='rpn-boxes', **box_regressor_kwargs)
super(RPNHead, self).build(input_shape)
def call(self, features):
scores = {}
boxes = {}
for i, level in enumerate(
range(self._config_dict['min_level'],
self._config_dict['max_level'] + 1)):
x = features[level]
for conv, norm in zip(self._convs, self._norms[i]):
x = conv(x)
x = norm(x)
x = self._activation(x)
scores[level] = self._classifier(x)
boxes[level] = self._box_regressor(x)
return scores, boxes
def get_config(self):
return self._config_dict
@classmethod
def from_config(cls, config):
return cls(**config)
# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for dense_prediction_heads.py."""
# Import libraries
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from official.vision.beta.modeling.heads import dense_prediction_heads
class RetinaNetHeadTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
(False, False),
(False, True),
(True, False),
(True, True),
)
def test_forward(self, use_separable_conv, use_sync_bn):
retinanet_head = dense_prediction_heads.RetinaNetHead(
min_level=3,
max_level=4,
num_classes=3,
num_anchors_per_location=3,
num_convs=2,
num_filters=256,
use_separable_conv=use_separable_conv,
activation='relu',
use_sync_bn=use_sync_bn,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_regularizer=None,
bias_regularizer=None,
)
features = {
3: np.random.rand(2, 128, 128, 16),
4: np.random.rand(2, 64, 64, 16),
}
scores, boxes = retinanet_head(features)
self.assertAllEqual(scores[3].numpy().shape, [2, 128, 128, 9])
self.assertAllEqual(scores[4].numpy().shape, [2, 64, 64, 9])
self.assertAllEqual(boxes[3].numpy().shape, [2, 128, 128, 12])
self.assertAllEqual(boxes[4].numpy().shape, [2, 64, 64, 12])
def test_serialize_deserialize(self):
retinanet_head = dense_prediction_heads.RetinaNetHead(
min_level=3,
max_level=7,
num_classes=3,
num_anchors_per_location=9,
num_convs=2,
num_filters=16,
use_separable_conv=False,
activation='relu',
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_regularizer=None,
bias_regularizer=None,
)
config = retinanet_head.get_config()
new_retinanet_head = (
dense_prediction_heads.RetinaNetHead.from_config(config))
self.assertAllEqual(
retinanet_head.get_config(), new_retinanet_head.get_config())
class RpnHeadTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
(False, False),
(False, True),
(True, False),
(True, True),
)
def test_forward(self, use_separable_conv, use_sync_bn):
rpn_head = dense_prediction_heads.RPNHead(
min_level=3,
max_level=4,
num_anchors_per_location=3,
num_convs=2,
num_filters=256,
use_separable_conv=use_separable_conv,
activation='relu',
use_sync_bn=use_sync_bn,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_regularizer=None,
bias_regularizer=None,
)
features = {
3: np.random.rand(2, 128, 128, 16),
4: np.random.rand(2, 64, 64, 16),
}
scores, boxes = rpn_head(features)
self.assertAllEqual(scores[3].numpy().shape, [2, 128, 128, 3])
self.assertAllEqual(scores[4].numpy().shape, [2, 64, 64, 3])
self.assertAllEqual(boxes[3].numpy().shape, [2, 128, 128, 12])
self.assertAllEqual(boxes[4].numpy().shape, [2, 64, 64, 12])
def test_serialize_deserialize(self):
rpn_head = dense_prediction_heads.RPNHead(
min_level=3,
max_level=7,
num_anchors_per_location=9,
num_convs=2,
num_filters=16,
use_separable_conv=False,
activation='relu',
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_regularizer=None,
bias_regularizer=None,
)
config = rpn_head.get_config()
new_rpn_head = dense_prediction_heads.RPNHead.from_config(config)
self.assertAllEqual(rpn_head.get_config(), new_rpn_head.get_config())
if __name__ == '__main__':
tf.test.main()
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Instance prediction heads."""
# Import libraries
import tensorflow as tf
from official.modeling import tf_utils
@tf.keras.utils.register_keras_serializable(package='Vision')
class DetectionHead(tf.keras.layers.Layer):
"""Detection head."""
def __init__(self,
num_classes,
num_convs=0,
num_filters=256,
use_separable_conv=False,
num_fcs=2,
fc_dims=1024,
activation='relu',
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_regularizer=None,
bias_regularizer=None,
**kwargs):
"""Initialize params to build the detection head.
Args:
num_classes: a integer for the number of classes.
num_convs: `int` number that represents the number of the intermediate
conv layers before the FC layers.
num_filters: `int` number that represents the number of filters of the
intermediate conv layers.
use_separable_conv: `bool`, indicating whether the separable conv layers
is used.
num_fcs: `int` number that represents the number of FC layers before the
predictions.
fc_dims: `int` number that represents the number of dimension of the FC
layers.
activation: `string`, indicating which activation is used, e.g. 'relu',
'swish', etc.
use_sync_bn: `bool`, whether to use synchronized batch normalization
across different replicas.
norm_momentum: `float`, the momentum parameter of the normalization
layers.
norm_epsilon: `float`, the epsilon parameter of the normalization layers.
kernel_regularizer: `tf.keras.regularizers.Regularizer` object for layer
kernel.
bias_regularizer: `tf.keras.regularizers.Regularizer` object for bias.
**kwargs: other keyword arguments passed to Layer.
"""
super(DetectionHead, self).__init__(**kwargs)
self._config_dict = {
'num_classes': num_classes,
'num_convs': num_convs,
'num_filters': num_filters,
'use_separable_conv': use_separable_conv,
'num_fcs': num_fcs,
'fc_dims': fc_dims,
'activation': activation,
'use_sync_bn': use_sync_bn,
'norm_momentum': norm_momentum,
'norm_epsilon': norm_epsilon,
'kernel_regularizer': kernel_regularizer,
'bias_regularizer': bias_regularizer,
}
if tf.keras.backend.image_data_format() == 'channels_last':
self._bn_axis = -1
else:
self._bn_axis = 1
self._activation = tf_utils.get_activation(activation)
def build(self, input_shape):
"""Creates the variables of the head."""
conv_op = (tf.keras.layers.SeparableConv2D
if self._config_dict['use_separable_conv']
else tf.keras.layers.Conv2D)
conv_kwargs = {
'filters': self._config_dict['num_filters'],
'kernel_size': 3,
'padding': 'same',
}
if self._config_dict['use_separable_conv']:
conv_kwargs.update({
'depthwise_initializer': tf.keras.initializers.VarianceScaling(
scale=2, mode='fan_out', distribution='untruncated_normal'),
'pointwise_initializer': tf.keras.initializers.VarianceScaling(
scale=2, mode='fan_out', distribution='untruncated_normal'),
'bias_initializer': tf.zeros_initializer(),
'depthwise_regularizer': self._config_dict['kernel_regularizer'],
'pointwise_regularizer': self._config_dict['kernel_regularizer'],
'bias_regularizer': self._config_dict['bias_regularizer'],
})
else:
conv_kwargs.update({
'kernel_initializer': tf.keras.initializers.VarianceScaling(
scale=2, mode='fan_out', distribution='untruncated_normal'),
'bias_initializer': tf.zeros_initializer(),
'kernel_regularizer': self._config_dict['kernel_regularizer'],
'bias_regularizer': self._config_dict['bias_regularizer'],
})
bn_op = (tf.keras.layers.experimental.SyncBatchNormalization
if self._config_dict['use_sync_bn']
else tf.keras.layers.BatchNormalization)
bn_kwargs = {
'axis': self._bn_axis,
'momentum': self._config_dict['norm_momentum'],
'epsilon': self._config_dict['norm_epsilon'],
}
self._convs = []
self._conv_norms = []
for i in range(self._config_dict['num_convs']):
conv_name = 'detection-conv_{}'.format(i)
self._convs.append(conv_op(name=conv_name, **conv_kwargs))
bn_name = 'detection-conv-bn_{}'.format(i)
self._conv_norms.append(bn_op(name=bn_name, **bn_kwargs))
self._fcs = []
self._fc_norms = []
for i in range(self._config_dict['num_fcs']):
fc_name = 'detection-fc_{}'.format(i)
self._fcs.append(
tf.keras.layers.Dense(
units=self._config_dict['fc_dims'],
kernel_initializer=tf.keras.initializers.VarianceScaling(
scale=1 / 3.0, mode='fan_out', distribution='uniform'),
kernel_regularizer=self._config_dict['kernel_regularizer'],
bias_regularizer=self._config_dict['bias_regularizer'],
name=fc_name))
bn_name = 'detection-fc-bn_{}'.format(i)
self._fc_norms.append(bn_op(name=bn_name, **bn_kwargs))
self._classifier = tf.keras.layers.Dense(
units=self._config_dict['num_classes'],
kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.01),
bias_initializer=tf.zeros_initializer(),
kernel_regularizer=self._config_dict['kernel_regularizer'],
bias_regularizer=self._config_dict['bias_regularizer'],
name='detection-scores')
self._box_regressor = tf.keras.layers.Dense(
units=self._config_dict['num_classes'] * 4,
kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.001),
bias_initializer=tf.zeros_initializer(),
kernel_regularizer=self._config_dict['kernel_regularizer'],
bias_regularizer=self._config_dict['bias_regularizer'],
name='detection-boxes')
super(DetectionHead, self).build(input_shape)
def call(self, inputs, training=None):
"""Box and class branches for the Mask-RCNN model.
Args:
inputs: ROI features, a tensor of shape
[batch_size, num_instances, roi_height, roi_width, roi_channels],
representing the ROI features.
training: a boolean indicating whether it is in `training` mode.
Returns:
class_outputs: a tensor with a shape of
[batch_size, num_rois, num_classes], representing the class predictions.
box_outputs: a tensor with a shape of
[batch_size, num_rois, num_classes * 4], representing the box
predictions.
"""
roi_features = inputs
_, num_rois, height, width, filters = roi_features.get_shape().as_list()
x = tf.reshape(roi_features, [-1, height, width, filters])
for conv, bn in zip(self._convs, self._conv_norms):
x = conv(x)
x = bn(x)
x = self._activation(x)
_, _, _, filters = x.get_shape().as_list()
x = tf.reshape(x, [-1, num_rois, height * width * filters])
for fc, bn in zip(self._fcs, self._fc_norms):
x = fc(x)
x = bn(x)
x = self._activation(x)
classes = self._classifier(x)
boxes = self._box_regressor(x)
return classes, boxes
def get_config(self):
return self._config_dict
@classmethod
def from_config(cls, config):
return cls(**config)
@tf.keras.utils.register_keras_serializable(package='Vision')
class MaskHead(tf.keras.layers.Layer):
"""Mask head."""
def __init__(self,
num_classes,
upsample_factor=2,
num_convs=4,
num_filters=256,
use_separable_conv=False,
activation='relu',
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_regularizer=None,
bias_regularizer=None,
**kwargs):
"""Initialize params to build the mask head.
Args:
num_classes: `int`, the number of classes.
upsample_factor: `int`, >= 1, the upsample factor to generate the
final predicted masks.
num_convs: `int` number that represents the number of the intermediate
conv layers before the mask prediction layers.
num_filters: `int` number that represents the number of filters of the
intermediate conv layers.
use_separable_conv: `bool`, indicating whether the separable conv layers
is used.
activation: `string`, indicating which activation is used, e.g. 'relu',
'swish', etc.
use_sync_bn: `bool`, whether to use synchronized batch normalization
across different replicas.
norm_momentum: `float`, the momentum parameter of the normalization
layers.
norm_epsilon: `float`, the epsilon parameter of the normalization layers.
kernel_regularizer: `tf.keras.regularizers.Regularizer` object for layer
kernel.
bias_regularizer: `tf.keras.regularizers.Regularizer` object for bias.
**kwargs: other keyword arguments passed to Layer.
"""
super(MaskHead, self).__init__(**kwargs)
self._config_dict = {
'num_classes': num_classes,
'upsample_factor': upsample_factor,
'num_convs': num_convs,
'num_filters': num_filters,
'use_separable_conv': use_separable_conv,
'activation': activation,
'use_sync_bn': use_sync_bn,
'norm_momentum': norm_momentum,
'norm_epsilon': norm_epsilon,
'kernel_regularizer': kernel_regularizer,
'bias_regularizer': bias_regularizer,
}
if tf.keras.backend.image_data_format() == 'channels_last':
self._bn_axis = -1
else:
self._bn_axis = 1
self._activation = tf_utils.get_activation(activation)
def build(self, input_shape):
"""Creates the variables of the head."""
conv_op = (tf.keras.layers.SeparableConv2D
if self._config_dict['use_separable_conv']
else tf.keras.layers.Conv2D)
conv_kwargs = {
'filters': self._config_dict['num_filters'],
'kernel_size': 3,
'padding': 'same',
}
if self._config_dict['use_separable_conv']:
conv_kwargs.update({
'depthwise_initializer': tf.keras.initializers.VarianceScaling(
scale=2, mode='fan_out', distribution='untruncated_normal'),
'pointwise_initializer': tf.keras.initializers.VarianceScaling(
scale=2, mode='fan_out', distribution='untruncated_normal'),
'bias_initializer': tf.zeros_initializer(),
'depthwise_regularizer': self._config_dict['kernel_regularizer'],
'pointwise_regularizer': self._config_dict['kernel_regularizer'],
'bias_regularizer': self._config_dict['bias_regularizer'],
})
else:
conv_kwargs.update({
'kernel_initializer': tf.keras.initializers.VarianceScaling(
scale=2, mode='fan_out', distribution='untruncated_normal'),
'bias_initializer': tf.zeros_initializer(),
'kernel_regularizer': self._config_dict['kernel_regularizer'],
'bias_regularizer': self._config_dict['bias_regularizer'],
})
bn_op = (tf.keras.layers.experimental.SyncBatchNormalization
if self._config_dict['use_sync_bn']
else tf.keras.layers.BatchNormalization)
bn_kwargs = {
'axis': self._bn_axis,
'momentum': self._config_dict['norm_momentum'],
'epsilon': self._config_dict['norm_epsilon'],
}
self._convs = []
self._conv_norms = []
for i in range(self._config_dict['num_convs']):
conv_name = 'mask-conv_{}'.format(i)
self._convs.append(conv_op(name=conv_name, **conv_kwargs))
bn_name = 'mask-conv-bn_{}'.format(i)
self._conv_norms.append(bn_op(name=bn_name, **bn_kwargs))
self._deconv = tf.keras.layers.Conv2DTranspose(
filters=self._config_dict['num_filters'],
kernel_size=self._config_dict['upsample_factor'],
strides=self._config_dict['upsample_factor'],
padding='valid',
kernel_initializer=tf.keras.initializers.VarianceScaling(
scale=2, mode='fan_out', distribution='untruncated_normal'),
bias_initializer=tf.zeros_initializer(),
kernel_regularizer=self._config_dict['kernel_regularizer'],
bias_regularizer=self._config_dict['bias_regularizer'],
name='mask-upsampling')
self._deconv_bn = bn_op(name='mask-deconv-bn', **bn_kwargs)
conv_kwargs = {
'filters': self._config_dict['num_classes'],
'kernel_size': 1,
'padding': 'valid',
}
if self._config_dict['use_separable_conv']:
conv_kwargs.update({
'depthwise_initializer': tf.keras.initializers.VarianceScaling(
scale=2, mode='fan_out', distribution='untruncated_normal'),
'pointwise_initializer': tf.keras.initializers.VarianceScaling(
scale=2, mode='fan_out', distribution='untruncated_normal'),
'bias_initializer': tf.zeros_initializer(),
'depthwise_regularizer': self._config_dict['kernel_regularizer'],
'pointwise_regularizer': self._config_dict['kernel_regularizer'],
'bias_regularizer': self._config_dict['bias_regularizer'],
})
else:
conv_kwargs.update({
'kernel_initializer': tf.keras.initializers.VarianceScaling(
scale=2, mode='fan_out', distribution='untruncated_normal'),
'bias_initializer': tf.zeros_initializer(),
'kernel_regularizer': self._config_dict['kernel_regularizer'],
'bias_regularizer': self._config_dict['bias_regularizer'],
})
self._mask_regressor = conv_op(name='mask-logits', **conv_kwargs)
super(MaskHead, self).build(input_shape)
def call(self, inputs, training=None):
"""Mask branch for the Mask-RCNN model.
Args:
inputs: a list of two tensors
inputs[0]: ROI features, a tensor of shape
[batch_size, num_instances, roi_height, roi_width, roi_channels],
representing the ROI features.
inputs[1]: ROI classes, a tensor of shape
[batch_size, num_instances], representing the classes of the ROIs.
training: a boolean indicating whether it is in `training` mode.
Returns:
mask_outputs: a tensor of shape
[batch_size, num_instances, roi_height * upsample_factor,
roi_width * upsample_factor], representing the mask predictions.
"""
roi_features, roi_classes = inputs
batch_size, num_rois, height, width, filters = (
roi_features.get_shape().as_list())
if batch_size is None:
batch_size = tf.shape(roi_features)[0]
x = tf.reshape(roi_features, [-1, height, width, filters])
for conv, bn in zip(self._convs, self._conv_norms):
x = conv(x)
x = bn(x)
x = self._activation(x)
x = self._deconv(x)
x = self._deconv_bn(x)
x = self._activation(x)
logits = self._mask_regressor(x)
mask_height = height * self._config_dict['upsample_factor']
mask_width = width * self._config_dict['upsample_factor']
logits = tf.reshape(
logits,
[-1, num_rois, mask_height, mask_width,
self._config_dict['num_classes']])
batch_indices = tf.tile(
tf.expand_dims(tf.range(batch_size), axis=1), [1, num_rois])
mask_indices = tf.tile(
tf.expand_dims(tf.range(num_rois), axis=0), [batch_size, 1])
gather_indices = tf.stack(
[batch_indices, mask_indices, tf.cast(roi_classes, dtype=tf.int32)],
axis=2)
mask_outputs = tf.gather_nd(
tf.transpose(logits, [0, 1, 4, 2, 3]), gather_indices)
return mask_outputs
def get_config(self):
return self._config_dict
@classmethod
def from_config(cls, config):
return cls(**config)
# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for instance_heads.py."""
# Import libraries
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from official.vision.beta.modeling.heads import instance_heads
class DetectionHeadTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
(0, 0, False, False),
(0, 1, False, False),
(1, 0, False, False),
(1, 1, False, False),
)
def test_forward(self, num_convs, num_fcs, use_separable_conv, use_sync_bn):
detection_head = instance_heads.DetectionHead(
num_classes=3,
num_convs=num_convs,
num_filters=16,
use_separable_conv=use_separable_conv,
num_fcs=num_fcs,
fc_dims=4,
activation='relu',
use_sync_bn=use_sync_bn,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_regularizer=None,
bias_regularizer=None,
)
roi_features = np.random.rand(2, 10, 128, 128, 16)
scores, boxes = detection_head(roi_features)
self.assertAllEqual(scores.numpy().shape, [2, 10, 3])
self.assertAllEqual(boxes.numpy().shape, [2, 10, 12])
def test_serialize_deserialize(self):
detection_head = instance_heads.DetectionHead(
num_classes=91,
num_convs=0,
num_filters=256,
use_separable_conv=False,
num_fcs=2,
fc_dims=1024,
activation='relu',
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_regularizer=None,
bias_regularizer=None,
)
config = detection_head.get_config()
new_detection_head = instance_heads.DetectionHead.from_config(config)
self.assertAllEqual(
detection_head.get_config(), new_detection_head.get_config())
class MaskHeadTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
(1, 1, False),
(1, 2, False),
(2, 1, False),
(2, 2, False),
)
def test_forward(self, upsample_factor, num_convs, use_sync_bn):
mask_head = instance_heads.MaskHead(
num_classes=3,
upsample_factor=upsample_factor,
num_convs=num_convs,
num_filters=16,
use_separable_conv=False,
activation='relu',
use_sync_bn=use_sync_bn,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_regularizer=None,
bias_regularizer=None,
)
roi_features = np.random.rand(2, 10, 14, 14, 16)
roi_classes = np.zeros((2, 10))
masks = mask_head([roi_features, roi_classes])
self.assertAllEqual(
masks.numpy().shape,
[2, 10, 14 * upsample_factor, 14 * upsample_factor])
def test_serialize_deserialize(self):
mask_head = instance_heads.MaskHead(
num_classes=3,
upsample_factor=2,
num_convs=1,
num_filters=256,
use_separable_conv=False,
activation='relu',
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_regularizer=None,
bias_regularizer=None,
)
config = mask_head.get_config()
new_mask_head = instance_heads.MaskHead.from_config(config)
self.assertAllEqual(
mask_head.get_config(), new_mask_head.get_config())
if __name__ == '__main__':
tf.test.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment