Commit e4be7e00 authored by Yeqing Li's avatar Yeqing Li Committed by A. Unique TensorFlower
Browse files

Removes unneeded content of the beta folder.

PiperOrigin-RevId: 437276665
parent f47405b5
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of the Panoptic Quality metric.
Panoptic Quality is an instance-based metric for evaluating the task of
image parsing, aka panoptic segmentation.
Please see the paper for details:
"Panoptic Segmentation", Alexander Kirillov, Kaiming He, Ross Girshick,
Carsten Rother and Piotr Dollar. arXiv:1801.00868, 2018.
Note that this metric class is branched from
https://github.com/tensorflow/models/blob/master/research/deeplab/evaluation/panoptic_quality.py
"""
import collections
import numpy as np
_EPSILON = 1e-10
def realdiv_maybe_zero(x, y):
"""Element-wise x / y where y may contain zeros, for those returns 0 too."""
return np.where(
np.less(np.abs(y), _EPSILON), np.zeros_like(x), np.divide(x, y))
def _ids_to_counts(id_array):
"""Given a numpy array, a mapping from each unique entry to its count."""
ids, counts = np.unique(id_array, return_counts=True)
return dict(zip(ids, counts))
class PanopticQuality:
"""Metric class for Panoptic Quality.
"Panoptic Segmentation" by Alexander Kirillov, Kaiming He, Ross Girshick,
Carsten Rother, Piotr Dollar.
https://arxiv.org/abs/1801.00868
"""
def __init__(self, num_categories, ignored_label, max_instances_per_category,
offset):
"""Initialization for PanopticQualityMetric.
Args:
num_categories: The number of segmentation categories (or "classes" in the
dataset.
ignored_label: A category id that is ignored in evaluation, e.g. the void
label as defined in COCO panoptic segmentation dataset.
max_instances_per_category: The maximum number of instances for each
category. Used in ensuring unique instance labels.
offset: The maximum number of unique labels. This is used, by multiplying
the ground-truth labels, to generate unique ids for individual regions
of overlap between groundtruth and predicted segments.
"""
self.num_categories = num_categories
self.ignored_label = ignored_label
self.max_instances_per_category = max_instances_per_category
self.offset = offset
self.reset()
def _naively_combine_labels(self, category_mask, instance_mask):
"""Naively creates a combined label array from categories and instances."""
return (category_mask.astype(np.uint32) * self.max_instances_per_category +
instance_mask.astype(np.uint32))
def compare_and_accumulate(self, groundtruths, predictions):
"""Compares predicted segmentation with groundtruth, accumulates its metric.
It is not assumed that instance ids are unique across different categories.
See for example combine_semantic_and_instance_predictions.py in official
PanopticAPI evaluation code for issues to consider when fusing category
and instance labels.
Instances ids of the ignored category have the meaning that id 0 is "void"
and remaining ones are crowd instances.
Args:
groundtruths: A dictionary contains groundtruth labels. It should contain
the following fields.
- category_mask: A 2D numpy uint16 array of groundtruth per-pixel
category labels.
- instance_mask: A 2D numpy uint16 array of groundtruth instance labels.
predictions: A dictionary contains the model outputs. It should contain
the following fields.
- category_array: A 2D numpy uint16 array of predicted per-pixel
category labels.
- instance_array: A 2D numpy uint16 array of predicted instance labels.
"""
groundtruth_category_mask = groundtruths['category_mask']
groundtruth_instance_mask = groundtruths['instance_mask']
predicted_category_mask = predictions['category_mask']
predicted_instance_mask = predictions['instance_mask']
# First, combine the category and instance labels so that every unique
# value for (category, instance) is assigned a unique integer label.
pred_segment_id = self._naively_combine_labels(predicted_category_mask,
predicted_instance_mask)
gt_segment_id = self._naively_combine_labels(groundtruth_category_mask,
groundtruth_instance_mask)
# Pre-calculate areas for all groundtruth and predicted segments.
gt_segment_areas = _ids_to_counts(gt_segment_id)
pred_segment_areas = _ids_to_counts(pred_segment_id)
# We assume there is only one void segment and it has instance id = 0.
void_segment_id = self.ignored_label * self.max_instances_per_category
# There may be other ignored groundtruth segments with instance id > 0, find
# those ids using the unique segment ids extracted with the area computation
# above.
ignored_segment_ids = {
gt_segment_id for gt_segment_id in gt_segment_areas
if (gt_segment_id //
self.max_instances_per_category) == self.ignored_label
}
# Next, combine the groundtruth and predicted labels. Dividing up the pixels
# based on which groundtruth segment and which predicted segment they belong
# to, this will assign a different 32-bit integer label to each choice
# of (groundtruth segment, predicted segment), encoded as
# gt_segment_id * offset + pred_segment_id.
intersection_id_array = (
gt_segment_id.astype(np.uint64) * self.offset +
pred_segment_id.astype(np.uint64))
# For every combination of (groundtruth segment, predicted segment) with a
# non-empty intersection, this counts the number of pixels in that
# intersection.
intersection_areas = _ids_to_counts(intersection_id_array)
# Helper function that computes the area of the overlap between a predicted
# segment and the ground-truth void/ignored segment.
def prediction_void_overlap(pred_segment_id):
void_intersection_id = void_segment_id * self.offset + pred_segment_id
return intersection_areas.get(void_intersection_id, 0)
# Compute overall ignored overlap.
def prediction_ignored_overlap(pred_segment_id):
total_ignored_overlap = 0
for ignored_segment_id in ignored_segment_ids:
intersection_id = ignored_segment_id * self.offset + pred_segment_id
total_ignored_overlap += intersection_areas.get(intersection_id, 0)
return total_ignored_overlap
# Sets that are populated with which segments groundtruth/predicted segments
# have been matched with overlapping predicted/groundtruth segments
# respectively.
gt_matched = set()
pred_matched = set()
# Calculate IoU per pair of intersecting segments of the same category.
for intersection_id, intersection_area in intersection_areas.items():
gt_segment_id = int(intersection_id // self.offset)
pred_segment_id = int(intersection_id % self.offset)
gt_category = int(gt_segment_id // self.max_instances_per_category)
pred_category = int(pred_segment_id // self.max_instances_per_category)
if gt_category != pred_category:
continue
# Union between the groundtruth and predicted segments being compared does
# not include the portion of the predicted segment that consists of
# groundtruth "void" pixels.
union = (
gt_segment_areas[gt_segment_id] +
pred_segment_areas[pred_segment_id] - intersection_area -
prediction_void_overlap(pred_segment_id))
iou = intersection_area / union
if iou > 0.5:
self.tp_per_class[gt_category] += 1
self.iou_per_class[gt_category] += iou
gt_matched.add(gt_segment_id)
pred_matched.add(pred_segment_id)
# Count false negatives for each category.
for gt_segment_id in gt_segment_areas:
if gt_segment_id in gt_matched:
continue
category = gt_segment_id // self.max_instances_per_category
# Failing to detect a void segment is not a false negative.
if category == self.ignored_label:
continue
self.fn_per_class[category] += 1
# Count false positives for each category.
for pred_segment_id in pred_segment_areas:
if pred_segment_id in pred_matched:
continue
# A false positive is not penalized if is mostly ignored in the
# groundtruth.
if (prediction_ignored_overlap(pred_segment_id) /
pred_segment_areas[pred_segment_id]) > 0.5:
continue
category = pred_segment_id // self.max_instances_per_category
self.fp_per_class[category] += 1
def _valid_categories(self):
"""Categories with a "valid" value for the metric, have > 0 instances.
We will ignore the `ignore_label` class and other classes which have
`tp + fn + fp = 0`.
Returns:
Boolean array of shape `[num_categories]`.
"""
valid_categories = np.not_equal(
self.tp_per_class + self.fn_per_class + self.fp_per_class, 0)
if self.ignored_label >= 0 and self.ignored_label < self.num_categories:
valid_categories[self.ignored_label] = False
return valid_categories
def result_per_category(self):
"""For supported metrics, return individual per-category metric values.
Returns:
A dictionary contains all per-class metrics, each metrics is a numpy array
of shape `[self.num_categories]`, where index `i` is the metrics value
over only that category.
"""
sq_per_class = realdiv_maybe_zero(self.iou_per_class, self.tp_per_class)
rq_per_class = realdiv_maybe_zero(
self.tp_per_class,
self.tp_per_class + 0.5 * self.fn_per_class + 0.5 * self.fp_per_class)
return {
'sq_per_class': sq_per_class,
'rq_per_class': rq_per_class,
'pq_per_class': np.multiply(sq_per_class, rq_per_class)
}
def result(self, is_thing=None):
"""Computes and returns the detailed metric results over all comparisons.
Args:
is_thing: A boolean array of length `num_categories`. The entry
`is_thing[category_id]` is True iff that category is a "thing" category
instead of "stuff."
Returns:
A dictionary with a breakdown of metrics and/or metric factors by things,
stuff, and all categories.
"""
results = self.result_per_category()
valid_categories = self._valid_categories()
# If known, break down which categories are valid _and_ things/stuff.
category_sets = collections.OrderedDict()
category_sets['All'] = valid_categories
if is_thing is not None:
category_sets['Things'] = np.logical_and(valid_categories, is_thing)
category_sets['Stuff'] = np.logical_and(valid_categories,
np.logical_not(is_thing))
for category_set_name, in_category_set in category_sets.items():
if np.any(in_category_set):
results.update({
f'{category_set_name}_pq':
np.mean(results['pq_per_class'][in_category_set]),
f'{category_set_name}_sq':
np.mean(results['sq_per_class'][in_category_set]),
f'{category_set_name}_rq':
np.mean(results['rq_per_class'][in_category_set]),
# The number of categories in this subset.
f'{category_set_name}_num_categories':
np.sum(in_category_set.astype(np.int32)),
})
else:
results[category_set_name] = {
f'{category_set_name}_pq': 0.,
f'{category_set_name}_sq': 0.,
f'{category_set_name}_rq': 0.,
f'{category_set_name}_num_categories': 0
}
return results
def reset(self):
"""Resets the accumulation to the metric class's state at initialization."""
self.iou_per_class = np.zeros(self.num_categories, dtype=np.float64)
self.tp_per_class = np.zeros(self.num_categories, dtype=np.float64)
self.fn_per_class = np.zeros(self.num_categories, dtype=np.float64)
self.fp_per_class = np.zeros(self.num_categories, dtype=np.float64)
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The panoptic quality evaluator.
The following snippet demonstrates the use of interfaces:
evaluator = PanopticQualityEvaluator(...)
for _ in range(num_evals):
for _ in range(num_batches_per_eval):
predictions, groundtruth = predictor.predict(...) # pop a batch.
evaluator.update_state(groundtruths, predictions)
evaluator.result() # finish one full eval and reset states.
See also: https://github.com/cocodataset/cocoapi/
"""
import numpy as np
import tensorflow as tf
from official.vision.beta.evaluation import panoptic_quality
def _crop_padding(mask, image_info):
"""Crops padded masks to match original image shape.
Args:
mask: a padded mask tensor.
image_info: a tensor that holds information about original and preprocessed
images.
Returns:
cropped and padded masks: tf.Tensor
"""
image_shape = tf.cast(image_info[0, :], tf.int32)
mask = tf.image.crop_to_bounding_box(
tf.expand_dims(mask, axis=-1), 0, 0,
image_shape[0], image_shape[1])
return tf.expand_dims(mask[:, :, 0], axis=0)
class PanopticQualityEvaluator:
"""Panoptic Quality metric class."""
def __init__(self, num_categories, ignored_label, max_instances_per_category,
offset, is_thing=None, rescale_predictions=False):
"""Constructs Panoptic Quality evaluation class.
The class provides the interface to Panoptic Quality metrics_fn.
Args:
num_categories: The number of segmentation categories (or "classes" in the
dataset.
ignored_label: A category id that is ignored in evaluation, e.g. the void
label as defined in COCO panoptic segmentation dataset.
max_instances_per_category: The maximum number of instances for each
category. Used in ensuring unique instance labels.
offset: The maximum number of unique labels. This is used, by multiplying
the ground-truth labels, to generate unique ids for individual regions
of overlap between groundtruth and predicted segments.
is_thing: A boolean array of length `num_categories`. The entry
`is_thing[category_id]` is True iff that category is a "thing" category
instead of "stuff." Default to `None`, and it means categories are not
classified into these two categories.
rescale_predictions: `bool`, whether to scale back prediction to original
image sizes. If True, groundtruths['image_info'] is used to rescale
predictions.
"""
self._pq_metric_module = panoptic_quality.PanopticQuality(
num_categories, ignored_label, max_instances_per_category, offset)
self._is_thing = is_thing
self._rescale_predictions = rescale_predictions
self._required_prediction_fields = ['category_mask', 'instance_mask']
self._required_groundtruth_fields = ['category_mask', 'instance_mask']
self.reset_states()
@property
def name(self):
return 'panoptic_quality'
def reset_states(self):
"""Resets internal states for a fresh run."""
self._pq_metric_module.reset()
def result(self):
"""Evaluates detection results, and reset_states."""
results = self._pq_metric_module.result(self._is_thing)
self.reset_states()
return results
def _convert_to_numpy(self, groundtruths, predictions):
"""Converts tesnors to numpy arrays."""
if groundtruths:
labels = tf.nest.map_structure(lambda x: x.numpy(), groundtruths)
numpy_groundtruths = {}
for key, val in labels.items():
if isinstance(val, tuple):
val = np.concatenate(val)
numpy_groundtruths[key] = val
else:
numpy_groundtruths = groundtruths
if predictions:
outputs = tf.nest.map_structure(lambda x: x.numpy(), predictions)
numpy_predictions = {}
for key, val in outputs.items():
if isinstance(val, tuple):
val = np.concatenate(val)
numpy_predictions[key] = val
else:
numpy_predictions = predictions
return numpy_groundtruths, numpy_predictions
def update_state(self, groundtruths, predictions):
"""Update and aggregate detection results and groundtruth data.
Args:
groundtruths: a dictionary of Tensors including the fields below. See also
different parsers under `../dataloader` for more details.
Required fields:
- category_mask: a numpy array of uint16 of shape [batch_size, H, W].
- instance_mask: a numpy array of uint16 of shape [batch_size, H, W].
- image_info: [batch, 4, 2], a tensor that holds information about
original and preprocessed images. Each entry is in the format of
[[original_height, original_width], [input_height, input_width],
[y_scale, x_scale], [y_offset, x_offset]], where [desired_height,
desired_width] is the actual scaled image size, and [y_scale, x_scale]
is the scaling factor, which is the ratio of scaled dimension /
original dimension.
predictions: a dictionary of tensors including the fields below. See
different parsers under `../dataloader` for more details.
Required fields:
- category_mask: a numpy array of uint16 of shape [batch_size, H, W].
- instance_mask: a numpy array of uint16 of shape [batch_size, H, W].
Raises:
ValueError: if the required prediction or groundtruth fields are not
present in the incoming `predictions` or `groundtruths`.
"""
groundtruths, predictions = self._convert_to_numpy(groundtruths,
predictions)
for k in self._required_prediction_fields:
if k not in predictions:
raise ValueError(
'Missing the required key `{}` in predictions!'.format(k))
for k in self._required_groundtruth_fields:
if k not in groundtruths:
raise ValueError(
'Missing the required key `{}` in groundtruths!'.format(k))
if self._rescale_predictions:
for idx in range(len(groundtruths['category_mask'])):
image_info = groundtruths['image_info'][idx]
groundtruths_ = {
'category_mask':
_crop_padding(groundtruths['category_mask'][idx], image_info),
'instance_mask':
_crop_padding(groundtruths['instance_mask'][idx], image_info),
}
predictions_ = {
'category_mask':
_crop_padding(predictions['category_mask'][idx], image_info),
'instance_mask':
_crop_padding(predictions['instance_mask'][idx], image_info),
}
groundtruths_, predictions_ = self._convert_to_numpy(
groundtruths_, predictions_)
self._pq_metric_module.compare_and_accumulate(
groundtruths_, predictions_)
else:
self._pq_metric_module.compare_and_accumulate(groundtruths, predictions)
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for panoptic_quality_evaluator."""
import numpy as np
import tensorflow as tf
from official.vision.beta.evaluation import panoptic_quality_evaluator
class PanopticQualityEvaluatorTest(tf.test.TestCase):
def test_multiple_batches(self):
category_mask = np.zeros([6, 6], np.uint16)
groundtruth_instance_mask = np.array([
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 2, 2, 2, 1],
[1, 2, 2, 2, 2, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
],
dtype=np.uint16)
good_det_instance_mask = np.array([
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 2, 2, 2, 2, 1],
[1, 2, 2, 2, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
],
dtype=np.uint16)
groundtruths = {
'category_mask':
tf.convert_to_tensor([category_mask]),
'instance_mask':
tf.convert_to_tensor([groundtruth_instance_mask]),
'image_info':
tf.convert_to_tensor([[[6, 6], [6, 6], [1.0, 1.0], [0, 0]]],
dtype=tf.float32)
}
predictions = {
'category_mask': tf.convert_to_tensor([category_mask]),
'instance_mask': tf.convert_to_tensor([good_det_instance_mask])
}
pq_evaluator = panoptic_quality_evaluator.PanopticQualityEvaluator(
num_categories=1,
ignored_label=2,
max_instances_per_category=16,
offset=16,
rescale_predictions=True)
for _ in range(2):
pq_evaluator.update_state(groundtruths, predictions)
bad_det_instance_mask = np.array([
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 2, 2, 1],
[1, 1, 1, 2, 2, 1],
[1, 1, 1, 2, 2, 1],
[1, 1, 1, 1, 1, 1],
],
dtype=np.uint16)
predictions['instance_mask'] = tf.convert_to_tensor([bad_det_instance_mask])
for _ in range(2):
pq_evaluator.update_state(groundtruths, predictions)
results = pq_evaluator.result()
np.testing.assert_array_equal(results['pq_per_class'],
[((28 / 30 + 6 / 8) + (27 / 32)) / 2 / 2])
np.testing.assert_array_equal(results['rq_per_class'], [3 / 4])
np.testing.assert_array_equal(results['sq_per_class'],
[((28 / 30 + 6 / 8) + (27 / 32)) / 3])
self.assertAlmostEqual(results['All_pq'], 0.63177083)
self.assertAlmostEqual(results['All_rq'], 0.75)
self.assertAlmostEqual(results['All_sq'], 0.84236111)
self.assertEqual(results['All_num_categories'], 1)
if __name__ == '__main__':
tf.test.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for Panoptic Quality metric.
Note that this metric test class is branched from
https://github.com/tensorflow/models/blob/master/research/deeplab/evaluation/panoptic_quality_test.py
"""
from absl.testing import absltest
import numpy as np
from official.vision.beta.evaluation import panoptic_quality
class PanopticQualityTest(absltest.TestCase):
def test_perfect_match(self):
category_mask = np.zeros([6, 6], np.uint16)
instance_mask = np.array([
[1, 1, 1, 1, 1, 1],
[1, 2, 2, 2, 2, 1],
[1, 2, 2, 2, 2, 1],
[1, 2, 2, 2, 2, 1],
[1, 2, 2, 1, 1, 1],
[1, 2, 1, 1, 1, 1],
],
dtype=np.uint16)
groundtruths = {
'category_mask': category_mask,
'instance_mask': instance_mask
}
predictions = {
'category_mask': category_mask,
'instance_mask': instance_mask
}
pq_metric = panoptic_quality.PanopticQuality(
num_categories=1,
ignored_label=2,
max_instances_per_category=16,
offset=16)
pq_metric.compare_and_accumulate(groundtruths, predictions)
np.testing.assert_array_equal(pq_metric.iou_per_class, [2.0])
np.testing.assert_array_equal(pq_metric.tp_per_class, [2])
np.testing.assert_array_equal(pq_metric.fn_per_class, [0])
np.testing.assert_array_equal(pq_metric.fp_per_class, [0])
results = pq_metric.result()
np.testing.assert_array_equal(results['pq_per_class'], [1.0])
np.testing.assert_array_equal(results['rq_per_class'], [1.0])
np.testing.assert_array_equal(results['sq_per_class'], [1.0])
self.assertAlmostEqual(results['All_pq'], 1.0)
self.assertAlmostEqual(results['All_rq'], 1.0)
self.assertAlmostEqual(results['All_sq'], 1.0)
self.assertEqual(results['All_num_categories'], 1)
def test_totally_wrong(self):
category_mask = np.array([
[0, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 1, 0],
[0, 1, 1, 1, 1, 0],
[0, 1, 1, 1, 1, 0],
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
],
dtype=np.uint16)
instance_mask = np.zeros([6, 6], np.uint16)
groundtruths = {
'category_mask': category_mask,
'instance_mask': instance_mask
}
predictions = {
'category_mask': 1 - category_mask,
'instance_mask': instance_mask
}
pq_metric = panoptic_quality.PanopticQuality(
num_categories=2,
ignored_label=2,
max_instances_per_category=1,
offset=16)
pq_metric.compare_and_accumulate(groundtruths, predictions)
np.testing.assert_array_equal(pq_metric.iou_per_class, [0.0, 0.0])
np.testing.assert_array_equal(pq_metric.tp_per_class, [0, 0])
np.testing.assert_array_equal(pq_metric.fn_per_class, [1, 1])
np.testing.assert_array_equal(pq_metric.fp_per_class, [1, 1])
results = pq_metric.result()
np.testing.assert_array_equal(results['pq_per_class'], [0.0, 0.0])
np.testing.assert_array_equal(results['rq_per_class'], [0.0, 0.0])
np.testing.assert_array_equal(results['sq_per_class'], [0.0, 0.0])
self.assertAlmostEqual(results['All_pq'], 0.0)
self.assertAlmostEqual(results['All_rq'], 0.0)
self.assertAlmostEqual(results['All_sq'], 0.0)
self.assertEqual(results['All_num_categories'], 2)
def test_matches_by_iou(self):
groundtruth_instance_mask = np.array(
[
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 2, 2, 2, 1],
[1, 2, 2, 2, 2, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
],
dtype=np.uint16)
good_det_instance_mask = np.array(
[
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 2, 2, 2, 2, 1],
[1, 2, 2, 2, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
],
dtype=np.uint16)
groundtruths = {
'category_mask': np.zeros_like(groundtruth_instance_mask),
'instance_mask': groundtruth_instance_mask
}
predictions = {
'category_mask': np.zeros_like(good_det_instance_mask),
'instance_mask': good_det_instance_mask
}
pq_metric = panoptic_quality.PanopticQuality(
num_categories=1,
ignored_label=2,
max_instances_per_category=16,
offset=16)
pq_metric.compare_and_accumulate(groundtruths, predictions)
# iou(1, 1) = 28/30
# iou(2, 2) = 6 / 8
np.testing.assert_array_almost_equal(pq_metric.iou_per_class,
[28 / 30 + 6 / 8])
np.testing.assert_array_equal(pq_metric.tp_per_class, [2])
np.testing.assert_array_equal(pq_metric.fn_per_class, [0])
np.testing.assert_array_equal(pq_metric.fp_per_class, [0])
results = pq_metric.result()
np.testing.assert_array_equal(results['pq_per_class'],
[(28 / 30 + 6 / 8) / 2])
np.testing.assert_array_equal(results['rq_per_class'], [1.0])
np.testing.assert_array_equal(results['sq_per_class'],
[(28 / 30 + 6 / 8) / 2])
self.assertAlmostEqual(results['All_pq'], (28 / 30 + 6 / 8) / 2)
self.assertAlmostEqual(results['All_rq'], 1.0)
self.assertAlmostEqual(results['All_sq'], (28 / 30 + 6 / 8) / 2)
self.assertEqual(results['All_num_categories'], 1)
bad_det_instance_mask = np.array(
[
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 2, 2, 1],
[1, 1, 1, 2, 2, 1],
[1, 1, 1, 2, 2, 1],
[1, 1, 1, 1, 1, 1],
],
dtype=np.uint16)
predictions['instance_mask'] = bad_det_instance_mask
pq_metric.reset()
pq_metric.compare_and_accumulate(groundtruths, predictions)
# iou(1, 1) = 27/32
np.testing.assert_array_almost_equal(pq_metric.iou_per_class, [27 / 32])
np.testing.assert_array_equal(pq_metric.tp_per_class, [1])
np.testing.assert_array_equal(pq_metric.fn_per_class, [1])
np.testing.assert_array_equal(pq_metric.fp_per_class, [1])
results = pq_metric.result()
np.testing.assert_array_equal(results['pq_per_class'], [27 / 32 / 2])
np.testing.assert_array_equal(results['rq_per_class'], [0.5])
np.testing.assert_array_equal(results['sq_per_class'], [27 / 32])
self.assertAlmostEqual(results['All_pq'], 27 / 32 / 2)
self.assertAlmostEqual(results['All_rq'], 0.5)
self.assertAlmostEqual(results['All_sq'], 27 / 32)
self.assertEqual(results['All_num_categories'], 1)
def test_wrong_instances(self):
category_mask = np.array([
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 2, 2, 1, 2, 2],
[1, 2, 2, 1, 2, 2],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
],
dtype=np.uint16)
groundtruth_instance_mask = np.zeros([6, 6], dtype=np.uint16)
predicted_instance_mask = np.array([
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 1, 1],
[0, 0, 0, 0, 1, 1],
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
],
dtype=np.uint16)
groundtruths = {
'category_mask': category_mask,
'instance_mask': groundtruth_instance_mask
}
predictions = {
'category_mask': category_mask,
'instance_mask': predicted_instance_mask
}
pq_metric = panoptic_quality.PanopticQuality(
num_categories=3,
ignored_label=0,
max_instances_per_category=10,
offset=100)
pq_metric.compare_and_accumulate(groundtruths, predictions)
np.testing.assert_array_equal(pq_metric.iou_per_class, [0.0, 1.0, 0.0])
np.testing.assert_array_equal(pq_metric.tp_per_class, [0, 1, 0])
np.testing.assert_array_equal(pq_metric.fn_per_class, [0, 0, 1])
np.testing.assert_array_equal(pq_metric.fp_per_class, [0, 0, 2])
results = pq_metric.result()
np.testing.assert_array_equal(results['pq_per_class'], [0.0, 1.0, 0.0])
np.testing.assert_array_equal(results['rq_per_class'], [0.0, 1.0, 0.0])
np.testing.assert_array_equal(results['sq_per_class'], [0.0, 1.0, 0.0])
self.assertAlmostEqual(results['All_pq'], 0.5)
self.assertAlmostEqual(results['All_rq'], 0.5)
self.assertAlmostEqual(results['All_sq'], 0.5)
self.assertEqual(results['All_num_categories'], 2)
def test_instance_order_is_arbitrary(self):
category_mask = np.array([
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
[1, 2, 2, 1, 2, 2],
[1, 2, 2, 1, 2, 2],
[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1],
],
dtype=np.uint16)
groundtruth_instance_mask = np.array([
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
[0, 1, 1, 0, 0, 0],
[0, 1, 1, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
],
dtype=np.uint16)
predicted_instance_mask = np.array([
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 1, 1],
[0, 0, 0, 0, 1, 1],
[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0],
],
dtype=np.uint16)
groundtruths = {
'category_mask': category_mask,
'instance_mask': groundtruth_instance_mask
}
predictions = {
'category_mask': category_mask,
'instance_mask': predicted_instance_mask
}
pq_metric = panoptic_quality.PanopticQuality(
num_categories=3,
ignored_label=0,
max_instances_per_category=10,
offset=100)
pq_metric.compare_and_accumulate(groundtruths, predictions)
np.testing.assert_array_equal(pq_metric.iou_per_class, [0.0, 1.0, 2.0])
np.testing.assert_array_equal(pq_metric.tp_per_class, [0, 1, 2])
np.testing.assert_array_equal(pq_metric.fn_per_class, [0, 0, 0])
np.testing.assert_array_equal(pq_metric.fp_per_class, [0, 0, 0])
results = pq_metric.result()
np.testing.assert_array_equal(results['pq_per_class'], [0.0, 1.0, 1.0])
np.testing.assert_array_equal(results['rq_per_class'], [0.0, 1.0, 1.0])
np.testing.assert_array_equal(results['sq_per_class'], [0.0, 1.0, 1.0])
self.assertAlmostEqual(results['All_pq'], 1.0)
self.assertAlmostEqual(results['All_rq'], 1.0)
self.assertAlmostEqual(results['All_sq'], 1.0)
self.assertEqual(results['All_num_categories'], 2)
if __name__ == '__main__':
absltest.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Metrics for segmentation."""
import tensorflow as tf
from official.vision.beta.evaluation import iou
class MeanIoU(tf.keras.metrics.MeanIoU):
"""Mean IoU metric for semantic segmentation.
This class utilizes tf.keras.metrics.MeanIoU to perform batched mean iou when
both input images and groundtruth masks are resized to the same size
(rescale_predictions=False). It also computes mean iou on groundtruth original
sizes, in which case, each prediction is rescaled back to the original image
size.
"""
def __init__(
self, num_classes, rescale_predictions=False, name=None, dtype=None):
"""Constructs Segmentation evaluator class.
Args:
num_classes: `int`, number of classes.
rescale_predictions: `bool`, whether to scale back prediction to original
image sizes. If True, y_true['image_info'] is used to rescale
predictions.
name: `str`, name of the metric instance..
dtype: data type of the metric result.
"""
self._rescale_predictions = rescale_predictions
super().__init__(num_classes=num_classes, name=name, dtype=dtype)
def update_state(self, y_true, y_pred):
"""Updates metric state.
Args:
y_true: `dict`, dictionary with the following name, and key values.
- masks: [batch, width, height, 1], groundtruth masks.
- valid_masks: [batch, width, height, 1], valid elements in the mask.
- image_info: [batch, 4, 2], a tensor that holds information about
original and preprocessed images. Each entry is in the format of
[[original_height, original_width], [input_height, input_width],
[y_scale, x_scale], [y_offset, x_offset]], where [desired_height,
desired_width] is the actual scaled image size, and [y_scale, x_scale]
is the scaling factor, which is the ratio of scaled dimension /
original dimension.
y_pred: Tensor [batch, width_p, height_p, num_classes], predicated masks.
"""
predictions = y_pred
masks = y_true['masks']
valid_masks = y_true['valid_masks']
images_info = y_true['image_info']
if isinstance(predictions, tuple) or isinstance(predictions, list):
predictions = tf.concat(predictions, axis=0)
masks = tf.concat(masks, axis=0)
valid_masks = tf.concat(valid_masks, axis=0)
images_info = tf.concat(images_info, axis=0)
# Ignore mask elements is set to zero for argmax op.
masks = tf.where(valid_masks, masks, tf.zeros_like(masks))
if self._rescale_predictions:
# This part can only run on cpu/gpu due to dynamic image resizing.
for i in range(tf.shape(predictions)[0]):
mask = masks[i]
valid_mask = valid_masks[i]
predicted_mask = predictions[i]
image_info = images_info[i]
rescale_size = tf.cast(
tf.math.ceil(image_info[1, :] / image_info[2, :]), tf.int32)
image_shape = tf.cast(image_info[0, :], tf.int32)
offsets = tf.cast(image_info[3, :], tf.int32)
predicted_mask = tf.image.resize(
predicted_mask,
rescale_size,
method=tf.image.ResizeMethod.BILINEAR)
predicted_mask = tf.image.crop_to_bounding_box(predicted_mask,
offsets[0], offsets[1],
image_shape[0],
image_shape[1])
mask = tf.image.crop_to_bounding_box(mask, 0, 0, image_shape[0],
image_shape[1])
valid_mask = tf.image.crop_to_bounding_box(valid_mask, 0, 0,
image_shape[0],
image_shape[1])
predicted_mask = tf.argmax(predicted_mask, axis=2)
flatten_predictions = tf.reshape(predicted_mask, shape=[1, -1])
flatten_masks = tf.reshape(mask, shape=[1, -1])
flatten_valid_masks = tf.reshape(valid_mask, shape=[1, -1])
super(MeanIoU, self).update_state(
flatten_masks, flatten_predictions,
tf.cast(flatten_valid_masks, tf.float32))
else:
predictions = tf.image.resize(
predictions,
tf.shape(masks)[1:3],
method=tf.image.ResizeMethod.BILINEAR)
predictions = tf.argmax(predictions, axis=3)
flatten_predictions = tf.reshape(predictions, shape=[-1])
flatten_masks = tf.reshape(masks, shape=[-1])
flatten_valid_masks = tf.reshape(valid_masks, shape=[-1])
super().update_state(flatten_masks, flatten_predictions,
tf.cast(flatten_valid_masks, tf.float32))
class PerClassIoU(iou.PerClassIoU):
"""Per Class IoU metric for semantic segmentation.
This class utilizes iou.PerClassIoU to perform batched per class
iou when both input images and groundtruth masks are resized to the same size
(rescale_predictions=False). It also computes per class iou on groundtruth
original sizes, in which case, each prediction is rescaled back to the
original image size.
"""
def __init__(
self, num_classes, rescale_predictions=False, name=None, dtype=None):
"""Constructs Segmentation evaluator class.
Args:
num_classes: `int`, number of classes.
rescale_predictions: `bool`, whether to scale back prediction to original
image sizes. If True, y_true['image_info'] is used to rescale
predictions.
name: `str`, name of the metric instance..
dtype: data type of the metric result.
"""
self._rescale_predictions = rescale_predictions
super().__init__(num_classes=num_classes, name=name, dtype=dtype)
def update_state(self, y_true, y_pred):
"""Updates metric state.
Args:
y_true: `dict`, dictionary with the following name, and key values.
- masks: [batch, width, height, 1], groundtruth masks.
- valid_masks: [batch, width, height, 1], valid elements in the mask.
- image_info: [batch, 4, 2], a tensor that holds information about
original and preprocessed images. Each entry is in the format of
[[original_height, original_width], [input_height, input_width],
[y_scale, x_scale], [y_offset, x_offset]], where [desired_height,
desired_width] is the actual scaled image size, and [y_scale, x_scale]
is the scaling factor, which is the ratio of scaled dimension /
original dimension.
y_pred: Tensor [batch, width_p, height_p, num_classes], predicated masks.
"""
predictions = y_pred
masks = y_true['masks']
valid_masks = y_true['valid_masks']
images_info = y_true['image_info']
if isinstance(predictions, tuple) or isinstance(predictions, list):
predictions = tf.concat(predictions, axis=0)
masks = tf.concat(masks, axis=0)
valid_masks = tf.concat(valid_masks, axis=0)
images_info = tf.concat(images_info, axis=0)
# Ignore mask elements is set to zero for argmax op.
masks = tf.where(valid_masks, masks, tf.zeros_like(masks))
if self._rescale_predictions:
# This part can only run on cpu/gpu due to dynamic image resizing.
for i in range(tf.shape(predictions)[0]):
mask = masks[i]
valid_mask = valid_masks[i]
predicted_mask = predictions[i]
image_info = images_info[i]
rescale_size = tf.cast(
tf.math.ceil(image_info[1, :] / image_info[2, :]), tf.int32)
image_shape = tf.cast(image_info[0, :], tf.int32)
offsets = tf.cast(image_info[3, :], tf.int32)
predicted_mask = tf.image.resize(
predicted_mask,
rescale_size,
method=tf.image.ResizeMethod.BILINEAR)
predicted_mask = tf.image.crop_to_bounding_box(predicted_mask,
offsets[0], offsets[1],
image_shape[0],
image_shape[1])
mask = tf.image.crop_to_bounding_box(mask, 0, 0, image_shape[0],
image_shape[1])
valid_mask = tf.image.crop_to_bounding_box(valid_mask, 0, 0,
image_shape[0],
image_shape[1])
predicted_mask = tf.argmax(predicted_mask, axis=2)
flatten_predictions = tf.reshape(predicted_mask, shape=[1, -1])
flatten_masks = tf.reshape(mask, shape=[1, -1])
flatten_valid_masks = tf.reshape(valid_mask, shape=[1, -1])
super().update_state(flatten_masks, flatten_predictions,
tf.cast(flatten_valid_masks, tf.float32))
else:
predictions = tf.image.resize(
predictions,
tf.shape(masks)[1:3],
method=tf.image.ResizeMethod.BILINEAR)
predictions = tf.argmax(predictions, axis=3)
flatten_predictions = tf.reshape(predictions, shape=[-1])
flatten_masks = tf.reshape(masks, shape=[-1])
flatten_valid_masks = tf.reshape(valid_masks, shape=[-1])
super().update_state(flatten_masks, flatten_predictions,
tf.cast(flatten_valid_masks, tf.float32))
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for segmentation_metrics."""
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from official.vision.beta.evaluation import segmentation_metrics
class SegmentationMetricsTest(parameterized.TestCase, tf.test.TestCase):
def _create_test_data(self):
y_pred_cls0 = np.expand_dims(
np.array([[1, 1, 0], [1, 1, 0], [0, 0, 0]], dtype=np.uint16),
axis=(0, -1))
y_pred_cls1 = np.expand_dims(
np.array([[0, 0, 0], [0, 0, 1], [0, 0, 1]], dtype=np.uint16),
axis=(0, -1))
y_pred = np.concatenate((y_pred_cls0, y_pred_cls1), axis=-1)
y_true = {
'masks':
np.expand_dims(
np.array([[0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0], [0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1], [0, 0, 0, 1, 1, 1]],
dtype=np.uint16),
axis=(0, -1)),
'valid_masks':
np.ones([1, 6, 6, 1], dtype=np.uint16),
'image_info':
np.array([[[6, 6], [3, 3], [0.5, 0.5], [0, 0]]], dtype=np.float32)
}
return y_pred, y_true
@parameterized.parameters(True, False)
def test_mean_iou_metric(self, rescale_predictions):
tf.config.experimental_run_functions_eagerly(True)
mean_iou_metric = segmentation_metrics.MeanIoU(
num_classes=2, rescale_predictions=rescale_predictions)
y_pred, y_true = self._create_test_data()
# Disable autograph for correct coverage statistics.
update_fn = tf.autograph.experimental.do_not_convert(
mean_iou_metric.update_state)
update_fn(y_true=y_true, y_pred=y_pred)
miou = mean_iou_metric.result()
self.assertAlmostEqual(miou.numpy(), 0.762, places=3)
@parameterized.parameters(True, False)
def test_per_class_mean_iou_metric(self, rescale_predictions):
per_class_iou_metric = segmentation_metrics.PerClassIoU(
num_classes=2, rescale_predictions=rescale_predictions)
y_pred, y_true = self._create_test_data()
# Disable autograph for correct coverage statistics.
update_fn = tf.autograph.experimental.do_not_convert(
per_class_iou_metric.update_state)
update_fn(y_true=y_true, y_pred=y_pred)
per_class_miou = per_class_iou_metric.result()
self.assertAllClose(per_class_miou.numpy(), [0.857, 0.667], atol=1e-3)
if __name__ == '__main__':
tf.test.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""2D detection evaluator for the Waymo Open Dataset."""
import pprint
from absl import logging
import tensorflow as tf
from official.vision.beta.ops import box_ops
from waymo_open_dataset import label_pb2
from waymo_open_dataset.metrics.python import wod_detection_evaluator
from waymo_open_dataset.protos import breakdown_pb2
from waymo_open_dataset.protos import metrics_pb2
def get_2d_detection_default_config():
"""Returns the config proto for WOD 2D detection Evaluation."""
config = metrics_pb2.Config()
config.breakdown_generator_ids.append(breakdown_pb2.Breakdown.OBJECT_TYPE)
difficulty = config.difficulties.add()
difficulty.levels.append(label_pb2.Label.LEVEL_1)
difficulty.levels.append(label_pb2.Label.LEVEL_2)
config.breakdown_generator_ids.append(breakdown_pb2.Breakdown.ALL_BUT_SIGN)
difficulty = config.difficulties.add()
difficulty.levels.append(label_pb2.Label.LEVEL_1)
difficulty.levels.append(label_pb2.Label.LEVEL_2)
config.matcher_type = metrics_pb2.MatcherProto.TYPE_HUNGARIAN
config.iou_thresholds.append(0.0)
config.iou_thresholds.append(0.7)
config.iou_thresholds.append(0.5)
config.iou_thresholds.append(0.5)
config.iou_thresholds.append(0.5)
config.box_type = label_pb2.Label.Box.TYPE_2D
for i in range(100):
config.score_cutoffs.append(i * 0.01)
config.score_cutoffs.append(1.0)
return config
class WOD2dDetectionEvaluator(wod_detection_evaluator.WODDetectionEvaluator):
"""WOD 2D detection evaluation metric class."""
def __init__(self, config=None):
if config is None:
config = get_2d_detection_default_config()
super().__init__(config=config)
def _remove_padding(self, tensor_dict, num_valid):
"""Remove the paddings of the prediction/groundtruth data."""
result_tensor_dict = {}
gather_indices = tf.range(num_valid)
for k, v in tensor_dict.items():
if 'frame_id' in k:
result_tensor_dict[k] = tf.tile([v], [num_valid])
else:
result_tensor_dict[k] = tf.gather(v, gather_indices)
return result_tensor_dict
def update_state(self, groundtruths, predictions):
"""Update the metrics state with prediction and groundtruth data.
Args:
groundtruths: a dictionary of Tensors including the fields below.
Required fields:
- source_id: a numpy array of int or string of shape [batch_size].
- num_detections: a numpy array of int of shape [batch_size].
- boxes: a numpy array of float of shape [batch_size, K, 4].
- classes: a numpy array of int of shape [batch_size, K].
- difficulties: a numpy array of int of shape [batch_size, K].
predictions: a dictionary of tensors including the fields below.
Required fields:
- source_id: a numpy array of int or string of shape [batch_size].
- image_info: a numpy array of float of shape [batch_size, 4, 2].
- num_detections: a numpy array of int of shape [batch_size].
- detection_boxes: a numpy array of float of shape [batch_size, K, 4].
- detection_classes: a numpy array of int of shape [batch_size, K].
- detection_scores: a numpy array of float of shape [batch_size, K].
"""
# Preprocess potentially aggregated tensors.
for k, v in groundtruths.items():
if isinstance(v, tuple):
groundtruths[k] = tf.concat(v, axis=0)
for k, v in predictions.items():
if isinstance(v, tuple):
predictions[k] = tf.concat(v, axis=0)
# Change cyclists' type id from 3 to 4, where 3 is reserved for sign.
groundtruth_type = tf.cast(groundtruths['classes'], tf.uint8)
groundtruth_type = tf.where(
tf.equal(groundtruth_type, 3),
tf.ones_like(groundtruth_type) * 4, groundtruth_type)
prediction_type = tf.cast(predictions['detection_classes'], tf.uint8)
prediction_type = tf.where(
tf.equal(prediction_type, 3),
tf.ones_like(prediction_type) * 4, prediction_type)
# Rescale the detection boxes back to original scale.
image_scale = tf.tile(predictions['image_info'][:, 2:3, :], (1, 1, 2))
prediction_bbox = predictions['detection_boxes'] / image_scale
batch_size = tf.shape(groundtruths['source_id'])[0]
for i in tf.range(batch_size):
frame_groundtruths = {
'ground_truth_frame_id':
groundtruths['source_id'][i],
'ground_truth_bbox':
box_ops.yxyx_to_cycxhw(
tf.cast(groundtruths['boxes'][i], tf.float32)),
'ground_truth_type':
groundtruth_type[i],
'ground_truth_difficulty':
tf.cast(groundtruths['difficulties'][i], tf.uint8),
}
frame_groundtruths = self._remove_padding(
frame_groundtruths, groundtruths['num_detections'][i])
frame_predictions = {
'prediction_frame_id':
groundtruths['source_id'][i],
'prediction_bbox':
box_ops.yxyx_to_cycxhw(
tf.cast(prediction_bbox[i], tf.float32)),
'prediction_type':
prediction_type[i],
'prediction_score':
tf.cast(predictions['detection_scores'][i], tf.float32),
'prediction_overlap_nlz':
tf.zeros_like(predictions['detection_scores'][i], dtype=tf.bool)
}
frame_predictions = self._remove_padding(frame_predictions,
predictions['num_detections'][i])
super().update_state(frame_groundtruths, frame_predictions)
def evaluate(self):
"""Compute the final metrics."""
ap, _, _, _, _ = super().evaluate()
metric_dict = {}
for i, name in enumerate(self._breakdown_names):
# Skip sign metrics in 2d detection task.
if 'SIGN' in name:
continue
metric_dict['WOD metrics/{}/AP'.format(name)] = ap[i]
pp = pprint.PrettyPrinter()
logging.info('WOD Detection Metrics: \n %s', pp.pformat(metric_dict))
return metric_dict
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Losses used for detection models."""
import tensorflow as tf
class FocalLoss(tf.keras.losses.Loss):
"""Implements a Focal loss for classification problems.
Reference:
[Focal Loss for Dense Object Detection](https://arxiv.org/abs/1708.02002).
"""
def __init__(self,
alpha,
gamma,
reduction=tf.keras.losses.Reduction.AUTO,
name=None):
"""Initializes `FocalLoss`.
Args:
alpha: The `alpha` weight factor for binary class imbalance.
gamma: The `gamma` focusing parameter to re-weight loss.
reduction: (Optional) Type of `tf.keras.losses.Reduction` to apply to
loss. Default value is `AUTO`. `AUTO` indicates that the reduction
option will be determined by the usage context. For almost all cases
this defaults to `SUM_OVER_BATCH_SIZE`. When used with
`tf.distribute.Strategy`, outside of built-in training loops such as
`tf.keras` `compile` and `fit`, using `AUTO` or `SUM_OVER_BATCH_SIZE`
will raise an error. Please see this custom training [tutorial](
https://www.tensorflow.org/tutorials/distribute/custom_training) for
more details.
name: Optional name for the op. Defaults to 'retinanet_class_loss'.
"""
self._alpha = alpha
self._gamma = gamma
super(FocalLoss, self).__init__(reduction=reduction, name=name)
def call(self, y_true, y_pred):
"""Invokes the `FocalLoss`.
Args:
y_true: A tensor of size [batch, num_anchors, num_classes]
y_pred: A tensor of size [batch, num_anchors, num_classes]
Returns:
Summed loss float `Tensor`.
"""
with tf.name_scope('focal_loss'):
y_true = tf.cast(y_true, dtype=tf.float32)
y_pred = tf.cast(y_pred, dtype=tf.float32)
positive_label_mask = tf.equal(y_true, 1.0)
cross_entropy = (
tf.nn.sigmoid_cross_entropy_with_logits(labels=y_true, logits=y_pred))
probs = tf.sigmoid(y_pred)
probs_gt = tf.where(positive_label_mask, probs, 1.0 - probs)
# With small gamma, the implementation could produce NaN during back prop.
modulator = tf.pow(1.0 - probs_gt, self._gamma)
loss = modulator * cross_entropy
weighted_loss = tf.where(positive_label_mask, self._alpha * loss,
(1.0 - self._alpha) * loss)
return weighted_loss
def get_config(self):
config = {
'alpha': self._alpha,
'gamma': self._gamma,
}
base_config = super(FocalLoss, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Losses utilities for detection models."""
import tensorflow as tf
def multi_level_flatten(multi_level_inputs, last_dim=None):
"""Flattens a multi-level input.
Args:
multi_level_inputs: Ordered Dict with level to [batch, d1, ..., dm].
last_dim: Whether the output should be [batch_size, None], or [batch_size,
None, last_dim]. Defaults to `None`.
Returns:
Concatenated output [batch_size, None], or [batch_size, None, dm]
"""
flattened_inputs = []
batch_size = None
for level in multi_level_inputs.keys():
single_input = multi_level_inputs[level]
if batch_size is None:
batch_size = single_input.shape[0] or tf.shape(single_input)[0]
if last_dim is not None:
flattened_input = tf.reshape(single_input, [batch_size, -1, last_dim])
else:
flattened_input = tf.reshape(single_input, [batch_size, -1])
flattened_inputs.append(flattened_input)
return tf.concat(flattened_inputs, axis=1)
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Losses for maskrcn model."""
# Import libraries
import tensorflow as tf
class RpnScoreLoss(object):
"""Region Proposal Network score loss function."""
def __init__(self, rpn_batch_size_per_im):
self._rpn_batch_size_per_im = rpn_batch_size_per_im
self._binary_crossentropy = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.SUM, from_logits=True)
def __call__(self, score_outputs, labels):
"""Computes total RPN detection loss.
Computes total RPN detection loss including box and score from all levels.
Args:
score_outputs: an OrderDict with keys representing levels and values
representing scores in [batch_size, height, width, num_anchors].
labels: the dictionary that returned from dataloader that includes
groundturth targets.
Returns:
rpn_score_loss: a scalar tensor representing total score loss.
"""
with tf.name_scope('rpn_loss'):
levels = sorted(score_outputs.keys())
score_losses = []
for level in levels:
score_losses.append(
self._rpn_score_loss(
score_outputs[level],
labels[level],
normalizer=tf.cast(
tf.shape(score_outputs[level])[0] *
self._rpn_batch_size_per_im,
dtype=tf.float32)))
# Sums per level losses to total loss.
return tf.math.add_n(score_losses)
def _rpn_score_loss(self, score_outputs, score_targets, normalizer=1.0):
"""Computes score loss."""
# score_targets has three values:
# (1) score_targets[i]=1, the anchor is a positive sample.
# (2) score_targets[i]=0, negative.
# (3) score_targets[i]=-1, the anchor is don't care (ignore).
with tf.name_scope('rpn_score_loss'):
mask = tf.math.logical_or(tf.math.equal(score_targets, 1),
tf.math.equal(score_targets, 0))
score_targets = tf.math.maximum(score_targets,
tf.zeros_like(score_targets))
score_targets = tf.expand_dims(score_targets, axis=-1)
score_outputs = tf.expand_dims(score_outputs, axis=-1)
score_loss = self._binary_crossentropy(
score_targets, score_outputs, sample_weight=mask)
score_loss /= normalizer
return score_loss
class RpnBoxLoss(object):
"""Region Proposal Network box regression loss function."""
def __init__(self, huber_loss_delta: float):
# The delta is typically around the mean value of regression target.
# for instances, the regression targets of 512x512 input with 6 anchors on
# P2-P6 pyramid is about [0.1, 0.1, 0.2, 0.2].
self._huber_loss = tf.keras.losses.Huber(
delta=huber_loss_delta, reduction=tf.keras.losses.Reduction.SUM)
def __call__(self, box_outputs, labels):
"""Computes total RPN detection loss.
Computes total RPN detection loss including box and score from all levels.
Args:
box_outputs: an OrderDict with keys representing levels and values
representing box regression targets in
[batch_size, height, width, num_anchors * 4].
labels: the dictionary that returned from dataloader that includes
groundturth targets.
Returns:
rpn_box_loss: a scalar tensor representing total box regression loss.
"""
with tf.name_scope('rpn_loss'):
levels = sorted(box_outputs.keys())
box_losses = []
for level in levels:
box_losses.append(self._rpn_box_loss(box_outputs[level], labels[level]))
# Sum per level losses to total loss.
return tf.add_n(box_losses)
def _rpn_box_loss(self, box_outputs, box_targets, normalizer=1.0):
"""Computes box regression loss."""
with tf.name_scope('rpn_box_loss'):
mask = tf.cast(tf.not_equal(box_targets, 0.0), dtype=tf.float32)
box_targets = tf.expand_dims(box_targets, axis=-1)
box_outputs = tf.expand_dims(box_outputs, axis=-1)
box_loss = self._huber_loss(box_targets, box_outputs, sample_weight=mask)
# The loss is normalized by the sum of non-zero weights and additional
# normalizer provided by the function caller. Using + 0.01 here to avoid
# division by zero.
box_loss /= normalizer * (tf.reduce_sum(mask) + 0.01)
return box_loss
class FastrcnnClassLoss(object):
"""Fast R-CNN classification loss function."""
def __init__(self):
self._categorical_crossentropy = tf.keras.losses.CategoricalCrossentropy(
reduction=tf.keras.losses.Reduction.SUM, from_logits=True)
def __call__(self, class_outputs, class_targets):
"""Computes the class loss (Fast-RCNN branch) of Mask-RCNN.
This function implements the classification loss of the Fast-RCNN.
The classification loss is softmax on all RoIs.
Reference: https://github.com/facebookresearch/Detectron/blob/master/detectron/modeling/fast_rcnn_heads.py # pylint: disable=line-too-long
Args:
class_outputs: a float tensor representing the class prediction for each box
with a shape of [batch_size, num_boxes, num_classes].
class_targets: a float tensor representing the class label for each box
with a shape of [batch_size, num_boxes].
Returns:
a scalar tensor representing total class loss.
"""
with tf.name_scope('fast_rcnn_loss'):
batch_size, num_boxes, num_classes = class_outputs.get_shape().as_list()
class_targets = tf.cast(class_targets, dtype=tf.int32)
class_targets_one_hot = tf.one_hot(class_targets, num_classes)
return self._fast_rcnn_class_loss(class_outputs, class_targets_one_hot,
normalizer=batch_size * num_boxes)
def _fast_rcnn_class_loss(self, class_outputs, class_targets_one_hot,
normalizer=1.0):
"""Computes classification loss."""
with tf.name_scope('fast_rcnn_class_loss'):
class_loss = self._categorical_crossentropy(class_targets_one_hot,
class_outputs)
class_loss /= normalizer
return class_loss
class FastrcnnBoxLoss(object):
"""Fast R-CNN box regression loss function."""
def __init__(self,
huber_loss_delta: float,
class_agnostic_bbox_pred: bool = False):
"""Initiate Faster RCNN box loss.
Args:
huber_loss_delta: the delta is typically around the mean value of
regression target. for instances, the regression targets of 512x512
input with 6 anchors on P2-P6 pyramid is about [0.1, 0.1, 0.2, 0.2].
class_agnostic_bbox_pred: if True, class agnostic bounding box prediction
is performed.
"""
self._huber_loss = tf.keras.losses.Huber(
delta=huber_loss_delta, reduction=tf.keras.losses.Reduction.SUM)
self._class_agnostic_bbox_pred = class_agnostic_bbox_pred
def __call__(self, box_outputs, class_targets, box_targets):
"""Computes the box loss (Fast-RCNN branch) of Mask-RCNN.
This function implements the box regression loss of the Fast-RCNN. As the
`box_outputs` produces `num_classes` boxes for each RoI, the reference model
expands `box_targets` to match the shape of `box_outputs` and selects only
the target that the RoI has a maximum overlap. (Reference: https://github.com/facebookresearch/Detectron/blob/master/detectron/roi_data/fast_rcnn.py) # pylint: disable=line-too-long
Instead, this function selects the `box_outputs` by the `class_targets` so
that it doesn't expand `box_targets`.
The box loss is smooth L1-loss on only positive samples of RoIs.
Reference: https://github.com/facebookresearch/Detectron/blob/master/detectron/modeling/fast_rcnn_heads.py # pylint: disable=line-too-long
Args:
box_outputs: a float tensor representing the box prediction for each box
with a shape of [batch_size, num_boxes, num_classes * 4].
class_targets: a float tensor representing the class label for each box
with a shape of [batch_size, num_boxes].
box_targets: a float tensor representing the box label for each box
with a shape of [batch_size, num_boxes, 4].
Returns:
box_loss: a scalar tensor representing total box regression loss.
"""
with tf.name_scope('fast_rcnn_loss'):
class_targets = tf.cast(class_targets, dtype=tf.int32)
if not self._class_agnostic_bbox_pred:
box_outputs = self._assign_class_targets(box_outputs, class_targets)
return self._fast_rcnn_box_loss(box_outputs, box_targets, class_targets)
def _assign_class_targets(self, box_outputs, class_targets):
"""Selects the box from `box_outputs` based on `class_targets`, with which the box has the maximum overlap."""
(batch_size, num_rois,
num_class_specific_boxes) = box_outputs.get_shape().as_list()
num_classes = num_class_specific_boxes // 4
box_outputs = tf.reshape(box_outputs,
[batch_size, num_rois, num_classes, 4])
box_indices = tf.reshape(
class_targets + tf.tile(
tf.expand_dims(tf.range(batch_size) * num_rois * num_classes, 1),
[1, num_rois]) + tf.tile(
tf.expand_dims(tf.range(num_rois) * num_classes, 0),
[batch_size, 1]), [-1])
box_outputs = tf.matmul(
tf.one_hot(
box_indices,
batch_size * num_rois * num_classes,
dtype=box_outputs.dtype), tf.reshape(box_outputs, [-1, 4]))
box_outputs = tf.reshape(box_outputs, [batch_size, -1, 4])
return box_outputs
def _fast_rcnn_box_loss(self, box_outputs, box_targets, class_targets,
normalizer=1.0):
"""Computes box regression loss."""
with tf.name_scope('fast_rcnn_box_loss'):
mask = tf.tile(tf.expand_dims(tf.greater(class_targets, 0), axis=2),
[1, 1, 4])
mask = tf.cast(mask, dtype=tf.float32)
box_targets = tf.expand_dims(box_targets, axis=-1)
box_outputs = tf.expand_dims(box_outputs, axis=-1)
box_loss = self._huber_loss(box_targets, box_outputs, sample_weight=mask)
# The loss is normalized by the number of ones in mask,
# additianal normalizer provided by the user and using 0.01 here to avoid
# division by 0.
box_loss /= normalizer * (tf.reduce_sum(mask) + 0.01)
return box_loss
class MaskrcnnLoss(object):
"""Mask R-CNN instance segmentation mask loss function."""
def __init__(self):
self._binary_crossentropy = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.SUM, from_logits=True)
def __call__(self, mask_outputs, mask_targets, select_class_targets):
"""Computes the mask loss of Mask-RCNN.
This function implements the mask loss of Mask-RCNN. As the `mask_outputs`
produces `num_classes` masks for each RoI, the reference model expands
`mask_targets` to match the shape of `mask_outputs` and selects only the
target that the RoI has a maximum overlap. (Reference: https://github.com/facebookresearch/Detectron/blob/master/detectron/roi_data/mask_rcnn.py) # pylint: disable=line-too-long
Instead, this implementation selects the `mask_outputs` by the `class_targets`
so that it doesn't expand `mask_targets`. Note that the selection logic is
done in the post-processing of mask_rcnn_fn in mask_rcnn_architecture.py.
Args:
mask_outputs: a float tensor representing the prediction for each mask,
with a shape of
[batch_size, num_masks, mask_height, mask_width].
mask_targets: a float tensor representing the binary mask of ground truth
labels for each mask with a shape of
[batch_size, num_masks, mask_height, mask_width].
select_class_targets: a tensor with a shape of [batch_size, num_masks],
representing the foreground mask targets.
Returns:
mask_loss: a float tensor representing total mask loss.
"""
with tf.name_scope('mask_rcnn_loss'):
(batch_size, num_masks, mask_height,
mask_width) = mask_outputs.get_shape().as_list()
weights = tf.tile(
tf.reshape(tf.greater(select_class_targets, 0),
[batch_size, num_masks, 1, 1]),
[1, 1, mask_height, mask_width])
weights = tf.cast(weights, dtype=tf.float32)
mask_targets = tf.expand_dims(mask_targets, axis=-1)
mask_outputs = tf.expand_dims(mask_outputs, axis=-1)
mask_loss = self._binary_crossentropy(mask_targets, mask_outputs,
sample_weight=weights)
# The loss is normalized by the number of 1's in weights and
# + 0.01 is used to avoid division by zero.
return mask_loss / (tf.reduce_sum(weights) + 0.01)
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Losses used for detection models."""
# Import libraries
import tensorflow as tf
def focal_loss(logits, targets, alpha, gamma):
"""Compute the focal loss between `logits` and the golden `target` values.
Focal loss = -(1-pt)^gamma * log(pt)
where pt is the probability of being classified to the true class.
Args:
logits: A float32 tensor of size
[batch, d_1, ..., d_k, n_classes].
targets: A float32 tensor of size
[batch, d_1, ..., d_k, n_classes].
alpha: A float32 scalar multiplying alpha to the loss from positive examples
and (1-alpha) to the loss from negative examples.
gamma: A float32 scalar modulating loss from hard and easy examples.
Returns:
loss: A float32 Tensor of size
[batch, d_1, ..., d_k, n_classes] representing
normalized loss on the prediction map.
"""
with tf.name_scope('focal_loss'):
positive_label_mask = tf.equal(targets, 1.0)
cross_entropy = (
tf.nn.sigmoid_cross_entropy_with_logits(labels=targets, logits=logits))
probs = tf.sigmoid(logits)
probs_gt = tf.where(positive_label_mask, probs, 1.0 - probs)
# With small gamma, the implementation could produce NaN during back prop.
modulator = tf.pow(1.0 - probs_gt, gamma)
loss = modulator * cross_entropy
weighted_loss = tf.where(positive_label_mask, alpha * loss,
(1.0 - alpha) * loss)
return weighted_loss
class FocalLoss(tf.keras.losses.Loss):
"""Implements a Focal loss for classification problems.
Reference:
[Focal Loss for Dense Object Detection](https://arxiv.org/abs/1708.02002).
"""
def __init__(self,
alpha,
gamma,
num_classes,
reduction=tf.keras.losses.Reduction.AUTO,
name=None):
"""Initializes `FocalLoss`.
Args:
alpha: The `alpha` weight factor for binary class imbalance.
gamma: The `gamma` focusing parameter to re-weight loss.
num_classes: Number of foreground classes.
reduction: (Optional) Type of `tf.keras.losses.Reduction` to apply to
loss. Default value is `AUTO`. `AUTO` indicates that the reduction
option will be determined by the usage context. For almost all cases
this defaults to `SUM_OVER_BATCH_SIZE`. When used with
`tf.distribute.Strategy`, outside of built-in training loops such as
`tf.keras` `compile` and `fit`, using `AUTO` or `SUM_OVER_BATCH_SIZE`
will raise an error. Please see this custom training [tutorial](
https://www.tensorflow.org/tutorials/distribute/custom_training) for
more details.
name: Optional name for the op. Defaults to 'retinanet_class_loss'.
"""
self._num_classes = num_classes
self._alpha = alpha
self._gamma = gamma
super(FocalLoss, self).__init__(reduction=reduction, name=name)
def call(self, y_true, y_pred):
"""Invokes the `FocalLoss`.
Args:
y_true: Ordered Dict with level to [batch, height, width, num_anchors].
for example,
{3: tf.Tensor(shape=[32, 512, 512, 9], dtype=tf.float32),
4: tf.Tensor([shape=32, 256, 256, 9, dtype=tf.float32])}
y_pred: Ordered Dict with level to [batch, height, width, num_anchors *
num_classes]. for example,
{3: tf.Tensor(shape=[32, 512, 512, 9], dtype=tf.int64),
4: tf.Tensor(shape=[32, 256, 256, 9 * 21], dtype=tf.int64)}
Returns:
Summed loss float `Tensor`.
"""
flattened_cls_outputs = []
flattened_labels = []
batch_size = None
for level in y_pred.keys():
cls_output = y_pred[level]
label = y_true[level]
if batch_size is None:
batch_size = cls_output.shape[0] or tf.shape(cls_output)[0]
flattened_cls_outputs.append(
tf.reshape(cls_output, [batch_size, -1, self._num_classes]))
flattened_labels.append(tf.reshape(label, [batch_size, -1]))
cls_outputs = tf.concat(flattened_cls_outputs, axis=1)
labels = tf.concat(flattened_labels, axis=1)
cls_targets_one_hot = tf.one_hot(labels, self._num_classes)
return focal_loss(
tf.cast(cls_outputs, dtype=tf.float32),
tf.cast(cls_targets_one_hot, dtype=tf.float32), self._alpha,
self._gamma)
def get_config(self):
config = {
'alpha': self._alpha,
'gamma': self._gamma,
'num_classes': self._num_classes,
}
base_config = super(FocalLoss, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
class RetinanetBoxLoss(tf.keras.losses.Loss):
"""RetinaNet box Huber loss."""
def __init__(self,
delta,
reduction=tf.keras.losses.Reduction.AUTO,
name=None):
"""Initializes `RetinanetBoxLoss`.
Args:
delta: A float, the point where the Huber loss function changes from a
quadratic to linear.
reduction: (Optional) Type of `tf.keras.losses.Reduction` to apply to
loss. Default value is `AUTO`. `AUTO` indicates that the reduction
option will be determined by the usage context. For almost all cases
this defaults to `SUM_OVER_BATCH_SIZE`. When used with
`tf.distribute.Strategy`, outside of built-in training loops such as
`tf.keras` `compile` and `fit`, using `AUTO` or `SUM_OVER_BATCH_SIZE`
will raise an error. Please see this custom training [tutorial](
https://www.tensorflow.org/tutorials/distribute/custom_training) for
more details.
name: Optional name for the op. Defaults to 'retinanet_class_loss'.
"""
self._huber_loss = tf.keras.losses.Huber(
delta=delta, reduction=tf.keras.losses.Reduction.NONE)
self._delta = delta
super(RetinanetBoxLoss, self).__init__(reduction=reduction, name=name)
def call(self, y_true, y_pred):
"""Computes box detection loss.
Computes total detection loss including box and class loss from all levels.
Args:
y_true: Ordered Dict with level to [batch, height, width,
num_anchors * 4] for example,
{3: tf.Tensor(shape=[32, 512, 512, 9 * 4], dtype=tf.float32),
4: tf.Tensor([shape=32, 256, 256, 9 * 4, dtype=tf.float32])}
y_pred: Ordered Dict with level to [batch, height, width,
num_anchors * 4]. for example,
{3: tf.Tensor(shape=[32, 512, 512, 9 * 4], dtype=tf.int64),
4: tf.Tensor(shape=[32, 256, 256, 9 * 4], dtype=tf.int64)}
Returns:
an integer tensor representing total box regression loss.
"""
# Sums all positives in a batch for normalization and avoids zero
# num_positives_sum, which would lead to inf loss during training
flattened_box_outputs = []
flattened_labels = []
batch_size = None
for level in y_pred.keys():
box_output = y_pred[level]
label = y_true[level]
if batch_size is None:
batch_size = box_output.shape[0] or tf.shape(box_output)[0]
flattened_box_outputs.append(tf.reshape(box_output, [batch_size, -1, 4]))
flattened_labels.append(tf.reshape(label, [batch_size, -1, 4]))
box_outputs = tf.concat(flattened_box_outputs, axis=1)
labels = tf.concat(flattened_labels, axis=1)
loss = self._huber_loss(labels, box_outputs)
return loss
def get_config(self):
config = {
'delta': self._delta,
}
base_config = super(RetinanetBoxLoss, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Losses used for segmentation models."""
# Import libraries
import tensorflow as tf
from official.modeling import tf_utils
EPSILON = 1e-5
class SegmentationLoss:
"""Semantic segmentation loss."""
def __init__(self, label_smoothing, class_weights, ignore_label,
use_groundtruth_dimension, top_k_percent_pixels=1.0):
self._top_k_percent_pixels = top_k_percent_pixels
self._class_weights = class_weights
self._ignore_label = ignore_label
self._use_groundtruth_dimension = use_groundtruth_dimension
self._label_smoothing = label_smoothing
def __call__(self, logits, labels):
_, height, width, num_classes = logits.get_shape().as_list()
if self._use_groundtruth_dimension:
# TODO(arashwan): Test using align corners to match deeplab alignment.
logits = tf.image.resize(
logits, tf.shape(labels)[1:3],
method=tf.image.ResizeMethod.BILINEAR)
else:
labels = tf.image.resize(
labels, (height, width),
method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
valid_mask = tf.not_equal(labels, self._ignore_label)
normalizer = tf.reduce_sum(tf.cast(valid_mask, tf.float32)) + EPSILON
# Assign pixel with ignore label to class 0 (background). The loss on the
# pixel will later be masked out.
labels = tf.where(valid_mask, labels, tf.zeros_like(labels))
labels = tf.squeeze(tf.cast(labels, tf.int32), axis=3)
valid_mask = tf.squeeze(tf.cast(valid_mask, tf.float32), axis=3)
onehot_labels = tf.one_hot(labels, num_classes)
onehot_labels = onehot_labels * (
1 - self._label_smoothing) + self._label_smoothing / num_classes
cross_entropy_loss = tf.nn.softmax_cross_entropy_with_logits(
labels=onehot_labels, logits=logits)
if not self._class_weights:
class_weights = [1] * num_classes
else:
class_weights = self._class_weights
if num_classes != len(class_weights):
raise ValueError(
'Length of class_weights should be {}'.format(num_classes))
weight_mask = tf.einsum('...y,y->...',
tf.one_hot(labels, num_classes, dtype=tf.float32),
tf.constant(class_weights, tf.float32))
valid_mask *= weight_mask
cross_entropy_loss *= tf.cast(valid_mask, tf.float32)
if self._top_k_percent_pixels >= 1.0:
loss = tf.reduce_sum(cross_entropy_loss) / normalizer
else:
cross_entropy_loss = tf.reshape(cross_entropy_loss, shape=[-1])
top_k_pixels = tf.cast(
self._top_k_percent_pixels *
tf.cast(tf.size(cross_entropy_loss), tf.float32), tf.int32)
top_k_losses, _ = tf.math.top_k(
cross_entropy_loss, k=top_k_pixels, sorted=True)
normalizer = tf.reduce_sum(
tf.cast(tf.not_equal(top_k_losses, 0.0), tf.float32)) + EPSILON
loss = tf.reduce_sum(top_k_losses) / normalizer
return loss
def get_actual_mask_scores(logits, labels, ignore_label):
"""Gets actual mask scores."""
_, height, width, num_classes = logits.get_shape().as_list()
batch_size = tf.shape(logits)[0]
logits = tf.stop_gradient(logits)
labels = tf.image.resize(
labels, (height, width),
method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
predicted_labels = tf.argmax(logits, -1, output_type=tf.int32)
flat_predictions = tf.reshape(predicted_labels, [batch_size, -1])
flat_labels = tf.cast(tf.reshape(labels, [batch_size, -1]), tf.int32)
one_hot_predictions = tf.one_hot(
flat_predictions, num_classes, on_value=True, off_value=False)
one_hot_labels = tf.one_hot(
flat_labels, num_classes, on_value=True, off_value=False)
keep_mask = tf.not_equal(flat_labels, ignore_label)
keep_mask = tf.expand_dims(keep_mask, 2)
overlap = tf.logical_and(one_hot_predictions, one_hot_labels)
overlap = tf.logical_and(overlap, keep_mask)
overlap = tf.reduce_sum(tf.cast(overlap, tf.float32), axis=1)
union = tf.logical_or(one_hot_predictions, one_hot_labels)
union = tf.logical_and(union, keep_mask)
union = tf.reduce_sum(tf.cast(union, tf.float32), axis=1)
actual_scores = tf.divide(overlap, tf.maximum(union, EPSILON))
return actual_scores
class MaskScoringLoss:
"""Mask Scoring loss."""
def __init__(self, ignore_label):
self._ignore_label = ignore_label
self._mse_loss = tf.keras.losses.MeanSquaredError(
reduction=tf.keras.losses.Reduction.NONE)
def __call__(self, predicted_scores, logits, labels):
actual_scores = get_actual_mask_scores(logits, labels, self._ignore_label)
loss = tf_utils.safe_mean(self._mse_loss(actual_scores, predicted_scores))
return loss
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Modeling package definition."""
from official.vision.beta.modeling import backbones
from official.vision.beta.modeling import decoders
from official.vision.beta.modeling import heads
from official.vision.beta.modeling import layers
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Backbones package definition."""
from official.vision.beta.modeling.backbones.efficientnet import EfficientNet
from official.vision.beta.modeling.backbones.mobiledet import MobileDet
from official.vision.beta.modeling.backbones.mobilenet import MobileNet
from official.vision.beta.modeling.backbones.resnet import ResNet
from official.vision.beta.modeling.backbones.resnet_3d import ResNet3D
from official.vision.beta.modeling.backbones.resnet_deeplab import DilatedResNet
from official.vision.beta.modeling.backbones.revnet import RevNet
from official.vision.beta.modeling.backbones.spinenet import SpineNet
from official.vision.beta.modeling.backbones.spinenet_mobile import SpineNetMobile
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains definitions of EfficientNet Networks."""
import math
from typing import Any, List, Tuple
# Import libraries
import tensorflow as tf
from official.modeling import hyperparams
from official.modeling import tf_utils
from official.vision.beta.modeling.backbones import factory
from official.vision.beta.modeling.layers import nn_blocks
from official.vision.beta.modeling.layers import nn_layers
layers = tf.keras.layers
# The fixed EfficientNet-B0 architecture discovered by NAS.
# Each element represents a specification of a building block:
# (block_fn, block_repeats, kernel_size, strides, expand_ratio, in_filters,
# out_filters, is_output)
EN_B0_BLOCK_SPECS = [
('mbconv', 1, 3, 1, 1, 32, 16, False),
('mbconv', 2, 3, 2, 6, 16, 24, True),
('mbconv', 2, 5, 2, 6, 24, 40, True),
('mbconv', 3, 3, 2, 6, 40, 80, False),
('mbconv', 3, 5, 1, 6, 80, 112, True),
('mbconv', 4, 5, 2, 6, 112, 192, False),
('mbconv', 1, 3, 1, 6, 192, 320, True),
]
SCALING_MAP = {
'b0': dict(width_scale=1.0, depth_scale=1.0),
'b1': dict(width_scale=1.0, depth_scale=1.1),
'b2': dict(width_scale=1.1, depth_scale=1.2),
'b3': dict(width_scale=1.2, depth_scale=1.4),
'b4': dict(width_scale=1.4, depth_scale=1.8),
'b5': dict(width_scale=1.6, depth_scale=2.2),
'b6': dict(width_scale=1.8, depth_scale=2.6),
'b7': dict(width_scale=2.0, depth_scale=3.1),
}
class BlockSpec():
"""A container class that specifies the block configuration for MnasNet."""
def __init__(self, block_fn: str, block_repeats: int, kernel_size: int,
strides: int, expand_ratio: float, in_filters: int,
out_filters: int, is_output: bool, width_scale: float,
depth_scale: float):
self.block_fn = block_fn
self.block_repeats = round_repeats(block_repeats, depth_scale)
self.kernel_size = kernel_size
self.strides = strides
self.expand_ratio = expand_ratio
self.in_filters = nn_layers.round_filters(in_filters, width_scale)
self.out_filters = nn_layers.round_filters(out_filters, width_scale)
self.is_output = is_output
def round_repeats(repeats: int, multiplier: float, skip: bool = False) -> int:
"""Returns rounded number of filters based on depth multiplier."""
if skip or not multiplier:
return repeats
return int(math.ceil(multiplier * repeats))
def block_spec_decoder(specs: List[Tuple[Any, ...]], width_scale: float,
depth_scale: float) -> List[BlockSpec]:
"""Decodes and returns specs for a block."""
decoded_specs = []
for s in specs:
s = s + (
width_scale,
depth_scale,
)
decoded_specs.append(BlockSpec(*s))
return decoded_specs
@tf.keras.utils.register_keras_serializable(package='Beta')
class EfficientNet(tf.keras.Model):
"""Creates an EfficientNet family model.
This implements the EfficientNet model from:
Mingxing Tan, Quoc V. Le.
EfficientNet: Rethinking Model Scaling for Convolutional Neural Networks.
(https://arxiv.org/pdf/1905.11946)
"""
def __init__(self,
model_id: str,
input_specs: tf.keras.layers.InputSpec = layers.InputSpec(
shape=[None, None, None, 3]),
se_ratio: float = 0.0,
stochastic_depth_drop_rate: float = 0.0,
kernel_initializer: str = 'VarianceScaling',
kernel_regularizer: tf.keras.regularizers.Regularizer = None,
bias_regularizer: tf.keras.regularizers.Regularizer = None,
activation: str = 'relu',
use_sync_bn: bool = False,
norm_momentum: float = 0.99,
norm_epsilon: float = 0.001, # pytype: disable=annotation-type-mismatch # typed-keras
**kwargs):
"""Initializes an EfficientNet model.
Args:
model_id: A `str` of model ID of EfficientNet.
input_specs: A `tf.keras.layers.InputSpec` of the input tensor.
se_ratio: A `float` of squeeze and excitation ratio for inverted
bottleneck blocks.
stochastic_depth_drop_rate: A `float` of drop rate for drop connect layer.
kernel_initializer: A `str` for kernel initializer of convolutional
layers.
kernel_regularizer: A `tf.keras.regularizers.Regularizer` object for
Conv2D. Default to None.
bias_regularizer: A `tf.keras.regularizers.Regularizer` object for Conv2D.
Default to None.
activation: A `str` of name of the activation function.
use_sync_bn: If True, use synchronized batch normalization.
norm_momentum: A `float` of normalization momentum for the moving average.
norm_epsilon: A `float` added to variance to avoid dividing by zero.
**kwargs: Additional keyword arguments to be passed.
"""
self._model_id = model_id
self._input_specs = input_specs
self._se_ratio = se_ratio
self._stochastic_depth_drop_rate = stochastic_depth_drop_rate
self._use_sync_bn = use_sync_bn
self._activation = activation
self._kernel_initializer = kernel_initializer
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
self._kernel_regularizer = kernel_regularizer
self._bias_regularizer = bias_regularizer
if use_sync_bn:
self._norm = layers.experimental.SyncBatchNormalization
else:
self._norm = layers.BatchNormalization
if tf.keras.backend.image_data_format() == 'channels_last':
bn_axis = -1
else:
bn_axis = 1
# Build EfficientNet.
inputs = tf.keras.Input(shape=input_specs.shape[1:])
width_scale = SCALING_MAP[model_id]['width_scale']
depth_scale = SCALING_MAP[model_id]['depth_scale']
# Build stem.
x = layers.Conv2D(
filters=nn_layers.round_filters(32, width_scale),
kernel_size=3,
strides=2,
use_bias=False,
padding='same',
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer)(
inputs)
x = self._norm(
axis=bn_axis, momentum=norm_momentum, epsilon=norm_epsilon)(
x)
x = tf_utils.get_activation(activation)(x)
# Build intermediate blocks.
endpoints = {}
endpoint_level = 2
decoded_specs = block_spec_decoder(EN_B0_BLOCK_SPECS, width_scale,
depth_scale)
for i, specs in enumerate(decoded_specs):
x = self._block_group(
inputs=x, specs=specs, name='block_group_{}'.format(i))
if specs.is_output:
endpoints[str(endpoint_level)] = x
endpoint_level += 1
# Build output specs for downstream tasks.
self._output_specs = {l: endpoints[l].get_shape() for l in endpoints}
# Build the final conv for classification.
x = layers.Conv2D(
filters=nn_layers.round_filters(1280, width_scale),
kernel_size=1,
strides=1,
use_bias=False,
padding='same',
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer)(
x)
x = self._norm(
axis=bn_axis, momentum=norm_momentum, epsilon=norm_epsilon)(
x)
endpoints[str(endpoint_level)] = tf_utils.get_activation(activation)(x)
super(EfficientNet, self).__init__(
inputs=inputs, outputs=endpoints, **kwargs)
def _block_group(self,
inputs: tf.Tensor,
specs: BlockSpec,
name: str = 'block_group'):
"""Creates one group of blocks for the EfficientNet model.
Args:
inputs: A `tf.Tensor` of size `[batch, channels, height, width]`.
specs: The specifications for one inverted bottleneck block group.
name: A `str` name for the block.
Returns:
The output `tf.Tensor` of the block layer.
"""
if specs.block_fn == 'mbconv':
block_fn = nn_blocks.InvertedBottleneckBlock
else:
raise ValueError('Block func {} not supported.'.format(specs.block_fn))
x = block_fn(
in_filters=specs.in_filters,
out_filters=specs.out_filters,
expand_ratio=specs.expand_ratio,
strides=specs.strides,
kernel_size=specs.kernel_size,
se_ratio=self._se_ratio,
stochastic_depth_drop_rate=self._stochastic_depth_drop_rate,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer,
activation=self._activation,
use_sync_bn=self._use_sync_bn,
norm_momentum=self._norm_momentum,
norm_epsilon=self._norm_epsilon)(
inputs)
for _ in range(1, specs.block_repeats):
x = block_fn(
in_filters=specs.out_filters, # Set 'in_filters' to 'out_filters'.
out_filters=specs.out_filters,
expand_ratio=specs.expand_ratio,
strides=1, # Fix strides to 1.
kernel_size=specs.kernel_size,
se_ratio=self._se_ratio,
stochastic_depth_drop_rate=self._stochastic_depth_drop_rate,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer,
activation=self._activation,
use_sync_bn=self._use_sync_bn,
norm_momentum=self._norm_momentum,
norm_epsilon=self._norm_epsilon)(
x)
return tf.identity(x, name=name)
def get_config(self):
config_dict = {
'model_id': self._model_id,
'se_ratio': self._se_ratio,
'stochastic_depth_drop_rate': self._stochastic_depth_drop_rate,
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
'bias_regularizer': self._bias_regularizer,
'activation': self._activation,
'use_sync_bn': self._use_sync_bn,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epsilon
}
return config_dict
@classmethod
def from_config(cls, config, custom_objects=None):
return cls(**config)
@property
def output_specs(self):
"""A dict of {level: TensorShape} pairs for the model output."""
return self._output_specs
@factory.register_backbone_builder('efficientnet')
def build_efficientnet(
input_specs: tf.keras.layers.InputSpec,
backbone_config: hyperparams.Config,
norm_activation_config: hyperparams.Config,
l2_regularizer: tf.keras.regularizers.Regularizer = None) -> tf.keras.Model: # pytype: disable=annotation-type-mismatch # typed-keras
"""Builds EfficientNet backbone from a config."""
backbone_type = backbone_config.type
backbone_cfg = backbone_config.get()
assert backbone_type == 'efficientnet', (f'Inconsistent backbone type '
f'{backbone_type}')
return EfficientNet(
model_id=backbone_cfg.model_id,
input_specs=input_specs,
stochastic_depth_drop_rate=backbone_cfg.stochastic_depth_drop_rate,
se_ratio=backbone_cfg.se_ratio,
activation=norm_activation_config.activation,
use_sync_bn=norm_activation_config.use_sync_bn,
norm_momentum=norm_activation_config.norm_momentum,
norm_epsilon=norm_activation_config.norm_epsilon,
kernel_regularizer=l2_regularizer)
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for EfficientNet."""
# Import libraries
from absl.testing import parameterized
import tensorflow as tf
from official.vision.beta.modeling.backbones import efficientnet
class EfficientNetTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(32, 224)
def test_network_creation(self, input_size):
"""Test creation of EfficientNet family models."""
tf.keras.backend.set_image_data_format('channels_last')
network = efficientnet.EfficientNet(model_id='b0')
inputs = tf.keras.Input(shape=(input_size, input_size, 3), batch_size=1)
endpoints = network(inputs)
self.assertAllEqual([1, input_size / 2**2, input_size / 2**2, 24],
endpoints['2'].shape.as_list())
self.assertAllEqual([1, input_size / 2**3, input_size / 2**3, 40],
endpoints['3'].shape.as_list())
self.assertAllEqual([1, input_size / 2**4, input_size / 2**4, 112],
endpoints['4'].shape.as_list())
self.assertAllEqual([1, input_size / 2**5, input_size / 2**5, 320],
endpoints['5'].shape.as_list())
@parameterized.parameters('b0', 'b3', 'b6')
def test_network_scaling(self, model_id):
"""Test compound scaling."""
efficientnet_params = {
'b0': 4049564,
'b3': 10783528,
'b6': 40960136,
}
tf.keras.backend.set_image_data_format('channels_last')
input_size = 32
network = efficientnet.EfficientNet(model_id=model_id, se_ratio=0.25)
self.assertEqual(network.count_params(), efficientnet_params[model_id])
inputs = tf.keras.Input(shape=(input_size, input_size, 3), batch_size=1)
_ = network(inputs)
@parameterized.parameters(1, 3)
def test_input_specs(self, input_dim):
"""Test different input feature dimensions."""
tf.keras.backend.set_image_data_format('channels_last')
input_specs = tf.keras.layers.InputSpec(shape=[None, None, None, input_dim])
network = efficientnet.EfficientNet(model_id='b0', input_specs=input_specs)
inputs = tf.keras.Input(shape=(128, 128, input_dim), batch_size=1)
_ = network(inputs)
def test_serialize_deserialize(self):
# Create a network object that sets all of its config options.
kwargs = dict(
model_id='b0',
se_ratio=0.25,
stochastic_depth_drop_rate=None,
use_sync_bn=False,
kernel_initializer='VarianceScaling',
kernel_regularizer=None,
bias_regularizer=None,
activation='relu',
norm_momentum=0.99,
norm_epsilon=0.001,
)
network = efficientnet.EfficientNet(**kwargs)
expected_config = dict(kwargs)
self.assertEqual(network.get_config(), expected_config)
# Create another network object from the first object's config.
new_network = efficientnet.EfficientNet.from_config(network.get_config())
# Validate that the config can be forced to JSON.
_ = new_network.to_json()
# If the serialization was successful, the new config should match the old.
self.assertAllEqual(network.get_config(), new_network.get_config())
if __name__ == '__main__':
tf.test.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Backbone registers and factory method.
One can regitered a new backbone model by the following two steps:
1 Import the factory and register the build in the backbone file.
2 Import the backbone class and add a build in __init__.py.
```
# my_backbone.py
from modeling.backbones import factory
class MyBackbone():
...
@factory.register_backbone_builder('my_backbone')
def build_my_backbone():
return MyBackbone()
# backbones/__init__.py adds import
from modeling.backbones.my_backbone import MyBackbone
```
If one wants the MyBackbone class to be used only by those binary
then don't imported the backbone module in backbones/__init__.py, but import it
in place that uses it.
"""
from typing import Sequence, Union
# Import libraries
import tensorflow as tf
from official.core import registry
from official.modeling import hyperparams
_REGISTERED_BACKBONE_CLS = {}
def register_backbone_builder(key: str):
"""Decorates a builder of backbone class.
The builder should be a Callable (a class or a function).
This decorator supports registration of backbone builder as follows:
```
class MyBackbone(tf.keras.Model):
pass
@register_backbone_builder('mybackbone')
def builder(input_specs, config, l2_reg):
return MyBackbone(...)
# Builds a MyBackbone object.
my_backbone = build_backbone_3d(input_specs, config, l2_reg)
```
Args:
key: A `str` of key to look up the builder.
Returns:
A callable for using as class decorator that registers the decorated class
for creation from an instance of task_config_cls.
"""
return registry.register(_REGISTERED_BACKBONE_CLS, key)
def build_backbone(input_specs: Union[tf.keras.layers.InputSpec,
Sequence[tf.keras.layers.InputSpec]],
backbone_config: hyperparams.Config,
norm_activation_config: hyperparams.Config,
l2_regularizer: tf.keras.regularizers.Regularizer = None,
**kwargs) -> tf.keras.Model: # pytype: disable=annotation-type-mismatch # typed-keras
"""Builds backbone from a config.
Args:
input_specs: A (sequence of) `tf.keras.layers.InputSpec` of input.
backbone_config: A `OneOfConfig` of backbone config.
norm_activation_config: A config for normalization/activation layer.
l2_regularizer: A `tf.keras.regularizers.Regularizer` object. Default to
None.
**kwargs: Additional keyword args to be passed to backbone builder.
Returns:
A `tf.keras.Model` instance of the backbone.
"""
backbone_builder = registry.lookup(_REGISTERED_BACKBONE_CLS,
backbone_config.type)
return backbone_builder(
input_specs=input_specs,
backbone_config=backbone_config,
norm_activation_config=norm_activation_config,
l2_regularizer=l2_regularizer,
**kwargs)
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for factory functions."""
# Import libraries
from absl.testing import parameterized
import tensorflow as tf
from tensorflow.python.distribute import combinations
from official.vision.beta.configs import backbones as backbones_cfg
from official.vision.beta.configs import backbones_3d as backbones_3d_cfg
from official.vision.beta.configs import common as common_cfg
from official.vision.beta.modeling import backbones
from official.vision.beta.modeling.backbones import factory
class FactoryTest(tf.test.TestCase, parameterized.TestCase):
@combinations.generate(
combinations.combine(model_id=[18, 34, 50, 101, 152],))
def test_resnet_creation(self, model_id):
"""Test creation of ResNet models."""
network = backbones.ResNet(
model_id=model_id, se_ratio=0.0, norm_momentum=0.99, norm_epsilon=1e-5)
backbone_config = backbones_cfg.Backbone(
type='resnet',
resnet=backbones_cfg.ResNet(model_id=model_id, se_ratio=0.0))
norm_activation_config = common_cfg.NormActivation(
norm_momentum=0.99, norm_epsilon=1e-5, use_sync_bn=False)
factory_network = factory.build_backbone(
input_specs=tf.keras.layers.InputSpec(shape=[None, None, None, 3]),
backbone_config=backbone_config,
norm_activation_config=norm_activation_config)
network_config = network.get_config()
factory_network_config = factory_network.get_config()
self.assertEqual(network_config, factory_network_config)
@combinations.generate(
combinations.combine(
model_id=['b0', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'b7'],
se_ratio=[0.0, 0.25],
))
def test_efficientnet_creation(self, model_id, se_ratio):
"""Test creation of EfficientNet models."""
network = backbones.EfficientNet(
model_id=model_id,
se_ratio=se_ratio,
norm_momentum=0.99,
norm_epsilon=1e-5)
backbone_config = backbones_cfg.Backbone(
type='efficientnet',
efficientnet=backbones_cfg.EfficientNet(
model_id=model_id, se_ratio=se_ratio))
norm_activation_config = common_cfg.NormActivation(
norm_momentum=0.99, norm_epsilon=1e-5, use_sync_bn=False)
factory_network = factory.build_backbone(
input_specs=tf.keras.layers.InputSpec(shape=[None, None, None, 3]),
backbone_config=backbone_config,
norm_activation_config=norm_activation_config)
network_config = network.get_config()
factory_network_config = factory_network.get_config()
self.assertEqual(network_config, factory_network_config)
@combinations.generate(
combinations.combine(
model_id=['MobileNetV1', 'MobileNetV2',
'MobileNetV3Large', 'MobileNetV3Small',
'MobileNetV3EdgeTPU'],
filter_size_scale=[1.0, 0.75],
))
def test_mobilenet_creation(self, model_id, filter_size_scale):
"""Test creation of Mobilenet models."""
network = backbones.MobileNet(
model_id=model_id,
filter_size_scale=filter_size_scale,
norm_momentum=0.99,
norm_epsilon=1e-5)
backbone_config = backbones_cfg.Backbone(
type='mobilenet',
mobilenet=backbones_cfg.MobileNet(
model_id=model_id, filter_size_scale=filter_size_scale))
norm_activation_config = common_cfg.NormActivation(
norm_momentum=0.99, norm_epsilon=1e-5, use_sync_bn=False)
factory_network = factory.build_backbone(
input_specs=tf.keras.layers.InputSpec(shape=[None, None, None, 3]),
backbone_config=backbone_config,
norm_activation_config=norm_activation_config)
network_config = network.get_config()
factory_network_config = factory_network.get_config()
self.assertEqual(network_config, factory_network_config)
@combinations.generate(combinations.combine(model_id=['49'],))
def test_spinenet_creation(self, model_id):
"""Test creation of SpineNet models."""
input_size = 128
min_level = 3
max_level = 7
input_specs = tf.keras.layers.InputSpec(
shape=[None, input_size, input_size, 3])
network = backbones.SpineNet(
input_specs=input_specs,
min_level=min_level,
max_level=max_level,
norm_momentum=0.99,
norm_epsilon=1e-5)
backbone_config = backbones_cfg.Backbone(
type='spinenet',
spinenet=backbones_cfg.SpineNet(model_id=model_id))
norm_activation_config = common_cfg.NormActivation(
norm_momentum=0.99, norm_epsilon=1e-5, use_sync_bn=False)
factory_network = factory.build_backbone(
input_specs=tf.keras.layers.InputSpec(
shape=[None, input_size, input_size, 3]),
backbone_config=backbone_config,
norm_activation_config=norm_activation_config)
network_config = network.get_config()
factory_network_config = factory_network.get_config()
self.assertEqual(network_config, factory_network_config)
@combinations.generate(
combinations.combine(model_id=[38, 56, 104],))
def test_revnet_creation(self, model_id):
"""Test creation of RevNet models."""
network = backbones.RevNet(
model_id=model_id, norm_momentum=0.99, norm_epsilon=1e-5)
backbone_config = backbones_cfg.Backbone(
type='revnet',
revnet=backbones_cfg.RevNet(model_id=model_id))
norm_activation_config = common_cfg.NormActivation(
norm_momentum=0.99, norm_epsilon=1e-5, use_sync_bn=False)
factory_network = factory.build_backbone(
input_specs=tf.keras.layers.InputSpec(shape=[None, None, None, 3]),
backbone_config=backbone_config,
norm_activation_config=norm_activation_config)
network_config = network.get_config()
factory_network_config = factory_network.get_config()
self.assertEqual(network_config, factory_network_config)
@combinations.generate(combinations.combine(model_type=['resnet_3d'],))
def test_resnet_3d_creation(self, model_type):
"""Test creation of ResNet 3D models."""
backbone_cfg = backbones_3d_cfg.Backbone3D(type=model_type).get()
temporal_strides = []
temporal_kernel_sizes = []
for block_spec in backbone_cfg.block_specs:
temporal_strides.append(block_spec.temporal_strides)
temporal_kernel_sizes.append(block_spec.temporal_kernel_sizes)
_ = backbones.ResNet3D(
model_id=backbone_cfg.model_id,
temporal_strides=temporal_strides,
temporal_kernel_sizes=temporal_kernel_sizes,
norm_momentum=0.99,
norm_epsilon=1e-5)
@combinations.generate(
combinations.combine(
model_id=[
'MobileDetCPU',
'MobileDetDSP',
'MobileDetEdgeTPU',
'MobileDetGPU'],
filter_size_scale=[1.0, 0.75],
))
def test_mobiledet_creation(self, model_id, filter_size_scale):
"""Test creation of Mobiledet models."""
network = backbones.MobileDet(
model_id=model_id,
filter_size_scale=filter_size_scale,
norm_momentum=0.99,
norm_epsilon=1e-5)
backbone_config = backbones_cfg.Backbone(
type='mobiledet',
mobiledet=backbones_cfg.MobileDet(
model_id=model_id, filter_size_scale=filter_size_scale))
norm_activation_config = common_cfg.NormActivation(
norm_momentum=0.99, norm_epsilon=1e-5, use_sync_bn=False)
factory_network = factory.build_backbone(
input_specs=tf.keras.layers.InputSpec(shape=[None, None, None, 3]),
backbone_config=backbone_config,
norm_activation_config=norm_activation_config)
network_config = network.get_config()
factory_network_config = factory_network.get_config()
self.assertEqual(network_config, factory_network_config)
if __name__ == '__main__':
tf.test.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Definitions of MobileDet Networks."""
import dataclasses
from typing import Any, Dict, Optional, Tuple, List
import tensorflow as tf
from official.modeling import hyperparams
from official.vision.beta.modeling.backbones import factory
from official.vision.beta.modeling.backbones import mobilenet
from official.vision.beta.modeling.layers import nn_blocks
from official.vision.beta.modeling.layers import nn_layers
layers = tf.keras.layers
# pylint: disable=pointless-string-statement
"""
Architecture: https://arxiv.org/abs/1704.04861.
"MobileDets: Searching for Object Detection Architectures for
Mobile Accelerators" Yunyang Xiong, Hanxiao Liu, Suyog Gupta, Berkin Akin,
Gabriel Bender, Yongzhe Wang, Pieter-Jan Kindermans, Mingxing Tan, Vikas Singh,
Bo Chen
Note that `round_down_protection` flag should be set to false for scaling
of the network.
"""
MD_CPU_BLOCK_SPECS = {
'spec_name': 'MobileDetCPU',
# [expand_ratio] is set to 1 and [use_residual] is set to false
# for inverted_bottleneck_no_expansion
# [se_ratio] is set to 0.25 for all inverted_bottleneck layers
# [activation] is set to 'hard_swish' for all applicable layers
'block_spec_schema': ['block_fn', 'kernel_size', 'strides', 'filters',
'activation', 'se_ratio', 'expand_ratio',
'use_residual', 'is_output'],
'block_specs': [
('convbn', 3, 2, 16, 'hard_swish', None, None, None, False),
# inverted_bottleneck_no_expansion
('invertedbottleneck', 3, 1, 8, 'hard_swish', 0.25, 1., False, True),
('invertedbottleneck', 3, 2, 16, 'hard_swish', 0.25, 4., False, True),
('invertedbottleneck', 3, 2, 32, 'hard_swish', 0.25, 8., False, False),
('invertedbottleneck', 3, 1, 32, 'hard_swish', 0.25, 4., True, False),
('invertedbottleneck', 3, 1, 32, 'hard_swish', 0.25, 4., True, False),
('invertedbottleneck', 3, 1, 32, 'hard_swish', 0.25, 4., True, True),
('invertedbottleneck', 5, 2, 72, 'hard_swish', 0.25, 8., False, False),
('invertedbottleneck', 3, 1, 72, 'hard_swish', 0.25, 8., True, False),
('invertedbottleneck', 5, 1, 72, 'hard_swish', 0.25, 4., True, False),
('invertedbottleneck', 3, 1, 72, 'hard_swish', 0.25, 4., True, False),
('invertedbottleneck', 3, 1, 72, 'hard_swish', 0.25, 8., False, False),
('invertedbottleneck', 3, 1, 72, 'hard_swish', 0.25, 8., True, False),
('invertedbottleneck', 3, 1, 72, 'hard_swish', 0.25, 8., True, False),
('invertedbottleneck', 3, 1, 72, 'hard_swish', 0.25, 8., True, True),
('invertedbottleneck', 5, 2, 104, 'hard_swish', 0.25, 8., False, False),
('invertedbottleneck', 5, 1, 104, 'hard_swish', 0.25, 4., True, False),
('invertedbottleneck', 5, 1, 104, 'hard_swish', 0.25, 4., True, False),
('invertedbottleneck', 3, 1, 104, 'hard_swish', 0.25, 4., True, False),
('invertedbottleneck', 3, 1, 144, 'hard_swish', 0.25, 8., False, True),
]
}
MD_DSP_BLOCK_SPECS = {
'spec_name': 'MobileDetDSP',
# [expand_ratio] is set to 1 and [use_residual] is set to false
# for inverted_bottleneck_no_expansion
# [use_depthwise] is set to False for fused_conv
# [se_ratio] is set to None for all inverted_bottleneck layers
# [activation] is set to 'relu6' for all applicable layers
'block_spec_schema': ['block_fn', 'kernel_size', 'strides', 'filters',
'activation', 'se_ratio', 'expand_ratio',
'input_compression_ratio', 'output_compression_ratio',
'use_depthwise', 'use_residual', 'is_output'],
'block_specs': [
('convbn', 3, 2, 32, 'relu6',
None, None, None, None, None, None, False),
# inverted_bottleneck_no_expansion
('invertedbottleneck', 3, 1, 24, 'relu6',
None, 1., None, None, True, False, True),
('invertedbottleneck', 3, 2, 32, 'relu6',
None, 4., None, None, False, False, False), # fused_conv
('invertedbottleneck', 3, 1, 32, 'relu6',
None, 4., None, None, False, True, False), # fused_conv
('invertedbottleneck', 3, 1, 32, 'relu6',
None, 4., None, None, True, True, False),
('tucker', 3, 1, 32, 'relu6',
None, None, 0.25, 0.75, None, True, True),
('invertedbottleneck', 3, 2, 64, 'relu6',
None, 8., None, None, False, False, False), # fused_conv
('invertedbottleneck', 3, 1, 64, 'relu6',
None, 4., None, None, True, True, False),
('invertedbottleneck', 3, 1, 64, 'relu6',
None, 4., None, None, False, True, False), # fused_conv
('invertedbottleneck', 3, 1, 64, 'relu6',
None, 4., None, None, False, True, True), # fused_conv
('invertedbottleneck', 3, 2, 120, 'relu6',
None, 8., None, None, False, False, False), # fused_conv
('invertedbottleneck', 3, 1, 120, 'relu6',
None, 4., None, None, True, True, False),
('invertedbottleneck', 3, 1, 120, 'relu6',
None, 8, None, None, True, True, False),
('invertedbottleneck', 3, 1, 120, 'relu6',
None, 8., None, None, True, True, False),
('invertedbottleneck', 3, 1, 144, 'relu6',
None, 8., None, None, False, False, False), # fused_conv
('invertedbottleneck', 3, 1, 144, 'relu6',
None, 8., None, None, True, True, False),
('invertedbottleneck', 3, 1, 144, 'relu6',
None, 8, None, None, True, True, False),
('invertedbottleneck', 3, 1, 144, 'relu6',
None, 8., None, None, True, True, True),
('invertedbottleneck', 3, 2, 160, 'relu6',
None, 4, None, None, True, False, False),
('invertedbottleneck', 3, 1, 160, 'relu6',
None, 4, None, None, True, True, False),
('invertedbottleneck', 3, 1, 160, 'relu6',
None, 4., None, None, False, False, False), # fused_conv
('tucker', 3, 1, 160, 'relu6',
None, None, 0.75, 0.75, None, True, False),
('invertedbottleneck', 3, 1, 240, 'relu6',
None, 8, None, None, True, False, True),
]
}
MD_EdgeTPU_BLOCK_SPECS = {
'spec_name': 'MobileDetEdgeTPU',
# [use_depthwise] is set to False for fused_conv
# [se_ratio] is set to None for all inverted_bottleneck layers
# [activation] is set to 'relu6' for all applicable layers
'block_spec_schema': ['block_fn', 'kernel_size', 'strides', 'filters',
'activation', 'se_ratio', 'expand_ratio',
'input_compression_ratio', 'output_compression_ratio',
'use_depthwise', 'use_residual', 'is_output'],
'block_specs': [
('convbn', 3, 2, 32, 'relu6',
None, None, None, None, None, None, False),
('tucker', 3, 1, 16, 'relu6',
None, None, 0.25, 0.75, None, False, True),
('invertedbottleneck', 3, 2, 16, 'relu6',
None, 8., None, None, False, False, False), # fused_conv
('invertedbottleneck', 3, 1, 16, 'relu6',
None, 4., None, None, False, True, False), # fused_conv
('invertedbottleneck', 3, 1, 16, 'relu6',
None, 8., None, None, False, True, False), # fused_conv
('invertedbottleneck', 3, 1, 16, 'relu6',
None, 4., None, None, False, True, True), # fused_conv
('invertedbottleneck', 5, 2, 40, 'relu6',
None, 8., None, None, False, False, False), # fused_conv
('invertedbottleneck', 3, 1, 40, 'relu6',
None, 4., None, None, False, True, False), # fused_conv
('invertedbottleneck', 3, 1, 40, 'relu6',
None, 4., None, None, False, True, False), # fused_conv
('invertedbottleneck', 3, 1, 40, 'relu6',
None, 4., None, None, False, True, True), # fused_conv
('invertedbottleneck', 3, 2, 72, 'relu6',
None, 8, None, None, True, False, False),
('invertedbottleneck', 3, 1, 72, 'relu6',
None, 8, None, None, True, True, False),
('invertedbottleneck', 3, 1, 72, 'relu6',
None, 4., None, None, False, True, False), # fused_conv
('invertedbottleneck', 3, 1, 72, 'relu6',
None, 4., None, None, False, True, False), # fused_conv
('invertedbottleneck', 5, 1, 96, 'relu6',
None, 8, None, None, True, False, False),
('invertedbottleneck', 5, 1, 96, 'relu6',
None, 8, None, None, True, True, False),
('invertedbottleneck', 3, 1, 96, 'relu6',
None, 8, None, None, True, True, False),
('invertedbottleneck', 3, 1, 96, 'relu6',
None, 8, None, None, True, True, True),
('invertedbottleneck', 5, 2, 120, 'relu6',
None, 8, None, None, True, False, False),
('invertedbottleneck', 3, 1, 120, 'relu6',
None, 8, None, None, True, True, False),
('invertedbottleneck', 5, 1, 120, 'relu6',
None, 4, None, None, True, True, False),
('invertedbottleneck', 3, 1, 120, 'relu6',
None, 8, None, None, True, True, False),
('invertedbottleneck', 5, 1, 384, 'relu6',
None, 8, None, None, True, False, True),
]
}
MD_GPU_BLOCK_SPECS = {
'spec_name': 'MobileDetGPU',
# [use_depthwise] is set to False for fused_conv
# [se_ratio] is set to None for all inverted_bottleneck layers
# [activation] is set to 'relu6' for all applicable layers
'block_spec_schema': ['block_fn', 'kernel_size', 'strides', 'filters',
'activation', 'se_ratio', 'expand_ratio',
'input_compression_ratio', 'output_compression_ratio',
'use_depthwise', 'use_residual', 'is_output'],
'block_specs': [
# block 0
('convbn', 3, 2, 32, 'relu6',
None, None, None, None, None, None, False),
# block 1
('tucker', 3, 1, 16, 'relu6',
None, None, 0.25, 0.25, None, False, True),
# block 2
('invertedbottleneck', 3, 2, 32, 'relu6',
None, 8., None, None, False, False, False), # fused_conv
('tucker', 3, 1, 32, 'relu6',
None, None, 0.25, 0.25, None, True, False),
('tucker', 3, 1, 32, 'relu6',
None, None, 0.25, 0.25, None, True, False),
('tucker', 3, 1, 32, 'relu6',
None, None, 0.25, 0.25, None, True, True),
# block 3
('invertedbottleneck', 3, 2, 64, 'relu6',
None, 8., None, None, False, False, False), # fused_conv
('invertedbottleneck', 3, 1, 64, 'relu6',
None, 8., None, None, False, True, False), # fused_conv
('invertedbottleneck', 3, 1, 64, 'relu6',
None, 8., None, None, False, True, False), # fused_conv
('invertedbottleneck', 3, 1, 64, 'relu6',
None, 4., None, None, False, True, True), # fused_conv
# block 4
('invertedbottleneck', 3, 2, 128, 'relu6',
None, 8., None, None, False, False, False), # fused_conv
('invertedbottleneck', 3, 1, 128, 'relu6',
None, 4., None, None, False, True, False), # fused_conv
('invertedbottleneck', 3, 1, 128, 'relu6',
None, 4., None, None, False, True, False), # fused_conv
('invertedbottleneck', 3, 1, 128, 'relu6',
None, 4., None, None, False, True, False), # fused_conv
# block 5
('invertedbottleneck', 3, 1, 128, 'relu6',
None, 8., None, None, False, False, False), # fused_conv
('invertedbottleneck', 3, 1, 128, 'relu6',
None, 8., None, None, False, True, False), # fused_conv
('invertedbottleneck', 3, 1, 128, 'relu6',
None, 8., None, None, False, True, False), # fused_conv
('invertedbottleneck', 3, 1, 128, 'relu6',
None, 8., None, None, False, True, True), # fused_conv
# block 6
('invertedbottleneck', 3, 2, 128, 'relu6',
None, 4., None, None, False, False, False), # fused_conv
('invertedbottleneck', 3, 1, 128, 'relu6',
None, 4., None, None, False, True, False), # fused_conv
('invertedbottleneck', 3, 1, 128, 'relu6',
None, 4., None, None, False, True, False), # fused_conv
('invertedbottleneck', 3, 1, 128, 'relu6',
None, 4., None, None, False, True, False), # fused_conv
# block 7
('invertedbottleneck', 3, 1, 384, 'relu6',
None, 8, None, None, True, False, True),
]
}
SUPPORTED_SPECS_MAP = {
'MobileDetCPU': MD_CPU_BLOCK_SPECS,
'MobileDetDSP': MD_DSP_BLOCK_SPECS,
'MobileDetEdgeTPU': MD_EdgeTPU_BLOCK_SPECS,
'MobileDetGPU': MD_GPU_BLOCK_SPECS,
}
@dataclasses.dataclass
class BlockSpec(hyperparams.Config):
"""A container class that specifies the block configuration for MobileDet."""
block_fn: str = 'convbn'
kernel_size: int = 3
strides: int = 1
filters: int = 32
use_bias: bool = False
use_normalization: bool = True
activation: str = 'relu6'
is_output: bool = True
# Used for block type InvertedResConv and TuckerConvBlock.
use_residual: bool = True
# Used for block type InvertedResConv only.
use_depthwise: bool = True
expand_ratio: Optional[float] = 8.
se_ratio: Optional[float] = None
# Used for block type TuckerConvBlock only.
input_compression_ratio: Optional[float] = None
output_compression_ratio: Optional[float] = None
def block_spec_decoder(
specs: Dict[Any, Any],
filter_size_scale: float,
divisible_by: int = 8) -> List[BlockSpec]:
"""Decodes specs for a block.
Args:
specs: A `dict` specification of block specs of a mobiledet version.
filter_size_scale: A `float` multiplier for the filter size for all
convolution ops. The value must be greater than zero. Typical usage will
be to set this value in (0, 1) to reduce the number of parameters or
computation cost of the model.
divisible_by: An `int` that ensures all inner dimensions are divisible by
this number.
Returns:
A list of `BlockSpec` that defines structure of the base network.
"""
spec_name = specs['spec_name']
block_spec_schema = specs['block_spec_schema']
block_specs = specs['block_specs']
if not block_specs:
raise ValueError(
'The block spec cannot be empty for {} !'.format(spec_name))
if len(block_specs[0]) != len(block_spec_schema):
raise ValueError('The block spec values {} do not match with '
'the schema {}'.format(block_specs[0], block_spec_schema))
decoded_specs = []
for s in block_specs:
kw_s = dict(zip(block_spec_schema, s))
decoded_specs.append(BlockSpec(**kw_s))
for ds in decoded_specs:
if ds.filters:
ds.filters = nn_layers.round_filters(filters=ds.filters,
multiplier=filter_size_scale,
divisor=divisible_by,
round_down_protect=False,
min_depth=8)
return decoded_specs
@tf.keras.utils.register_keras_serializable(package='Beta')
class MobileDet(tf.keras.Model):
"""Creates a MobileDet family model."""
def __init__(
self,
model_id: str = 'MobileDetCPU',
filter_size_scale: float = 1.0,
input_specs: tf.keras.layers.InputSpec = layers.InputSpec(
shape=[None, None, None, 3]),
# The followings are for hyper-parameter tuning.
norm_momentum: float = 0.99,
norm_epsilon: float = 0.001,
kernel_initializer: str = 'VarianceScaling',
kernel_regularizer: Optional[tf.keras.regularizers.Regularizer] = None,
bias_regularizer: Optional[tf.keras.regularizers.Regularizer] = None,
# The followings should be kept the same most of the times.
min_depth: int = 8,
divisible_by: int = 8,
regularize_depthwise: bool = False,
use_sync_bn: bool = False,
**kwargs):
"""Initializes a MobileDet model.
Args:
model_id: A `str` of MobileDet version. The supported values are
`MobileDetCPU`, `MobileDetDSP`, `MobileDetEdgeTPU`, `MobileDetGPU`.
filter_size_scale: A `float` of multiplier for the filters (number of
channels) for all convolution ops. The value must be greater than zero.
Typical usage will be to set this value in (0, 1) to reduce the number
of parameters or computation cost of the model.
input_specs: A `tf.keras.layers.InputSpec` of specs of the input tensor.
norm_momentum: A `float` of normalization momentum for the moving average.
norm_epsilon: A `float` added to variance to avoid dividing by zero.
kernel_initializer: A `str` for kernel initializer of convolutional
layers.
kernel_regularizer: A `tf.keras.regularizers.Regularizer` object for
Conv2D. Default to None.
bias_regularizer: A `tf.keras.regularizers.Regularizer` object for Conv2D.
Default to None.
min_depth: An `int` of minimum depth (number of channels) for all
convolution ops. Enforced when filter_size_scale < 1, and not an active
constraint when filter_size_scale >= 1.
divisible_by: An `int` that ensures all inner dimensions are divisible by
this number.
regularize_depthwise: If Ture, apply regularization on depthwise.
use_sync_bn: If True, use synchronized batch normalization.
**kwargs: Additional keyword arguments to be passed.
"""
if model_id not in SUPPORTED_SPECS_MAP:
raise ValueError('The MobileDet version {} '
'is not supported'.format(model_id))
if filter_size_scale <= 0:
raise ValueError('filter_size_scale is not greater than zero.')
self._model_id = model_id
self._input_specs = input_specs
self._filter_size_scale = filter_size_scale
self._min_depth = min_depth
self._divisible_by = divisible_by
self._regularize_depthwise = regularize_depthwise
self._kernel_initializer = kernel_initializer
self._kernel_regularizer = kernel_regularizer
self._bias_regularizer = bias_regularizer
self._use_sync_bn = use_sync_bn
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
inputs = tf.keras.Input(shape=input_specs.shape[1:])
block_specs = SUPPORTED_SPECS_MAP.get(model_id)
self._decoded_specs = block_spec_decoder(
specs=block_specs,
filter_size_scale=self._filter_size_scale,
divisible_by=self._get_divisible_by())
x, endpoints, next_endpoint_level = self._mobiledet_base(inputs=inputs)
self._output_specs = {l: endpoints[l].get_shape() for l in endpoints}
super(MobileDet, self).__init__(
inputs=inputs, outputs=endpoints, **kwargs)
def _get_divisible_by(self):
return self._divisible_by
def _mobiledet_base(self,
inputs: tf.Tensor
) -> Tuple[tf.Tensor, Dict[str, tf.Tensor], int]:
"""Builds the base MobileDet architecture.
Args:
inputs: A `tf.Tensor` of shape `[batch_size, height, width, channels]`.
Returns:
A tuple of output Tensor and dictionary that collects endpoints.
"""
input_shape = inputs.get_shape().as_list()
if len(input_shape) != 4:
raise ValueError('Expected rank 4 input, was: %d' % len(input_shape))
net = inputs
endpoints = {}
endpoint_level = 1
for i, block_def in enumerate(self._decoded_specs):
block_name = 'block_group_{}_{}'.format(block_def.block_fn, i)
if block_def.block_fn == 'convbn':
net = mobilenet.Conv2DBNBlock(
filters=block_def.filters,
kernel_size=block_def.kernel_size,
strides=block_def.strides,
activation=block_def.activation,
use_bias=block_def.use_bias,
use_normalization=block_def.use_normalization,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer,
use_sync_bn=self._use_sync_bn,
norm_momentum=self._norm_momentum,
norm_epsilon=self._norm_epsilon
)(net)
elif block_def.block_fn == 'invertedbottleneck':
in_filters = net.shape.as_list()[-1]
net = nn_blocks.InvertedBottleneckBlock(
in_filters=in_filters,
out_filters=block_def.filters,
kernel_size=block_def.kernel_size,
strides=block_def.strides,
expand_ratio=block_def.expand_ratio,
se_ratio=block_def.se_ratio,
se_inner_activation=block_def.activation,
se_gating_activation='sigmoid',
se_round_down_protect=False,
expand_se_in_filters=True,
activation=block_def.activation,
use_depthwise=block_def.use_depthwise,
use_residual=block_def.use_residual,
regularize_depthwise=self._regularize_depthwise,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer,
use_sync_bn=self._use_sync_bn,
norm_momentum=self._norm_momentum,
norm_epsilon=self._norm_epsilon,
divisible_by=self._get_divisible_by()
)(net)
elif block_def.block_fn == 'tucker':
in_filters = net.shape.as_list()[-1]
net = nn_blocks.TuckerConvBlock(
in_filters=in_filters,
out_filters=block_def.filters,
kernel_size=block_def.kernel_size,
strides=block_def.strides,
input_compression_ratio=block_def.input_compression_ratio,
output_compression_ratio=block_def.output_compression_ratio,
activation=block_def.activation,
use_residual=block_def.use_residual,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer,
use_sync_bn=self._use_sync_bn,
norm_momentum=self._norm_momentum,
norm_epsilon=self._norm_epsilon,
divisible_by=self._get_divisible_by()
)(net)
else:
raise ValueError('Unknown block type {} for layer {}'.format(
block_def.block_fn, i))
net = tf.keras.layers.Activation('linear', name=block_name)(net)
if block_def.is_output:
endpoints[str(endpoint_level)] = net
endpoint_level += 1
return net, endpoints, endpoint_level
def get_config(self):
config_dict = {
'model_id': self._model_id,
'filter_size_scale': self._filter_size_scale,
'min_depth': self._min_depth,
'divisible_by': self._divisible_by,
'regularize_depthwise': self._regularize_depthwise,
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
'bias_regularizer': self._bias_regularizer,
'use_sync_bn': self._use_sync_bn,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epsilon,
}
return config_dict
@classmethod
def from_config(cls, config, custom_objects=None):
return cls(**config)
@property
def output_specs(self):
"""A dict of {level: TensorShape} pairs for the model output."""
return self._output_specs
@factory.register_backbone_builder('mobiledet')
def build_mobiledet(
input_specs: tf.keras.layers.InputSpec,
backbone_config: hyperparams.Config,
norm_activation_config: hyperparams.Config,
l2_regularizer: Optional[tf.keras.regularizers.Regularizer] = None
) -> tf.keras.Model:
"""Builds MobileDet backbone from a config."""
backbone_type = backbone_config.type
backbone_cfg = backbone_config.get()
assert backbone_type == 'mobiledet', (f'Inconsistent backbone type '
f'{backbone_type}')
return MobileDet(
model_id=backbone_cfg.model_id,
filter_size_scale=backbone_cfg.filter_size_scale,
input_specs=input_specs,
use_sync_bn=norm_activation_config.use_sync_bn,
norm_momentum=norm_activation_config.norm_momentum,
norm_epsilon=norm_activation_config.norm_epsilon,
kernel_regularizer=l2_regularizer)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment