"docs/backend/structured_outputs_for_reasoning_models.ipynb" did not exist on "55de40f782d1949740aec74e88ae7cce00d59582"
Commit bcb231f0 authored by Yeqing Li's avatar Yeqing Li Committed by A. Unique TensorFlower
Browse files

Move retinanet keras model to tensorflow_models/official

PiperOrigin-RevId: 274010788
parent 04ce9636
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Model architecture factory."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from official.vision.detection.modeling.architecture import fpn
from official.vision.detection.modeling.architecture import heads
from official.vision.detection.modeling.architecture import nn_ops
from official.vision.detection.modeling.architecture import resnet
def batch_norm_relu_generator(params):
def _batch_norm_op(**kwargs):
return nn_ops.BatchNormRelu(
momentum=params.batch_norm_momentum,
epsilon=params.batch_norm_epsilon,
trainable=params.batch_norm_trainable,
**kwargs)
return _batch_norm_op
def backbone_generator(params):
"""Generator function for various backbone models."""
if params.architecture.backbone == 'resnet':
resnet_params = params.resnet
backbone_fn = resnet.Resnet(
resnet_depth=resnet_params.resnet_depth,
dropblock_keep_prob=resnet_params.dropblock.dropblock_keep_prob,
dropblock_size=resnet_params.dropblock.dropblock_size,
batch_norm_relu=batch_norm_relu_generator(resnet_params.batch_norm))
else:
raise ValueError('Backbone model %s is not supported.' %
params.architecture.backbone)
return backbone_fn
def multilevel_features_generator(params):
"""Generator function for various FPN models."""
if params.architecture.multilevel_features == 'fpn':
fpn_params = params.fpn
fpn_fn = fpn.Fpn(
min_level=fpn_params.min_level,
max_level=fpn_params.max_level,
fpn_feat_dims=fpn_params.fpn_feat_dims,
batch_norm_relu=batch_norm_relu_generator(fpn_params.batch_norm))
else:
raise ValueError('The multi-level feature model %s is not supported.'
% params.architecture.multilevel_features)
return fpn_fn
def retinanet_head_generator(params):
"""Generator function for RetinaNet head architecture."""
return heads.RetinanetHead(
params.min_level,
params.max_level,
params.num_classes,
params.anchors_per_location,
params.retinanet_head_num_convs,
params.retinanet_head_num_filters,
batch_norm_relu=batch_norm_relu_generator(params.batch_norm))
def rpn_head_generator(params):
"""Generator function for RPN head architecture."""
return heads.RpnHead(params.min_level,
params.max_level,
params.anchors_per_location,
batch_norm_relu=batch_norm_relu_generator(
params.batch_norm))
def fast_rcnn_head_generator(params):
"""Generator function for Fast R-CNN head architecture."""
return heads.FastrcnnHead(params.num_classes,
params.fast_rcnn_mlp_head_dim,
batch_norm_relu=batch_norm_relu_generator(
params.batch_norm))
def mask_rcnn_head_generator(params):
"""Generator function for Mask R-CNN head architecture."""
return heads.MaskrcnnHead(params.num_classes,
params.mrcnn_resolution,
batch_norm_relu=batch_norm_relu_generator(
params.batch_norm))
def shapeprior_head_generator(params):
"""Generator function for RetinaNet head architecture."""
return heads.ShapemaskPriorHead(
params.num_classes,
params.num_downsample_channels,
params.mask_crop_size,
params.use_category_for_mask,
params.num_of_instances,
params.min_mask_level,
params.max_mask_level,
params.num_clusters,
params.temperature,
params.shape_prior_path)
def coarsemask_head_generator(params):
"""Generator function for RetinaNet head architecture."""
return heads.ShapemaskCoarsemaskHead(
params.num_classes,
params.num_downsample_channels,
params.mask_crop_size,
params.use_category_for_mask,
params.num_convs)
def finemask_head_generator(params):
"""Generator function for RetinaNet head architecture."""
return heads.ShapemaskFinemaskHead(
params.num_classes,
params.num_downsample_channels,
params.mask_crop_size,
params.num_convs,
params.coarse_mask_thr,
params.gt_upsample_scale)
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Feature Pyramid Networks.
Feature Pyramid Networks were proposed in:
[1] Tsung-Yi Lin, Piotr Dollar, Ross Girshick, Kaiming He, Bharath Hariharan,
, and Serge Belongie
Feature Pyramid Networks for Object Detection. CVPR 2017.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow.compat.v2 as tf
from tensorflow.python.keras import backend
from official.vision.detection.modeling.architecture import nn_ops
from official.vision.detection.utils import spatial_transform
class Fpn(object):
"""Feature pyramid networks."""
def __init__(self,
min_level=3,
max_level=7,
fpn_feat_dims=256,
batch_norm_relu=nn_ops.BatchNormRelu):
"""FPN initialization function.
Args:
min_level: `int` minimum level in FPN output feature maps.
max_level: `int` maximum level in FPN output feature maps.
fpn_feat_dims: `int` number of filters in FPN layers.
batch_norm_relu: an operation that includes a batch normalization layer
followed by a relu layer(optional).
"""
self._min_level = min_level
self._max_level = max_level
self._fpn_feat_dims = fpn_feat_dims
self._batch_norm_relus = {}
for level in range(self._min_level, self._max_level + 1):
self._batch_norm_relus[level] = batch_norm_relu(
relu=False, name='p%d-bn' % level)
def __call__(self, multilevel_features, is_training=None):
"""Returns the FPN features for a given multilevel features.
Args:
multilevel_features: a `dict` containing `int` keys for continuous feature
levels, e.g., [2, 3, 4, 5]. The values are corresponding features with
shape [batch_size, height_l, width_l, num_filters].
is_training: `bool` if True, the model is in training mode.
Returns:
a `dict` containing `int` keys for continuous feature levels
[min_level, min_level + 1, ..., max_level]. The values are corresponding
FPN features with shape [batch_size, height_l, width_l, fpn_feat_dims].
"""
input_levels = multilevel_features.keys()
if min(input_levels) > self._min_level:
raise ValueError(
'The minimum backbone level %d should be '%(min(input_levels)) +
'less or equal to FPN minimum level %d.:'%(self._min_level))
backbone_max_level = min(max(input_levels), self._max_level)
with backend.get_graph().as_default(), tf.name_scope('fpn'):
# Adds lateral connections.
feats_lateral = {}
for level in range(self._min_level, backbone_max_level + 1):
feats_lateral[level] = tf.keras.layers.Conv2D(
filters=self._fpn_feat_dims,
kernel_size=(1, 1),
padding='same',
name='l%d' % level)(
multilevel_features[level])
# Adds top-down path.
feats = {backbone_max_level: feats_lateral[backbone_max_level]}
for level in range(backbone_max_level - 1, self._min_level - 1, -1):
feats[level] = spatial_transform.nearest_upsampling(
feats[level + 1], 2) + feats_lateral[level]
# Adds post-hoc 3x3 convolution kernel.
for level in range(self._min_level, backbone_max_level + 1):
feats[level] = tf.keras.layers.Conv2D(
filters=self._fpn_feat_dims,
strides=(1, 1),
kernel_size=(3, 3),
padding='same',
name='post_hoc_d%d' % level)(
feats[level])
# Adds coarser FPN levels introduced for RetinaNet.
for level in range(backbone_max_level + 1, self._max_level + 1):
feats_in = feats[level - 1]
if level > backbone_max_level + 1:
feats_in = tf.nn.relu(feats_in)
feats[level] = tf.keras.layers.Conv2D(
filters=self._fpn_feat_dims,
strides=(2, 2),
kernel_size=(3, 3),
padding='same',
name='p%d' % level)(
feats_in)
# Adds batch_norm layer.
for level in range(self._min_level, self._max_level + 1):
feats[level] = self._batch_norm_relus[level](
feats[level], is_training=is_training)
return feats
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Classes to build various prediction heads in all supported models."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pickle
from absl import logging
import numpy as np
import tensorflow.compat.v2 as tf
from tensorflow.python.keras import backend
from official.vision.detection.modeling.architecture import nn_ops
from official.vision.detection.utils import spatial_transform
class RpnHead(object):
"""Region Proposal Network head."""
def __init__(self,
min_level,
max_level,
anchors_per_location,
batch_norm_relu=nn_ops.BatchNormRelu):
"""Initialize params to build Region Proposal Network head.
Args:
min_level: `int` number of minimum feature level.
max_level: `int` number of maximum feature level.
anchors_per_location: `int` number of number of anchors per pixel
location.
batch_norm_relu: an operation that includes a batch normalization layer
followed by a relu layer(optional).
"""
self._min_level = min_level
self._max_level = max_level
self._anchors_per_location = anchors_per_location
self._rpn_conv = tf.keras.layers.Conv2D(
256,
kernel_size=(3, 3),
strides=(1, 1),
activation=None,
bias_initializer=tf.zeros_initializer(),
kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.01),
padding='same',
name='rpn')
self._rpn_class_conv = tf.keras.layers.Conv2D(
anchors_per_location,
kernel_size=(1, 1),
strides=(1, 1),
bias_initializer=tf.zeros_initializer(),
kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.01),
padding='valid',
name='rpn-class')
self._rpn_box_conv = tf.keras.layers.Conv2D(
4 * anchors_per_location,
kernel_size=(1, 1),
strides=(1, 1),
bias_initializer=tf.zeros_initializer(),
kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.01),
padding='valid',
name='rpn-box')
self._batch_norm_relus = {}
for level in range(self._min_level, self._max_level + 1):
self._batch_norm_relus[level] = batch_norm_relu(name='rpn%d-bn' % level)
def _shared_rpn_heads(self, features, anchors_per_location, level,
is_training):
"""Shared RPN heads."""
# TODO(chiachenc): check the channel depth of the first convoultion.
features = self._rpn_conv(features)
# The batch normalization layers are not shared between levels.
features = self._batch_norm_relus[level](features, is_training=is_training)
# Proposal classification scores
scores = self._rpn_class_conv(features)
# Proposal bbox regression deltas
bboxes = self._rpn_box_conv(features)
return scores, bboxes
def __call__(self, features, is_training=None):
scores_outputs = {}
box_outputs = {}
with backend.get_graph().as_default(), tf.name_scope('rpn_head'):
for level in range(self._min_level, self._max_level + 1):
scores_output, box_output = self._shared_rpn_heads(
features[level], self._anchors_per_location, level, is_training)
scores_outputs[level] = scores_output
box_outputs[level] = box_output
return scores_outputs, box_outputs
class FastrcnnHead(object):
"""Fast R-CNN box head."""
def __init__(self,
num_classes,
mlp_head_dim,
batch_norm_relu=nn_ops.BatchNormRelu):
"""Initialize params to build Fast R-CNN box head.
Args:
num_classes: a integer for the number of classes.
mlp_head_dim: a integer that is the hidden dimension in the
fully-connected layers.
batch_norm_relu: an operation that includes a batch normalization layer
followed by a relu layer(optional).
"""
self._num_classes = num_classes
self._mlp_head_dim = mlp_head_dim
self._batch_norm_relu = batch_norm_relu()
def __call__(self, roi_features, is_training=None):
"""Box and class branches for the Mask-RCNN model.
Args:
roi_features: A ROI feature tensor of shape
[batch_size, num_rois, height_l, width_l, num_filters].
is_training: `boolean`, if True if model is in training mode.
Returns:
class_outputs: a tensor with a shape of
[batch_size, num_rois, num_classes], representing the class predictions.
box_outputs: a tensor with a shape of
[batch_size, num_rois, num_classes * 4], representing the box
predictions.
"""
with backend.get_graph().as_default(), tf.name_scope('fast_rcnn_head'):
# reshape inputs beofre FC.
_, num_rois, height, width, filters = roi_features.get_shape().as_list()
roi_features = tf.reshape(roi_features,
[-1, num_rois, height * width * filters])
net = tf.keras.layers.Dense(
units=self._mlp_head_dim, activation=None, name='fc6')(
roi_features)
net = self._batch_norm_relu(net, is_training=is_training)
net = tf.keras.layers.Dense(
units=self._mlp_head_dim, activation=None, name='fc7')(
net)
net = self._batch_norm_relu(net, is_training=is_training)
class_outputs = tf.keras.layers.Dense(
self._num_classes,
kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.01),
bias_initializer=tf.zeros_initializer(),
name='class-predict')(
net)
box_outputs = tf.keras.layers.Dense(
self._num_classes * 4,
kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.001),
bias_initializer=tf.zeros_initializer(),
name='box-predict')(
net)
return class_outputs, box_outputs
class MaskrcnnHead(object):
"""Mask R-CNN head."""
def __init__(self,
num_classes,
mrcnn_resolution,
batch_norm_relu=nn_ops.BatchNormRelu):
"""Initialize params to build Fast R-CNN head.
Args:
num_classes: a integer for the number of classes.
mrcnn_resolution: a integer that is the resolution of masks.
batch_norm_relu: an operation that includes a batch normalization layer
followed by a relu layer(optional).
"""
self._num_classes = num_classes
self._mrcnn_resolution = mrcnn_resolution
self._batch_norm_relu = batch_norm_relu()
def __call__(self, roi_features, class_indices, is_training=None):
"""Mask branch for the Mask-RCNN model.
Args:
roi_features: A ROI feature tensor of shape
[batch_size, num_rois, height_l, width_l, num_filters].
class_indices: a Tensor of shape [batch_size, num_rois], indicating
which class the ROI is.
is_training: `boolean`, if True if model is in training mode.
Returns:
mask_outputs: a tensor with a shape of
[batch_size, num_masks, mask_height, mask_width, num_classes],
representing the mask predictions.
fg_gather_indices: a tensor with a shape of [batch_size, num_masks, 2],
representing the fg mask targets.
Raises:
ValueError: If boxes is not a rank-3 tensor or the last dimension of
boxes is not 4.
"""
def _get_stddev_equivalent_to_msra_fill(kernel_size, fan_out):
"""Returns the stddev of random normal initialization as MSRAFill."""
# Reference: https://github.com/pytorch/pytorch/blob/master/caffe2/operators/filler_op.h#L445-L463 # pylint: disable=line-too-long
# For example, kernel size is (3, 3) and fan out is 256, stddev is 0.029.
# stddev = (2/(3*3*256))^0.5 = 0.029
return (2 / (kernel_size[0] * kernel_size[1] * fan_out)) ** 0.5
with backend.get_graph().as_default():
with tf.name_scope('mask_head'):
_, num_rois, height, width, filters = roi_features.get_shape().as_list()
net = tf.reshape(roi_features, [-1, height, width, filters])
for i in range(4):
kernel_size = (3, 3)
fan_out = 256
init_stddev = _get_stddev_equivalent_to_msra_fill(
kernel_size, fan_out)
net = tf.keras.layers.Conv2D(
fan_out,
kernel_size=kernel_size,
strides=(1, 1),
padding='same',
dilation_rate=(1, 1),
activation=None,
kernel_initializer=tf.keras.initializers.RandomNormal(
stddev=init_stddev),
bias_initializer=tf.zeros_initializer(),
name='mask-conv-l%d' % i)(
net)
net = self._batch_norm_relu(net, is_training=is_training)
kernel_size = (2, 2)
fan_out = 256
init_stddev = _get_stddev_equivalent_to_msra_fill(kernel_size, fan_out)
net = tf.keras.layers.Conv2DTranspose(
fan_out,
kernel_size=kernel_size,
strides=(2, 2),
padding='valid',
activation=None,
kernel_initializer=tf.keras.initializers.RandomNormal(
stddev=init_stddev),
bias_initializer=tf.zeros_initializer(),
name='conv5-mask')(
net)
net = self._batch_norm_relu(net, is_training=is_training)
kernel_size = (1, 1)
fan_out = self._num_classes
init_stddev = _get_stddev_equivalent_to_msra_fill(kernel_size, fan_out)
mask_outputs = tf.keras.layers.Conv2D(
fan_out,
kernel_size=kernel_size,
strides=(1, 1),
padding='valid',
kernel_initializer=tf.keras.initializers.RandomNormal(
stddev=init_stddev),
bias_initializer=tf.zeros_initializer(),
name='mask_fcn_logits')(
net)
mask_outputs = tf.reshape(mask_outputs, [
-1, num_rois, self._mrcnn_resolution, self._mrcnn_resolution,
self._num_classes
])
with tf.name_scope('masks_post_processing'):
# TODO(pengchong): Figure out the way not to use the static inferred
# batch size.
batch_size, num_masks = class_indices.get_shape().as_list()
mask_outputs = tf.transpose(a=mask_outputs, perm=[0, 1, 4, 2, 3])
# Contructs indices for gather.
batch_indices = tf.tile(
tf.expand_dims(tf.range(batch_size), axis=1), [1, num_masks])
mask_indices = tf.tile(
tf.expand_dims(tf.range(num_masks), axis=0), [batch_size, 1])
gather_indices = tf.stack(
[batch_indices, mask_indices, class_indices], axis=2)
mask_outputs = tf.gather_nd(mask_outputs, gather_indices)
return mask_outputs
class RetinanetHead(object):
"""RetinaNet head."""
def __init__(self,
min_level,
max_level,
num_classes,
anchors_per_location,
num_convs=4,
num_filters=256,
batch_norm_relu=nn_ops.BatchNormRelu):
"""Initialize params to build RetinaNet head.
Args:
min_level: `int` number of minimum feature level.
max_level: `int` number of maximum feature level.
num_classes: `int` number of classification categories.
anchors_per_location: `int` number of anchors per pixel location.
num_convs: `int` number of stacked convolution before the last prediction
layer.
num_filters: `int` number of filters used in the head architecture.
batch_norm_relu: an operation that includes a batch normalization layer
followed by a relu layer(optional).
"""
self._min_level = min_level
self._max_level = max_level
self._num_classes = num_classes
self._anchors_per_location = anchors_per_location
self._num_convs = num_convs
self._num_filters = num_filters
with tf.name_scope('class_net') as scope_name:
self._class_name_scope = tf.name_scope(scope_name)
with tf.name_scope('box_net') as scope_name:
self._box_name_scope = tf.name_scope(scope_name)
self._build_class_net_layers(batch_norm_relu)
self._build_box_net_layers(batch_norm_relu)
def _class_net_batch_norm_name(self, i, level):
return 'class-%d-%d' % (i, level)
def _box_net_batch_norm_name(self, i, level):
return 'box-%d-%d' % (i, level)
def _build_class_net_layers(self, batch_norm_relu):
"""Build re-usable layers for class prediction network."""
self._class_predict = tf.keras.layers.Conv2D(
self._num_classes * self._anchors_per_location,
kernel_size=(3, 3),
bias_initializer=tf.constant_initializer(-np.log((1 - 0.01) / 0.01)),
kernel_initializer=tf.keras.initializers.RandomNormal(stddev=1e-5),
padding='same',
name='class-predict')
self._class_conv = []
self._class_batch_norm_relu = {}
for i in range(self._num_convs):
self._class_conv.append(
tf.keras.layers.Conv2D(
self._num_filters,
kernel_size=(3, 3),
bias_initializer=tf.zeros_initializer(),
kernel_initializer=tf.keras.initializers.RandomNormal(
stddev=0.01),
activation=None,
padding='same',
name='class-' + str(i)))
for level in range(self._min_level, self._max_level + 1):
name = self._class_net_batch_norm_name(i, level)
self._class_batch_norm_relu[name] = batch_norm_relu(name=name)
def _build_box_net_layers(self, batch_norm_relu):
"""Build re-usable layers for box prediction network."""
self._box_predict = tf.keras.layers.Conv2D(
4 * self._anchors_per_location,
kernel_size=(3, 3),
bias_initializer=tf.zeros_initializer(),
kernel_initializer=tf.keras.initializers.RandomNormal(stddev=1e-5),
padding='same',
name='box-predict')
self._box_conv = []
self._box_batch_norm_relu = {}
for i in range(self._num_convs):
self._box_conv.append(
tf.keras.layers.Conv2D(
self._num_filters,
kernel_size=(3, 3),
activation=None,
bias_initializer=tf.zeros_initializer(),
kernel_initializer=tf.keras.initializers.RandomNormal(
stddev=0.01),
padding='same',
name='box-' + str(i)))
for level in range(self._min_level, self._max_level + 1):
name = self._box_net_batch_norm_name(i, level)
self._box_batch_norm_relu[name] = batch_norm_relu(name=name)
def __call__(self, fpn_features, is_training=None):
"""Returns outputs of RetinaNet head."""
class_outputs = {}
box_outputs = {}
with backend.get_graph().as_default(), tf.name_scope('retinanet'):
for level in range(self._min_level, self._max_level + 1):
features = fpn_features[level]
class_outputs[level] = self.class_net(
features, level, is_training=is_training)
box_outputs[level] = self.box_net(
features, level, is_training=is_training)
return class_outputs, box_outputs
def class_net(self, features, level, is_training):
"""Class prediction network for RetinaNet."""
with self._class_name_scope:
for i in range(self._num_convs):
features = self._class_conv[i](features)
# The convolution layers in the class net are shared among all levels, but
# each level has its batch normlization to capture the statistical
# difference among different levels.
name = self._class_net_batch_norm_name(i, level)
features = self._class_batch_norm_relu[name](
features, is_training=is_training)
classes = self._class_predict(features)
return classes
def box_net(self, features, level, is_training=None):
"""Box regression network for RetinaNet."""
with self._box_name_scope:
for i in range(self._num_convs):
features = self._box_conv[i](features)
# The convolution layers in the box net are shared among all levels, but
# each level has its batch normlization to capture the statistical
# difference among different levels.
name = self._box_net_batch_norm_name(i, level)
features = self._box_batch_norm_relu[name](
features, is_training=is_training)
boxes = self._box_predict(features)
return boxes
# TODO(yeqing): Refactor this class when it is ready for var_scope reuse.
class ShapemaskPriorHead(object):
"""ShapeMask Prior head."""
def __init__(self,
num_classes,
num_downsample_channels,
mask_crop_size,
use_category_for_mask,
num_of_instances,
min_mask_level,
max_mask_level,
num_clusters,
temperature,
shape_prior_path=None):
"""Initialize params to build RetinaNet head.
Args:
num_classes: Number of output classes.
num_downsample_channels: number of channels in mask branch.
mask_crop_size: feature crop size.
use_category_for_mask: use class information in mask branch.
num_of_instances: number of instances to sample in training time.
min_mask_level: minimum FPN level to crop mask feature from.
max_mask_level: maximum FPN level to crop mask feature from.
num_clusters: number of clusters to use in K-Means.
temperature: the temperature for shape prior learning.
shape_prior_path: the path to load shape priors.
"""
self._mask_num_classes = num_classes
self._num_downsample_channels = num_downsample_channels
self._mask_crop_size = mask_crop_size
self._use_category_for_mask = use_category_for_mask
self._num_of_instances = num_of_instances
self._min_mask_level = min_mask_level
self._max_mask_level = max_mask_level
self._num_clusters = num_clusters
self._temperature = temperature
self._shape_prior_path = shape_prior_path
def __call__(self,
fpn_features,
boxes,
outer_boxes,
classes,
is_training=None):
"""Generate the detection priors from the box detections and FPN features.
This corresponds to the Fig. 4 of the ShapeMask paper at
https://arxiv.org/pdf/1904.03239.pdf
Args:
fpn_features: a dictionary of FPN features.
boxes: a float tensor of shape [batch_size, num_instances, 4]
representing the tight gt boxes from dataloader/detection.
outer_boxes: a float tensor of shape [batch_size, num_instances, 4]
representing the loose gt boxes from dataloader/detection.
classes: a int Tensor of shape [batch_size, num_instances]
of instance classes.
is_training: training mode or not.
Returns:
crop_features: a float Tensor of shape [batch_size * num_instances,
mask_crop_size, mask_crop_size, num_downsample_channels]. This is the
instance feature crop.
detection_priors: A float Tensor of shape [batch_size * num_instances,
mask_size, mask_size, 1].
"""
with backend.get_graph().as_default():
# loads class specific or agnostic shape priors
if self._shape_prior_path:
if self._use_category_for_mask:
fid = tf.io.gfile.GFile(self._shape_prior_path, 'rb')
class_tups = pickle.load(fid)
max_class_id = class_tups[-1][0] + 1
class_masks = np.zeros((max_class_id, self._num_clusters,
self._mask_crop_size, self._mask_crop_size),
dtype=np.float32)
for cls_id, _, cls_mask in class_tups:
assert cls_mask.shape == (self._num_clusters,
self._mask_crop_size**2)
class_masks[cls_id] = cls_mask.reshape(self._num_clusters,
self._mask_crop_size,
self._mask_crop_size)
self.class_priors = tf.convert_to_tensor(
value=class_masks, dtype=tf.float32)
else:
npy_path = tf.io.gfile.GFile(self._shape_prior_path)
class_np_masks = np.load(npy_path)
assert class_np_masks.shape == (
self._num_clusters, self._mask_crop_size,
self._mask_crop_size), 'Invalid priors!!!'
self.class_priors = tf.convert_to_tensor(
value=class_np_masks, dtype=tf.float32)
else:
self.class_priors = tf.zeros(
[self._num_clusters, self._mask_crop_size, self._mask_crop_size],
tf.float32)
batch_size = boxes.get_shape()[0]
min_level_shape = fpn_features[self._min_mask_level].get_shape().as_list()
self._max_feature_size = min_level_shape[1]
detection_prior_levels = self._compute_box_levels(boxes)
level_outer_boxes = outer_boxes / tf.pow(
2., tf.expand_dims(detection_prior_levels, -1))
detection_prior_levels = tf.cast(detection_prior_levels, tf.int32)
uniform_priors = spatial_transform.crop_mask_in_target_box(
tf.ones([
batch_size, self._num_of_instances, self._mask_crop_size,
self._mask_crop_size
], tf.float32), boxes, outer_boxes, self._mask_crop_size)
# Prepare crop features.
multi_level_features = self._get_multilevel_features(fpn_features)
crop_features = spatial_transform.single_level_feature_crop(
multi_level_features, level_outer_boxes, detection_prior_levels,
self._min_mask_level, self._mask_crop_size)
# Predict and fuse shape priors.
shape_weights = self._classify_and_fuse_detection_priors(
uniform_priors, classes, crop_features)
fused_shape_priors = self._fuse_priors(shape_weights, classes)
fused_shape_priors = tf.reshape(fused_shape_priors, [
batch_size, self._num_of_instances, self._mask_crop_size,
self._mask_crop_size
])
predicted_detection_priors = spatial_transform.crop_mask_in_target_box(
fused_shape_priors, boxes, outer_boxes, self._mask_crop_size)
predicted_detection_priors = tf.reshape(
predicted_detection_priors,
[-1, self._mask_crop_size, self._mask_crop_size, 1])
return crop_features, predicted_detection_priors
def _get_multilevel_features(self, fpn_features):
"""Get multilevel features from FPN feature dictionary into one tensor.
Args:
fpn_features: a dictionary of FPN features.
Returns:
features: a float tensor of shape [batch_size, num_levels,
max_feature_size, max_feature_size, num_downsample_channels].
"""
# TODO(yeqing): Recover reuse=tf.AUTO_REUSE logic.
with tf.name_scope('masknet'):
mask_feats = {}
# Reduce the feature dimension at each FPN level by convolution.
for feat_level in range(self._min_mask_level, self._max_mask_level + 1):
mask_feats[feat_level] = tf.keras.layers.Conv2D(
self._num_downsample_channels,
kernel_size=(1, 1),
bias_initializer=tf.zeros_initializer(),
kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.01),
padding='same',
name='mask-downsample')(
fpn_features[feat_level])
# Concat features through padding to the max size.
features = [mask_feats[self._min_mask_level]]
for feat_level in range(self._min_mask_level + 1,
self._max_mask_level + 1):
features.append(tf.image.pad_to_bounding_box(
mask_feats[feat_level], 0, 0,
self._max_feature_size, self._max_feature_size))
features = tf.stack(features, axis=1)
return features
def _compute_box_levels(self, boxes):
"""Compute the box FPN levels.
Args:
boxes: a float tensor of shape [batch_size, num_instances, 4].
Returns:
levels: a int tensor of shape [batch_size, num_instances].
"""
object_sizes = tf.stack([
boxes[:, :, 2] - boxes[:, :, 0],
boxes[:, :, 3] - boxes[:, :, 1],
], axis=2)
object_sizes = tf.reduce_max(input_tensor=object_sizes, axis=2)
ratios = object_sizes / self._mask_crop_size
levels = tf.math.ceil(tf.math.log(ratios) / tf.math.log(2.))
levels = tf.maximum(tf.minimum(levels, self._max_mask_level),
self._min_mask_level)
return levels
def _classify_and_fuse_detection_priors(self, uniform_priors,
detection_prior_classes,
crop_features):
"""Classify the uniform prior by predicting the shape modes.
Classify the object crop features into K modes of the clusters for each
category.
Args:
uniform_priors: A float Tensor of shape [batch_size, num_instances,
mask_size, mask_size] representing the uniform detection priors.
detection_prior_classes: A int Tensor of shape [batch_size, num_instances]
of detection class ids.
crop_features: A float Tensor of shape [batch_size * num_instances,
mask_size, mask_size, num_channels].
Returns:
shape_weights: A float Tensor of shape
[batch_size * num_instances, num_clusters] representing the classifier
output probability over all possible shapes.
"""
location_detection_priors = tf.reshape(
uniform_priors, [-1, self._mask_crop_size, self._mask_crop_size, 1])
# Generate image embedding to shape.
fused_shape_features = crop_features * location_detection_priors
shape_embedding = tf.reduce_mean(
input_tensor=fused_shape_features, axis=(1, 2))
if not self._use_category_for_mask:
# TODO(weicheng) use custom op for performance
shape_logits = tf.keras.layers.Dense(
self._num_clusters,
kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.01))(
shape_embedding)
shape_logits = tf.reshape(shape_logits,
[-1, self._num_clusters]) / self._temperature
shape_weights = tf.nn.softmax(shape_logits, name='shape_prior_weights')
else:
shape_logits = tf.keras.layers.Dense(
self._mask_num_classes * self._num_clusters,
kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.01))(
shape_embedding)
shape_logits = tf.reshape(
shape_logits, [-1, self._mask_num_classes, self._num_clusters])
training_classes = tf.reshape(detection_prior_classes, [-1])
class_idx = tf.stack(
[tf.range(tf.size(input=training_classes)), training_classes - 1],
axis=1)
shape_logits = tf.gather_nd(shape_logits, class_idx) / self._temperature
shape_weights = tf.nn.softmax(shape_logits, name='shape_prior_weights')
return shape_weights
def _fuse_priors(self, shape_weights, detection_prior_classes):
"""Fuse shape priors by the predicted shape probability.
Args:
shape_weights: A float Tensor of shape [batch_size * num_instances,
num_clusters] of predicted shape probability distribution.
detection_prior_classes: A int Tensor of shape [batch_size, num_instances]
of detection class ids.
Returns:
detection_priors: A float Tensor of shape [batch_size * num_instances,
mask_size, mask_size, 1].
"""
if self._use_category_for_mask:
object_class_priors = tf.gather(
self.class_priors, detection_prior_classes)
else:
num_batch_instances = shape_weights.get_shape()[0]
object_class_priors = tf.tile(
tf.expand_dims(self.class_priors, 0),
[num_batch_instances, 1, 1, 1])
vector_class_priors = tf.reshape(
object_class_priors,
[-1, self._num_clusters,
self._mask_crop_size * self._mask_crop_size])
detection_priors = tf.matmul(
tf.expand_dims(shape_weights, 1), vector_class_priors)[:, 0, :]
detection_priors = tf.reshape(
detection_priors, [-1, self._mask_crop_size, self._mask_crop_size, 1])
return detection_priors
class ShapemaskCoarsemaskHead(object):
"""ShapemaskCoarsemaskHead head."""
def __init__(self,
num_classes,
num_downsample_channels,
mask_crop_size,
use_category_for_mask,
num_convs):
"""Initialize params to build ShapeMask coarse and fine prediction head.
Args:
num_classes: `int` number of mask classification categories.
num_downsample_channels: `int` number of filters at mask head.
mask_crop_size: feature crop size.
use_category_for_mask: use class information in mask branch.
num_convs: `int` number of stacked convolution before the last prediction
layer.
"""
self._mask_num_classes = num_classes
self._num_downsample_channels = num_downsample_channels
self._mask_crop_size = mask_crop_size
self._use_category_for_mask = use_category_for_mask
self._num_convs = num_convs
if not use_category_for_mask:
assert num_classes == 1
def __call__(self,
crop_features,
detection_priors,
inst_classes,
is_training=None):
"""Generate instance masks from FPN features and detection priors.
This corresponds to the Fig. 5-6 of the ShapeMask paper at
https://arxiv.org/pdf/1904.03239.pdf
Args:
crop_features: a float Tensor of shape [batch_size * num_instances,
mask_crop_size, mask_crop_size, num_downsample_channels]. This is the
instance feature crop.
detection_priors: a float Tensor of shape [batch_size * num_instances,
mask_crop_size, mask_crop_size, 1]. This is the detection prior for
the instance.
inst_classes: a int Tensor of shape [batch_size, num_instances]
of instance classes.
is_training: a bool indicating whether in training mode.
Returns:
mask_outputs: instance mask prediction as a float Tensor of shape
[batch_size * num_instances, mask_size, mask_size, num_classes].
"""
# Embed the anchor map into some feature space for anchor conditioning.
detection_prior_features = tf.keras.layers.Conv2D(
self._num_downsample_channels,
kernel_size=(1, 1),
bias_initializer=tf.zeros_initializer(),
kernel_initializer=tf.keras.initializers.RandomNormal(
mean=0., stddev=0.01),
padding='same',
name='anchor-conv')(
detection_priors)
prior_conditioned_features = crop_features + detection_prior_features
coarse_output_features = self.coarsemask_decoder_net(
prior_conditioned_features, is_training)
coarse_mask_classes = tf.keras.layers.Conv2D(
self._mask_num_classes,
kernel_size=(1, 1),
# Focal loss bias initialization to have foreground 0.01 probability.
bias_initializer=tf.constant_initializer(-np.log((1 - 0.01) / 0.01)),
kernel_initializer=tf.keras.initializers.RandomNormal(
mean=0, stddev=0.01),
padding='same',
name='class-predict')(
coarse_output_features)
if self._use_category_for_mask:
inst_classes = tf.cast(tf.reshape(inst_classes, [-1]), tf.int32)
coarse_mask_classes_t = tf.transpose(
a=coarse_mask_classes, perm=(0, 3, 1, 2))
# pylint: disable=g-long-lambda
coarse_mask_logits = tf.cond(
pred=tf.size(input=inst_classes) > 0,
true_fn=lambda: tf.gather_nd(
coarse_mask_classes_t,
tf.stack(
[tf.range(tf.size(input=inst_classes)), inst_classes - 1],
axis=1)),
false_fn=lambda: coarse_mask_classes_t[:, 0, :, :])
# pylint: enable=g-long-lambda
coarse_mask_logits = tf.expand_dims(coarse_mask_logits, -1)
else:
coarse_mask_logits = coarse_mask_classes
coarse_class_probs = tf.nn.sigmoid(coarse_mask_logits)
class_probs = tf.cast(coarse_class_probs, prior_conditioned_features.dtype)
return coarse_mask_classes, class_probs, prior_conditioned_features
def coarsemask_decoder_net(self,
images,
is_training=None,
batch_norm_relu=nn_ops.BatchNormRelu):
"""Coarse mask decoder network architecture.
Args:
images: A tensor of size [batch, height_in, width_in, channels_in].
is_training: Whether batch_norm layers are in training mode.
batch_norm_relu: an operation that includes a batch normalization layer
followed by a relu layer(optional).
Returns:
images: A feature tensor of size [batch, output_size, output_size,
num_channels]
"""
for i in range(self._num_convs):
images = tf.keras.layers.Conv2D(
self._num_downsample_channels,
kernel_size=(3, 3),
bias_initializer=tf.zeros_initializer(),
kernel_initializer=tf.keras.initializers.RandomNormal(stddev=0.01),
activation=None,
padding='same',
name='coarse-class-%d' % i)(
images)
images = batch_norm_relu(name='coarse-class-%d-bn' % i)(
images, is_training=is_training)
return images
class ShapemaskFinemaskHead(object):
"""ShapemaskFinemaskHead head."""
def __init__(self,
num_classes,
num_downsample_channels,
mask_crop_size,
num_convs,
coarse_mask_thr,
gt_upsample_scale,
batch_norm_relu=nn_ops.BatchNormRelu):
"""Initialize params to build ShapeMask coarse and fine prediction head.
Args:
num_classes: `int` number of mask classification categories.
num_downsample_channels: `int` number of filters at mask head.
mask_crop_size: feature crop size.
num_convs: `int` number of stacked convolution before the last prediction
layer.
coarse_mask_thr: the threshold for suppressing noisy coarse prediction.
gt_upsample_scale: scale for upsampling groundtruths.
batch_norm_relu: an operation that includes a batch normalization layer
followed by a relu layer(optional).
"""
self._mask_num_classes = num_classes
self._num_downsample_channels = num_downsample_channels
self._mask_crop_size = mask_crop_size
self._num_convs = num_convs
self._coarse_mask_thr = coarse_mask_thr
self._gt_upsample_scale = gt_upsample_scale
self._class_predict_conv = tf.keras.layers.Conv2D(
self._mask_num_classes,
kernel_size=(1, 1),
# Focal loss bias initialization to have foreground 0.01 probability.
bias_initializer=tf.constant_initializer(-np.log((1 - 0.01) / 0.01)),
kernel_initializer=tf.keras.initializers.RandomNormal(
mean=0, stddev=0.01),
padding='same',
name='affinity-class-predict')
self._upsample_conv = tf.keras.layers.Conv2DTranspose(
self._num_downsample_channels // 2,
(self._gt_upsample_scale, self._gt_upsample_scale),
(self._gt_upsample_scale, self._gt_upsample_scale))
self._fine_class_conv = []
self._fine_class_bn = []
for i in range(self._num_convs):
self._fine_class_conv.append(
tf.keras.layers.Conv2D(
self._num_downsample_channels,
kernel_size=(3, 3),
bias_initializer=tf.zeros_initializer(),
kernel_initializer=tf.keras.initializers.RandomNormal(
stddev=0.01),
activation=None,
padding='same',
name='fine-class-%d' % i))
self._fine_class_bn.append(batch_norm_relu(name='fine-class-%d-bn' % i))
def __call__(self, prior_conditioned_features, class_probs, is_training=None):
"""Generate instance masks from FPN features and detection priors.
This corresponds to the Fig. 5-6 of the ShapeMask paper at
https://arxiv.org/pdf/1904.03239.pdf
Args:
prior_conditioned_features: a float Tensor of shape [batch_size *
num_instances, mask_crop_size, mask_crop_size, num_downsample_channels].
This is the instance feature crop.
class_probs: a float Tensor of shape [batch_size * num_instances,
mask_crop_size, mask_crop_size, 1]. This is the class probability of
instance segmentation.
is_training: a bool indicating whether in training mode.
Returns:
mask_outputs: instance mask prediction as a float Tensor of shape
[batch_size * num_instances, mask_size, mask_size, num_classes].
"""
with backend.get_graph().as_default(), tf.name_scope('affinity-masknet'):
# Extract the foreground mean features
point_samp_prob_thr = 1. / (1. + tf.exp(-self._coarse_mask_thr))
point_samp_prob_thr = tf.cast(point_samp_prob_thr, class_probs.dtype)
class_probs = tf.where(
tf.greater(class_probs, point_samp_prob_thr), class_probs,
tf.zeros_like(class_probs))
weighted_features = class_probs * prior_conditioned_features
sum_class_vector = tf.reduce_sum(
input_tensor=class_probs, axis=(1, 2)) + tf.constant(
1e-20, class_probs.dtype)
instance_embedding = tf.reduce_sum(
input_tensor=weighted_features, axis=(1, 2)) / sum_class_vector
# Take the difference between crop features and mean instance features.
instance_features = prior_conditioned_features - tf.reshape(
instance_embedding, (-1, 1, 1, self._num_downsample_channels))
# Decoder to generate upsampled segmentation mask.
affinity_output_features = self.finemask_decoder_net(
instance_features, is_training)
# Predict per-class instance masks.
affinity_mask_classes = self._class_predict_conv(affinity_output_features)
return affinity_mask_classes
def finemask_decoder_net(self, images, is_training=None):
"""Fine mask decoder network architecture.
Args:
images: A tensor of size [batch, height_in, width_in, channels_in].
is_training: Whether batch_norm layers are in training mode.
Returns:
images: A feature tensor of size [batch, output_size, output_size,
num_channels], where output size is self._gt_upsample_scale times
that of input.
"""
for i in range(self._num_convs):
images = self._fine_class_conv[i](images)
images = self._fine_class_bn[i](images, is_training=is_training)
if self._gt_upsample_scale > 1:
images = self._upsample_conv(images)
return images
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Neural network operations commonly shared by the architectures."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow.compat.v2 as tf
from tensorflow.python.keras import backend
class BatchNormRelu(tf.keras.layers.Layer):
"""Combined Batch Normalization and ReLU layers."""
def __init__(self,
momentum=0.997,
epsilon=1e-4,
trainable=True,
relu=True,
init_zero=False,
name=None):
"""A class to construct layers for a batch normalization followed by a ReLU.
Args:
momentum: momentum for the moving average.
epsilon: small float added to variance to avoid dividing by zero.
trainable: `boolean`, if True also add variables to the graph collection
GraphKeys.TRAINABLE_VARIABLES. If False, freeze batch normalization
layer.
relu: `bool` if False, omits the ReLU operation.
init_zero: `bool` if True, initializes scale parameter of batch
normalization with 0. If False, initialize it with 1.
name: `str` name for the operation.
"""
self._use_relu = relu
self._trainable = trainable
if init_zero:
gamma_initializer = tf.keras.initializers.Zeros()
else:
gamma_initializer = tf.keras.initializers.Ones()
# TODO(yeqing): Check if we can change the fused=True again.
self._batch_norm_op = tf.keras.layers.BatchNormalization(
momentum=momentum,
epsilon=epsilon,
center=True,
scale=True,
trainable=trainable,
fused=False,
gamma_initializer=gamma_initializer,
name=name)
def __call__(self, inputs, is_training=None):
"""Builds layers for a batch normalization followed by a ReLU.
Args:
inputs: `Tensor` of shape `[batch, channels, ...]`.
is_training: `boolean`, if True if model is in training mode.
Returns:
A normalized `Tensor` with the same `data_format`.
"""
# We will need to keep training=None by default, so that it can be inherit
# from keras.Model.training
if is_training and self._trainable:
is_training = True
inputs = self._batch_norm_op(inputs, training=is_training)
if self._use_relu:
inputs = tf.nn.relu(inputs)
return inputs
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Contains definitions for the post-activation form of Residual Networks.
Residual networks (ResNets) were proposed in:
[1] Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun
Deep Residual Learning for Image Recognition. arXiv:1512.03385
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from absl import logging
import tensorflow.compat.v2 as tf
from tensorflow.python.keras import backend
from official.vision.detection.modeling.architecture import nn_ops
# TODO(b/140112644): Refactor the code with Keras style, i.e. build and call.
class Resnet(object):
"""Class to build ResNet family model."""
def __init__(self,
resnet_depth,
dropblock_keep_prob=None,
dropblock_size=None,
batch_norm_relu=nn_ops.BatchNormRelu,
data_format='channels_last'):
"""ResNet initialization function.
Args:
resnet_depth: `int` depth of ResNet backbone model.
dropblock_keep_prob: `float` or `Tensor` keep_prob parameter of DropBlock.
"None" means no DropBlock.
dropblock_size: `int` size parameter of DropBlock.
batch_norm_relu: an operation that includes a batch normalization layer
followed by a relu layer(optional).
data_format: `str` either "channels_first" for `[batch, channels, height,
width]` or "channels_last for `[batch, height, width, channels]`.
"""
self._resnet_depth = resnet_depth
self._dropblock_keep_prob = dropblock_keep_prob
self._dropblock_size = dropblock_size
self._batch_norm_relu = batch_norm_relu
self._data_format = data_format
model_params = {
18: {'block': self.residual_block, 'layers': [2, 2, 2, 2]},
34: {'block': self.residual_block, 'layers': [3, 4, 6, 3]},
50: {'block': self.bottleneck_block, 'layers': [3, 4, 6, 3]},
101: {'block': self.bottleneck_block, 'layers': [3, 4, 23, 3]},
152: {'block': self.bottleneck_block, 'layers': [3, 8, 36, 3]},
200: {'block': self.bottleneck_block, 'layers': [3, 24, 36, 3]}
}
if resnet_depth not in model_params:
valid_resnet_depths = ', '.join(
[str(depth) for depth in sorted(model_params.keys())])
raise ValueError(
'The resnet_depth should be in [%s]. Not a valid resnet_depth:'%(
valid_resnet_depths), self._resnet_depth)
params = model_params[resnet_depth]
self._resnet_fn = self.resnet_v1_generator(
params['block'], params['layers'])
def __call__(self, inputs, is_training=None):
"""Returns the ResNet model for a given size and number of output classes.
Args:
inputs: a `Tesnor` with shape [batch_size, height, width, 3] representing
a batch of images.
is_training: `bool` if True, the model is in training mode.
Returns:
a `dict` containing `int` keys for continuous feature levels [2, 3, 4, 5].
The values are corresponding feature hierarchy in ResNet with shape
[batch_size, height_l, width_l, num_filters].
"""
with backend.get_graph().as_default():
with tf.name_scope('resnet%s' % self._resnet_depth):
return self._resnet_fn(inputs, is_training)
def dropblock(self, net, is_training=None):
"""DropBlock: a regularization method for convolutional neural networks.
DropBlock is a form of structured dropout, where units in a contiguous
region of a feature map are dropped together. DropBlock works better than
dropout on convolutional layers due to the fact that activation units in
convolutional layers are spatially correlated.
See https://arxiv.org/pdf/1810.12890.pdf for details.
Args:
net: `Tensor` input tensor.
is_training: `bool` if True, the model is in training mode.
Returns:
A version of input tensor with DropBlock applied.
Raises:
if width and height of the input tensor are not equal.
"""
if not is_training or self._dropblock_keep_prob is None:
return net
logging.info('Applying DropBlock: dropblock_size {}, net.shape {}'.format(
self._dropblock_size, net.shape))
if self._data_format == 'channels_last':
_, width, height, _ = net.get_shape().as_list()
else:
_, _, width, height = net.get_shape().as_list()
total_size = width * height
dropblock_size = min(self._dropblock_size, min(width, height))
# Seed_drop_rate is the gamma parameter of DropBlcok.
seed_drop_rate = (
1.0 - self._dropblock_keep_prob) * total_size / dropblock_size**2 / (
(width - self._dropblock_size + 1) *
(height - self._dropblock_size + 1))
# Forces the block to be inside the feature map.
w_i, h_i = tf.meshgrid(tf.range(width), tf.range(height))
valid_block = tf.logical_and(
tf.logical_and(w_i >= int(dropblock_size // 2),
w_i < width - (dropblock_size - 1) // 2),
tf.logical_and(h_i >= int(dropblock_size // 2),
h_i < width - (dropblock_size - 1) // 2))
if self._data_format == 'channels_last':
valid_block = tf.reshape(valid_block, [1, height, width, 1])
else:
valid_block = tf.reshape(valid_block, [1, 1, height, width])
randnoise = tf.random.uniform(net.shape, dtype=tf.float32)
valid_block = tf.cast(valid_block, dtype=tf.float32)
seed_keep_rate = tf.cast(1 - seed_drop_rate, dtype=tf.float32)
block_pattern = (1 - valid_block + seed_keep_rate + randnoise) >= 1
block_pattern = tf.cast(block_pattern, dtype=tf.float32)
if dropblock_size == min(width, height):
block_pattern = tf.reduce_min(
input_tensor=block_pattern,
axis=[1, 2] if self._data_format == 'channels_last' else [2, 3],
keepdims=True)
else:
block_pattern = -tf.keras.layers.MaxPool2D(
pool_size=self._dropblock_size,
strides=1,
padding='SAME',
data_format=self._data_format)(-block_pattern)
percent_ones = tf.cast(
tf.reduce_sum(input_tensor=block_pattern), tf.float32) / tf.cast(
tf.size(input=block_pattern), tf.float32)
net = net / tf.cast(percent_ones, net.dtype) * tf.cast(
block_pattern, net.dtype)
return net
def fixed_padding(self, inputs, kernel_size):
"""Pads the input along the spatial dimensions independently of input size.
Args:
inputs: `Tensor` of size `[batch, channels, height, width]` or
`[batch, height, width, channels]` depending on `data_format`.
kernel_size: `int` kernel size to be used for `conv2d` or max_pool2d`
operations. Should be a positive integer.
Returns:
A padded `Tensor` of the same `data_format` with size either intact
(if `kernel_size == 1`) or padded (if `kernel_size > 1`).
"""
pad_total = kernel_size - 1
pad_beg = pad_total // 2
pad_end = pad_total - pad_beg
if self._data_format == 'channels_first':
padded_inputs = tf.pad(
tensor=inputs,
paddings=[[0, 0], [0, 0], [pad_beg, pad_end], [pad_beg, pad_end]])
else:
padded_inputs = tf.pad(
tensor=inputs,
paddings=[[0, 0], [pad_beg, pad_end], [pad_beg, pad_end], [0, 0]])
return padded_inputs
def conv2d_fixed_padding(self, inputs, filters, kernel_size, strides):
"""Strided 2-D convolution with explicit padding.
The padding is consistent and is based only on `kernel_size`, not on the
dimensions of `inputs` (as opposed to using `tf.layers.conv2d` alone).
Args:
inputs: `Tensor` of size `[batch, channels, height_in, width_in]`.
filters: `int` number of filters in the convolution.
kernel_size: `int` size of the kernel to be used in the convolution.
strides: `int` strides of the convolution.
Returns:
A `Tensor` of shape `[batch, filters, height_out, width_out]`.
"""
if strides > 1:
inputs = self.fixed_padding(inputs, kernel_size)
return tf.keras.layers.Conv2D(
filters=filters,
kernel_size=kernel_size,
strides=strides,
padding=('SAME' if strides == 1 else 'VALID'),
use_bias=False,
kernel_initializer=tf.initializers.VarianceScaling(),
data_format=self._data_format)(
inputs=inputs)
def residual_block(self,
inputs,
filters,
strides,
use_projection=False,
is_training=None):
"""Standard building block for residual networks with BN after convolutions.
Args:
inputs: `Tensor` of size `[batch, channels, height, width]`.
filters: `int` number of filters for the first two convolutions. Note that
the third and final convolution will use 4 times as many filters.
strides: `int` block stride. If greater than 1, this block will ultimately
downsample the input.
use_projection: `bool` for whether this block should use a projection
shortcut (versus the default identity shortcut). This is usually
`True` for the first block of a block group, which may change the
number of filters and the resolution.
is_training: `bool` if True, the model is in training mode.
Returns:
The output `Tensor` of the block.
"""
shortcut = inputs
if use_projection:
# Projection shortcut in first layer to match filters and strides
shortcut = self.conv2d_fixed_padding(
inputs=inputs, filters=filters, kernel_size=1, strides=strides)
shortcut = self._batch_norm_relu(relu=False)(
shortcut, is_training=is_training)
inputs = self.conv2d_fixed_padding(
inputs=inputs, filters=filters, kernel_size=3, strides=strides)
inputs = self._batch_norm_relu()(inputs, is_training=is_training)
inputs = self.conv2d_fixed_padding(
inputs=inputs, filters=filters, kernel_size=3, strides=1)
inputs = self._batch_norm_relu()(
inputs, relu=False, init_zero=True, is_training=is_training)
return tf.nn.relu(inputs + shortcut)
def bottleneck_block(self,
inputs,
filters,
strides,
use_projection=False,
is_training=None):
"""Bottleneck block variant for residual networks with BN after convolutions.
Args:
inputs: `Tensor` of size `[batch, channels, height, width]`.
filters: `int` number of filters for the first two convolutions. Note that
the third and final convolution will use 4 times as many filters.
strides: `int` block stride. If greater than 1, this block will ultimately
downsample the input.
use_projection: `bool` for whether this block should use a projection
shortcut (versus the default identity shortcut). This is usually
`True` for the first block of a block group, which may change the
number of filters and the resolution.
is_training: `bool` if True, the model is in training mode.
Returns:
The output `Tensor` of the block.
"""
shortcut = inputs
if use_projection:
# Projection shortcut only in first block within a group. Bottleneck
# blocks end with 4 times the number of filters.
filters_out = 4 * filters
shortcut = self.conv2d_fixed_padding(
inputs=inputs, filters=filters_out, kernel_size=1, strides=strides)
shortcut = self._batch_norm_relu(relu=False)(
shortcut, is_training=is_training)
shortcut = self.dropblock(shortcut, is_training=is_training)
inputs = self.conv2d_fixed_padding(
inputs=inputs, filters=filters, kernel_size=1, strides=1)
inputs = self._batch_norm_relu()(inputs, is_training=is_training)
inputs = self.dropblock(inputs, is_training=is_training)
inputs = self.conv2d_fixed_padding(
inputs=inputs, filters=filters, kernel_size=3, strides=strides)
inputs = self._batch_norm_relu()(inputs, is_training=is_training)
inputs = self.dropblock(inputs, is_training=is_training)
inputs = self.conv2d_fixed_padding(
inputs=inputs, filters=4 * filters, kernel_size=1, strides=1)
inputs = self._batch_norm_relu(
relu=False, init_zero=True)(
inputs, is_training=is_training)
inputs = self.dropblock(inputs, is_training=is_training)
return tf.nn.relu(inputs + shortcut)
def block_group(self, inputs, filters, block_fn, blocks, strides, name,
is_training):
"""Creates one group of blocks for the ResNet model.
Args:
inputs: `Tensor` of size `[batch, channels, height, width]`.
filters: `int` number of filters for the first convolution of the layer.
block_fn: `function` for the block to use within the model
blocks: `int` number of blocks contained in the layer.
strides: `int` stride to use for the first convolution of the layer. If
greater than 1, this layer will downsample the input.
name: `str`name for the Tensor output of the block layer.
is_training: `bool` if True, the model is in training mode.
Returns:
The output `Tensor` of the block layer.
"""
# Only the first block per block_group uses projection shortcut and strides.
inputs = block_fn(inputs, filters, strides, use_projection=True,
is_training=is_training)
for _ in range(1, blocks):
inputs = block_fn(inputs, filters, 1, is_training=is_training)
return tf.identity(inputs, name)
def resnet_v1_generator(self, block_fn, layers):
"""Generator for ResNet v1 models.
Args:
block_fn: `function` for the block to use within the model. Either
`residual_block` or `bottleneck_block`.
layers: list of 4 `int`s denoting the number of blocks to include in each
of the 4 block groups. Each group consists of blocks that take inputs of
the same resolution.
Returns:
Model `function` that takes in `inputs` and `is_training` and returns the
output `Tensor` of the ResNet model.
"""
def model(inputs, is_training=None):
"""Creation of the model graph."""
inputs = self.conv2d_fixed_padding(
inputs=inputs, filters=64, kernel_size=7, strides=2)
inputs = tf.identity(inputs, 'initial_conv')
inputs = self._batch_norm_relu()(inputs, is_training=is_training)
inputs = tf.keras.layers.MaxPool2D(
pool_size=3, strides=2, padding='SAME',
data_format=self._data_format)(
inputs)
inputs = tf.identity(inputs, 'initial_max_pool')
c2 = self.block_group(
inputs=inputs, filters=64, block_fn=block_fn, blocks=layers[0],
strides=1, name='block_group1', is_training=is_training)
c3 = self.block_group(
inputs=c2, filters=128, block_fn=block_fn, blocks=layers[1],
strides=2, name='block_group2', is_training=is_training)
c4 = self.block_group(
inputs=c3, filters=256, block_fn=block_fn, blocks=layers[2],
strides=2, name='block_group3', is_training=is_training)
c5 = self.block_group(
inputs=c4, filters=512, block_fn=block_fn, blocks=layers[3],
strides=2, name='block_group4', is_training=is_training)
return {2: c2, 3: c3, 4: c4, 5: c5}
return model
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Base Model definition."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import abc
import functools
import re
import six
from absl import logging
import tensorflow.compat.v2 as tf
from official.vision.detection.modeling import checkpoint_utils
from official.vision.detection.modeling import learning_rates
class OptimizerFactory(object):
"""Class to generate optimizer function."""
def __init__(self, params):
"""Creates optimized based on the specified flags."""
if params.type == 'momentum':
self._optimizer = functools.partial(
tf.keras.optimizers.SGD, momentum=0.9, nesterov=True)
elif params.type == 'adam':
self._optimizer = tf.keras.optimizers.Adam
elif params.type == 'adadelta':
self._optimizer = tf.keras.optimizers.Adadelta
elif params.type == 'adagrad':
self._optimizer = tf.keras.optimizers.Adagrad
elif params.type == 'rmsprop':
self._optimizer = functools.partial(
tf.keras.optimizers.RMSProp, momentum=params.momentum)
else:
raise ValueError('Unsupported optimizer type %s.' % self._optimizer)
def __call__(self, learning_rate):
return self._optimizer(learning_rate=learning_rate)
def _make_filter_trainable_variables_fn(frozen_variable_prefix):
"""Creates a function for filtering trainable varialbes.
"""
def _filter_trainable_variables(variables):
"""Filters trainable varialbes
Args:
variables: a list of tf.Variable to be filtered.
Returns:
filtered_variables: a list of tf.Variable filtered out the frozen ones.
"""
# frozen_variable_prefix: a regex string specifing the prefix pattern of
# the frozen variables' names.
filtered_variables = [
v for v in variables
if not re.match(frozen_variable_prefix, v.name)
]
return filtered_variables
return _filter_trainable_variables
class Model(object):
"""Base class for model function."""
__metaclass__ = abc.ABCMeta
def __init__(self, params):
self._use_bfloat16 = params.architecture.use_bfloat16
assert not self._use_bfloat16, 'bfloat16 is not supported in Keras yet.'
# Optimization.
self._optimizer_fn = OptimizerFactory(params.train.optimizer)
self._learning_rate = learning_rates.learning_rate_generator(
params.train.learning_rate)
self._frozen_variable_prefix = params.train.frozen_variable_prefix
# Checkpoint restoration.
self._checkpoint = params.train.checkpoint.as_dict()
# Summary.
self._enable_summary = params.enable_summary
self._model_dir = params.model_dir
@abc.abstractmethod
def build_outputs(self, inputs, mode):
"""Build the graph of the forward path."""
pass
@abc.abstractmethod
def build_model(self, params, mode):
"""Build the model object."""
pass
@abc.abstractmethod
def build_loss_fn(self):
"""Build the model object."""
pass
def post_processing(self, labels, outputs):
"""Post-processing function."""
return labels, outputs
def model_outputs(self, inputs, mode):
"""Build the model outputs."""
return self.build_outputs(inputs, mode)
def build_optimizer(self):
"""Returns train_op to optimize total loss."""
# Sets up the optimizer.
return self._optimizer_fn(self._learning_rate)
def make_filter_trainable_variables_fn(self):
"""Creates a function for filtering trainable varialbes.
"""
return _make_filter_trainable_variables_fn(self._frozen_variable_prefix)
def weight_decay_loss(self, l2_weight_decay, keras_model):
# TODO(yeqing): Correct the filter according to cr/269707763.
return l2_weight_decay * tf.add_n([
tf.nn.l2_loss(v)
for v in self._keras_model.trainable_variables
if 'batch_normalization' not in v.name
])
def make_restore_checkpoint_fn(self):
"""Returns scaffold function to restore parameters from v1 checkpoint."""
if 'skip_checkpoint_variables' in self._checkpoint:
skip_regex = self._checkpoint['skip_checkpoint_variables']
else:
skip_regex = None
return checkpoint_utils.make_restore_checkpoint_fn(
self._checkpoint['path'],
prefix=self._checkpoint['prefix'],
skip_regex=skip_regex)
def eval_metrics(self):
"""Returns tuple of metric function and its inputs for evaluation."""
raise NotImplementedError('Unimplemented eval_metrics')
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Util functions for loading checkpoints. Especially for loading Tensorflow 1.x
checkpoint to Tensorflow 2.x (keras) model.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import re
from absl import logging
import tensorflow.compat.v2 as tf
def _build_assignment_map(keras_model,
prefix='',
skip_variables_regex=None,
var_to_shape_map=None):
"""Compute an assignment mapping for loading older checkpoints into a Keras
model. Variable names are remapped from the original TPUEstimator model to
the new Keras name.
Args:
keras_model: tf.keras.Model object to provide variables to assign.
prefix: prefix in the variable name to be remove for alignment with names in
the checkpoint.
skip_variables_regex: regular expression to math the names of variables that
do not need to be assign.
var_to_shape_map: variable name to shape mapping from the checkpoint.
Returns:
The variable assignment map.
"""
assignment_map = {}
checkpoint_names = None
if var_to_shape_map:
checkpoint_names = list(filter(
lambda x: not x.endswith('Momentum') and not x.endswith(
'global_step'), var_to_shape_map.keys()))
for var in keras_model.variables:
var_name = var.name
if skip_variables_regex and re.match(skip_variables_regex, var_name):
continue
# Trim the index of the variable.
if ':' in var_name:
var_name = var_name[:var_name.rindex(':')]
if var_name.startswith(prefix):
var_name = var_name[len(prefix):]
if not var_to_shape_map:
assignment_map[var_name] = var
continue
# Match name with variables in the checkpoint.
match_names = list(filter(lambda x: x.endswith(var_name), checkpoint_names))
try:
if match_names:
assert len(match_names) == 1, 'more then on matches for {}: {}'.format(
var_name, match_names)
checkpoint_names.remove(match_names[0])
assignment_map[match_names[0]] = var
else:
logging.info('Error not found var name: %s', var_name)
except Exception as e:
logging.info('Error removing the match_name: %s', match_names)
logging.info('Exception: %s', e)
raise
logging.info('Found variable in checkpoint: %d', len(assignment_map))
return assignment_map
def _get_checkpoint_map(checkpoint_path):
reader = tf.train.load_checkpoint(checkpoint_path)
return reader.get_variable_to_shape_map()
def make_restore_checkpoint_fn(checkpoint_path, prefix='', skip_regex=None):
"""Returns scaffold function to restore parameters from v1 checkpoint.
Args:
checkpoint_path: path of the checkpoint folder or file.
Example 1: '/path/to/model_dir/'
Example 2: '/path/to/model.ckpt-22500'
prefix: prefix in the variable name to be remove for alignment with names in
the checkpoint.
skip_regex: regular expression to math the names of variables that
do not need to be assign.
Returns:
Callable[tf.kears.Model] -> void. Fn to load v1 checkpoint to keras model.
"""
def _restore_checkpoint_fn(keras_model):
"""Loads pretrained model through scaffold function."""
if not checkpoint_path:
logging.info('checkpoint_path is empty')
return
var_prefix = prefix
if prefix and not prefix.endswith('/'):
var_prefix += '/'
var_to_shape_map = _get_checkpoint_map(checkpoint_path)
assert var_to_shape_map, 'var_to_shape_map should not be empty'
vars_to_load = _build_assignment_map(
keras_model,
prefix=var_prefix,
skip_variables_regex=skip_regex,
var_to_shape_map=var_to_shape_map)
if not vars_to_load:
raise ValueError('Variables to load is empty.')
tf.compat.v1.train.init_from_checkpoint(checkpoint_path,
vars_to_load)
return _restore_checkpoint_fn
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Factory to build detection model."""
from official.vision.detection.modeling import retinanet_model
def model_generator(params):
"""Model function generator."""
if params.type == 'retinanet':
model_fn = retinanet_model.RetinanetModel(params)
else:
raise ValueError('Model %s is not supported.'% params.type)
return model_fn
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Learning rate schedule."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import functools
import numpy as np
import tensorflow.compat.v2 as tf
from official.modeling.hyperparams import params_dict
class StepLearningRateWithLinearWarmup(tf.keras.optimizers.schedules.LearningRateSchedule):
"""Class to generate learning rate tensor."""
def __init__(self, params):
"""Creates the step learning rate tensor with linear warmup."""
super(StepLearningRateWithLinearWarmup, self).__init__()
assert isinstance(params, (dict, params_dict.ParamsDict))
if isinstance(params, dict):
params = params_dict.ParamsDict(params)
self._params = params
def __call__(self, global_step):
warmup_lr = self._params.warmup_learning_rate
warmup_steps = self._params.warmup_steps
init_lr = self._params.init_learning_rate
lr_levels = self._params.learning_rate_levels
lr_steps = self._params.learning_rate_steps
linear_warmup = (
warmup_lr + tf.cast(global_step, dtype=tf.float32) / warmup_steps *
(init_lr - warmup_lr))
learning_rate = tf.where(global_step < warmup_steps, linear_warmup, init_lr)
for next_learning_rate, start_step in zip(lr_levels, lr_steps):
learning_rate = tf.where(global_step >= start_step, next_learning_rate,
learning_rate)
return learning_rate
def get_config(self):
return {'_params': self._params.as_dict()}
class CosineLearningRateWithLinearWarmup(tf.keras.optimizers.schedules.LearningRateSchedule):
"""Class to generate learning rate tensor."""
def __init__(self, params):
"""Creates the consine learning rate tensor with linear warmup."""
super(CosineLearningRateWithLinearWarmup, self).__init__()
assert isinstance(params, (dict, params_dict.ParamsDict))
if isinstance(params, dict):
params = params_dict.ParamsDict(params)
self._params = params
def __call__(self, global_step):
global_step = tf.cast(global_step, dtype=tf.float32)
warmup_lr = self._params.warmup_learning_rate
warmup_steps = self._params.warmup_steps
init_lr = self._params.init_learning_rate
total_steps = self._params.total_steps
linear_warmup = (
warmup_lr + global_step / warmup_steps * (init_lr - warmup_lr))
cosine_learning_rate = (
init_lr * (tf.cos(np.pi * (global_step - warmup_steps) /
(total_steps - warmup_steps)) + 1.0) / 2.0)
learning_rate = tf.where(global_step < warmup_steps, linear_warmup,
cosine_learning_rate)
return learning_rate
def get_config(self):
return {'_params': self._params.as_dict()}
def learning_rate_generator(params):
"""The learning rate function generator."""
if params.type == 'step':
return StepLearningRateWithLinearWarmup(params)
elif params.type == 'cosine':
return CosineLearningRateWithLinearWarmup(params)
else:
raise ValueError('Unsupported learning rate type: {}.'.format(params.type))
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Losses used for Mask-RCNN."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow.compat.v2 as tf
def focal_loss(logits, targets, alpha, gamma, normalizer):
"""Compute the focal loss between `logits` and the golden `target` values.
Focal loss = -(1-pt)^gamma * log(pt)
where pt is the probability of being classified to the true class.
Args:
logits: A float32 tensor of size
[batch, height_in, width_in, num_predictions].
targets: A float32 tensor of size
[batch, height_in, width_in, num_predictions].
alpha: A float32 scalar multiplying alpha to the loss from positive examples
and (1-alpha) to the loss from negative examples.
gamma: A float32 scalar modulating loss from hard and easy examples.
normalizer: A float32 scalar normalizes the total loss from all examples.
Returns:
loss: A float32 Tensor of size [batch, height_in, width_in, num_predictions]
representing normalized loss on the prediction map.
"""
with tf.name_scope('focal_loss'):
positive_label_mask = tf.equal(targets, 1.0)
cross_entropy = (
tf.nn.sigmoid_cross_entropy_with_logits(labels=targets, logits=logits))
# Below are comments/derivations for computing modulator.
# For brevity, let x = logits, z = targets, r = gamma, and p_t = sigmod(x)
# for positive samples and 1 - sigmoid(x) for negative examples.
#
# The modulator, defined as (1 - P_t)^r, is a critical part in focal loss
# computation. For r > 0, it puts more weights on hard examples, and less
# weights on easier ones. However if it is directly computed as (1 - P_t)^r,
# its back-propagation is not stable when r < 1. The implementation here
# resolves the issue.
#
# For positive samples (labels being 1),
# (1 - p_t)^r
# = (1 - sigmoid(x))^r
# = (1 - (1 / (1 + exp(-x))))^r
# = (exp(-x) / (1 + exp(-x)))^r
# = exp(log((exp(-x) / (1 + exp(-x)))^r))
# = exp(r * log(exp(-x)) - r * log(1 + exp(-x)))
# = exp(- r * x - r * log(1 + exp(-x)))
#
# For negative samples (labels being 0),
# (1 - p_t)^r
# = (sigmoid(x))^r
# = (1 / (1 + exp(-x)))^r
# = exp(log((1 / (1 + exp(-x)))^r))
# = exp(-r * log(1 + exp(-x)))
#
# Therefore one unified form for positive (z = 1) and negative (z = 0)
# samples is:
# (1 - p_t)^r = exp(-r * z * x - r * log(1 + exp(-x))).
neg_logits = -1.0 * logits
modulator = tf.exp(gamma * targets * neg_logits -
gamma * tf.math.log1p(tf.exp(neg_logits)))
loss = modulator * cross_entropy
weighted_loss = tf.where(positive_label_mask, alpha * loss,
(1.0 - alpha) * loss)
weighted_loss /= normalizer
return weighted_loss
class RpnScoreLoss(object):
"""Region Proposal Network score loss function."""
def __init__(self, params):
raise ValueError('Not TF 2.0 ready.')
self._batch_size = params.batch_size
self._rpn_batch_size_per_im = params.rpn_batch_size_per_im
def __call__(self, score_outputs, labels):
"""Computes total RPN detection loss.
Computes total RPN detection loss including box and score from all levels.
Args:
score_outputs: an OrderDict with keys representing levels and values
representing scores in [batch_size, height, width, num_anchors].
labels: the dictionary that returned from dataloader that includes
groundturth targets.
Returns:
rpn_score_loss: a scalar tensor representing total score loss.
"""
with tf.name_scope('rpn_loss'):
levels = sorted(score_outputs.keys())
score_losses = []
for level in levels:
score_targets_l = labels['score_targets_%d' % level]
score_losses.append(
self._rpn_score_loss(
score_outputs[level],
score_targets_l,
normalizer=tf.cast(
self._batch_size * self._rpn_batch_size_per_im,
dtype=tf.float32)))
# Sums per level losses to total loss.
return tf.add_n(score_losses)
def _rpn_score_loss(self, score_outputs, score_targets, normalizer=1.0):
"""Computes score loss."""
# score_targets has three values:
# (1) score_targets[i]=1, the anchor is a positive sample.
# (2) score_targets[i]=0, negative.
# (3) score_targets[i]=-1, the anchor is don't care (ignore).
with tf.name_scope('rpn_score_loss'):
mask = tf.logical_or(tf.equal(score_targets, 1),
tf.equal(score_targets, 0))
score_targets = tf.maximum(score_targets, tf.zeros_like(score_targets))
# RPN score loss is sum over all except ignored samples.
score_loss = tf.compat.v1.losses.sigmoid_cross_entropy(
score_targets,
score_outputs,
weights=mask,
reduction=tf.compat.v1.losses.Reduction.SUM)
score_loss /= normalizer
return score_loss
class RpnBoxLoss(object):
"""Region Proposal Network box regression loss function."""
def __init__(self, params):
raise ValueError('Not TF 2.0 ready.')
self._delta = params.huber_loss_delta
def __call__(self, box_outputs, labels):
"""Computes total RPN detection loss.
Computes total RPN detection loss including box and score from all levels.
Args:
box_outputs: an OrderDict with keys representing levels and values
representing box regression targets in
[batch_size, height, width, num_anchors * 4].
labels: the dictionary that returned from dataloader that includes
groundturth targets.
Returns:
rpn_box_loss: a scalar tensor representing total box regression loss.
"""
with tf.compat.v1.name_scope('rpn_loss'):
levels = sorted(box_outputs.keys())
box_losses = []
for level in levels:
box_targets_l = labels['box_targets_%d' % level]
box_losses.append(
self._rpn_box_loss(
box_outputs[level], box_targets_l, delta=self._delta))
# Sum per level losses to total loss.
return tf.add_n(box_losses)
def _rpn_box_loss(self, box_outputs, box_targets, normalizer=1.0, delta=1./9):
"""Computes box regression loss."""
# The delta is typically around the mean value of regression target.
# for instances, the regression targets of 512x512 input with 6 anchors on
# P2-P6 pyramid is about [0.1, 0.1, 0.2, 0.2].
with tf.compat.v1.name_scope('rpn_box_loss'):
mask = tf.not_equal(box_targets, 0.0)
# The loss is normalized by the sum of non-zero weights before additional
# normalizer provided by the function caller.
box_loss = tf.compat.v1.losses.huber_loss(
box_targets,
box_outputs,
weights=mask,
delta=delta,
reduction=tf.compat.v1.losses.Reduction.SUM_BY_NONZERO_WEIGHTS)
box_loss /= normalizer
return box_loss
class FastrcnnClassLoss(object):
"""Fast R-CNN classification loss function."""
def __init__(self):
raise ValueError('Not TF 2.0 ready.')
def __call__(self, class_outputs, class_targets):
"""Computes the class loss (Fast-RCNN branch) of Mask-RCNN.
This function implements the classification loss of the Fast-RCNN.
The classification loss is softmax on all RoIs.
Reference: https://github.com/facebookresearch/Detectron/blob/master/detectron/modeling/fast_rcnn_heads.py # pylint: disable=line-too-long
Args:
class_outputs: a float tensor representing the class prediction for each box
with a shape of [batch_size, num_boxes, num_classes].
class_targets: a float tensor representing the class label for each box
with a shape of [batch_size, num_boxes].
Returns:
a scalar tensor representing total class loss.
"""
with tf.compat.v1.name_scope('fast_rcnn_loss'):
_, _, _, num_classes = class_outputs.get_shape().as_list()
class_targets = tf.cast(class_targets, dtype=tf.int32)
class_targets_one_hot = tf.one_hot(class_targets, num_classes)
return self._fast_rcnn_class_loss(class_outputs, class_targets_one_hot)
def _fast_rcnn_class_loss(self, class_outputs, class_targets_one_hot,
normalizer=1.0):
"""Computes classification loss."""
with tf.compat.v1.name_scope('fast_rcnn_class_loss'):
# The loss is normalized by the sum of non-zero weights before additional
# normalizer provided by the function caller.
class_loss = tf.compat.v1.losses.softmax_cross_entropy(
class_targets_one_hot,
class_outputs,
reduction=tf.compat.v1.losses.Reduction.SUM_BY_NONZERO_WEIGHTS)
class_loss /= normalizer
return class_loss
class FastrcnnBoxLoss(object):
"""Fast R-CNN box regression loss function."""
def __init__(self, params):
raise ValueError('Not TF 2.0 ready.')
self._delta = params.huber_loss_delta
def __call__(self, box_outputs, class_targets, box_targets):
"""Computes the box loss (Fast-RCNN branch) of Mask-RCNN.
This function implements the box regression loss of the Fast-RCNN. As the
`box_outputs` produces `num_classes` boxes for each RoI, the reference model
expands `box_targets` to match the shape of `box_outputs` and selects only
the target that the RoI has a maximum overlap. (Reference: https://github.com/facebookresearch/Detectron/blob/master/detectron/roi_data/fast_rcnn.py) # pylint: disable=line-too-long
Instead, this function selects the `box_outputs` by the `class_targets` so
that it doesn't expand `box_targets`.
The box loss is smooth L1-loss on only positive samples of RoIs.
Reference: https://github.com/facebookresearch/Detectron/blob/master/detectron/modeling/fast_rcnn_heads.py # pylint: disable=line-too-long
Args:
box_outputs: a float tensor representing the box prediction for each box
with a shape of [batch_size, num_boxes, num_classes * 4].
class_targets: a float tensor representing the class label for each box
with a shape of [batch_size, num_boxes].
box_targets: a float tensor representing the box label for each box
with a shape of [batch_size, num_boxes, 4].
Returns:
box_loss: a scalar tensor representing total box regression loss.
"""
with tf.compat.v1.name_scope('fast_rcnn_loss'):
class_targets = tf.cast(class_targets, dtype=tf.int32)
# Selects the box from `box_outputs` based on `class_targets`, with which
# the box has the maximum overlap.
(batch_size, num_rois,
num_class_specific_boxes) = box_outputs.get_shape().as_list()
num_classes = num_class_specific_boxes // 4
box_outputs = tf.reshape(box_outputs,
[batch_size, num_rois, num_classes, 4])
box_indices = tf.reshape(
class_targets + tf.tile(
tf.expand_dims(
tf.range(batch_size) * num_rois * num_classes, 1),
[1, num_rois]) + tf.tile(
tf.expand_dims(tf.range(num_rois) * num_classes, 0),
[batch_size, 1]), [-1])
box_outputs = tf.matmul(
tf.one_hot(
box_indices,
batch_size * num_rois * num_classes,
dtype=box_outputs.dtype), tf.reshape(box_outputs, [-1, 4]))
box_outputs = tf.reshape(box_outputs, [batch_size, -1, 4])
return self._fast_rcnn_box_loss(box_outputs, box_targets, class_targets,
delta=self._delta)
def _fast_rcnn_box_loss(self, box_outputs, box_targets, class_targets,
normalizer=1.0, delta=1.):
"""Computes box regression loss."""
# The delta is typically around the mean value of regression target.
# for instances, the regression targets of 512x512 input with 6 anchors on
# P2-P6 pyramid is about [0.1, 0.1, 0.2, 0.2].
with tf.compat.v1.name_scope('fast_rcnn_box_loss'):
mask = tf.tile(tf.expand_dims(tf.greater(class_targets, 0), axis=2),
[1, 1, 4])
# The loss is normalized by the sum of non-zero weights before additional
# normalizer provided by the function caller.
box_loss = tf.compat.v1.losses.huber_loss(
box_targets,
box_outputs,
weights=mask,
delta=delta,
reduction=tf.compat.v1.losses.Reduction.SUM_BY_NONZERO_WEIGHTS)
box_loss /= normalizer
return box_loss
class MaskrcnnLoss(object):
"""Mask R-CNN instance segmentation mask loss function."""
def __init__(self):
raise ValueError('Not TF 2.0 ready.')
def __call__(self, mask_outputs, mask_targets, select_class_targets):
"""Computes the mask loss of Mask-RCNN.
This function implements the mask loss of Mask-RCNN. As the `mask_outputs`
produces `num_classes` masks for each RoI, the reference model expands
`mask_targets` to match the shape of `mask_outputs` and selects only the
target that the RoI has a maximum overlap. (Reference: https://github.com/facebookresearch/Detectron/blob/master/detectron/roi_data/mask_rcnn.py) # pylint: disable=line-too-long
Instead, this implementation selects the `mask_outputs` by the `class_targets`
so that it doesn't expand `mask_targets`. Note that the selection logic is
done in the post-processing of mask_rcnn_fn in mask_rcnn_architecture.py.
Args:
mask_outputs: a float tensor representing the prediction for each mask,
with a shape of
[batch_size, num_masks, mask_height, mask_width].
mask_targets: a float tensor representing the binary mask of ground truth
labels for each mask with a shape of
[batch_size, num_masks, mask_height, mask_width].
select_class_targets: a tensor with a shape of [batch_size, num_masks],
representing the foreground mask targets.
Returns:
mask_loss: a float tensor representing total mask loss.
"""
with tf.compat.v1.name_scope('mask_loss'):
(batch_size, num_masks, mask_height,
mask_width) = mask_outputs.get_shape().as_list()
weights = tf.tile(
tf.reshape(tf.greater(select_class_targets, 0),
[batch_size, num_masks, 1, 1]),
[1, 1, mask_height, mask_width])
return tf.compat.v1.losses.sigmoid_cross_entropy(
mask_targets,
mask_outputs,
weights=weights,
reduction=tf.compat.v1.losses.Reduction.SUM_BY_NONZERO_WEIGHTS)
class RetinanetClassLoss(object):
"""RetinaNet class loss."""
def __init__(self, params):
self._num_classes = params.num_classes
self._focal_loss_alpha = params.focal_loss_alpha
self._focal_loss_gamma = params.focal_loss_gamma
def __call__(self, cls_outputs, labels, num_positives):
"""Computes total detection loss.
Computes total detection loss including box and class loss from all levels.
Args:
cls_outputs: an OrderDict with keys representing levels and values
representing logits in [batch_size, height, width,
num_anchors * num_classes].
labels: the dictionary that returned from dataloader that includes
class groundturth targets.
num_positives: number of positive examples in the minibatch.
Returns:
an integar tensor representing total class loss.
"""
# Sums all positives in a batch for normalization and avoids zero
# num_positives_sum, which would lead to inf loss during training
num_positives_sum = tf.reduce_sum(input_tensor=num_positives) + 1.0
cls_losses = []
for level in cls_outputs.keys():
cls_losses.append(self.class_loss(
cls_outputs[level], labels[level], num_positives_sum))
# Sums per level losses to total loss.
return tf.add_n(cls_losses)
def class_loss(self, cls_outputs, cls_targets, num_positives,
ignore_label=-2):
"""Computes RetinaNet classification loss."""
# Onehot encoding for classification labels.
cls_targets_one_hot = tf.one_hot(cls_targets, self._num_classes)
bs, height, width, _, _ = cls_targets_one_hot.get_shape().as_list()
cls_targets_one_hot = tf.reshape(cls_targets_one_hot,
[bs, height, width, -1])
loss = focal_loss(cls_outputs, cls_targets_one_hot,
self._focal_loss_alpha, self._focal_loss_gamma,
num_positives)
ignore_loss = tf.where(
tf.equal(cls_targets, ignore_label),
tf.zeros_like(cls_targets, dtype=tf.float32),
tf.ones_like(cls_targets, dtype=tf.float32),
)
ignore_loss = tf.expand_dims(ignore_loss, -1)
ignore_loss = tf.tile(ignore_loss, [1, 1, 1, 1, self._num_classes])
ignore_loss = tf.reshape(ignore_loss, tf.shape(input=loss))
return tf.reduce_sum(input_tensor=ignore_loss * loss)
class RetinanetBoxLoss(object):
"""RetinaNet box loss."""
def __init__(self, params):
self._huber_loss = tf.keras.losses.Huber(
delta=params.huber_loss_delta, reduction=tf.keras.losses.Reduction.SUM)
def __call__(self, box_outputs, labels, num_positives):
"""Computes box detection loss.
Computes total detection loss including box and class loss from all levels.
Args:
box_outputs: an OrderDict with keys representing levels and values
representing box regression targets in [batch_size, height, width,
num_anchors * 4].
labels: the dictionary that returned from dataloader that includes
box groundturth targets.
num_positives: number of positive examples in the minibatch.
Returns:
an integar tensor representing total box regression loss.
"""
# Sums all positives in a batch for normalization and avoids zero
# num_positives_sum, which would lead to inf loss during training
num_positives_sum = tf.reduce_sum(input_tensor=num_positives) + 1.0
box_losses = []
for level in box_outputs.keys():
# Onehot encoding for classification labels.
box_targets_l = labels[level]
box_losses.append(
self.box_loss(box_outputs[level], box_targets_l, num_positives_sum))
# Sums per level losses to total loss.
return tf.add_n(box_losses)
def box_loss(self, box_outputs, box_targets, num_positives):
"""Computes RetinaNet box regression loss."""
# The delta is typically around the mean value of regression target.
# for instances, the regression targets of 512x512 input with 6 anchors on
# P3-P7 pyramid is about [0.1, 0.1, 0.2, 0.2].
normalizer = num_positives * 4.0
mask = tf.not_equal(box_targets, 0.0)
box_loss = self._huber_loss(box_targets, box_outputs, sample_weight=mask)
box_loss /= normalizer
return box_loss
class ShapeMaskLoss(object):
"""ShapeMask mask loss function wrapper."""
def __init__(self):
raise ValueError('Not TF 2.0 ready.')
def __call__(self, logits, scaled_labels, classes,
category_loss=True, mse_loss=False):
"""Compute instance segmentation loss.
Args:
logits: A Tensor of shape [batch_size * num_points, height, width,
num_classes]. The logits are not necessarily between 0 and 1.
scaled_labels: A float16 Tensor of shape [batch_size, num_instances,
mask_size, mask_size], where mask_size =
mask_crop_size * gt_upsample_scale for fine mask, or mask_crop_size
for coarse masks and shape priors.
classes: A int tensor of shape [batch_size, num_instances].
category_loss: use class specific mask prediction or not.
mse_loss: use mean square error for mask loss or not
Returns:
mask_loss: an float tensor representing total mask classification loss.
iou: a float tensor representing the IoU between target and prediction.
"""
classes = tf.reshape(classes, [-1])
_, _, height, width = scaled_labels.get_shape().as_list()
scaled_labels = tf.reshape(scaled_labels, [-1, height, width])
if not category_loss:
logits = logits[:, :, :, 0]
else:
logits = tf.transpose(a=logits, perm=(0, 3, 1, 2))
gather_idx = tf.stack([tf.range(tf.size(input=classes)), classes - 1],
axis=1)
logits = tf.gather_nd(logits, gather_idx)
# Ignore loss on empty mask targets.
valid_labels = tf.reduce_any(
input_tensor=tf.greater(scaled_labels, 0), axis=[1, 2])
if mse_loss:
# Logits are probabilities in the case of shape prior prediction.
logits *= tf.reshape(
tf.cast(valid_labels, logits.dtype), [-1, 1, 1])
weighted_loss = tf.nn.l2_loss(scaled_labels - logits)
probs = logits
else:
weighted_loss = tf.nn.sigmoid_cross_entropy_with_logits(
labels=scaled_labels, logits=logits)
probs = tf.sigmoid(logits)
weighted_loss *= tf.reshape(
tf.cast(valid_labels, weighted_loss.dtype), [-1, 1, 1])
iou = tf.reduce_sum(
input_tensor=tf.minimum(scaled_labels, probs)) / tf.reduce_sum(
input_tensor=tf.maximum(scaled_labels, probs))
mask_loss = tf.reduce_sum(input_tensor=weighted_loss) / tf.reduce_sum(
input_tensor=scaled_labels)
return tf.cast(mask_loss, tf.float32), tf.cast(iou, tf.float32)
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Post-processing model outputs to generate detection."""
from __future__ import absolute_import
from __future__ import division
# from __future__ import google_type_annotations
from __future__ import print_function
import functools
import tensorflow.compat.v2 as tf
from official.vision.detection.utils import box_utils
def generate_detections_factory(params):
"""Factory to select function to generate detection."""
if params.use_batched_nms:
func = functools.partial(
_generate_detections_batched,
max_total_size=params.max_total_size,
nms_iou_threshold=params.nms_iou_threshold,
score_threshold=params.score_threshold)
else:
func = functools.partial(
_generate_detections,
max_total_size=params.max_total_size,
nms_iou_threshold=params.nms_iou_threshold,
score_threshold=params.score_threshold)
return func
def _generate_detections(boxes,
scores,
max_total_size=100,
nms_iou_threshold=0.3,
score_threshold=0.05,
pre_nms_num_boxes=5000):
"""Generate the final detections given the model outputs.
This uses batch unrolling, which is TPU compatible.
Args:
boxes: a tensor with shape [batch_size, N, num_classes, 4] or
[batch_size, N, 1, 4], which box predictions on all feature levels. The N
is the number of total anchors on all levels.
scores: a tensor with shape [batch_size, N, num_classes], which
stacks class probability on all feature levels. The N is the number of
total anchors on all levels. The num_classes is the number of classes
predicted by the model. Note that the class_outputs here is the raw score.
max_total_size: a scalar representing maximum number of boxes retained over
all classes.
nms_iou_threshold: a float representing the threshold for deciding whether
boxes overlap too much with respect to IOU.
score_threshold: a float representing the threshold for deciding when to
remove boxes based on score.
pre_nms_num_boxes: an int number of top candidate detections per class
before NMS.
Returns:
nms_boxes: `float` Tensor of shape [batch_size, max_total_size, 4]
representing top detected boxes in [y1, x1, y2, x2].
nms_scores: `float` Tensor of shape [batch_size, max_total_size]
representing sorted confidence scores for detected boxes. The values are
between [0, 1].
nms_classes: `int` Tensor of shape [batch_size, max_total_size] representing
classes for detected boxes.
valid_detections: `int` Tensor of shape [batch_size] only the top
`valid_detections` boxes are valid detections.
"""
with tf.name_scope('generate_detections'):
batch_size = scores.get_shape().as_list()[0]
nmsed_boxes = []
nmsed_classes = []
nmsed_scores = []
valid_detections = []
for i in range(batch_size):
(nmsed_boxes_i, nmsed_scores_i, nmsed_classes_i,
valid_detections_i) = _generate_detections_per_image(
boxes[i],
scores[i],
max_total_size,
nms_iou_threshold,
score_threshold,
pre_nms_num_boxes)
nmsed_boxes.append(nmsed_boxes_i)
nmsed_scores.append(nmsed_scores_i)
nmsed_classes.append(nmsed_classes_i)
valid_detections.append(valid_detections_i)
nmsed_boxes = tf.stack(nmsed_boxes, axis=0)
nmsed_scores = tf.stack(nmsed_scores, axis=0)
nmsed_classes = tf.stack(nmsed_classes, axis=0)
valid_detections = tf.stack(valid_detections, axis=0)
return nmsed_boxes, nmsed_scores, nmsed_classes, valid_detections
def _generate_detections_per_image(boxes,
scores,
max_total_size=100,
nms_iou_threshold=0.3,
score_threshold=0.05,
pre_nms_num_boxes=5000):
"""Generate the final detections per image given the model outputs.
Args:
boxes: a tensor with shape [N, num_classes, 4] or [N, 1, 4], which box
predictions on all feature levels. The N is the number of total anchors on
all levels.
scores: a tensor with shape [N, num_classes], which stacks class probability
on all feature levels. The N is the number of total anchors on all levels.
The num_classes is the number of classes predicted by the model. Note that
the class_outputs here is the raw score.
max_total_size: a scalar representing maximum number of boxes retained over
all classes.
nms_iou_threshold: a float representing the threshold for deciding whether
boxes overlap too much with respect to IOU.
score_threshold: a float representing the threshold for deciding when to
remove boxes based on score.
pre_nms_num_boxes: an int number of top candidate detections per class
before NMS.
Returns:
nms_boxes: `float` Tensor of shape [max_total_size, 4] representing top
detected boxes in [y1, x1, y2, x2].
nms_scores: `float` Tensor of shape [max_total_size] representing sorted
confidence scores for detected boxes. The values are between [0, 1].
nms_classes: `int` Tensor of shape [max_total_size] representing classes for
detected boxes.
valid_detections: `int` Tensor of shape [1] only the top `valid_detections`
boxes are valid detections.
"""
nmsed_boxes = []
nmsed_scores = []
nmsed_classes = []
num_classes_for_box = boxes.get_shape().as_list()[1]
num_classes = scores.get_shape().as_list()[1]
for i in range(num_classes):
boxes_i = boxes[:, min(num_classes_for_box-1, i)]
scores_i = scores[:, i]
# Obtains pre_nms_num_boxes before running NMS.
scores_i, indices = tf.nn.top_k(
scores_i, k=tf.minimum(tf.shape(input=scores_i)[-1], pre_nms_num_boxes))
boxes_i = tf.gather(boxes_i, indices)
(nmsed_indices_i,
nmsed_num_valid_i) = tf.image.non_max_suppression_padded(
tf.cast(boxes_i, tf.float32),
tf.cast(scores_i, tf.float32),
max_total_size,
iou_threshold=nms_iou_threshold,
score_threshold=score_threshold,
pad_to_max_output_size=True,
name='nms_detections_' + str(i))
nmsed_boxes_i = tf.gather(boxes_i, nmsed_indices_i)
nmsed_scores_i = tf.gather(scores_i, nmsed_indices_i)
# Sets scores of invalid boxes to -1.
nmsed_scores_i = tf.where(
tf.less(tf.range(max_total_size), [nmsed_num_valid_i]), nmsed_scores_i,
-tf.ones_like(nmsed_scores_i))
nmsed_classes_i = tf.fill([max_total_size], i)
nmsed_boxes.append(nmsed_boxes_i)
nmsed_scores.append(nmsed_scores_i)
nmsed_classes.append(nmsed_classes_i)
# Concats results from all classes and sort them.
nmsed_boxes = tf.concat(nmsed_boxes, axis=0)
nmsed_scores = tf.concat(nmsed_scores, axis=0)
nmsed_classes = tf.concat(nmsed_classes, axis=0)
nmsed_scores, indices = tf.nn.top_k(
nmsed_scores,
k=max_total_size,
sorted=True)
nmsed_boxes = tf.gather(nmsed_boxes, indices)
nmsed_classes = tf.gather(nmsed_classes, indices)
valid_detections = tf.reduce_sum(
input_tensor=tf.cast(tf.greater(nmsed_scores, -1), tf.int32))
return nmsed_boxes, nmsed_scores, nmsed_classes, valid_detections
def _generate_detections_batched(boxes,
scores,
max_total_size,
nms_iou_threshold,
score_threshold):
"""Generates detected boxes with scores and classes for one-stage detector.
The function takes output of multi-level ConvNets and anchor boxes and
generates detected boxes. Note that this used batched nms, which is not
supported on TPU currently.
Args:
boxes: a tensor with shape [batch_size, N, num_classes, 4] or
[batch_size, N, 1, 4], which box predictions on all feature levels. The N
is the number of total anchors on all levels.
scores: a tensor with shape [batch_size, N, num_classes], which
stacks class probability on all feature levels. The N is the number of
total anchors on all levels. The num_classes is the number of classes
predicted by the model. Note that the class_outputs here is the raw score.
max_total_size: a scalar representing maximum number of boxes retained over
all classes.
nms_iou_threshold: a float representing the threshold for deciding whether
boxes overlap too much with respect to IOU.
score_threshold: a float representing the threshold for deciding when to
remove boxes based on score.
Returns:
nms_boxes: `float` Tensor of shape [batch_size, max_total_size, 4]
representing top detected boxes in [y1, x1, y2, x2].
nms_scores: `float` Tensor of shape [batch_size, max_total_size]
representing sorted confidence scores for detected boxes. The values are
between [0, 1].
nms_classes: `int` Tensor of shape [batch_size, max_total_size] representing
classes for detected boxes.
valid_detections: `int` Tensor of shape [batch_size] only the top
`valid_detections` boxes are valid detections.
"""
with tf.name_scope('generate_detections'):
# TODO(tsungyi): Removes normalization/denomalization once the
# tf.image.combined_non_max_suppression is coordinate system agnostic.
# Normalizes maximum box cooridinates to 1.
normalizer = tf.reduce_max(input_tensor=boxes)
boxes /= normalizer
(nmsed_boxes, nmsed_scores, nmsed_classes,
valid_detections) = tf.image.combined_non_max_suppression(
boxes,
scores,
max_output_size_per_class=max_total_size,
max_total_size=max_total_size,
iou_threshold=nms_iou_threshold,
score_threshold=score_threshold,
pad_per_class=False,)
# De-normalizes box cooridinates.
nmsed_boxes *= normalizer
return nmsed_boxes, nmsed_scores, nmsed_classes, valid_detections
def _apply_score_activation(logits, num_classes, activation):
"""Applies activation to logits and removes the background class.
Note that it is assumed that the background class has index 0, which is
sliced away after the score transformation.
Args:
logits: the raw logit tensor.
num_classes: the total number of classes including one background class.
activation: the score activation type, one of 'SIGMOID', 'SOFTMAX' and
'IDENTITY'.
Returns:
scores: the tensor after applying score transformation and background
class removal.
"""
batch_size = tf.shape(input=logits)[0]
logits = tf.reshape(logits, [batch_size, -1, num_classes])
if activation == 'SIGMOID':
scores = tf.sigmoid(logits)
elif activation == 'SOFTMAX':
scores = tf.softmax(logits)
elif activation == 'IDENTITY':
pass
else:
raise ValueError(
'The score activation should be SIGMOID, SOFTMAX or IDENTITY')
scores = scores[..., 1:]
return scores
class GenerateOneStageDetections(tf.keras.layers.Layer):
"""Generates detected boxes with scores and classes for one-stage detector."""
def __init__(self, params, **kwargs):
super(GenerateOneStageDetections, self).__init__(**kwargs)
self._generate_detections = generate_detections_factory(params)
self._min_level = params.min_level
self._max_level = params.max_level
self._num_classes = params.num_classes
self._score_activation = 'SIGMOID'
def call(self, inputs):
box_outputs, class_outputs, anchor_boxes, image_shape = inputs
# Collects outputs from all levels into a list.
boxes = []
scores = []
for i in range(self._min_level, self._max_level + 1):
batch_size = tf.shape(input=class_outputs[i])[0]
# Applies score transformation and remove the implicit background class.
scores_i = _apply_score_activation(
class_outputs[i], self._num_classes, self._score_activation)
# Box decoding.
# The anchor boxes are shared for all data in a batch.
# One stage detector only supports class agnostic box regression.
anchor_boxes_i = tf.reshape(anchor_boxes[i], [batch_size, -1, 4])
box_outputs_i = tf.reshape(box_outputs[i], [batch_size, -1, 4])
boxes_i = box_utils.decode_boxes(box_outputs_i, anchor_boxes_i)
# Box clipping.
boxes_i = box_utils.clip_boxes(boxes_i, image_shape)
boxes.append(boxes_i)
scores.append(scores_i)
boxes = tf.concat(boxes, axis=1)
scores = tf.concat(scores, axis=1)
boxes = tf.expand_dims(boxes, axis=2)
(nmsed_boxes, nmsed_scores, nmsed_classes,
valid_detections) = self._generate_detections(boxes, scores)
# Adds 1 to offset the background class which has index 0.
nmsed_classes += 1
return nmsed_boxes, nmsed_scores, nmsed_classes, valid_detections
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Model defination for the RetinaNet Model."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import numpy as np
from absl import logging
import tensorflow.compat.v2 as tf
from tensorflow.python.keras import backend
from official.vision.detection.dataloader import mode_keys
from official.vision.detection.modeling import base_model
from official.vision.detection.modeling import losses
from official.vision.detection.modeling import postprocess
from official.vision.detection.modeling.architecture import factory
from official.vision.detection.evaluation import factory as eval_factory
class COCOMetrics(object):
# This is only a wrapper for COCO metric and works on for numpy array. So it
# doesn't inherit from tf.keras.layers.Layer or tf.keras.metrics.Metric.
def __init__(self, params):
self._evaluator = eval_factory.evaluator_generator(params.eval)
def update_state(self, y_true, y_pred):
labels, outputs = y_true, y_pred
labels = tf.nest.map_structure(lambda x: x.numpy(), labels)
outputs = tf.nest.map_structure(lambda x: x.numpy(), outputs)
groundtruths = {}
predictions = {}
for key, val in outputs.items():
if isinstance(val, tuple):
val = np.concatenate(val)
predictions[key] = val
for key, val in labels.items():
if isinstance(val, tuple):
val = np.concatenate(val)
groundtruths[key] = val
self._evaluator.update(predictions, groundtruths)
def result(self):
return self._evaluator.evaluate()
def reset_states(self):
logging.info('State is reset on calling metric.result().')
pass
class RetinanetModel(base_model.Model):
"""RetinaNet model function."""
def __init__(self, params):
super(RetinanetModel, self).__init__(params)
# For eval metrics.
self._params = params
# Architecture generators.
self._backbone_fn = factory.backbone_generator(params)
self._fpn_fn = factory.multilevel_features_generator(params)
self._head_fn = factory.retinanet_head_generator(params.retinanet_head)
# Loss function.
self._cls_loss_fn = losses.RetinanetClassLoss(params.retinanet_loss)
self._box_loss_fn = losses.RetinanetBoxLoss(params.retinanet_loss)
self._box_loss_weight = params.retinanet_loss.box_loss_weight
self._keras_model = None
# Predict function.
self._generate_detections_fn = postprocess.GenerateOneStageDetections(
params.postprocess)
self._l2_weight_decay = params.train.l2_weight_decay
self._transpose_input = params.train.transpose_input
assert not self._transpose_input, 'Transpose input is not supportted.'
# Input layer.
input_shape = (
params.retinanet_parser.output_size +
[params.retinanet_parser.num_channels])
self._input_layer = tf.keras.layers.Input(shape=input_shape, name='')
def build_outputs(self, inputs, mode):
backbone_features = self._backbone_fn(
inputs, is_training=(mode == mode_keys.TRAIN))
fpn_features = self._fpn_fn(
backbone_features, is_training=(mode == mode_keys.TRAIN))
cls_outputs, box_outputs = self._head_fn(
fpn_features, is_training=(mode == mode_keys.TRAIN))
model_outputs = {
'cls_outputs': cls_outputs,
'box_outputs': box_outputs,
}
return model_outputs
def build_loss_fn(self):
if self._keras_model is None:
raise ValueError('build_loss_fn() must be called after build_model().')
def _total_loss_fn(labels, outputs):
cls_loss = self._cls_loss_fn(outputs['cls_outputs'],
labels['cls_targets'],
labels['num_positives'])
box_loss = self._box_loss_fn(outputs['box_outputs'],
labels['box_targets'],
labels['num_positives'])
model_loss = cls_loss + self._box_loss_weight * box_loss
l2_regularization_loss = self.weight_decay_loss(self._l2_weight_decay,
self._keras_model)
total_loss = model_loss + l2_regularization_loss
return {
'total_loss': total_loss,
'cls_loss': cls_loss,
'box_loss': box_loss,
'model_loss': model_loss,
'l2_regularization_loss': l2_regularization_loss,
}
return _total_loss_fn
def build_model(self, params, mode=None):
if self._keras_model is None:
with backend.get_graph().as_default():
outputs = self.model_outputs(self._input_layer, mode)
model = tf.keras.models.Model(
inputs=self._input_layer, outputs=outputs, name='retinanet')
assert model is not None, 'Fail to build tf.keras.Model.'
model.optimizer = self.build_optimizer()
self._keras_model = model
return self._keras_model
def post_processing(self, labels, outputs):
required_output_fields = ['cls_outputs', 'box_outputs']
for field in required_output_fields:
if field not in outputs:
raise ValueError('"%s" is missing in outputs, requried %s found %s',
field, required_output_fields, outputs.keys())
required_label_fields = ['image_info', 'groundtruths']
for field in required_label_fields:
if field not in labels:
raise ValueError('"%s" is missing in outputs, requried %s found %s',
field, required_label_fields, labels.keys())
boxes, scores, classes, valid_detections = self._generate_detections_fn(
inputs=(outputs['box_outputs'], outputs['cls_outputs'],
labels['anchor_boxes'], labels['image_info'][:, 1:2, :]))
outputs.update({
'source_id': labels['groundtruths']['source_id'],
'image_info': labels['image_info'],
'num_detections': valid_detections,
'detection_boxes': boxes,
'detection_classes': classes,
'detection_scores': scores,
})
if 'groundtruths' in labels:
labels['source_id'] = labels['groundtruths']['source_id']
labels['boxes'] = labels['groundtruths']['boxes']
labels['classes'] = labels['groundtruths']['classes']
labels['areas'] = labels['groundtruths']['areas']
labels['is_crowds'] = labels['groundtruths']['is_crowds']
return labels, outputs
def eval_metrics(self):
return COCOMetrics(self._params)
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""AutoAugment util file."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow.compat.v2 as tf
def distort_image_with_autoaugment(image, bboxes, augmentation_name):
raise NotImplementedError("Not TF 2.0 ready.")
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Utility functions for bounding box processing."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import tensorflow.compat.v2 as tf
EPSILON = 1e-8
BBOX_XFORM_CLIP = np.log(1000. / 16.)
def normalize_boxes(boxes, image_shape):
"""Converts boxes to the normalized coordinates.
Args:
boxes: a tensor whose last dimension is 4 representing the coordinates
of boxes in ymin, xmin, ymax, xmax order.
image_shape: a list of two integers, a two-element vector or a tensor such
that all but the last dimensions are `broadcastable` to `boxes`. The last
dimension is 2, which represents [height, width].
Returns:
normalized_boxes: a tensor whose shape is the same as `boxes` representing
the normalized boxes.
Raises:
ValueError: If the last dimension of boxes is not 4.
"""
if boxes.shape[-1] != 4:
raise ValueError(
'boxes.shape[1] is {:d}, but must be 4.'.format(boxes.shape[1]))
with tf.name_scope('normalize_boxes'):
if isinstance(image_shape, list) or isinstance(image_shape, tuple):
height, width = image_shape
else:
image_shape = tf.cast(image_shape, dtype=boxes.dtype)
height = image_shape[..., 0:1]
width = image_shape[..., 1:2]
ymin = boxes[..., 0:1] / height
xmin = boxes[..., 1:2] / width
ymax = boxes[..., 2:3] / height
xmax = boxes[..., 3:4] / width
normalized_boxes = tf.concat([ymin, xmin, ymax, xmax], axis=-1)
return normalized_boxes
def denormalize_boxes(boxes, image_shape):
"""Converts boxes normalized by [height, width] to pixel coordinates.
Args:
boxes: a tensor whose last dimension is 4 representing the coordinates
of boxes in ymin, xmin, ymax, xmax order.
image_shape: a list of two integers, a two-element vector or a tensor such
that all but the last dimensions are `broadcastable` to `boxes`. The last
dimension is 2, which represents [height, width].
Returns:
denormalized_boxes: a tensor whose shape is the same as `boxes` representing
the denormalized boxes.
Raises:
ValueError: If the last dimension of boxes is not 4.
"""
with tf.name_scope('denormalize_boxes'):
if isinstance(image_shape, list) or isinstance(image_shape, tuple):
height, width = image_shape
else:
image_shape = tf.cast(image_shape, dtype=boxes.dtype)
height = image_shape[..., 0:1]
width = image_shape[..., 1:2]
ymin = boxes[..., 0:1] * height
xmin = boxes[..., 1:2] * width
ymax = boxes[..., 2:3] * height
xmax = boxes[..., 3:4] * width
denormalized_boxes = tf.concat([ymin, xmin, ymax, xmax], axis=-1)
return denormalized_boxes
def clip_boxes(boxes, image_shape):
"""Clips boxes to image boundaries.
Args:
boxes: a tensor whose last dimension is 4 representing the coordinates
of boxes in ymin, xmin, ymax, xmax order.
image_shape: a list of two integers, a two-element vector or a tensor such
that all but the last dimensions are `broadcastable` to `boxes`. The last
dimension is 2, which represents [height, width].
Returns:
clipped_boxes: a tensor whose shape is the same as `boxes` representing the
clipped boxes.
Raises:
ValueError: If the last dimension of boxes is not 4.
"""
if boxes.shape[-1] != 4:
raise ValueError(
'boxes.shape[1] is {:d}, but must be 4.'.format(boxes.shape[1]))
with tf.name_scope('crop_boxes'):
if isinstance(image_shape, list) or isinstance(image_shape, tuple):
height, width = image_shape
else:
image_shape = tf.cast(image_shape, dtype=boxes.dtype)
height = image_shape[..., 0:1]
width = image_shape[..., 1:2]
ymin = boxes[..., 0:1]
xmin = boxes[..., 1:2]
ymax = boxes[..., 2:3]
xmax = boxes[..., 3:4]
clipped_ymin = tf.maximum(tf.minimum(ymin, height - 1.0), 0.0)
clipped_ymax = tf.maximum(tf.minimum(ymax, height - 1.0), 0.0)
clipped_xmin = tf.maximum(tf.minimum(xmin, width - 1.0), 0.0)
clipped_xmax = tf.maximum(tf.minimum(xmax, width - 1.0), 0.0)
clipped_boxes = tf.concat(
[clipped_ymin, clipped_xmin, clipped_ymax, clipped_xmax],
axis=-1)
return clipped_boxes
def encode_boxes(boxes, anchors, weights=None):
"""Encode boxes to targets.
Args:
boxes: a tensor whose last dimension is 4 representing the coordinates
of boxes in ymin, xmin, ymax, xmax order.
anchors: a tensor whose shape is the same as `boxes` representing the
coordinates of anchors in ymin, xmin, ymax, xmax order.
weights: None or a list of four float numbers used to scale coordinates.
Returns:
encoded_boxes: a tensor whose shape is the same as `boxes` representing the
encoded box targets.
Raises:
ValueError: If the last dimension of boxes is not 4.
"""
if boxes.shape[-1] != 4:
raise ValueError(
'boxes.shape[1] is {:d}, but must be 4.'.format(boxes.shape[1]))
with tf.name_scope('encode_boxes'):
boxes = tf.cast(boxes, dtype=anchors.dtype)
ymin = boxes[..., 0:1]
xmin = boxes[..., 1:2]
ymax = boxes[..., 2:3]
xmax = boxes[..., 3:4]
box_h = ymax - ymin + 1.0
box_w = xmax - xmin + 1.0
box_yc = ymin + 0.5 * box_h
box_xc = xmin + 0.5 * box_w
anchor_ymin = anchors[..., 0:1]
anchor_xmin = anchors[..., 1:2]
anchor_ymax = anchors[..., 2:3]
anchor_xmax = anchors[..., 3:4]
anchor_h = anchor_ymax - anchor_ymin + 1.0
anchor_w = anchor_xmax - anchor_xmin + 1.0
anchor_yc = anchor_ymin + 0.5 * anchor_h
anchor_xc = anchor_xmin + 0.5 * anchor_w
encoded_dy = (box_yc - anchor_yc) / anchor_h
encoded_dx = (box_xc - anchor_xc) / anchor_w
encoded_dh = tf.math.log(box_h / anchor_h)
encoded_dw = tf.math.log(box_w / anchor_w)
if weights:
encoded_dy *= weights[0]
encoded_dx *= weights[1]
encoded_dh *= weights[2]
encoded_dw *= weights[3]
encoded_boxes = tf.concat(
[encoded_dy, encoded_dx, encoded_dh, encoded_dw],
axis=-1)
return encoded_boxes
def decode_boxes(encoded_boxes, anchors, weights=None):
"""Decode boxes.
Args:
encoded_boxes: a tensor whose last dimension is 4 representing the
coordinates of encoded boxes in ymin, xmin, ymax, xmax order.
anchors: a tensor whose shape is the same as `boxes` representing the
coordinates of anchors in ymin, xmin, ymax, xmax order.
weights: None or a list of four float numbers used to scale coordinates.
Returns:
encoded_boxes: a tensor whose shape is the same as `boxes` representing the
decoded box targets.
"""
with tf.name_scope('decode_boxes'):
encoded_boxes = tf.cast(encoded_boxes, dtype=anchors.dtype)
dy = encoded_boxes[..., 0:1]
dx = encoded_boxes[..., 1:2]
dh = encoded_boxes[..., 2:3]
dw = encoded_boxes[..., 3:4]
if weights:
dy /= weights[0]
dx /= weights[1]
dh /= weights[2]
dw /= weights[3]
dh = tf.minimum(dh, BBOX_XFORM_CLIP)
dw = tf.minimum(dw, BBOX_XFORM_CLIP)
anchor_ymin = anchors[..., 0:1]
anchor_xmin = anchors[..., 1:2]
anchor_ymax = anchors[..., 2:3]
anchor_xmax = anchors[..., 3:4]
anchor_h = anchor_ymax - anchor_ymin + 1.0
anchor_w = anchor_xmax - anchor_xmin + 1.0
anchor_yc = anchor_ymin + 0.5 * anchor_h
anchor_xc = anchor_xmin + 0.5 * anchor_w
decoded_boxes_yc = dy * anchor_h + anchor_yc
decoded_boxes_xc = dx * anchor_w + anchor_xc
decoded_boxes_h = tf.exp(dh) * anchor_h
decoded_boxes_w = tf.exp(dw) * anchor_w
decoded_boxes_ymin = decoded_boxes_yc - 0.5 * decoded_boxes_h
decoded_boxes_xmin = decoded_boxes_xc - 0.5 * decoded_boxes_w
decoded_boxes_ymax = decoded_boxes_ymin + decoded_boxes_h - 1.0
decoded_boxes_xmax = decoded_boxes_xmin + decoded_boxes_w - 1.0
decoded_boxes = tf.concat(
[decoded_boxes_ymin, decoded_boxes_xmin,
decoded_boxes_ymax, decoded_boxes_xmax],
axis=-1)
return decoded_boxes
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Utility functions for input processing."""
import math
import tensorflow.compat.v2 as tf
from official.vision.detection.utils import box_utils
from official.vision.detection.utils.object_detection import preprocessor
def pad_to_fixed_size(input_tensor, size, constant_values=0):
"""Pads data to a fixed length at the first dimension.
Args:
input_tensor: `Tensor` with any dimension.
size: `int` number for the first dimension of output Tensor.
constant_values: `int` value assigned to the paddings.
Returns:
`Tensor` with the first dimension padded to `size`.
"""
input_shape = input_tensor.get_shape().as_list()
padding_shape = []
# Computes the padding length on the first dimension.
padding_length = size - tf.shape(input=input_tensor)[0]
assert_length = tf.Assert(
tf.greater_equal(padding_length, 0), [padding_length])
with tf.control_dependencies([assert_length]):
padding_shape.append(padding_length)
# Copies shapes of the rest of input shape dimensions.
for i in range(1, len(input_shape)):
padding_shape.append(tf.shape(input=input_tensor)[i])
# Pads input tensor to the fixed first dimension.
paddings = tf.cast(constant_values * tf.ones(padding_shape),
input_tensor.dtype)
padded_tensor = tf.concat([input_tensor, paddings], axis=0)
output_shape = input_shape
output_shape[0] = size
padded_tensor.set_shape(output_shape)
return padded_tensor
def normalize_image(image,
offset=(0.485, 0.456, 0.406),
scale=(0.229, 0.224, 0.225)):
"""Normalizes the image to zero mean and unit variance."""
image = tf.image.convert_image_dtype(image, dtype=tf.float32)
offset = tf.constant(offset)
offset = tf.expand_dims(offset, axis=0)
offset = tf.expand_dims(offset, axis=0)
image -= offset
scale = tf.constant(scale)
scale = tf.expand_dims(scale, axis=0)
scale = tf.expand_dims(scale, axis=0)
image /= scale
return image
def compute_padded_size(desired_size, stride):
"""Compute the padded size given the desired size and the stride.
The padded size will be the smallest rectangle, such that each dimension is
the smallest multiple of the stride which is larger than the desired
dimension. For example, if desired_size = (100, 200) and stride = 32,
the output padded_size = (128, 224).
Args:
desired_size: a `Tensor` or `int` list/tuple of two elements representing
[height, width] of the target output image size.
stride: an integer, the stride of the backbone network.
Returns:
padded_size: a `Tensor` or `int` list/tuple of two elements representing
[height, width] of the padded output image size.
"""
if isinstance(desired_size, list) or isinstance(desired_size, tuple):
padded_size = [int(math.ceil(d * 1.0 / stride) * stride)
for d in desired_size]
else:
padded_size = tf.cast(
tf.math.ceil(
tf.cast(desired_size, dtype=tf.float32) / stride) * stride,
tf.int32)
return padded_size
def resize_and_crop_image(image,
desired_size,
padded_size,
aug_scale_min=1.0,
aug_scale_max=1.0,
seed=1,
method=tf.image.ResizeMethod.BILINEAR):
"""Resizes the input image to output size.
Resize and pad images given the desired output size of the image and
stride size.
Here are the preprocessing steps.
1. For a given image, keep its aspect ratio and rescale the image to make it
the largest rectangle to be bounded by the rectangle specified by the
`desired_size`.
2. Pad the rescaled image to the padded_size.
Args:
image: a `Tensor` of shape [height, width, 3] representing an image.
desired_size: a `Tensor` or `int` list/tuple of two elements representing
[height, width] of the desired actual output image size.
padded_size: a `Tensor` or `int` list/tuple of two elements representing
[height, width] of the padded output image size. Padding will be applied
after scaling the image to the desired_size.
aug_scale_min: a `float` with range between [0, 1.0] representing minimum
random scale applied to desired_size for training scale jittering.
aug_scale_max: a `float` with range between [1.0, inf] representing maximum
random scale applied to desired_size for training scale jittering.
seed: seed for random scale jittering.
method: function to resize input image to scaled image.
Returns:
output_image: `Tensor` of shape [height, width, 3] where [height, width]
equals to `output_size`.
image_info: a 2D `Tensor` that encodes the information of the image and the
applied preprocessing. It is in the format of
[[original_height, original_width], [scaled_height, scaled_width],
[y_scale, x_scale], [y_offset, x_offset]], where [scaled_height,
scaled_width] is the actual scaled image size, and [y_scale, x_scale] is
the scaling factory, which is the ratio of
scaled dimension / original dimension.
"""
with tf.name_scope('resize_and_crop_image'):
image_size = tf.cast(tf.shape(input=image)[0:2], tf.float32)
random_jittering = (aug_scale_min != 1.0 or aug_scale_max != 1.0)
if random_jittering:
random_scale = tf.random.uniform([],
aug_scale_min,
aug_scale_max,
seed=seed)
scaled_size = tf.round(random_scale * desired_size)
else:
scaled_size = desired_size
scale = tf.minimum(
scaled_size[0] / image_size[0], scaled_size[1] / image_size[1])
scaled_size = tf.round(image_size * scale)
# Computes 2D image_scale.
image_scale = scaled_size / image_size
# Selects non-zero random offset (x, y) if scaled image is larger than
# desired_size.
if random_jittering:
max_offset = scaled_size - desired_size
max_offset = tf.where(tf.less(max_offset, 0),
tf.zeros_like(max_offset),
max_offset)
offset = max_offset * tf.random.uniform([
2,
], 0, 1, seed=seed)
offset = tf.cast(offset, tf.int32)
else:
offset = tf.zeros((2,), tf.int32)
scaled_image = tf.image.resize(
image, tf.cast(scaled_size, tf.int32), method=method)
if random_jittering:
scaled_image = scaled_image[
offset[0]:offset[0] + desired_size[0],
offset[1]:offset[1] + desired_size[1], :]
output_image = tf.image.pad_to_bounding_box(
scaled_image, 0, 0, padded_size[0], padded_size[1])
image_info = tf.stack([
image_size,
scaled_size,
image_scale,
tf.cast(offset, tf.float32)])
return output_image, image_info
def resize_and_crop_boxes(boxes,
image_scale,
output_size,
offset):
"""Resizes boxes to output size with scale and offset.
Args:
boxes: `Tensor` of shape [N, 4] representing ground truth boxes.
image_scale: 2D float `Tensor` representing scale factors that apply to
[height, width] of input image.
output_size: 2D `Tensor` or `int` representing [height, width] of target
output image size.
offset: 2D `Tensor` representing top-left corner [y0, x0] to crop scaled
boxes.
Returns:
boxes: `Tensor` of shape [N, 4] representing the scaled boxes.
"""
# Adjusts box coordinates based on image_scale and offset.
boxes *= tf.tile(tf.expand_dims(image_scale, axis=0), [1, 2])
boxes -= tf.tile(tf.expand_dims(offset, axis=0), [1, 2])
# Clips the boxes.
boxes = box_utils.clip_boxes(boxes, output_size)
return boxes
def resize_and_crop_masks(masks,
image_scale,
output_size,
offset):
"""Resizes boxes to output size with scale and offset.
Args:
masks: `Tensor` of shape [N, H, W, 1] representing ground truth masks.
image_scale: 2D float `Tensor` representing scale factors that apply to
[height, width] of input image.
output_size: 2D `Tensor` or `int` representing [height, width] of target
output image size.
offset: 2D `Tensor` representing top-left corner [y0, x0] to crop scaled
boxes.
Returns:
masks: `Tensor` of shape [N, H, W, 1] representing the scaled masks.
"""
mask_size = tf.shape(input=masks)[1:3]
scaled_size = tf.cast(image_scale * mask_size, tf.int32)
scaled_masks = tf.image.resize(
masks, scaled_size, method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
offset = tf.cast(offset, tf.int32)
scaled_masks = scaled_masks[:, offset[0]:offset[0] + output_size[0],
offset[1]:offset[1] + output_size[1], :]
output_masks = tf.image.pad_to_bounding_box(scaled_masks, 0, 0,
output_size[0], output_size[1])
return output_masks
def random_horizontal_flip(image, boxes=None, masks=None):
"""Randomly flips input image and bounding boxes."""
return preprocessor.random_horizontal_flip(image, boxes, masks)
def get_non_empty_box_indices(boxes):
"""Get indices for non-empty boxes."""
# Selects indices if box height or width is 0.
height = boxes[:, 2] - boxes[:, 0]
width = boxes[:, 3] - boxes[:, 1]
indices = tf.where(tf.logical_and(tf.greater(height, 0),
tf.greater(width, 0)))
return indices[:, 0]
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Argmax matcher implementation.
This class takes a similarity matrix and matches columns to rows based on the
maximum value per column. One can specify matched_thresholds and
to prevent columns from matching to rows (generally resulting in a negative
training example) and unmatched_theshold to ignore the match (generally
resulting in neither a positive or negative training example).
This matcher is used in Fast(er)-RCNN.
Note: matchers are used in TargetAssigners. There is a create_target_assigner
factory function for popular implementations.
"""
import tensorflow.compat.v2 as tf
from official.vision.detection.utils.object_detection import matcher
from official.vision.detection.utils.object_detection import shape_utils
class ArgMaxMatcher(matcher.Matcher):
"""Matcher based on highest value.
This class computes matches from a similarity matrix. Each column is matched
to a single row.
To support object detection target assignment this class enables setting both
matched_threshold (upper threshold) and unmatched_threshold (lower thresholds)
defining three categories of similarity which define whether examples are
positive, negative, or ignored:
(1) similarity >= matched_threshold: Highest similarity. Matched/Positive!
(2) matched_threshold > similarity >= unmatched_threshold: Medium similarity.
Depending on negatives_lower_than_unmatched, this is either
Unmatched/Negative OR Ignore.
(3) unmatched_threshold > similarity: Lowest similarity. Depending on flag
negatives_lower_than_unmatched, either Unmatched/Negative OR Ignore.
For ignored matches this class sets the values in the Match object to -2.
"""
def __init__(self,
matched_threshold,
unmatched_threshold=None,
negatives_lower_than_unmatched=True,
force_match_for_each_row=False):
"""Construct ArgMaxMatcher.
Args:
matched_threshold: Threshold for positive matches. Positive if
sim >= matched_threshold, where sim is the maximum value of the
similarity matrix for a given column. Set to None for no threshold.
unmatched_threshold: Threshold for negative matches. Negative if
sim < unmatched_threshold. Defaults to matched_threshold
when set to None.
negatives_lower_than_unmatched: Boolean which defaults to True. If True
then negative matches are the ones below the unmatched_threshold,
whereas ignored matches are in between the matched and umatched
threshold. If False, then negative matches are in between the matched
and unmatched threshold, and everything lower than unmatched is ignored.
force_match_for_each_row: If True, ensures that each row is matched to
at least one column (which is not guaranteed otherwise if the
matched_threshold is high). Defaults to False. See
argmax_matcher_test.testMatcherForceMatch() for an example.
Raises:
ValueError: if unmatched_threshold is set but matched_threshold is not set
or if unmatched_threshold > matched_threshold.
"""
if (matched_threshold is None) and (unmatched_threshold is not None):
raise ValueError('Need to also define matched_threshold when'
'unmatched_threshold is defined')
self._matched_threshold = matched_threshold
if unmatched_threshold is None:
self._unmatched_threshold = matched_threshold
else:
if unmatched_threshold > matched_threshold:
raise ValueError('unmatched_threshold needs to be smaller or equal'
'to matched_threshold')
self._unmatched_threshold = unmatched_threshold
if not negatives_lower_than_unmatched:
if self._unmatched_threshold == self._matched_threshold:
raise ValueError('When negatives are in between matched and '
'unmatched thresholds, these cannot be of equal '
'value. matched: %s, unmatched: %s',
self._matched_threshold, self._unmatched_threshold)
self._force_match_for_each_row = force_match_for_each_row
self._negatives_lower_than_unmatched = negatives_lower_than_unmatched
def _match(self, similarity_matrix):
"""Tries to match each column of the similarity matrix to a row.
Args:
similarity_matrix: tensor of shape [N, M] representing any similarity
metric.
Returns:
Match object with corresponding matches for each of M columns.
"""
def _match_when_rows_are_empty():
"""Performs matching when the rows of similarity matrix are empty.
When the rows are empty, all detections are false positives. So we return
a tensor of -1's to indicate that the columns do not match to any rows.
Returns:
matches: int32 tensor indicating the row each column matches to.
"""
similarity_matrix_shape = shape_utils.combined_static_and_dynamic_shape(
similarity_matrix)
return -1 * tf.ones([similarity_matrix_shape[1]], dtype=tf.int32)
def _match_when_rows_are_non_empty():
"""Performs matching when the rows of similarity matrix are non empty.
Returns:
matches: int32 tensor indicating the row each column matches to.
"""
# Matches for each column
matches = tf.argmax(input=similarity_matrix, axis=0, output_type=tf.int32)
# Deal with matched and unmatched threshold
if self._matched_threshold is not None:
# Get logical indices of ignored and unmatched columns as tf.int64
matched_vals = tf.reduce_max(input_tensor=similarity_matrix, axis=0)
below_unmatched_threshold = tf.greater(self._unmatched_threshold,
matched_vals)
between_thresholds = tf.logical_and(
tf.greater_equal(matched_vals, self._unmatched_threshold),
tf.greater(self._matched_threshold, matched_vals))
if self._negatives_lower_than_unmatched:
matches = self._set_values_using_indicator(matches,
below_unmatched_threshold,
-1)
matches = self._set_values_using_indicator(matches,
between_thresholds,
-2)
else:
matches = self._set_values_using_indicator(matches,
below_unmatched_threshold,
-2)
matches = self._set_values_using_indicator(matches,
between_thresholds,
-1)
if self._force_match_for_each_row:
similarity_matrix_shape = shape_utils.combined_static_and_dynamic_shape(
similarity_matrix)
force_match_column_ids = tf.argmax(
input=similarity_matrix, axis=1, output_type=tf.int32)
force_match_column_indicators = tf.one_hot(
force_match_column_ids, depth=similarity_matrix_shape[1])
force_match_row_ids = tf.argmax(
input=force_match_column_indicators, axis=0, output_type=tf.int32)
force_match_column_mask = tf.cast(
tf.reduce_max(input_tensor=force_match_column_indicators, axis=0),
tf.bool)
final_matches = tf.where(force_match_column_mask, force_match_row_ids,
matches)
return final_matches
else:
return matches
if similarity_matrix.shape.is_fully_defined():
if similarity_matrix.shape.dims[0].value == 0:
return _match_when_rows_are_empty()
else:
return _match_when_rows_are_non_empty()
else:
return tf.cond(
pred=tf.greater(tf.shape(input=similarity_matrix)[0], 0),
true_fn=_match_when_rows_are_non_empty,
false_fn=_match_when_rows_are_empty)
def _set_values_using_indicator(self, x, indicator, val):
"""Set the indicated fields of x to val.
Args:
x: tensor.
indicator: boolean with same shape as x.
val: scalar with value to set.
Returns:
modified tensor.
"""
indicator = tf.cast(indicator, x.dtype)
return tf.add(tf.multiply(x, 1 - indicator), val * indicator)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment