Commit 0016b0a7 authored by sunxx1's avatar sunxx1
Browse files

Merge branch 'dtk22.04' into 'main'

Dtk22.04

See merge request dcutoolkit/deeplearing/dlexamples_new!49
parents 17bc28d5 7a382d5d
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
from keras_cv.layers import NonMaxSuppression
class NonMaxSuppressionTest(tf.test.TestCase):
def test_return_shapes(self):
layer = NonMaxSuppression(classes=4, bounding_box_format="xyWH")
images = tf.ones((3, 480, 480, 3))
boxes = tf.cast(tf.random.uniform((3, 5, 4), 0, 480, tf.int32), tf.float32)
classes = tf.cast(tf.random.uniform((3, 5, 1), 0, 4, tf.int32), tf.float32)
scores = tf.random.uniform((3, 5, 1), 0, 1, tf.float32)
predictions = tf.concat([boxes, classes, scores], axis=-1)
boxes = layer(predictions, images)
self.assertEqual(boxes.shape, [3, None, 6])
def test_non_square_images(self):
layer = NonMaxSuppression(classes=4, bounding_box_format="xyxy")
boxes = tf.cast(tf.random.uniform((2, 5, 4), 0, 480, tf.int32), tf.float32)
classes = tf.cast(tf.random.uniform((2, 5, 1), 0, 4, tf.int32), tf.float32)
scores = tf.random.uniform((2, 5, 1), 0, 1, tf.float32)
predictions = tf.concat([boxes, classes, scores], axis=-1)
# RGB
images = tf.ones((2, 256, 512, 3))
boxes = layer(predictions, images)
self.assertEqual(boxes.shape, [2, None, 6])
# greyscale
images = tf.ones((2, 256, 512, 1))
boxes = layer(predictions, images)
self.assertEqual(boxes.shape, [2, None, 6])
def test_different_channels(self):
layer = NonMaxSuppression(classes=4, bounding_box_format="xyWH")
images = tf.ones((3, 480, 480, 5))
boxes = tf.cast(tf.random.uniform((3, 5, 4), 0, 480, tf.int32), tf.float32)
classes = tf.cast(tf.random.uniform((3, 5, 1), 0, 4, tf.int32), tf.float32)
scores = tf.random.uniform((3, 5, 1), 0, 1, tf.float32)
predictions = tf.concat([boxes, classes, scores], axis=-1)
boxes = layer(predictions, images)
self.assertEqual(boxes.shape, [3, None, 6])
def test_in_a_model(self):
input1 = tf.keras.layers.Input([5, 6])
input2 = tf.keras.layers.Input([480, 480, 3])
layer = NonMaxSuppression(classes=4, bounding_box_format="xyWH")
outputs = layer(input1, input2)
model = tf.keras.models.Model(inputs=[input1, input2], outputs=outputs)
images = tf.ones((3, 480, 480, 3))
boxes = tf.cast(tf.random.uniform((3, 5, 4), 0, 480, tf.int32), tf.float32)
classes = tf.cast(tf.random.uniform((3, 5, 1), 0, 4, tf.int32), tf.float32)
scores = tf.random.uniform((3, 5, 1), 0, 1, tf.float32)
predictions = tf.concat([boxes, classes, scores], axis=-1)
boxes = model([predictions, images])
self.assertEqual(boxes.shape, [3, None, 6])
def test_without_images(self):
layer = NonMaxSuppression(classes=4, bounding_box_format="xyWH")
boxes = tf.cast(tf.random.uniform((3, 5, 4), 0, 480, tf.int32), tf.float32)
classes = tf.cast(tf.random.uniform((3, 5, 1), 0, 4, tf.int32), tf.float32)
scores = tf.random.uniform((3, 5, 1), 0, 1, tf.float32)
predictions = tf.concat([boxes, classes, scores], axis=-1)
boxes = layer(predictions)
self.assertEqual(boxes.shape, [3, None, 6])
def test_ragged_output_with_differing_shapes(self):
layer = NonMaxSuppression(8, "xywh", iou_threshold=0.1)
images = tf.ones((2, 480, 480, 3))
predictions = tf.convert_to_tensor(
[
[
[0, 0, 1, 1, 4, 0.9],
[0, 0, 2, 3, 4, 0.76],
[4, 5, 3, 6, 3, 0.89],
[2, 2, 3, 3, 6, 0.04],
],
[
[0, 0, 5, 6, 4, 0.9],
[0, 0, 7, 3, 1, 0.76],
[4, 5, 5, 6, 4, 0.04],
[2, 1, 3, 3, 7, 0.48],
],
],
dtype=tf.float32,
)
boxes = layer(predictions, images)
self.assertEqual(boxes[0].shape, [2, 6])
self.assertEqual(boxes[1].shape, [3, 6])
def test_ragged_output_with_zero_boxes(self):
layer = NonMaxSuppression(8, "xywh", confidence_threshold=0.1)
images = tf.ones((2, 480, 480, 3))
boxes = tf.cast(tf.random.uniform((3, 5, 4), 0, 480, tf.int32), tf.float32)
classes = tf.cast(tf.random.uniform((3, 5, 1), 0, 4, tf.int32), tf.float32)
scores = tf.random.uniform((3, 5, 1), 0, 0.1, tf.float32)
predictions = tf.concat([boxes, classes, scores], axis=-1)
boxes = layer(predictions, images)
self.assertEqual(boxes[0].shape, [0, 6])
self.assertEqual(boxes[1].shape, [0, 6])
def test_input_box_shape(self):
layer = NonMaxSuppression(8, "xywh", confidence_threshold=0.1)
images = tf.ones((2, 480, 480, 3))
boxes = tf.cast(tf.random.uniform((3, 5, 5), 0, 480, tf.int32), tf.float32)
classes = tf.cast(tf.random.uniform((3, 5, 1), 0, 4, tf.int32), tf.float32)
scores = tf.random.uniform((3, 5, 1), 0, 0.1, tf.float32)
predictions = tf.concat([boxes, classes, scores], axis=-1)
with self.assertRaises(ValueError):
boxes = layer(predictions, images)
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
from tensorflow.keras import layers
from keras_cv import bounding_box
class RetinaNetLabelEncoder(layers.Layer):
"""Transforms the raw labels into targets for training.
This class has operations to generate targets for a batch of samples which
is made up of the input images, bounding boxes for the objects present and
their class ids.
Args:
bounding_box_format: The format of bounding boxes of input dataset. Refer
[to the keras.io docs](https://keras.io/api/keras_cv/bounding_box/formats/)
for more details on supported bounding box formats.
anchor_generator: `keras_cv.layers.AnchorGenerator` instance to produce anchor
boxes. Boxes are then used to encode labels on a per-image basis.
box_variance: The scaling factors used to scale the bounding box targets.
Defaults to (0.1, 0.1, 0.2, 0.2).
background_class: (Optional) The class ID used for the background class.
Defaults to -1.
ignore_class: (Optional) The class ID used for the ignore class. Defaults to -2.
"""
def __init__(
self,
bounding_box_format,
anchor_generator,
box_variance=(0.1, 0.1, 0.2, 0.2),
background_class=-1.0,
ignore_class=-2.0,
**kwargs,
):
super().__init__(**kwargs)
self.bounding_box_format = bounding_box_format
self.anchor_generator = anchor_generator
self.box_variance = tf.convert_to_tensor(box_variance, dtype=self.dtype)
self.background_class = background_class
self.ignore_class = ignore_class
self.built = True
def _match_anchor_boxes(
self, anchor_boxes, gt_boxes, match_iou=0.5, ignore_iou=0.4
):
"""Matches ground truth boxes to anchor boxes based on IOU.
1. Calculates the pairwise IOU for the M `anchor_boxes` and N `gt_boxes`
to get a `(M, N)` shaped matrix.
2. The ground truth box with the maximum IOU in each row is assigned to
the anchor box provided the IOU is greater than `match_iou`.
3. If the maximum IOU in a row is less than `ignore_iou`, the anchor
box is assigned with the background class.
4. The remaining anchor boxes that do not have any class assigned are
ignored during training.
Arguments:
anchor_boxes: A float tensor with the shape `(total_anchors, 4)`
representing all the anchor boxes for a given input image shape,
where each anchor box is of the format `[x, y, width, height]`.
gt_boxes: A float tensor with shape `(num_objects, 4)` representing
the ground truth boxes, where each box is of the format
`[x, y, width, height]`.
match_iou: A float value representing the minimum IOU threshold for
determining if a ground truth box can be assigned to an anchor box.
ignore_iou: A float value representing the IOU threshold under which
an anchor box is assigned to the background class.
Returns:
matched_gt_idx: Index of the matched object
positive_mask: A mask for anchor boxes that have been assigned ground
truth boxes.
ignore_mask: A mask for anchor boxes that need to by ignored during
training
"""
iou_matrix = bounding_box.compute_iou(
anchor_boxes, gt_boxes, bounding_box_format="xywh"
)
max_iou = tf.reduce_max(iou_matrix, axis=1)
matched_gt_idx = tf.argmax(iou_matrix, axis=1)
positive_mask = tf.greater_equal(max_iou, match_iou)
negative_mask = tf.less(max_iou, ignore_iou)
ignore_mask = tf.logical_not(tf.logical_or(positive_mask, negative_mask))
return (
matched_gt_idx,
tf.cast(positive_mask, dtype=self.dtype),
tf.cast(ignore_mask, dtype=self.dtype),
)
def _compute_box_target(self, anchor_boxes, matched_gt_boxes):
"""Transforms the ground truth boxes into targets for training"""
box_target = tf.concat(
[
(matched_gt_boxes[:, :2] - anchor_boxes[:, :2]) / anchor_boxes[:, 2:],
tf.math.log(matched_gt_boxes[:, 2:] / anchor_boxes[:, 2:]),
],
axis=-1,
)
box_target = box_target / self.box_variance
return box_target
def _encode_sample(self, gt_boxes, anchor_boxes):
"""Creates box and classification targets for a single sample"""
cls_ids = gt_boxes[:, 4]
gt_boxes = gt_boxes[:, :4]
cls_ids = tf.cast(cls_ids, dtype=self.dtype)
matched_gt_idx, positive_mask, ignore_mask = self._match_anchor_boxes(
anchor_boxes, gt_boxes
)
matched_gt_boxes = tf.gather(gt_boxes, matched_gt_idx)
box_target = self._compute_box_target(anchor_boxes, matched_gt_boxes)
matched_gt_cls_ids = tf.gather(cls_ids, matched_gt_idx)
cls_target = tf.where(
tf.not_equal(positive_mask, 1.0), self.background_class, matched_gt_cls_ids
)
cls_target = tf.where(tf.equal(ignore_mask, 1.0), self.ignore_class, cls_target)
cls_target = tf.expand_dims(cls_target, axis=-1)
label = tf.concat([box_target, cls_target], axis=-1)
# In the case that a box in the corner of an image matches with an all -1 box
# that is outside of the image, we should assign the box to the ignore class
# There are rare cases where a -1 box can be matched, resulting in a NaN during
# training. The unit test passing all -1s to the label encoder ensures that we
# properly handle this edge-case.
label = tf.where(
tf.expand_dims(tf.math.reduce_any(tf.math.is_nan(label), axis=-1), axis=-1),
self.ignore_class,
label,
)
return label
def call(self, images, target_boxes):
"""Creates box and classification targets for a batch"""
if isinstance(images, tf.RaggedTensor):
raise ValueError(
"`RetinaNetLabelEncoder`'s `call()` method does not "
"support RaggedTensor inputs for the `images` argument. Received "
f"`type(images)={type(images)}`."
)
target_boxes = bounding_box.convert_format(
target_boxes, source=self.bounding_box_format, target="xywh", images=images
)
anchor_boxes = self.anchor_generator(image_shape=tf.shape(images[0]))
anchor_boxes = tf.concat(list(anchor_boxes.values()), axis=0)
anchor_boxes = bounding_box.convert_format(
anchor_boxes,
source=self.anchor_generator.bounding_box_format,
target=self.bounding_box_format,
images=images[0],
)
if isinstance(target_boxes, tf.RaggedTensor):
target_boxes = target_boxes.to_tensor(default_value=-1)
result = tf.map_fn(
elems=(target_boxes),
fn=lambda box_set: self._encode_sample(box_set, anchor_boxes),
)
return bounding_box.convert_format(
result, source="xywh", target=self.bounding_box_format, images=images
)
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
from keras_cv import layers as cv_layers
class RetinaNetLabelEncoderTest(tf.test.TestCase):
def test_label_encoder_output_shapes(self):
images_shape = (8, 512, 512, 3)
boxes_shape = (8, 10, 5)
images = tf.random.uniform(shape=images_shape)
boxes = tf.random.uniform(
shape=boxes_shape, minval=0.0, maxval=1.0, dtype=tf.float32
)
strides = [2**i for i in range(3, 8)]
scales = [2**x for x in [0, 1 / 3, 2 / 3]]
sizes = [x**2 for x in [32.0, 64.0, 128.0, 256.0, 512.0]]
aspect_ratios = [0.5, 1.0, 2.0]
anchor_generator = cv_layers.AnchorGenerator(
bounding_box_format="yxyx",
sizes=sizes,
aspect_ratios=aspect_ratios,
scales=scales,
strides=strides,
)
encoder = cv_layers.RetinaNetLabelEncoder(
anchor_generator=anchor_generator,
bounding_box_format="rel_xyxy",
)
result = encoder(images, boxes)
self.assertEqual(result.shape, [8, 49104, 5])
def test_all_negative_1(self):
images_shape = (8, 512, 512, 3)
boxes_shape = (8, 10, 5)
images = tf.random.uniform(shape=images_shape)
boxes = -tf.ones(shape=boxes_shape, dtype=tf.float32)
strides = [2**i for i in range(3, 8)]
scales = [2**x for x in [0, 1 / 3, 2 / 3]]
sizes = [x**2 for x in [32.0, 64.0, 128.0, 256.0, 512.0]]
aspect_ratios = [0.5, 1.0, 2.0]
anchor_generator = cv_layers.AnchorGenerator(
bounding_box_format="yxyx",
sizes=sizes,
aspect_ratios=aspect_ratios,
scales=scales,
strides=strides,
)
encoder = cv_layers.RetinaNetLabelEncoder(
anchor_generator=anchor_generator,
bounding_box_format="rel_xyxy",
)
result = encoder(images, boxes)
self.assertFalse(tf.math.reduce_any(tf.math.is_nan(result)))
def test_ragged_encoding(self):
images_shape = (2, 512, 512, 3)
images = tf.random.uniform(shape=images_shape)
y_true = tf.ragged.stack(
[
tf.constant([[0, 0, 10, 10, 1], [5, 5, 10, 10, 1]], tf.float32),
tf.constant([[0, 0, 10, 10, 1]], tf.float32),
]
)
strides = [2**i for i in range(3, 8)]
scales = [2**x for x in [0, 1 / 3, 2 / 3]]
sizes = [x**2 for x in [32.0, 64.0, 128.0, 256.0, 512.0]]
aspect_ratios = [0.5, 1.0, 2.0]
anchor_generator = cv_layers.AnchorGenerator(
bounding_box_format="xywh",
sizes=sizes,
aspect_ratios=aspect_ratios,
scales=scales,
strides=strides,
)
encoder = cv_layers.RetinaNetLabelEncoder(
anchor_generator=anchor_generator,
bounding_box_format="xywh",
)
result = encoder(images, y_true)
# 49104 is the anchor generator shape
self.assertEqual(result.shape, [2, 49104, 5])
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict
from typing import Mapping
from typing import Optional
from typing import Tuple
import tensorflow as tf
from keras_cv import bounding_box
def _feature_bilinear_interpolation(
features: tf.Tensor, kernel_y: tf.Tensor, kernel_x: tf.Tensor
) -> tf.Tensor:
"""
Feature bilinear interpolation.
The RoIAlign feature f can be computed by bilinear interpolation
of four neighboring feature points f0, f1, f2, and f3.
f(y, x) = [hy, ly] * [[f00, f01], * [hx, lx]^T
[f10, f11]]
f(y, x) = (hy*hx)f00 + (hy*lx)f01 + (ly*hx)f10 + (lx*ly)f11
f(y, x) = w00*f00 + w01*f01 + w10*f10 + w11*f11
kernel_y = [hy, ly]
kernel_x = [hx, lx]
Args:
features: The features are in shape of [batch_size, num_boxes, output_size *
2, output_size * 2, num_filters].
kernel_y: Tensor of size [batch_size, boxes, output_size, 2, 1].
kernel_x: Tensor of size [batch_size, boxes, output_size, 2, 1].
Returns:
A 5-D tensor representing feature crop of shape
[batch_size, num_boxes, output_size, output_size, num_filters].
"""
features_shape = tf.shape(features)
batch_size, num_boxes, output_size, num_filters = (
features_shape[0],
features_shape[1],
features_shape[2],
features_shape[4],
)
output_size = output_size // 2
kernel_y = tf.reshape(kernel_y, [batch_size, num_boxes, output_size * 2, 1])
kernel_x = tf.reshape(kernel_x, [batch_size, num_boxes, 1, output_size * 2])
# Use implicit broadcast to generate the interpolation kernel. The
# multiplier `4` is for avg pooling.
interpolation_kernel = kernel_y * kernel_x * 4
# Interpolate the gathered features with computed interpolation kernels.
features *= tf.cast(
tf.expand_dims(interpolation_kernel, axis=-1), dtype=features.dtype
)
features = tf.reshape(
features,
[batch_size * num_boxes, output_size * 2, output_size * 2, num_filters],
)
features = tf.nn.avg_pool(features, [1, 2, 2, 1], [1, 2, 2, 1], "VALID")
features = tf.reshape(
features, [batch_size, num_boxes, output_size, output_size, num_filters]
)
return features
def _compute_grid_positions(
boxes: tf.Tensor, boundaries: tf.Tensor, output_size: int, sample_offset: float
) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor, tf.Tensor]:
"""
Computes the grid position w.r.t. the corresponding feature map.
Args:
boxes: a 3-D tensor of shape [batch_size, num_boxes, 4] encoding the
information of each box w.r.t. the corresponding feature map.
boxes[:, :, 0:2] are the grid position in (y, x) (float) of the top-left
corner of each box. boxes[:, :, 2:4] are the box sizes in (h, w) (float)
in terms of the number of pixels of the corresponding feature map size.
boundaries: a 3-D tensor of shape [batch_size, num_boxes, 2] representing
the boundary (in (y, x)) of the corresponding feature map for each box.
Any resampled grid points that go beyond the bounary will be clipped.
output_size: a scalar indicating the output crop size.
sample_offset: a float number in [0, 1] indicates the subpixel sample offset
from grid point.
Returns:
kernel_y: Tensor of size [batch_size, boxes, output_size, 2, 1].
kernel_x: Tensor of size [batch_size, boxes, output_size, 2, 1].
box_grid_y0y1: Tensor of size [batch_size, boxes, output_size, 2]
box_grid_x0x1: Tensor of size [batch_size, boxes, output_size, 2]
"""
boxes_shape = tf.shape(boxes)
batch_size, num_boxes = boxes_shape[0], boxes_shape[1]
if batch_size is None:
batch_size = tf.shape(boxes)[0]
box_grid_x = []
box_grid_y = []
for i in range(output_size):
box_grid_x.append(
boxes[:, :, 1] + (i + sample_offset) * boxes[:, :, 3] / output_size
)
box_grid_y.append(
boxes[:, :, 0] + (i + sample_offset) * boxes[:, :, 2] / output_size
)
box_grid_x = tf.stack(box_grid_x, axis=2)
box_grid_y = tf.stack(box_grid_y, axis=2)
box_grid_y0 = tf.floor(box_grid_y)
box_grid_x0 = tf.floor(box_grid_x)
box_grid_x0 = tf.maximum(tf.cast(0.0, dtype=box_grid_x0.dtype), box_grid_x0)
box_grid_y0 = tf.maximum(tf.cast(0.0, dtype=box_grid_y0.dtype), box_grid_y0)
box_grid_x0 = tf.minimum(box_grid_x0, tf.expand_dims(boundaries[:, :, 1], -1))
box_grid_x1 = tf.minimum(box_grid_x0 + 1, tf.expand_dims(boundaries[:, :, 1], -1))
box_grid_y0 = tf.minimum(box_grid_y0, tf.expand_dims(boundaries[:, :, 0], -1))
box_grid_y1 = tf.minimum(box_grid_y0 + 1, tf.expand_dims(boundaries[:, :, 0], -1))
box_gridx0x1 = tf.stack([box_grid_x0, box_grid_x1], axis=-1)
box_gridy0y1 = tf.stack([box_grid_y0, box_grid_y1], axis=-1)
# The RoIAlign feature f can be computed by bilinear interpolation of four
# neighboring feature points f0, f1, f2, and f3.
# f(y, x) = [hy, ly] * [[f00, f01], * [hx, lx]^T
# [f10, f11]]
# f(y, x) = (hy*hx)f00 + (hy*lx)f01 + (ly*hx)f10 + (lx*ly)f11
# f(y, x) = w00*f00 + w01*f01 + w10*f10 + w11*f11
ly = box_grid_y - box_grid_y0
lx = box_grid_x - box_grid_x0
hy = 1.0 - ly
hx = 1.0 - lx
kernel_y = tf.reshape(
tf.stack([hy, ly], axis=3), [batch_size, num_boxes, output_size, 2, 1]
)
kernel_x = tf.reshape(
tf.stack([hx, lx], axis=3), [batch_size, num_boxes, output_size, 2, 1]
)
return kernel_y, kernel_x, box_gridy0y1, box_gridx0x1
def multilevel_crop_and_resize(
features: Dict[str, tf.Tensor],
boxes: tf.Tensor,
output_size: int = 7,
sample_offset: float = 0.5,
) -> tf.Tensor:
"""
Crop and resize on multilevel feature pyramid.
Generate the (output_size, output_size) set of pixels for each input box
by first locating the box into the correct feature level, and then cropping
and resizing it using the correspoding feature map of that level.
Args:
features: A dictionary with key as pyramid level and value as features. The
features are in shape of [batch_size, height_l, width_l, num_filters].
boxes: A 3-D Tensor of shape [batch_size, num_boxes, 4]. Each row represents
a box with [y1, x1, y2, x2] in un-normalized coordinates.
output_size: A scalar to indicate the output crop size.
sample_offset: a float number in [0, 1] indicates the subpixel sample offset
from grid point.
Returns:
A 5-D tensor representing feature crop of shape
[batch_size, num_boxes, output_size, output_size, num_filters].
"""
with tf.name_scope("multilevel_crop_and_resize"):
levels = list(features.keys())
min_level = int(min(levels))
max_level = int(max(levels))
features_shape = tf.shape(features[min_level])
batch_size, max_feature_height, max_feature_width, num_filters = (
features_shape[0],
features_shape[1],
features_shape[2],
features_shape[3],
)
num_boxes = tf.shape(boxes)[1]
# Stack feature pyramid into a features_all of shape
# [batch_size, levels, height, width, num_filters].
features_all = []
feature_heights = []
feature_widths = []
for level in range(min_level, max_level + 1):
shape = features[level].get_shape().as_list()
feature_heights.append(shape[1])
feature_widths.append(shape[2])
# Concat tensor of [batch_size, height_l * width_l, num_filters] for each
# levels.
features_all.append(
tf.reshape(features[level], [batch_size, -1, num_filters])
)
features_r2 = tf.reshape(tf.concat(features_all, 1), [-1, num_filters])
# Calculate height_l * width_l for each level.
level_dim_sizes = [
feature_widths[i] * feature_heights[i] for i in range(len(feature_widths))
]
# level_dim_offsets is accumulated sum of level_dim_size.
level_dim_offsets = [0]
for i in range(len(feature_widths) - 1):
level_dim_offsets.append(level_dim_offsets[i] + level_dim_sizes[i])
batch_dim_size = level_dim_offsets[-1] + level_dim_sizes[-1]
level_dim_offsets = tf.constant(level_dim_offsets, tf.int32)
height_dim_sizes = tf.constant(feature_widths, tf.int32)
# Assigns boxes to the right level.
box_width = boxes[:, :, 3] - boxes[:, :, 1]
box_height = boxes[:, :, 2] - boxes[:, :, 0]
areas_sqrt = tf.sqrt(
tf.cast(box_height, tf.float32) * tf.cast(box_width, tf.float32)
)
# following the FPN paper to divide by 224.
levels = tf.cast(
tf.math.floordiv(
tf.math.log(tf.math.divide_no_nan(areas_sqrt, 224.0)), tf.math.log(2.0)
)
+ 4.0,
dtype=tf.int32,
)
# Maps levels between [min_level, max_level].
levels = tf.minimum(max_level, tf.maximum(levels, min_level))
# Projects box location and sizes to corresponding feature levels.
scale_to_level = tf.cast(
tf.pow(tf.constant(2.0), tf.cast(levels, tf.float32)), dtype=boxes.dtype
)
boxes /= tf.expand_dims(scale_to_level, axis=2)
box_width /= scale_to_level
box_height /= scale_to_level
boxes = tf.concat(
[
boxes[:, :, 0:2],
tf.expand_dims(box_height, -1),
tf.expand_dims(box_width, -1),
],
axis=-1,
)
# Maps levels to [0, max_level-min_level].
levels -= min_level
level_strides = tf.pow([[2.0]], tf.cast(levels, tf.float32))
boundary = tf.cast(
tf.concat(
[
tf.expand_dims(
[[tf.cast(max_feature_height, tf.float32)]] / level_strides - 1,
axis=-1,
),
tf.expand_dims(
[[tf.cast(max_feature_width, tf.float32)]] / level_strides - 1,
axis=-1,
),
],
axis=-1,
),
boxes.dtype,
)
# Compute grid positions.
kernel_y, kernel_x, box_gridy0y1, box_gridx0x1 = _compute_grid_positions(
boxes, boundary, output_size, sample_offset
)
x_indices = tf.cast(
tf.reshape(box_gridx0x1, [batch_size, num_boxes, output_size * 2]),
dtype=tf.int32,
)
y_indices = tf.cast(
tf.reshape(box_gridy0y1, [batch_size, num_boxes, output_size * 2]),
dtype=tf.int32,
)
batch_size_offset = tf.tile(
tf.reshape(tf.range(batch_size) * batch_dim_size, [batch_size, 1, 1, 1]),
[1, num_boxes, output_size * 2, output_size * 2],
)
# Get level offset for each box. Each box belongs to one level.
levels_offset = tf.tile(
tf.reshape(
tf.gather(level_dim_offsets, levels), [batch_size, num_boxes, 1, 1]
),
[1, 1, output_size * 2, output_size * 2],
)
y_indices_offset = tf.tile(
tf.reshape(
y_indices * tf.expand_dims(tf.gather(height_dim_sizes, levels), -1),
[batch_size, num_boxes, output_size * 2, 1],
),
[1, 1, 1, output_size * 2],
)
x_indices_offset = tf.tile(
tf.reshape(x_indices, [batch_size, num_boxes, 1, output_size * 2]),
[1, 1, output_size * 2, 1],
)
indices = tf.reshape(
batch_size_offset + levels_offset + y_indices_offset + x_indices_offset,
[-1],
)
# TODO(tanzhenyu): replace tf.gather with tf.gather_nd and try to get similar
# performance.
features_per_box = tf.reshape(
tf.gather(features_r2, indices),
[batch_size, num_boxes, output_size * 2, output_size * 2, num_filters],
)
# Bilinear interpolation.
features_per_box = _feature_bilinear_interpolation(
features_per_box, kernel_y, kernel_x
)
return features_per_box
# TODO(tanzheny): Remove this implementation once roi_pool has better performance.
class _ROIAligner(tf.keras.layers.Layer):
"""Performs ROIAlign for the second stage processing."""
def __init__(
self, bounding_box_format, target_size=7, sample_offset: float = 0.5, **kwargs
):
"""
Generates ROI Aligner.
Args:
bounding_box_format: the input format for boxes.
crop_size: An `int` of the output size of the cropped features.
sample_offset: A `float` in [0, 1] of the subpixel sample offset.
**kwargs: Additional keyword arguments passed to Layer.
"""
self._config_dict = {
"bounding_box_format": bounding_box_format,
"crop_size": target_size,
"sample_offset": sample_offset,
}
super().__init__(**kwargs)
def call(
self,
features: Mapping[str, tf.Tensor],
boxes: tf.Tensor,
training: Optional[bool] = None,
):
"""
Args:
features: A dictionary with key as pyramid level and value as features.
The features are in shape of
[batch_size, height_l, width_l, num_filters].
boxes: A 3-D `tf.Tensor` of shape [batch_size, num_boxes, 4]. Each row
represents a box with [y1, x1, y2, x2] in un-normalized coordinates.
from grid point.
training: A `bool` of whether it is in training mode.
Returns:
A 5-D `tf.Tensor` representing feature crop of shape
[batch_size, num_boxes, crop_size, crop_size, num_filters].
"""
boxes = bounding_box.convert_format(
boxes, source=self._config_dict["bounding_box_format"], target="yxyx"
)
roi_features = multilevel_crop_and_resize(
features,
boxes,
output_size=self._config_dict["crop_size"],
sample_offset=self._config_dict["sample_offset"],
)
return roi_features
def get_config(self):
return self._config_dict
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Mapping
from typing import Optional
from typing import Tuple
from typing import Union
import tensorflow as tf
from keras_cv import bounding_box
class ROIGenerator(tf.keras.layers.Layer):
"""
Generates region of interests (ROI, or proposal) from scores.
Mainly used in Region CNN (RCNN) networks.
This works for a multi-level input, both boxes and scores are dictionary
inputs with the same set of keys.
Users can configure top k and threshold differently in train and inference.
Users can choose to combine all levels if NMS across all levels are desired.
The following steps are applied to pair of (boxes, scores):
1) pre_nms_topk scores and boxes sorted and selected per level
2) nms applied and selected post_nms_topk scores and ROIs per level
3) combined scores and ROIs across all levels
4) post_nms_topk scores and ROIs sorted and selected
Args:
bounding_box_format: a case-insensitive string.
For detailed information on the supported format, see the
[KerasCV bounding box documentation](https://keras.io/api/keras_cv/bounding_box/formats/).
pre_nms_topk_train: int. number of top k scoring proposals to keep before applying NMS in training mode.
When RPN is run on multiple feature maps / levels (as in FPN) this number is per
feature map / level.
nms_score_threshold_train: float. score threshold to use for NMS in training mode.
nms_iou_threshold_train: float. IOU threshold to use for NMS in training mode.
post_nms_topk_train: int. number of top k scoring proposals to keep after applying NMS in training mode.
When RPN is run on multiple feature maps / levels (as in FPN) this number is per
feature map / level.
pre_nms_topk_test: int. number of top k scoring proposals to keep before applying NMS in inference mode.
When RPN is run on multiple feature maps / levels (as in FPN) this number is per
feature map / level.
nms_score_threshold_test: float. score threshold to use for NMS in inference mode.
nms_iou_threshold_test: float. IOU threshold to use for NMS in inference mode.
post_nms_topk_test: int. number of top k scoring proposals to keep after applying NMS in inference mode.
When RPN is run on multiple feature maps / levels (as in FPN) this number is per
feature map / level.
Usage:
```python
roi_generator = ROIGenerator("xyxy")
boxes = {2: tf.random.normal([32, 5, 4])}
scores = {2: tf.random.normal([32, 5])}
rois, roi_scores = roi_generator(boxes, scores, training=True)
```
"""
def __init__(
self,
bounding_box_format,
pre_nms_topk_train: int = 2000,
nms_score_threshold_train: float = 0.0,
nms_iou_threshold_train: float = 0.7,
post_nms_topk_train: int = 1000,
pre_nms_topk_test: int = 1000,
nms_score_threshold_test: float = 0.0,
nms_iou_threshold_test: float = 0.7,
post_nms_topk_test: int = 1000,
**kwargs,
):
super().__init__(**kwargs)
self.bounding_box_format = bounding_box_format
self.pre_nms_topk_train = pre_nms_topk_train
self.nms_score_threshold_train = nms_score_threshold_train
self.nms_iou_threshold_train = nms_iou_threshold_train
self.post_nms_topk_train = post_nms_topk_train
self.pre_nms_topk_test = pre_nms_topk_test
self.nms_score_threshold_test = nms_score_threshold_test
self.nms_iou_threshold_test = nms_iou_threshold_test
self.post_nms_topk_test = post_nms_topk_test
self.built = True
def call(
self,
multi_level_boxes: Union[tf.Tensor, Mapping[int, tf.Tensor]],
multi_level_scores: Union[tf.Tensor, Mapping[int, tf.Tensor]],
training: Optional[bool] = None,
) -> Tuple[tf.Tensor, tf.Tensor]:
"""
Args:
multi_level_boxes: float Tensor. A dictionary or single Tensor of boxes, one per level. shape is
[batch_size, num_boxes, 4] each level, in `bounding_box_format`.
The boxes from RPNs are usually encoded as deltas w.r.t to anchors,
they need to be decoded before passing in here.
multi_level_scores: float Tensor. A dictionary or single Tensor of scores, usually confidence scores,
one per level. shape is [batch_size, num_boxes] each level.
Returns:
rois: float Tensor of [batch_size, post_nms_topk, 4]
roi_scores: float Tensor of [batch_size, post_nms_topk]
"""
if training:
pre_nms_topk = self.pre_nms_topk_train
post_nms_topk = self.post_nms_topk_train
nms_score_threshold = self.nms_score_threshold_train
nms_iou_threshold = self.nms_iou_threshold_train
else:
pre_nms_topk = self.pre_nms_topk_test
post_nms_topk = self.post_nms_topk_test
nms_score_threshold = self.nms_score_threshold_test
nms_iou_threshold = self.nms_iou_threshold_test
def per_level_gen(boxes, scores):
scores_shape = scores.get_shape().as_list()
# scores can also be [batch_size, num_boxes, 1]
if len(scores_shape) == 3:
scores = tf.squeeze(scores, axis=-1)
_, num_boxes = scores.get_shape().as_list()
level_pre_nms_topk = min(num_boxes, pre_nms_topk)
level_post_nms_topk = min(num_boxes, post_nms_topk)
scores, sorted_indices = tf.nn.top_k(
scores, k=level_pre_nms_topk, sorted=True
)
boxes = tf.gather(boxes, sorted_indices, batch_dims=1)
# convert from input format to yxyx for the TF NMS operation
boxes = bounding_box.convert_format(
boxes,
source=self.bounding_box_format,
target="yxyx",
)
# TODO(tanzhenyu): consider supporting soft / batched nms for accl
selected_indices, num_valid = tf.image.non_max_suppression_padded(
boxes,
scores,
max_output_size=level_post_nms_topk,
iou_threshold=nms_iou_threshold,
score_threshold=nms_score_threshold,
pad_to_max_output_size=True,
sorted_input=True,
canonicalized_coordinates=True,
)
# convert back to input format
boxes = bounding_box.convert_format(
boxes,
source="yxyx",
target=self.bounding_box_format,
)
level_rois = tf.gather(boxes, selected_indices, batch_dims=1)
level_roi_scores = tf.gather(scores, selected_indices, batch_dims=1)
level_rois = level_rois * tf.cast(
tf.reshape(tf.range(level_post_nms_topk), [1, -1, 1])
< tf.reshape(num_valid, [-1, 1, 1]),
level_rois.dtype,
)
level_roi_scores = level_roi_scores * tf.cast(
tf.reshape(tf.range(level_post_nms_topk), [1, -1])
< tf.reshape(num_valid, [-1, 1]),
level_roi_scores.dtype,
)
return level_rois, level_roi_scores
if not isinstance(multi_level_boxes, dict):
return per_level_gen(multi_level_boxes, multi_level_scores)
rois = []
roi_scores = []
for level in sorted(multi_level_scores.keys()):
boxes = multi_level_boxes[level]
scores = multi_level_scores[level]
level_rois, level_roi_scores = per_level_gen(boxes, scores)
rois.append(level_rois)
roi_scores.append(level_roi_scores)
rois = tf.concat(rois, axis=1)
roi_scores = tf.concat(roi_scores, axis=1)
_, num_valid_rois = roi_scores.get_shape().as_list()
overall_top_k = min(num_valid_rois, post_nms_topk)
roi_scores, sorted_indices = tf.nn.top_k(
roi_scores, k=overall_top_k, sorted=True
)
rois = tf.gather(rois, sorted_indices, batch_dims=1)
return rois, roi_scores
def get_config(self):
config = {
"bounding_box_format": self.bounding_box_format,
"pre_nms_topk_train": self.pre_nms_topk_train,
"nms_score_threshold_train": self.nms_score_threshold_train,
"nms_iou_threshold_train": self.nms_iou_threshold_train,
"post_nms_topk_train": self.post_nms_topk_train,
"pre_nms_topk_test": self.pre_nms_topk_test,
"nms_score_threshold_test": self.nms_score_threshold_test,
"nms_iou_threshold_test": self.nms_iou_threshold_test,
"post_nms_topk_test": self.post_nms_topk_test,
}
base_config = super().get_config()
return dict(list(base_config.items()) + list(config.items()))
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
from keras_cv.layers.object_detection.roi_generator import ROIGenerator
class ROIGeneratorTest(tf.test.TestCase):
def test_single_tensor(self):
roi_generator = ROIGenerator("xyxy", nms_iou_threshold_train=0.96)
rpn_boxes = tf.constant(
[
[[0, 0, 10, 10], [0.1, 0.1, 9.9, 9.9], [5, 5, 10, 10], [2, 2, 8, 8]],
]
)
expected_rois = tf.gather(rpn_boxes, [[1, 3, 2]], batch_dims=1)
expected_rois = tf.concat([expected_rois, tf.zeros([1, 1, 4])], axis=1)
rpn_scores = tf.constant(
[
[0.6, 0.9, 0.2, 0.3],
]
)
# selecting the 1st, then 3rd, then 2nd as they don't overlap
# 0th box overlaps with 1st box
expected_roi_scores = tf.gather(rpn_scores, [[1, 3, 2]], batch_dims=1)
expected_roi_scores = tf.concat([expected_roi_scores, tf.zeros([1, 1])], axis=1)
rois, roi_scores = roi_generator(rpn_boxes, rpn_scores, training=True)
self.assertAllClose(expected_rois, rois)
self.assertAllClose(expected_roi_scores, roi_scores)
def test_single_level_single_batch_roi_ignore_box(self):
roi_generator = ROIGenerator("xyxy", nms_iou_threshold_train=0.96)
rpn_boxes = tf.constant(
[
[[0, 0, 10, 10], [0.1, 0.1, 9.9, 9.9], [5, 5, 10, 10], [2, 2, 8, 8]],
]
)
expected_rois = tf.gather(rpn_boxes, [[1, 3, 2]], batch_dims=1)
expected_rois = tf.concat([expected_rois, tf.zeros([1, 1, 4])], axis=1)
rpn_boxes = {2: rpn_boxes}
rpn_scores = tf.constant(
[
[0.6, 0.9, 0.2, 0.3],
]
)
# selecting the 1st, then 3rd, then 2nd as they don't overlap
# 0th box overlaps with 1st box
expected_roi_scores = tf.gather(rpn_scores, [[1, 3, 2]], batch_dims=1)
expected_roi_scores = tf.concat([expected_roi_scores, tf.zeros([1, 1])], axis=1)
rpn_scores = {2: rpn_scores}
rois, roi_scores = roi_generator(rpn_boxes, rpn_scores, training=True)
self.assertAllClose(expected_rois, rois)
self.assertAllClose(expected_roi_scores, roi_scores)
def test_single_level_single_batch_roi_all_box(self):
# for iou between 1st and 2nd box is 0.9604, so setting to 0.97 to
# such that NMS would treat them as different ROIs
roi_generator = ROIGenerator("xyxy", nms_iou_threshold_train=0.97)
rpn_boxes = tf.constant(
[
[[0, 0, 10, 10], [0.1, 0.1, 9.9, 9.9], [5, 5, 10, 10], [2, 2, 8, 8]],
]
)
expected_rois = tf.gather(rpn_boxes, [[1, 0, 3, 2]], batch_dims=1)
rpn_boxes = {2: rpn_boxes}
rpn_scores = tf.constant(
[
[0.6, 0.9, 0.2, 0.3],
]
)
# selecting the 1st, then 0th, then 3rd, then 2nd as they don't overlap
expected_roi_scores = tf.gather(rpn_scores, [[1, 0, 3, 2]], batch_dims=1)
rpn_scores = {2: rpn_scores}
rois, roi_scores = roi_generator(rpn_boxes, rpn_scores, training=True)
self.assertAllClose(expected_rois, rois)
self.assertAllClose(expected_roi_scores, roi_scores)
def test_single_level_propose_rois(self):
roi_generator = ROIGenerator("xyxy")
rpn_boxes = tf.constant(
[
[[0, 0, 10, 10], [0.1, 0.1, 9.9, 9.9], [5, 5, 10, 10], [2, 2, 8, 8]],
[[2, 2, 4, 4], [3, 3, 6, 6], [3.1, 3.1, 6.1, 6.1], [1, 1, 8, 8]],
]
)
expected_rois = tf.gather(rpn_boxes, [[1, 3, 2], [1, 3, 0]], batch_dims=1)
expected_rois = tf.concat([expected_rois, tf.zeros([2, 1, 4])], axis=1)
rpn_boxes = {2: rpn_boxes}
rpn_scores = tf.constant([[0.6, 0.9, 0.2, 0.3], [0.1, 0.8, 0.3, 0.5]])
# 1st batch -- selecting the 1st, then 3rd, then 2nd as they don't overlap
# 2nd batch -- selecting the 1st, then 3rd, then 0th as they don't overlap
expected_roi_scores = tf.gather(
rpn_scores, [[1, 3, 2], [1, 3, 0]], batch_dims=1
)
expected_roi_scores = tf.concat([expected_roi_scores, tf.zeros([2, 1])], axis=1)
rpn_scores = {2: rpn_scores}
rois, roi_scores = roi_generator(rpn_boxes, rpn_scores, training=True)
self.assertAllClose(expected_rois, rois)
self.assertAllClose(expected_roi_scores, roi_scores)
def test_two_level_single_batch_propose_rois_ignore_box(self):
roi_generator = ROIGenerator("xyxy")
rpn_boxes = tf.constant(
[
[[0, 0, 10, 10], [0.1, 0.1, 9.9, 9.9], [5, 5, 10, 10], [2, 2, 8, 8]],
[[2, 2, 4, 4], [3, 3, 6, 6], [3.1, 3.1, 6.1, 6.1], [1, 1, 8, 8]],
]
)
expected_rois = tf.constant(
[
[
[0.1, 0.1, 9.9, 9.9],
[3, 3, 6, 6],
[1, 1, 8, 8],
[2, 2, 8, 8],
[5, 5, 10, 10],
[2, 2, 4, 4],
[0, 0, 0, 0],
[0, 0, 0, 0],
]
]
)
rpn_boxes = {2: rpn_boxes[0:1], 3: rpn_boxes[1:2]}
rpn_scores = tf.constant([[0.6, 0.9, 0.2, 0.3], [0.1, 0.8, 0.3, 0.5]])
# 1st batch -- selecting the 1st, then 3rd, then 2nd as they don't overlap
# 2nd batch -- selecting the 1st, then 3rd, then 0th as they don't overlap
expected_roi_scores = [
[
0.9,
0.8,
0.5,
0.3,
0.2,
0.1,
0.0,
0.0,
]
]
rpn_scores = {2: rpn_scores[0:1], 3: rpn_scores[1:2]}
rois, roi_scores = roi_generator(rpn_boxes, rpn_scores, training=True)
self.assertAllClose(expected_rois, rois)
self.assertAllClose(expected_roi_scores, roi_scores)
def test_two_level_single_batch_propose_rois_all_box(self):
roi_generator = ROIGenerator("xyxy", nms_iou_threshold_train=0.99)
rpn_boxes = tf.constant(
[
[[0, 0, 10, 10], [0.1, 0.1, 9.9, 9.9], [5, 5, 10, 10], [2, 2, 8, 8]],
[[2, 2, 4, 4], [3, 3, 6, 6], [3.1, 3.1, 6.1, 6.1], [1, 1, 8, 8]],
]
)
expected_rois = tf.constant(
[
[
[0.1, 0.1, 9.9, 9.9],
[3, 3, 6, 6],
[0, 0, 10, 10],
[1, 1, 8, 8],
[2, 2, 8, 8],
[3.1, 3.1, 6.1, 6.1],
[5, 5, 10, 10],
[2, 2, 4, 4],
]
]
)
rpn_boxes = {2: rpn_boxes[0:1], 3: rpn_boxes[1:2]}
rpn_scores = tf.constant([[0.6, 0.9, 0.2, 0.3], [0.1, 0.8, 0.3, 0.5]])
# 1st batch -- selecting the 1st, then 0th, then 3rd, then 2nd as they don't overlap
# 2nd batch -- selecting the 1st, then 3rd, then 2nd, then 0th as they don't overlap
expected_roi_scores = [
[
0.9,
0.8,
0.6,
0.5,
0.3,
0.3,
0.2,
0.1,
]
]
rpn_scores = {2: rpn_scores[0:1], 3: rpn_scores[1:2]}
rois, roi_scores = roi_generator(rpn_boxes, rpn_scores, training=True)
self.assertAllClose(expected_rois, rois)
self.assertAllClose(expected_roi_scores, roi_scores)
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
from keras_cv import bounding_box
class ROIPooler(tf.keras.layers.Layer):
"""
Pooling feature map of dynamic shape into region of interest (ROI) of fixed shape.
Mainly used in Region CNN (RCNN) networks. This works for a single-level
input feature map.
This layer splits the feature map into [target_size[0], target_size[1]] areas,
and performs max pooling for each area. The area coordinates will be quantized.
Args:
bounding_box_format: a case-insensitive string.
For detailed information on the supported format, see the
[KerasCV bounding box documentation](https://keras.io/api/keras_cv/bounding_box/formats/).
target_size: List or Tuple of 2 integers of the pooled shape
image_shape: List of Tuple of 3 integers, or `TensorShape` of the input image shape.
Usage:
```python
feature_map = tf.random.normal([2, 16, 16, 512])
roi_pooler = ROIPooler(bounding_box_format="yxyx", target_size=[7, 7],
image_shape=[224, 224, 3])
rois = tf.constant([[[15., 30., 25., 45.]], [[22., 1., 30., 32.]]])
pooled_feature_map = roi_pooler(feature_map, rois)
```
"""
def __init__(
self,
bounding_box_format,
# TODO(consolidate size vs shape for KPL and here)
target_size,
image_shape,
**kwargs,
):
if not isinstance(target_size, (tuple, list)):
raise ValueError(
f"Expected `target_size` to be tuple or list, got {type(target_size)}"
)
if len(target_size) != 2:
raise ValueError(
f"Expected `target_size` to be size 2, got {len(target_size)}"
)
if image_shape[0] is None or image_shape[1] is None or image_shape[2] is None:
raise ValueError(
f"`image_shape` cannot have dynamic shape, got {image_shape}"
)
super().__init__(**kwargs)
self.bounding_box_format = bounding_box_format
self.target_height = target_size[0]
self.target_width = target_size[1]
self.image_shape = image_shape
self.built = True
def call(self, feature_map, rois):
"""
Args:
feature_map: [batch_size, H, W, C] float Tensor, the feature map extracted from image.
rois: [batch_size, N, 4] float Tensor, the region of interests to be pooled.
Returns:
pooled_feature_map: [batch_size, N, target_size, C] float Tensor
"""
# convert to relative format given feature map shape != image shape
rois = bounding_box.convert_format(
rois,
source=self.bounding_box_format,
target="rel_yxyx",
image_shape=self.image_shape,
)
pooled_feature_map = tf.vectorized_map(
self._pool_single_sample, (feature_map, rois)
)
return pooled_feature_map
def _pool_single_sample(self, args):
"""
Args: tuple of
feature_map: [H, W, C] float Tensor
rois: [N, 4] float Tensor
Returns:
pooled_feature_map: [target_size, C] float Tensor
"""
feature_map, rois = args
num_rois = rois.get_shape().as_list()[0]
height, width, channel = feature_map.get_shape().as_list()
# TODO (consider vectorize it for better performance)
for n in range(num_rois):
# [4]
roi = rois[n, :]
y_start = height * roi[0]
x_start = width * roi[1]
region_height = height * (roi[2] - roi[0])
region_width = width * (roi[3] - roi[1])
h_step = region_height / self.target_height
w_step = region_width / self.target_width
regions = []
for i in range(self.target_height):
for j in range(self.target_width):
height_start = y_start + i * h_step
height_end = height_start + h_step
height_start = tf.cast(height_start, tf.int32)
height_end = tf.cast(height_end, tf.int32)
# if feature_map shape smaller than roi, h_step would be 0
# in this case the result will be feature_map[0, 0, ...]
height_end = height_start + tf.maximum(1, height_end - height_start)
width_start = x_start + j * w_step
width_end = width_start + w_step
width_start = tf.cast(width_start, tf.int32)
width_end = tf.cast(width_end, tf.int32)
width_end = width_start + tf.maximum(1, width_end - width_start)
# [h_step, w_step, C]
region = feature_map[
height_start:height_end, width_start:width_end, :
]
# target_height * target_width * [C]
regions.append(tf.reduce_max(region, axis=[0, 1]))
regions = tf.reshape(
tf.stack(regions), [self.target_height, self.target_width, channel]
)
return regions
def get_config(self):
config = {
"bounding_box_format": self.bounding_box_format,
"target_size": [self.target_height, self.target_width],
"image_shape": self.image_shape,
}
base_config = super().get_config()
return dict(list(base_config.items()) + list(config.items()))
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
from keras_cv.layers.object_detection.roi_pool import ROIPooler
class ROIPoolTest(tf.test.TestCase):
def test_no_quantize(self):
roi_pooler = ROIPooler(
"rel_yxyx", target_size=[2, 2], image_shape=[224, 224, 3]
)
feature_map = tf.expand_dims(tf.reshape(tf.range(64), [8, 8, 1]), axis=0)
rois = tf.reshape(tf.constant([0.0, 0.0, 1.0, 1.0]), [1, 1, 4])
pooled_feature_map = roi_pooler(feature_map, rois)
# the maximum value would be at bottom-right at each block, roi sharded into 2x2 blocks
# | 0, 1, 2, 3 | 4, 5, 6, 7 |
# | 8, 9, 10, 11 | 12, 13, 14, 15 |
# | 16, 17, 18, 19 | 20, 21, 22, 23 |
# | 24, 25, 26, 27(max) | 28, 29, 30, 31(max) |
# --------------------------------------------
# | 32, 33, 34, 35 | 36, 37, 38, 39 |
# | 40, 41, 42, 43 | 44, 45, 46, 47 |
# | 48, 49, 50, 51 | 52, 53, 54, 55 |
# | 56, 57, 58, 59(max) | 60, 61, 62, 63(max) |
# --------------------------------------------
expected_feature_map = tf.reshape(tf.constant([27, 31, 59, 63]), [1, 2, 2, 1])
self.assertAllClose(expected_feature_map, pooled_feature_map)
def test_roi_quantize_y(self):
roi_pooler = ROIPooler("yxyx", target_size=[2, 2], image_shape=[224, 224, 3])
feature_map = tf.expand_dims(tf.reshape(tf.range(64), [8, 8, 1]), axis=0)
rois = tf.reshape(tf.constant([0.0, 0.0, 224, 220]), [1, 1, 4])
pooled_feature_map = roi_pooler(feature_map, rois)
# the maximum value would be at bottom-right at each block, roi sharded into 2x2 blocks
# | 0, 1, 2 | 3, 4, 5, 6 | 7 (removed)
# | 8, 9, 10 | 11, 12, 13, 14 | 15 (removed)
# | 16, 17, 18 | 19, 20, 21, 22 | 23 (removed)
# | 24, 25, 26(max) | 27, 28, 29, 30(max) | 31 (removed)
# --------------------------------------------
# | 32, 33, 34 | 35, 36, 37, 38 | 39 (removed)
# | 40, 41, 42 | 43, 44, 45, 46 | 47 (removed)
# | 48, 49, 50 | 51, 52, 53, 54 | 55 (removed)
# | 56, 57, 58(max) | 59, 60, 61, 62(max) | 63 (removed)
# --------------------------------------------
expected_feature_map = tf.reshape(tf.constant([26, 30, 58, 62]), [1, 2, 2, 1])
self.assertAllClose(expected_feature_map, pooled_feature_map)
def test_roi_quantize_x(self):
roi_pooler = ROIPooler("yxyx", target_size=[2, 2], image_shape=[224, 224, 3])
feature_map = tf.expand_dims(tf.reshape(tf.range(64), [8, 8, 1]), axis=0)
rois = tf.reshape(tf.constant([0.0, 0.0, 220, 224]), [1, 1, 4])
pooled_feature_map = roi_pooler(feature_map, rois)
# the maximum value would be at bottom-right at each block, roi sharded into 2x2 blocks
# | 0, 1, 2, 3 | 4, 5, 6, 7 |
# | 8, 9, 10, 11 | 12, 13, 14, 15 |
# | 16, 17, 18, 19(max) | 20, 21, 22, 23(max) |
# --------------------------------------------
# | 24, 25, 26, 27 | 28, 29, 30, 31 |
# | 32, 33, 34, 35 | 36, 37, 38, 39 |
# | 40, 41, 42, 43 | 44, 45, 46, 47 |
# | 48, 49, 50, 51(max) | 52, 53, 54, 55(max) |
# --------------------------------------------
expected_feature_map = tf.reshape(tf.constant([19, 23, 51, 55]), [1, 2, 2, 1])
self.assertAllClose(expected_feature_map, pooled_feature_map)
def test_roi_quantize_h(self):
roi_pooler = ROIPooler("yxyx", target_size=[3, 2], image_shape=[224, 224, 3])
feature_map = tf.expand_dims(tf.reshape(tf.range(64), [8, 8, 1]), axis=0)
rois = tf.reshape(tf.constant([0.0, 0.0, 224, 224]), [1, 1, 4])
pooled_feature_map = roi_pooler(feature_map, rois)
# the maximum value would be at bottom-right at each block, roi sharded into 3x2 blocks
# | 0, 1, 2, 3 | 4, 5, 6, 7 |
# | 8, 9, 10, 11(max) | 12, 13, 14, 15(max) |
# --------------------------------------------
# | 16, 17, 18, 19 | 20, 21, 22, 23 |
# | 24, 25, 26, 27 | 28, 29, 30, 31 |
# | 32, 33, 34, 35(max) | 36, 37, 38, 39(max) |
# --------------------------------------------
# | 40, 41, 42, 43 | 44, 45, 46, 47 |
# | 48, 49, 50, 51 | 52, 53, 54, 55 |
# | 56, 57, 58, 59(max) | 60, 61, 62, 63(max) |
# --------------------------------------------
expected_feature_map = tf.reshape(
tf.constant([11, 15, 35, 39, 59, 63]), [1, 3, 2, 1]
)
self.assertAllClose(expected_feature_map, pooled_feature_map)
def test_roi_quantize_w(self):
roi_pooler = ROIPooler("yxyx", target_size=[2, 3], image_shape=[224, 224, 3])
feature_map = tf.expand_dims(tf.reshape(tf.range(64), [8, 8, 1]), axis=0)
rois = tf.reshape(tf.constant([0.0, 0.0, 224, 224]), [1, 1, 4])
pooled_feature_map = roi_pooler(feature_map, rois)
# the maximum value would be at bottom-right at each block, roi sharded into 2x3 blocks
# | 0, 1 | 2, 3, 4 | 5, 6, 7 |
# | 8, 9 | 10, 11, 12 | 13, 14, 15 |
# | 16, 17 | 18, 19, 20 | 21, 22, 23 |
# | 24, 25(max) | 26, 27, 28(max) | 29, 30, 31(max) |
# --------------------------------------------
# | 32, 33 | 34, 35, 36 | 37, 38, 39 |
# | 40, 41 | 42, 43, 44 | 45, 46, 47 |
# | 48, 49 | 50, 51, 52 | 53, 54, 55 |
# | 56, 57(max) | 58, 59, 60(max) | 61, 62, 63(max) |
# --------------------------------------------
expected_feature_map = tf.reshape(
tf.constant([25, 28, 31, 57, 60, 63]), [1, 2, 3, 1]
)
self.assertAllClose(expected_feature_map, pooled_feature_map)
def test_roi_feature_map_height_smaller_than_roi(self):
roi_pooler = ROIPooler("yxyx", target_size=[6, 2], image_shape=[224, 224, 3])
feature_map = tf.expand_dims(tf.reshape(tf.range(16), [4, 4, 1]), axis=0)
rois = tf.reshape(tf.constant([0.0, 0.0, 224, 224]), [1, 1, 4])
pooled_feature_map = roi_pooler(feature_map, rois)
# | 0, 1(max) | 2, 3(max) |
# ------------------repeated----------------------
# | 4, 5(max) | 6, 7(max) |
# --------------------------------------------
# | 8, 9(max) | 10, 11(max) |
# ------------------repeated----------------------
# | 12, 13(max) | 14, 15(max) |
expected_feature_map = tf.reshape(
tf.constant([1, 3, 1, 3, 5, 7, 9, 11, 9, 11, 13, 15]), [1, 6, 2, 1]
)
self.assertAllClose(expected_feature_map, pooled_feature_map)
def test_roi_feature_map_width_smaller_than_roi(self):
roi_pooler = ROIPooler("yxyx", target_size=[2, 6], image_shape=[224, 224, 3])
feature_map = tf.expand_dims(tf.reshape(tf.range(16), [4, 4, 1]), axis=0)
rois = tf.reshape(tf.constant([0.0, 0.0, 224, 224]), [1, 1, 4])
pooled_feature_map = roi_pooler(feature_map, rois)
# | 0 | 1 | 2 | 3 |
# | 4(max) | 5(max) | 6(max) | 7(max) |
# --------------------------------------------
# | 8 | 9 | 10 | 11 |
# | 12(max) | 13(max) | 14(max) | 15(max) |
# --------------------------------------------
expected_feature_map = tf.reshape(
tf.constant([4, 4, 5, 6, 6, 7, 12, 12, 13, 14, 14, 15]), [1, 2, 6, 1]
)
self.assertAllClose(expected_feature_map, pooled_feature_map)
def test_roi_empty(self):
roi_pooler = ROIPooler("yxyx", target_size=[2, 2], image_shape=[224, 224, 3])
feature_map = tf.expand_dims(tf.reshape(tf.range(1, 65), [8, 8, 1]), axis=0)
rois = tf.reshape(tf.constant([0.0, 0.0, 0.0, 0.0]), [1, 1, 4])
pooled_feature_map = roi_pooler(feature_map, rois)
# all outputs should be top-left pixel
self.assertAllClose(tf.ones([1, 2, 2, 1]), pooled_feature_map)
def test_invalid_image_shape(self):
with self.assertRaisesRegex(ValueError, "dynamic shape"):
_ = ROIPooler("rel_yxyx", target_size=[2, 2], image_shape=[None, 224, 3])
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
from keras_cv import bounding_box
from keras_cv.bounding_box import iou
from keras_cv.ops import box_matcher
from keras_cv.ops import sampling
from keras_cv.ops import target_gather
class _ROISampler(tf.keras.layers.Layer):
"""
Sample ROIs for loss related calucation.
With proposals (ROIs) and ground truth, it performs the following:
1) compute IOU similarity matrix
2) match each proposal to ground truth box based on IOU
3) samples positive matches and negative matches and return
`append_gt_boxes` augments proposals with ground truth boxes. This is
useful in 2 stage detection networks during initialization where the
1st stage often cannot produce good proposals for 2nd stage. Setting it
to True will allow it to generate more reasonable proposals at the begining.
`background_class` allow users to set the labels for background proposals. Default
is 0, where users need to manually shift the incoming `gt_classes` if its range is
[0, num_classes).
Args:
bounding_box_format: The format of bounding boxes to generate. Refer
[to the keras.io docs](https://keras.io/api/keras_cv/bounding_box/formats/)
for more details on supported bounding box formats.
roi_matcher: a `ArgmaxBoxMatcher` object that matches proposals
with ground truth boxes. the positive match must be 1 and negative match must be -1.
Such assumption is not being validated here.
positive_fraction: the positive ratio w.r.t `num_sampled_rois`. Defaults to 0.25.
background_class: the background class which is used to map returned the sampled
ground truth which is classified as background.
num_sampled_rois: the number of sampled proposals per image for
further (loss) calculation. Defaults to 256.
append_gt_boxes: boolean, whether gt_boxes will be appended to rois
before sample the rois. Defaults to True.
"""
def __init__(
self,
bounding_box_format: str,
roi_matcher: box_matcher.ArgmaxBoxMatcher,
positive_fraction: float = 0.25,
background_class: int = 0,
num_sampled_rois: int = 256,
append_gt_boxes: bool = True,
**kwargs,
):
super().__init__(**kwargs)
self.bounding_box_format = bounding_box_format
self.roi_matcher = roi_matcher
self.positive_fraction = positive_fraction
self.background_class = background_class
self.num_sampled_rois = num_sampled_rois
self.append_gt_boxes = append_gt_boxes
self.built = True
# for debugging.
self._positives = tf.keras.metrics.Mean()
self._negatives = tf.keras.metrics.Mean()
def call(
self,
rois: tf.Tensor,
gt_boxes: tf.Tensor,
gt_classes: tf.Tensor,
):
"""
Args:
rois: [batch_size, num_rois, 4]
gt_boxes: [batch_size, num_gt, 4]
gt_classes: [batch_size, num_gt, 1]
Returns:
sampled_rois: [batch_size, num_sampled_rois, 4]
sampled_gt_boxes: [batch_size, num_sampled_rois, 4]
sampled_box_weights: [batch_size, num_sampled_rois, 1]
sampled_gt_classes: [batch_size, num_sampled_rois, 1]
sampled_class_weights: [batch_size, num_sampled_rois, 1]
"""
if self.append_gt_boxes:
# num_rois += num_gt
rois = tf.concat([rois, gt_boxes], axis=1)
num_rois = rois.get_shape().as_list()[1]
if num_rois is None:
raise ValueError(f"`rois` must have static shape, got {rois.get_shape()}")
if num_rois < self.num_sampled_rois:
raise ValueError(
f"num_rois must be less than `num_sampled_rois` ({self.num_sampled_rois}), got {num_rois}"
)
rois = bounding_box.convert_format(
rois, source=self.bounding_box_format, target="yxyx"
)
gt_boxes = bounding_box.convert_format(
gt_boxes, source=self.bounding_box_format, target="yxyx"
)
# [batch_size, num_rois, num_gt]
similarity_mat = iou.compute_iou(
rois, gt_boxes, bounding_box_format="yxyx", use_masking=True
)
# [batch_size, num_rois] | [batch_size, num_rois]
matched_gt_cols, matched_vals = self.roi_matcher(similarity_mat)
# [batch_size, num_rois]
positive_matches = tf.math.equal(matched_vals, 1)
negative_matches = tf.math.equal(matched_vals, -1)
self._positives.update_state(
tf.reduce_sum(tf.cast(positive_matches, tf.float32), axis=-1)
)
self._negatives.update_state(
tf.reduce_sum(tf.cast(negative_matches, tf.float32), axis=-1)
)
# [batch_size, num_rois, 1]
background_mask = tf.expand_dims(tf.logical_not(positive_matches), axis=-1)
# [batch_size, num_rois, 1]
matched_gt_classes = target_gather._target_gather(gt_classes, matched_gt_cols)
# also set all background matches to `background_class`
matched_gt_classes = tf.where(
background_mask,
tf.cast(
self.background_class * tf.ones_like(matched_gt_classes),
gt_classes.dtype,
),
matched_gt_classes,
)
# [batch_size, num_rois, 4]
matched_gt_boxes = target_gather._target_gather(gt_boxes, matched_gt_cols)
encoded_matched_gt_boxes = bounding_box._encode_box_to_deltas(
anchors=rois,
boxes=matched_gt_boxes,
anchor_format="yxyx",
box_format="yxyx",
variance=[0.1, 0.1, 0.2, 0.2],
)
# also set all background matches to 0 coordinates
encoded_matched_gt_boxes = tf.where(
background_mask, tf.zeros_like(matched_gt_boxes), encoded_matched_gt_boxes
)
# [batch_size, num_rois]
sampled_indicators = sampling.balanced_sample(
positive_matches,
negative_matches,
self.num_sampled_rois,
self.positive_fraction,
)
# [batch_size, num_sampled_rois] in the range of [0, num_rois)
sampled_indicators, sampled_indices = tf.math.top_k(
sampled_indicators, k=self.num_sampled_rois, sorted=True
)
# [batch_size, num_sampled_rois, 4]
sampled_rois = target_gather._target_gather(rois, sampled_indices)
# [batch_size, num_sampled_rois, 4]
sampled_gt_boxes = target_gather._target_gather(
encoded_matched_gt_boxes, sampled_indices
)
# [batch_size, num_sampled_rois, 1]
sampled_gt_classes = target_gather._target_gather(
matched_gt_classes, sampled_indices
)
# [batch_size, num_sampled_rois, 1]
# all negative samples will be ignored in regression
sampled_box_weights = target_gather._target_gather(
tf.cast(positive_matches[..., tf.newaxis], gt_boxes.dtype), sampled_indices
)
# [batch_size, num_sampled_rois, 1]
sampled_indicators = sampled_indicators[..., tf.newaxis]
sampled_class_weights = tf.cast(sampled_indicators, gt_classes.dtype)
return (
sampled_rois,
sampled_gt_boxes,
sampled_box_weights,
sampled_gt_classes,
sampled_class_weights,
)
def get_config(self):
config = {
"bounding_box_format": self.bounding_box_format,
"positive_fraction": self.positive_fraction,
"background_class": self.background_class,
"num_sampled_rois": self.num_sampled_rois,
"append_gt_boxes": self.append_gt_boxes,
"roi_matcher": self.roi_matcher.get_config(),
}
base_config = super().get_config()
return dict(list(base_config.items()) + list(config.items()))
@classmethod
def from_config(cls, config, custom_objects=None):
roi_matcher_config = config.pop("roi_matcher")
roi_matcher = box_matcher.ArgmaxBoxMatcher(**roi_matcher_config)
return cls(roi_matcher=roi_matcher, **config)
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
from keras_cv.layers.object_detection.roi_sampler import _ROISampler
from keras_cv.ops.box_matcher import ArgmaxBoxMatcher
class ROISamplerTest(tf.test.TestCase):
def test_roi_sampler(self):
box_matcher = ArgmaxBoxMatcher(thresholds=[0.3], match_values=[-1, 1])
roi_sampler = _ROISampler(
bounding_box_format="xyxy",
roi_matcher=box_matcher,
positive_fraction=0.5,
num_sampled_rois=2,
append_gt_boxes=False,
)
rois = tf.constant(
[[0, 0, 5, 5], [2.5, 2.5, 7.5, 7.5], [5, 5, 10, 10], [7.5, 7.5, 12.5, 12.5]]
)
rois = rois[tf.newaxis, ...]
# the 3rd box will generate 0 IOUs and not sampled.
gt_boxes = tf.constant(
[[10, 10, 15, 15], [2.5, 2.5, 7.5, 7.5], [-1, -1, -1, -1]]
)
gt_boxes = gt_boxes[tf.newaxis, ...]
gt_classes = tf.constant([[2, 10, -1]], dtype=tf.int32)
gt_classes = gt_classes[..., tf.newaxis]
_, sampled_gt_boxes, _, sampled_gt_classes, _ = roi_sampler(
rois, gt_boxes, gt_classes
)
# given we only choose 1 positive sample, and `append_labesl` is False,
# only the 2nd ROI is chosen.
expected_gt_boxes = tf.constant([[0.0, 0.0, 0, 0.0], [0.0, 0.0, 0, 0.0]])
expected_gt_boxes = expected_gt_boxes[tf.newaxis, ...]
# only the 2nd ROI is chosen, and the negative ROI is mapped to 0.
expected_gt_classes = tf.constant([[10], [0]], dtype=tf.int32)
expected_gt_classes = expected_gt_classes[tf.newaxis, ...]
self.assertAllClose(
tf.reduce_max(expected_gt_boxes), tf.reduce_max(sampled_gt_boxes)
)
self.assertAllClose(
tf.reduce_min(expected_gt_classes), tf.reduce_min(sampled_gt_classes)
)
def test_roi_sampler_small_threshold(self):
box_matcher = ArgmaxBoxMatcher(thresholds=[0.1], match_values=[-1, 1])
roi_sampler = _ROISampler(
bounding_box_format="xyxy",
roi_matcher=box_matcher,
positive_fraction=0.5,
num_sampled_rois=2,
append_gt_boxes=False,
)
rois = tf.constant(
[[0, 0, 5, 5], [2.5, 2.5, 7.5, 7.5], [5, 5, 10, 10], [7.5, 7.5, 12.5, 12.5]]
)
rois = rois[tf.newaxis, ...]
# the 3rd box will generate 0 IOUs and not sampled.
gt_boxes = tf.constant(
[[10, 10, 15, 15], [2.6, 2.6, 7.6, 7.6], [-1, -1, -1, -1]]
)
gt_boxes = gt_boxes[tf.newaxis, ...]
gt_classes = tf.constant([[2, 10, -1]], dtype=tf.int32)
gt_classes = gt_classes[..., tf.newaxis]
sampled_rois, sampled_gt_boxes, _, sampled_gt_classes, _ = roi_sampler(
rois, gt_boxes, gt_classes
)
# given we only choose 1 positive sample, and `append_labesl` is False,
# only the 2nd ROI is chosen. No negative samples exist given we
# select positive_threshold to be 0.1. (the minimum IOU is 1/7)
# given num_sampled_rois=2, it selects the 1st ROI as well.
expected_rois = tf.constant([[5, 5, 10, 10], [0.0, 0.0, 5.0, 5.0]])
expected_rois = expected_rois[tf.newaxis, ...]
# all ROIs are matched to the 2nd gt box.
# the boxes are encoded by dimensions, so the result is
# tx, ty = (5.1 - 5.0) / 5 = 0.02, tx, ty = (5.1 - 2.5) / 5 = 0.52
# then divide by 0.1 as box variance.
expected_gt_boxes = (
tf.constant([[0.02, 0.02, 0.0, 0.0], [0.52, 0.52, 0.0, 0.0]]) / 0.1
)
expected_gt_boxes = expected_gt_boxes[tf.newaxis, ...]
# only the 2nd ROI is chosen, and the negative ROI is mapped to 0.
expected_gt_classes = tf.constant([[10], [10]], dtype=tf.int32)
expected_gt_classes = expected_gt_classes[tf.newaxis, ...]
self.assertAllClose(
tf.reduce_max(expected_rois, 1), tf.reduce_max(sampled_rois, 1)
)
self.assertAllClose(
tf.reduce_max(expected_gt_boxes, 1), tf.reduce_max(sampled_gt_boxes, 1)
)
self.assertAllClose(expected_gt_classes, sampled_gt_classes)
def test_roi_sampler_large_threshold(self):
# the 2nd roi and 2nd gt box has IOU of 0.923, setting positive_threshold to 0.95 to ignore it
box_matcher = ArgmaxBoxMatcher(thresholds=[0.95], match_values=[-1, 1])
roi_sampler = _ROISampler(
bounding_box_format="xyxy",
roi_matcher=box_matcher,
positive_fraction=0.5,
num_sampled_rois=2,
append_gt_boxes=False,
)
rois = tf.constant(
[[0, 0, 5, 5], [2.5, 2.5, 7.5, 7.5], [5, 5, 10, 10], [7.5, 7.5, 12.5, 12.5]]
)
rois = rois[tf.newaxis, ...]
# the 3rd box will generate 0 IOUs and not sampled.
gt_boxes = tf.constant(
[[10, 10, 15, 15], [2.6, 2.6, 7.6, 7.6], [-1, -1, -1, -1]]
)
gt_boxes = gt_boxes[tf.newaxis, ...]
gt_classes = tf.constant([[2, 10, -1]], dtype=tf.int32)
gt_classes = gt_classes[..., tf.newaxis]
_, sampled_gt_boxes, _, sampled_gt_classes, _ = roi_sampler(
rois, gt_boxes, gt_classes
)
# all ROIs are negative matches, so they are mapped to 0.
expected_gt_boxes = tf.zeros([1, 2, 4], dtype=tf.float32)
# only the 2nd ROI is chosen, and the negative ROI is mapped to 0.
expected_gt_classes = tf.constant([[0], [0]], dtype=tf.int32)
expected_gt_classes = expected_gt_classes[tf.newaxis, ...]
# self.assertAllClose(expected_rois, sampled_rois)
self.assertAllClose(expected_gt_boxes, sampled_gt_boxes)
self.assertAllClose(expected_gt_classes, sampled_gt_classes)
def test_roi_sampler_large_threshold_custom_bg_class(self):
# the 2nd roi and 2nd gt box has IOU of 0.923, setting positive_threshold to 0.95 to ignore it
box_matcher = ArgmaxBoxMatcher(thresholds=[0.95], match_values=[-1, 1])
roi_sampler = _ROISampler(
bounding_box_format="xyxy",
roi_matcher=box_matcher,
positive_fraction=0.5,
background_class=-1,
num_sampled_rois=2,
append_gt_boxes=False,
)
rois = tf.constant(
[[0, 0, 5, 5], [2.5, 2.5, 7.5, 7.5], [5, 5, 10, 10], [7.5, 7.5, 12.5, 12.5]]
)
rois = rois[tf.newaxis, ...]
# the 3rd box will generate 0 IOUs and not sampled.
gt_boxes = tf.constant(
[[10, 10, 15, 15], [2.6, 2.6, 7.6, 7.6], [-1, -1, -1, -1]]
)
gt_boxes = gt_boxes[tf.newaxis, ...]
gt_classes = tf.constant([[2, 10, -1]], dtype=tf.int32)
gt_classes = gt_classes[..., tf.newaxis]
_, sampled_gt_boxes, _, sampled_gt_classes, _ = roi_sampler(
rois, gt_boxes, gt_classes
)
# all ROIs are negative matches, so they are mapped to 0.
expected_gt_boxes = tf.zeros([1, 2, 4], dtype=tf.float32)
# only the 2nd ROI is chosen, and the negative ROI is mapped to -1 from customization.
expected_gt_classes = tf.constant([[-1], [-1]], dtype=tf.int32)
expected_gt_classes = expected_gt_classes[tf.newaxis, ...]
# self.assertAllClose(expected_rois, sampled_rois)
self.assertAllClose(expected_gt_boxes, sampled_gt_boxes)
self.assertAllClose(expected_gt_classes, sampled_gt_classes)
def test_roi_sampler_large_threshold_append_gt_boxes(self):
# the 2nd roi and 2nd gt box has IOU of 0.923, setting positive_threshold to 0.95 to ignore it
box_matcher = ArgmaxBoxMatcher(thresholds=[0.95], match_values=[-1, 1])
roi_sampler = _ROISampler(
bounding_box_format="xyxy",
roi_matcher=box_matcher,
positive_fraction=0.5,
num_sampled_rois=2,
append_gt_boxes=True,
)
rois = tf.constant(
[[0, 0, 5, 5], [2.5, 2.5, 7.5, 7.5], [5, 5, 10, 10], [7.5, 7.5, 12.5, 12.5]]
)
rois = rois[tf.newaxis, ...]
# the 3rd box will generate 0 IOUs and not sampled.
gt_boxes = tf.constant(
[[10, 10, 15, 15], [2.6, 2.6, 7.6, 7.6], [-1, -1, -1, -1]]
)
gt_boxes = gt_boxes[tf.newaxis, ...]
gt_classes = tf.constant([[2, 10, -1]], dtype=tf.int32)
gt_classes = gt_classes[..., tf.newaxis]
_, sampled_gt_boxes, _, sampled_gt_classes, _ = roi_sampler(
rois, gt_boxes, gt_classes
)
# the selected gt boxes should be [0, 0, 0, 0], and [10, 10, 15, 15]
# but the 2nd will be encoded to 0.
self.assertAllClose(tf.reduce_min(sampled_gt_boxes), 0)
self.assertAllClose(tf.reduce_max(sampled_gt_boxes), 0)
# the selected gt classes should be [0, 2 or 10]
self.assertAllLessEqual(tf.reduce_max(sampled_gt_classes), 10)
self.assertAllGreaterEqual(tf.reduce_min(sampled_gt_classes), 0)
def test_roi_sampler_large_num_sampled_rois(self):
box_matcher = ArgmaxBoxMatcher(thresholds=[0.95], match_values=[-1, 1])
roi_sampler = _ROISampler(
bounding_box_format="xyxy",
roi_matcher=box_matcher,
positive_fraction=0.5,
num_sampled_rois=200,
append_gt_boxes=True,
)
rois = tf.constant(
[[0, 0, 5, 5], [2.5, 2.5, 7.5, 7.5], [5, 5, 10, 10], [7.5, 7.5, 12.5, 12.5]]
)
rois = rois[tf.newaxis, ...]
# the 3rd box will generate 0 IOUs and not sampled.
gt_boxes = tf.constant(
[[10, 10, 15, 15], [2.6, 2.6, 7.6, 7.6], [-1, -1, -1, -1]]
)
gt_boxes = gt_boxes[tf.newaxis, ...]
gt_classes = tf.constant([[2, 10, -1]], dtype=tf.int32)
gt_classes = gt_classes[..., tf.newaxis]
with self.assertRaisesRegex(ValueError, "must be less than"):
_, _, _ = roi_sampler(rois, gt_boxes, gt_classes)
def test_serialization(self):
box_matcher = ArgmaxBoxMatcher(thresholds=[0.95], match_values=[-1, 1])
roi_sampler = _ROISampler(
bounding_box_format="xyxy",
roi_matcher=box_matcher,
positive_fraction=0.5,
num_sampled_rois=200,
append_gt_boxes=True,
)
sampler_config = roi_sampler.get_config()
new_sampler = _ROISampler.from_config(sampler_config)
self.assertAllEqual(new_sampler.roi_matcher.match_values, [-1, 1])
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Mapping
import tensorflow as tf
from keras_cv import bounding_box
from keras_cv.bounding_box import iou
from keras_cv.ops import box_matcher
from keras_cv.ops import sampling
from keras_cv.ops import target_gather
class _RpnLabelEncoder(tf.keras.layers.Layer):
"""Transforms the raw labels into training targets for region proposal network (RPN).
# TODO(tanzhenyu): consider unifying with _ROISampler.
This is different from _ROISampler for a couple of reasons:
1) This deals with unbatched input, dict of anchors and potentially ragged labels
2) This deals with ground truth boxes, while _ROISampler deals with padded ground truth
boxes with value -1 and padded ground truth classes with value -1
3) this returns positive class target as 1, while _ROISampler returns
positive class target as-is. (All negative class target are 0)
The final classification loss will use one hot and #num_fg_classes + 1
4) this returns #num_anchors dense targets, while _ROISampler returns
#num_sampled_rois dense targets.
5) this returns all positive box targets, while _ROISampler still samples
positive box targets, while all negative box targets are also ignored
in regression loss.
Args:
anchor_format: The format of bounding boxes for anchors to generate. Refer
[to the keras.io docs](https://keras.io/api/keras_cv/bounding_box/formats/)
for more details on supported bounding box formats.
ground_truth_box_format: The format of bounding boxes for ground truth boxes to generate.
positive_threshold: the float threshold to set an anchor to positive match to gt box.
values above it are positive matches.
negative_threshold: the float threshold to set an anchor to negative match to gt box.
values below it are negative matches.
samples_per_image: for each image, the number of positive and negative samples
to generate.
positive_fraction: the fraction of positive samples to the total samples.
"""
def __init__(
self,
anchor_format,
ground_truth_box_format,
positive_threshold,
negative_threshold,
samples_per_image,
positive_fraction,
**kwargs,
):
super().__init__(**kwargs)
self.anchor_format = anchor_format
self.ground_truth_box_format = ground_truth_box_format
self.positive_threshold = positive_threshold
self.negative_threshold = negative_threshold
self.samples_per_image = samples_per_image
self.positive_fraction = positive_fraction
self.box_matcher = box_matcher.ArgmaxBoxMatcher(
thresholds=[negative_threshold, positive_threshold],
match_values=[-1, -2, 1],
force_match_for_each_col=False,
)
self.built = True
self._positives = tf.keras.metrics.Mean()
def call(
self,
anchors_dict: Mapping[str, tf.Tensor],
gt_boxes: tf.Tensor,
gt_classes: tf.Tensor,
):
"""
Args:
anchors: dict of [num_anchors, 4] or [batch_size, num_anchors, 4]
float Tensor for each level.
gt_boxes: [num_gt, 4] or [batch_size, num_anchors] float Tensor.
gt_classes: [num_gt, 1] float or integer Tensor.
Returns:
box_targets: dict of [num_anchors, 4] or for each level.
box_weights: dict of [num_anchors, 1] for each level.
class_targets: dict of [num_anchors, 1] for each level.
class_weights: dict of [num_anchors, 1] for each level.
"""
pack = False
anchors = anchors_dict
if isinstance(anchors, dict):
pack = True
anchors = tf.concat(tf.nest.flatten(anchors), axis=0)
anchors = bounding_box.convert_format(
anchors, source=self.anchor_format, target="yxyx"
)
gt_boxes = bounding_box.convert_format(
gt_boxes, source=self.ground_truth_box_format, target="yxyx"
)
# [num_anchors, num_gt] or [batch_size, num_anchors, num_gt]
similarity_mat = iou.compute_iou(anchors, gt_boxes, bounding_box_format="yxyx")
# [num_anchors] or [batch_size, num_anchors]
matched_gt_indices, matched_vals = self.box_matcher(similarity_mat)
# [num_anchors] or [batch_size, num_anchors]
positive_matches = tf.math.equal(matched_vals, 1)
# currently SyncOnReadVariable does not support `assign_add` in cross-replica.
# self._positives.update_state(
# tf.reduce_sum(tf.cast(positive_matches, tf.float32), axis=-1)
# )
negative_matches = tf.math.equal(matched_vals, -1)
# [num_anchors, 4] or [batch_size, num_anchors, 4]
matched_gt_boxes = target_gather._target_gather(gt_boxes, matched_gt_indices)
# [num_anchors, 4] or [batch_size, num_anchors, 4], used as `y_true` for regression loss
encoded_box_targets = bounding_box._encode_box_to_deltas(
anchors,
matched_gt_boxes,
anchor_format="yxyx",
box_format="yxyx",
variance=[0.1, 0.1, 0.2, 0.2],
)
# [num_anchors, 1] or [batch_size, num_anchors, 1]
box_sample_weights = tf.cast(positive_matches[..., tf.newaxis], gt_boxes.dtype)
# [num_anchors, 1] or [batch_size, num_anchors, 1]
positive_mask = tf.expand_dims(positive_matches, axis=-1)
# set all negative and ignored matches to 0, and all positive matches to 1
# [num_anchors, 1] or [batch_size, num_anchors, 1]
positive_classes = tf.ones_like(positive_mask, dtype=gt_classes.dtype)
negative_classes = tf.zeros_like(positive_mask, dtype=gt_classes.dtype)
# [num_anchors, 1] or [batch_size, num_anchors, 1]
class_targets = tf.where(positive_mask, positive_classes, negative_classes)
# [num_anchors] or [batch_size, num_anchors]
sampled_indicators = sampling.balanced_sample(
positive_matches,
negative_matches,
self.samples_per_image,
self.positive_fraction,
)
# [num_anchors, 1] or [batch_size, num_anchors, 1]
class_sample_weights = tf.cast(
sampled_indicators[..., tf.newaxis], gt_classes.dtype
)
if pack:
encoded_box_targets = self.unpack_targets(encoded_box_targets, anchors_dict)
box_sample_weights = self.unpack_targets(box_sample_weights, anchors_dict)
class_targets = self.unpack_targets(class_targets, anchors_dict)
class_sample_weights = self.unpack_targets(
class_sample_weights, anchors_dict
)
return (
encoded_box_targets,
box_sample_weights,
class_targets,
class_sample_weights,
)
def unpack_targets(self, targets, anchors_dict):
target_shape = len(targets.get_shape().as_list())
if target_shape != 2 and target_shape != 3:
raise ValueError(
f"unpacking targets must be rank 2 or rank 3, got {target_shape}"
)
unpacked_targets = {}
count = 0
for level, anchors in anchors_dict.items():
num_anchors_lvl = anchors.get_shape().as_list()[0]
if target_shape == 2:
unpacked_targets[level] = targets[count : count + num_anchors_lvl, ...]
else:
unpacked_targets[level] = targets[
:, count : count + num_anchors_lvl, ...
]
count += num_anchors_lvl
return unpacked_targets
def get_config(self):
config = {
"anchor_format": self.anchor_format,
"ground_truth_box_format": self.ground_truth_box_format,
"positive_threshold": self.positive_threshold,
"negative_threshold": self.negative_threshold,
"samples_per_image": self.samples_per_image,
"positive_fraction": self.positive_fraction,
}
return config
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
from keras_cv.layers.object_detection.rpn_label_encoder import _RpnLabelEncoder
class RpnLabelEncoderTest(tf.test.TestCase):
def test_rpn_label_encoder(self):
rpn_encoder = _RpnLabelEncoder(
anchor_format="xyxy",
ground_truth_box_format="xyxy",
positive_threshold=0.7,
negative_threshold=0.3,
positive_fraction=0.5,
samples_per_image=2,
)
rois = tf.constant(
[[0, 0, 5, 5], [2.5, 2.5, 7.5, 7.5], [5, 5, 10, 10], [7.5, 7.5, 12.5, 12.5]]
)
# the 3rd box will generate 0 IOUs and not sampled.
gt_boxes = tf.constant([[10, 10, 15, 15], [2.5, 2.5, 7.5, 7.5]])
gt_classes = tf.constant([2, 10, -1], dtype=tf.int32)
gt_classes = gt_classes[..., tf.newaxis]
box_targets, box_weights, cls_targets, cls_weights = rpn_encoder(
rois, gt_boxes, gt_classes
)
# all rois will be matched to the 2nd gt boxes, and encoded
expected_box_targets = (
tf.constant(
[
[0.5, 0.5, 0.0, 0.0],
[0.0, 0.0, 0.0, 0.0],
[-0.5, -0.5, 0.0, 0.0],
[0.5, 0.5, 0.0, 0.0],
]
)
/ 0.1
)
self.assertAllClose(expected_box_targets, box_targets)
# only foreground and background classes
self.assertAllClose(tf.reduce_max(cls_targets), 1.0)
self.assertAllClose(tf.reduce_min(cls_targets), 0.0)
# all weights between 0 and 1
self.assertAllClose(tf.reduce_max(cls_weights), 1.0)
self.assertAllClose(tf.reduce_min(cls_weights), 0.0)
self.assertAllClose(tf.reduce_max(box_weights), 1.0)
self.assertAllClose(tf.reduce_min(box_weights), 0.0)
def test_rpn_label_encoder_multi_level(self):
rpn_encoder = _RpnLabelEncoder(
anchor_format="xyxy",
ground_truth_box_format="xyxy",
positive_threshold=0.7,
negative_threshold=0.3,
positive_fraction=0.5,
samples_per_image=2,
)
rois = {
2: tf.constant([[0, 0, 5, 5], [2.5, 2.5, 7.5, 7.5]]),
3: tf.constant([[5, 5, 10, 10], [7.5, 7.5, 12.5, 12.5]]),
}
# the 3rd box will generate 0 IOUs and not sampled.
gt_boxes = tf.constant([[10, 10, 15, 15], [2.5, 2.5, 7.5, 7.5]])
gt_classes = tf.constant([2, 10, -1], dtype=tf.float32)
gt_classes = gt_classes[..., tf.newaxis]
_, _, _, cls_weights = rpn_encoder(rois, gt_boxes, gt_classes)
# the 2nd level found 2 positive matches, the 3rd level found no match
expected_cls_weights = {
2: tf.constant([[0.0], [1.0]]),
3: tf.constant([[0.0], [1.0]]),
}
self.assertAllClose(expected_cls_weights[2], cls_weights[2])
self.assertAllClose(expected_cls_weights[3], cls_weights[3])
def test_rpn_label_encoder_batched(self):
rpn_encoder = _RpnLabelEncoder(
anchor_format="xyxy",
ground_truth_box_format="xyxy",
positive_threshold=0.7,
negative_threshold=0.3,
positive_fraction=0.5,
samples_per_image=2,
)
rois = tf.constant(
[[0, 0, 5, 5], [2.5, 2.5, 7.5, 7.5], [5, 5, 10, 10], [7.5, 7.5, 12.5, 12.5]]
)
# the 3rd box will generate 0 IOUs and not sampled.
gt_boxes = tf.constant([[10, 10, 15, 15], [2.5, 2.5, 7.5, 7.5]])
gt_classes = tf.constant([2, 10, -1], dtype=tf.int32)
gt_classes = gt_classes[..., tf.newaxis]
rois = rois[tf.newaxis, ...]
gt_boxes = gt_boxes[tf.newaxis, ...]
gt_classes = gt_classes[tf.newaxis, ...]
box_targets, box_weights, cls_targets, cls_weights = rpn_encoder(
rois, gt_boxes, gt_classes
)
# all rois will be matched to the 2nd gt boxes, and encoded
expected_box_targets = (
tf.constant(
[
[0.5, 0.5, 0.0, 0.0],
[0.0, 0.0, 0.0, 0.0],
[-0.5, -0.5, 0.0, 0.0],
[0.5, 0.5, 0.0, 0.0],
]
)
/ 0.1
)
expected_box_targets = expected_box_targets[tf.newaxis, ...]
self.assertAllClose(expected_box_targets, box_targets)
# only foreground and background classes
self.assertAllClose(tf.reduce_max(cls_targets), 1.0)
self.assertAllClose(tf.reduce_min(cls_targets), 0.0)
# all weights between 0 and 1
self.assertAllClose(tf.reduce_max(cls_weights), 1.0)
self.assertAllClose(tf.reduce_min(cls_weights), 0.0)
self.assertAllClose(tf.reduce_max(box_weights), 1.0)
self.assertAllClose(tf.reduce_min(box_weights), 0.0)
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Also export the image KPLs from core keras, so that user can import all the image
# KPLs from one place.
from tensorflow.keras.layers import CenterCrop
from tensorflow.keras.layers import RandomBrightness
from tensorflow.keras.layers import RandomContrast
from tensorflow.keras.layers import RandomCrop
from tensorflow.keras.layers import RandomHeight
from tensorflow.keras.layers import RandomTranslation
from tensorflow.keras.layers import RandomWidth
from tensorflow.keras.layers import RandomZoom
from tensorflow.keras.layers import Rescaling
from tensorflow.keras.layers import Resizing
from keras_cv.layers.preprocessing.aug_mix import AugMix
from keras_cv.layers.preprocessing.augmenter import Augmenter
from keras_cv.layers.preprocessing.auto_contrast import AutoContrast
from keras_cv.layers.preprocessing.base_image_augmentation_layer import (
BaseImageAugmentationLayer,
)
from keras_cv.layers.preprocessing.channel_shuffle import ChannelShuffle
from keras_cv.layers.preprocessing.cut_mix import CutMix
from keras_cv.layers.preprocessing.equalization import Equalization
from keras_cv.layers.preprocessing.fourier_mix import FourierMix
from keras_cv.layers.preprocessing.grayscale import Grayscale
from keras_cv.layers.preprocessing.grid_mask import GridMask
from keras_cv.layers.preprocessing.maybe_apply import MaybeApply
from keras_cv.layers.preprocessing.mix_up import MixUp
from keras_cv.layers.preprocessing.mosaic import Mosaic
from keras_cv.layers.preprocessing.posterization import Posterization
from keras_cv.layers.preprocessing.rand_augment import RandAugment
from keras_cv.layers.preprocessing.random_augmentation_pipeline import (
RandomAugmentationPipeline,
)
from keras_cv.layers.preprocessing.random_channel_shift import RandomChannelShift
from keras_cv.layers.preprocessing.random_choice import RandomChoice
from keras_cv.layers.preprocessing.random_color_degeneration import (
RandomColorDegeneration,
)
from keras_cv.layers.preprocessing.random_color_jitter import RandomColorJitter
from keras_cv.layers.preprocessing.random_crop_and_resize import RandomCropAndResize
from keras_cv.layers.preprocessing.random_cutout import RandomCutout
from keras_cv.layers.preprocessing.random_flip import RandomFlip
from keras_cv.layers.preprocessing.random_gaussian_blur import RandomGaussianBlur
from keras_cv.layers.preprocessing.random_hue import RandomHue
from keras_cv.layers.preprocessing.random_jpeg_quality import RandomJpegQuality
from keras_cv.layers.preprocessing.random_rotation import RandomRotation
from keras_cv.layers.preprocessing.random_saturation import RandomSaturation
from keras_cv.layers.preprocessing.random_sharpness import RandomSharpness
from keras_cv.layers.preprocessing.random_shear import RandomShear
from keras_cv.layers.preprocessing.randomly_zoomed_crop import RandomlyZoomedCrop
from keras_cv.layers.preprocessing.solarization import Solarization
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
from keras_cv import layers
from keras_cv.layers.preprocessing.base_image_augmentation_layer import (
BaseImageAugmentationLayer,
)
from keras_cv.utils import preprocessing
@tf.keras.utils.register_keras_serializable(package="keras_cv")
class AugMix(BaseImageAugmentationLayer):
"""Performs the AugMix data augmentation technique.
AugMix aims to produce images with variety while preserving the
image semantics and local statistics. During the augmentation process, each image
is augmented `num_chains` different ways, each way consisting of `chain_depth`
augmentations. Augmentations are sampled from the list: translation, shearing,
rotation, posterization, histogram equalization, solarization and auto contrast.
The results of each chain are then mixed together with the original
image based on random samples from a Dirichlet distribution.
Args:
value_range: the range of values the incoming images will have.
Represented as a two number tuple written (low, high).
This is typically either `(0, 1)` or `(0, 255)` depending
on how your preprocessing pipeline is setup.
severity: A tuple of two floats, a single float or a `keras_cv.FactorSampler`.
A value is sampled from the provided range. If a float is passed, the
range is interpreted as `(0, severity)`. This value represents the
level of strength of augmentations and is in the range [0, 1].
Defaults to 0.3.
num_chains: an integer representing the number of different chains to
be mixed. Defaults to 3.
chain_depth: an integer or range representing the number of transformations in
the chains. Defaults to [1,3].
alpha: a float value used as the probability coefficients for the
Beta and Dirichlet distributions. Defaults to 1.0.
seed: Integer. Used to create a random seed.
References:
- [AugMix paper](https://arxiv.org/pdf/1912.02781)
- [Official Code](https://github.com/google-research/augmix)
- [Unoffial TF Code](https://github.com/szacho/augmix-tf)
Sample Usage:
```python
(images, labels), _ = tf.keras.datasets.cifar10.load_data()
augmix = keras_cv.layers.AugMix([0, 255])
augmented_images = augmix(images[:100])
```
"""
def __init__(
self,
value_range,
severity=0.3,
num_chains=3,
chain_depth=[1, 3],
alpha=1.0,
seed=None,
**kwargs,
):
super().__init__(seed=seed, **kwargs)
self.value_range = value_range
self.num_chains = num_chains
self.chain_depth = chain_depth
if isinstance(self.chain_depth, int):
self.chain_depth = [self.chain_depth, self.chain_depth]
self.alpha = alpha
self.seed = seed
self.auto_vectorize = False
self.severity = severity
self.severity_factor = preprocessing.parse_factor(
self.severity,
min_value=0.01,
max_value=1.0,
param_name="severity",
seed=self.seed,
)
# initialize layers
self.auto_contrast = layers.AutoContrast(value_range=self.value_range)
self.equalize = layers.Equalization(value_range=self.value_range)
def _sample_from_dirichlet(self, alpha):
gamma_sample = tf.random.gamma(
shape=(), alpha=alpha, seed=self._random_generator.make_legacy_seed()
)
return gamma_sample / tf.reduce_sum(gamma_sample, axis=-1, keepdims=True)
def _sample_from_beta(self, alpha, beta):
sample_alpha = tf.random.gamma(
(), 1.0, beta=alpha, seed=self._random_generator.make_legacy_seed()
)
sample_beta = tf.random.gamma(
(), 1.0, beta=beta, seed=self._random_generator.make_legacy_seed()
)
return sample_alpha / (sample_alpha + sample_beta)
def _sample_depth(self):
return self._random_generator.random_uniform(
shape=(),
minval=self.chain_depth[0],
maxval=self.chain_depth[1] + 1,
dtype=tf.int32,
)
def _loop_on_depth(self, depth_level, image_aug):
op_index = self._random_generator.random_uniform(
shape=(), minval=0, maxval=8, dtype=tf.int32
)
image_aug = self._apply_op(image_aug, op_index)
depth_level += 1
return depth_level, image_aug
def _loop_on_width(self, image, chain_mixing_weights, curr_chain, result):
image_aug = tf.identity(image)
chain_depth = self._sample_depth()
depth_level = tf.constant([0], dtype=tf.int32)
depth_level, image_aug = tf.while_loop(
lambda depth_level, image_aug: tf.less(depth_level, chain_depth),
self._loop_on_depth,
[depth_level, image_aug],
)
result += tf.gather(chain_mixing_weights, curr_chain) * image_aug
curr_chain += 1
return image, chain_mixing_weights, curr_chain, result
def _auto_contrast(self, image):
return self.auto_contrast(image)
def _equalize(self, image):
return self.equalize(image)
def _posterize(self, image):
image = preprocessing.transform_value_range(
images=image,
original_range=self.value_range,
target_range=[0, 255],
)
bits = tf.cast(self.severity_factor() * 3, tf.int32)
shift = tf.cast(4 - bits + 1, tf.uint8)
image = tf.cast(image, tf.uint8)
image = tf.bitwise.left_shift(tf.bitwise.right_shift(image, shift), shift)
image = tf.cast(image, self.compute_dtype)
return preprocessing.transform_value_range(
images=image,
original_range=[0, 255],
target_range=self.value_range,
)
def _rotate(self, image):
angle = tf.expand_dims(tf.cast(self.severity_factor() * 30, tf.float32), axis=0)
shape = tf.cast(tf.shape(image), tf.float32)
return preprocessing.transform(
tf.expand_dims(image, 0),
preprocessing.get_rotation_matrix(angle, shape[0], shape[1]),
)[0]
def _solarize(self, image):
threshold = tf.cast(tf.cast(self.severity_factor() * 255, tf.int32), tf.float32)
image = preprocessing.transform_value_range(
image, original_range=self.value_range, target_range=(0, 255)
)
result = tf.clip_by_value(image, 0, 255)
result = tf.where(result < threshold, result, 255 - result)
return preprocessing.transform_value_range(
result, original_range=(0, 255), target_range=self.value_range
)
def _shear_x(self, image):
x = tf.cast(self.severity_factor() * 0.3, tf.float32)
x *= preprocessing.random_inversion(self._random_generator)
transform_x = layers.RandomShear._format_transform(
[1.0, x, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0]
)
return preprocessing.transform(
images=tf.expand_dims(image, 0), transforms=transform_x
)[0]
def _shear_y(self, image):
y = tf.cast(self.severity_factor() * 0.3, tf.float32)
y *= preprocessing.random_inversion(self._random_generator)
transform_x = layers.RandomShear._format_transform(
[1.0, 0.0, 0.0, y, 1.0, 0.0, 0.0, 0.0]
)
return preprocessing.transform(
images=tf.expand_dims(image, 0), transforms=transform_x
)[0]
def _translate_x(self, image):
shape = tf.cast(tf.shape(image), tf.float32)
x = tf.cast(self.severity_factor() * shape[1] / 3, tf.float32)
x = tf.expand_dims(tf.expand_dims(x, axis=0), axis=0)
x *= preprocessing.random_inversion(self._random_generator)
x = tf.cast(x, tf.int32)
translations = tf.cast(
tf.concat([x, tf.zeros_like(x)], axis=1), dtype=tf.float32
)
return preprocessing.transform(
tf.expand_dims(image, 0), preprocessing.get_translation_matrix(translations)
)[0]
def _translate_y(self, image):
shape = tf.cast(tf.shape(image), tf.float32)
y = tf.cast(self.severity_factor() * shape[0] / 3, tf.float32)
y = tf.expand_dims(tf.expand_dims(y, axis=0), axis=0)
y *= preprocessing.random_inversion(self._random_generator)
y = tf.cast(y, tf.int32)
translations = tf.cast(
tf.concat([tf.zeros_like(y), y], axis=1), dtype=tf.float32
)
return preprocessing.transform(
tf.expand_dims(image, 0), preprocessing.get_translation_matrix(translations)
)[0]
def _apply_op(self, image, op_index):
augmented = image
augmented = tf.cond(
op_index == tf.constant([0], dtype=tf.int32),
lambda: self._auto_contrast(augmented),
lambda: augmented,
)
augmented = tf.cond(
op_index == tf.constant([1], dtype=tf.int32),
lambda: self._equalize(augmented),
lambda: augmented,
)
augmented = tf.cond(
op_index == tf.constant([2], dtype=tf.int32),
lambda: self._posterize(augmented),
lambda: augmented,
)
augmented = tf.cond(
op_index == tf.constant([3], dtype=tf.int32),
lambda: self._rotate(augmented),
lambda: augmented,
)
augmented = tf.cond(
op_index == tf.constant([4], dtype=tf.int32),
lambda: self._solarize(augmented),
lambda: augmented,
)
augmented = tf.cond(
op_index == tf.constant([5], dtype=tf.int32),
lambda: self._shear_x(augmented),
lambda: augmented,
)
augmented = tf.cond(
op_index == tf.constant([6], dtype=tf.int32),
lambda: self._shear_y(augmented),
lambda: augmented,
)
augmented = tf.cond(
op_index == tf.constant([7], dtype=tf.int32),
lambda: self._translate_x(augmented),
lambda: augmented,
)
augmented = tf.cond(
op_index == tf.constant([8], dtype=tf.int32),
lambda: self._translate_y(augmented),
lambda: augmented,
)
return augmented
def augment_image(self, image, transformation=None, **kwargs):
chain_mixing_weights = self._sample_from_dirichlet(
tf.ones([self.num_chains]) * self.alpha
)
weight_sample = self._sample_from_beta(self.alpha, self.alpha)
result = tf.zeros_like(image)
curr_chain = tf.constant([0], dtype=tf.int32)
image, chain_mixing_weights, curr_chain, result = tf.while_loop(
lambda image, chain_mixing_weights, curr_chain, result: tf.less(
curr_chain, self.num_chains
),
self._loop_on_width,
[image, chain_mixing_weights, curr_chain, result],
)
result = weight_sample * image + (1 - weight_sample) * result
return result
def augment_label(self, label, transformation=None, **kwargs):
return label
def get_config(self):
config = {
"value_range": self.value_range,
"severity": self.severity,
"num_chains": self.num_chains,
"chain_depth": self.chain_depth,
"alpha": self.alpha,
"seed": self.seed,
}
base_config = super().get_config()
return dict(list(base_config.items()) + list(config.items()))
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
from keras_cv.layers import preprocessing
class AugMixTest(tf.test.TestCase):
def test_return_shapes(self):
layer = preprocessing.AugMix([0, 255])
# RGB
xs = tf.ones((2, 512, 512, 3))
xs = layer(xs)
self.assertEqual(xs.shape, [2, 512, 512, 3])
# greyscale
xs = tf.ones((2, 512, 512, 1))
xs = layer(xs)
self.assertEqual(xs.shape, [2, 512, 512, 1])
def test_in_single_image(self):
layer = preprocessing.AugMix([0, 255])
# RGB
xs = tf.cast(
tf.ones((512, 512, 3)),
dtype=tf.float32,
)
xs = layer(xs)
self.assertEqual(xs.shape, [512, 512, 3])
# greyscale
xs = tf.cast(
tf.ones((512, 512, 1)),
dtype=tf.float32,
)
xs = layer(xs)
self.assertEqual(xs.shape, [512, 512, 1])
def test_non_square_images(self):
layer = preprocessing.AugMix([0, 255])
# RGB
xs = tf.ones((2, 256, 512, 3))
xs = layer(xs)
self.assertEqual(xs.shape, [2, 256, 512, 3])
# greyscale
xs = tf.ones((2, 256, 512, 1))
xs = layer(xs)
self.assertEqual(xs.shape, [2, 256, 512, 1])
def test_single_input_args(self):
layer = preprocessing.AugMix([0, 255])
# RGB
xs = tf.ones((2, 512, 512, 3))
xs = layer(xs)
self.assertEqual(xs.shape, [2, 512, 512, 3])
# greyscale
xs = tf.ones((2, 512, 512, 1))
xs = layer(xs)
self.assertEqual(xs.shape, [2, 512, 512, 1])
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
@tf.keras.utils.register_keras_serializable(package="keras_cv")
class Augmenter(tf.keras.layers.Layer):
"""Augmenter performs a series of preprocessing operations on input data.
Args:
layers: A list of Keras layers to be applied in sequence to input data.
"""
def __init__(self, layers, **kwargs):
super().__init__(**kwargs)
self.layers = layers
def call(self, inputs, training=True):
for layer in self.layers:
inputs = layer(inputs, training=training)
return inputs
def get_config(self):
config = super().get_config()
config.update({"layers": self.layers})
return config
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
from keras_cv.layers import preprocessing
class AugmenterTest(tf.test.TestCase):
def test_return_shapes(self):
input = tf.ones((2, 512, 512, 3))
layer = preprocessing.Augmenter(
[
preprocessing.Grayscale(
output_channels=1,
),
preprocessing.RandomCropAndResize(
target_size=(100, 100),
crop_area_factor=(1, 1),
aspect_ratio_factor=(1, 1),
),
]
)
output = layer(input, training=True)
self.assertEqual(output.shape, [2, 100, 100, 1])
def test_in_tf_function(self):
input = tf.ones((2, 512, 512, 3))
layer = preprocessing.Augmenter(
[
preprocessing.Grayscale(
output_channels=1,
),
preprocessing.RandomCropAndResize(
target_size=(100, 100),
crop_area_factor=(1, 1),
aspect_ratio_factor=(1, 1),
),
]
)
@tf.function
def augment(x):
return layer(x, training=True)
output = augment(input)
self.assertEqual(output.shape, [2, 100, 100, 1])
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
from keras_cv.layers.preprocessing.base_image_augmentation_layer import (
BaseImageAugmentationLayer,
)
from keras_cv.utils import preprocessing
@tf.keras.utils.register_keras_serializable(package="keras_cv")
class AutoContrast(BaseImageAugmentationLayer):
"""Performs the AutoContrast operation on an image.
Auto contrast stretches the values of an image across the entire available
`value_range`. This makes differences between pixels more obvious. An example of
this is if an image only has values `[0, 1]` out of the range `[0, 255]`, auto
contrast will change the `1` values to be `255`.
Args:
value_range: the range of values the incoming images will have.
Represented as a two number tuple written [low, high].
This is typically either `[0, 1]` or `[0, 255]` depending
on how your preprocessing pipeline is setup.
"""
def __init__(
self,
value_range,
**kwargs,
):
super().__init__(**kwargs)
self.value_range = value_range
def augment_image(self, image, transformation=None, **kwargs):
original_image = image
image = preprocessing.transform_value_range(
image, original_range=self.value_range, target_range=(0, 255)
)
low = tf.reduce_min(tf.reduce_min(image, axis=0), axis=0)
high = tf.reduce_max(tf.reduce_max(image, axis=0), axis=0)
scale = 255.0 / (high - low)
offset = -low * scale
image = image * scale[None, None] + offset[None, None]
result = tf.clip_by_value(image, 0.0, 255.0)
result = preprocessing.transform_value_range(
result, original_range=(0, 255), target_range=self.value_range
)
# don't process NaN channels
result = tf.where(tf.math.is_nan(result), original_image, result)
return result
def augment_bounding_boxes(self, bounding_boxes, **kwargs):
return bounding_boxes
def augment_label(self, label, transformation=None, **kwargs):
return label
def augment_segmentation_mask(self, segmentation_mask, transformation, **kwargs):
return segmentation_mask
def get_config(self):
config = super().get_config()
config.update({"value_range": self.value_range})
return config
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
from keras_cv.layers import preprocessing
class AutoContrastTest(tf.test.TestCase):
def test_constant_channels_dont_get_nanned(self):
img = tf.constant([1, 1], dtype=tf.float32)
img = tf.expand_dims(img, axis=-1)
img = tf.expand_dims(img, axis=-1)
img = tf.expand_dims(img, axis=0)
layer = preprocessing.AutoContrast(value_range=(0, 255))
ys = layer(img)
self.assertTrue(tf.math.reduce_any(ys[0] == 1.0))
self.assertTrue(tf.math.reduce_any(ys[0] == 1.0))
def test_auto_contrast_expands_value_range(self):
img = tf.constant([0, 128], dtype=tf.float32)
img = tf.expand_dims(img, axis=-1)
img = tf.expand_dims(img, axis=-1)
img = tf.expand_dims(img, axis=0)
layer = preprocessing.AutoContrast(value_range=(0, 255))
ys = layer(img)
self.assertTrue(tf.math.reduce_any(ys[0] == 0.0))
self.assertTrue(tf.math.reduce_any(ys[0] == 255.0))
def test_auto_contrast_different_values_per_channel(self):
img = tf.constant(
[[[1, 2, 3], [4, 5, 6]], [[7, 8, 9], [10, 11, 12]]], dtype=tf.float32
)
img = tf.expand_dims(img, axis=0)
layer = preprocessing.AutoContrast(value_range=(0, 255))
ys = layer(img)
self.assertTrue(tf.math.reduce_any(ys[0, ..., 0] == 0.0))
self.assertTrue(tf.math.reduce_any(ys[0, ..., 1] == 0.0))
self.assertTrue(tf.math.reduce_any(ys[0, ..., 0] == 255.0))
self.assertTrue(tf.math.reduce_any(ys[0, ..., 1] == 255.0))
self.assertAllClose(
ys,
[
[
[[0.0, 0.0, 0.0], [85.0, 85.0, 85.0]],
[[170.0, 170.0, 170.0], [255.0, 255.0, 255.0]],
]
],
)
def test_auto_contrast_expands_value_range_uint8(self):
img = tf.constant([0, 128], dtype=tf.uint8)
img = tf.expand_dims(img, axis=-1)
img = tf.expand_dims(img, axis=-1)
img = tf.expand_dims(img, axis=0)
layer = preprocessing.AutoContrast(value_range=(0, 255))
ys = layer(img)
self.assertTrue(tf.math.reduce_any(ys[0] == 0.0))
self.assertTrue(tf.math.reduce_any(ys[0] == 255.0))
def test_auto_contrast_properly_converts_value_range(self):
img = tf.constant([0, 0.5], dtype=tf.float32)
img = tf.expand_dims(img, axis=-1)
img = tf.expand_dims(img, axis=-1)
img = tf.expand_dims(img, axis=0)
layer = preprocessing.AutoContrast(value_range=(0, 1))
ys = layer(img)
self.assertTrue(tf.math.reduce_any(ys[0] == 0.0))
self.assertTrue(tf.math.reduce_any(ys[0] == 1.0))
# Copyright 2022 The KerasCV Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tensorflow as tf
from keras_cv import bounding_box
from keras_cv.utils import preprocessing
# In order to support both unbatched and batched inputs, the horizontal
# and verticle axis is reverse indexed
H_AXIS = -3
W_AXIS = -2
IMAGES = "images"
LABELS = "labels"
TARGETS = "targets"
BOUNDING_BOXES = "bounding_boxes"
KEYPOINTS = "keypoints"
RAGGED_BOUNDING_BOXES = "ragged_bounding_boxes"
SEGMENTATION_MASKS = "segmentation_masks"
IS_DICT = "is_dict"
USE_TARGETS = "use_targets"
@tf.keras.utils.register_keras_serializable(package="keras_cv")
class BaseImageAugmentationLayer(tf.keras.__internal__.layers.BaseRandomLayer):
"""Abstract base layer for image augmentaion.
This layer contains base functionalities for preprocessing layers which
augment image related data, eg. image and in future, label and bounding
boxes. The subclasses could avoid making certain mistakes and reduce code
duplications.
This layer requires you to implement one method: `augment_image()`, which
augments one single image during the training. There are a few additional
methods that you can implement for added functionality on the layer:
`augment_label()`, which handles label augmentation if the layer supports
that.
`augment_bounding_boxes()`, which handles the bounding box augmentation, if
the layer supports that.
`get_random_transformation()`, which should produce a random transformation
setting. The tranformation object, which could be any type, will be passed
to `augment_image`, `augment_label` and `augment_bounding_boxes`, to
coodinate the randomness behavior, eg, in the RandomFlip layer, the image
and bounding_boxes should be changed in the same way.
The `call()` method support two formats of inputs:
1. Single image tensor with 3D (HWC) or 4D (NHWC) format.
2. A dict of tensors with stable keys. The supported keys are:
`"images"`, `"labels"` and `"bounding_boxes"` at the moment. We might add
more keys in future when we support more types of augmentation.
The output of the `call()` will be in two formats, which will be the same
structure as the inputs.
The `call()` will handle the logic detecting the training/inference mode,
unpack the inputs, forward to the correct function, and pack the output back
to the same structure as the inputs.
By default the `call()` method leverages the `tf.vectorized_map()` function.
Auto-vectorization can be disabled by setting `self.auto_vectorize = False`
in your `__init__()` method. When disabled, `call()` instead relies
on `tf.map_fn()`. For example:
```python
class SubclassLayer(keras_cv.BaseImageAugmentationLayer):
def __init__(self):
super().__init__()
self.auto_vectorize = False
```
Example:
```python
class RandomContrast(keras_cv.BaseImageAugmentationLayer):
def __init__(self, factor=(0.5, 1.5), **kwargs):
super().__init__(**kwargs)
self._factor = factor
def augment_image(self, image, transformation):
random_factor = tf.random.uniform([], self._factor[0], self._factor[1])
mean = tf.math.reduced_mean(inputs, axis=-1, keep_dim=True)
return (inputs - mean) * random_factor + mean
```
Note that since the randomness is also a common functionnality, this layer
also includes a tf.keras.backend.RandomGenerator, which can be used to
produce the random numbers. The random number generator is stored in the
`self._random_generator` attribute.
"""
def __init__(self, seed=None, **kwargs):
super().__init__(seed=seed, **kwargs)
@property
def auto_vectorize(self):
"""Control whether automatic vectorization occurs.
By default the `call()` method leverages the `tf.vectorized_map()`
function. Auto-vectorization can be disabled by setting
`self.auto_vectorize = False` in your `__init__()` method. When
disabled, `call()` instead relies on `tf.map_fn()`. For example:
```python
class SubclassLayer(BaseImageAugmentationLayer):
def __init__(self):
super().__init__()
self.auto_vectorize = False
```
"""
return getattr(self, "_auto_vectorize", True)
@auto_vectorize.setter
def auto_vectorize(self, auto_vectorize):
self._auto_vectorize = auto_vectorize
@property
def _map_fn(self):
if self.auto_vectorize:
return tf.vectorized_map
else:
return tf.map_fn
def augment_image(self, image, transformation, **kwargs):
"""Augment a single image during training.
Args:
image: 3D image input tensor to the layer. Forwarded from
`layer.call()`.
transformation: The transformation object produced by
`get_random_transformation`. Used to coordinate the randomness
between image, label, bounding box, keypoints, and segmentation mask.
Returns:
output 3D tensor, which will be forward to `layer.call()`.
"""
raise NotImplementedError()
def augment_label(self, label, transformation, **kwargs):
"""Augment a single label during training.
Args:
label: 1D label to the layer. Forwarded from `layer.call()`.
transformation: The transformation object produced by
`get_random_transformation`. Used to coordinate the randomness
between image, label, bounding box, keypoints, and segmentation mask.
Returns:
output 1D tensor, which will be forward to `layer.call()`.
"""
raise NotImplementedError()
def augment_target(self, target, transformation, **kwargs):
"""Augment a single target during training.
Args:
target: 1D label to the layer. Forwarded from `layer.call()`.
transformation: The transformation object produced by
`get_random_transformation`. Used to coordinate the randomness
between image, label, bounding box, keypoints, and segmentation mask.
Returns:
output 1D tensor, which will be forward to `layer.call()`.
"""
return self.augment_label(target, transformation)
def augment_bounding_boxes(self, bounding_boxes, transformation, **kwargs):
"""Augment bounding boxes for one image during training.
Args:
image: 3D image input tensor to the layer. Forwarded from
`layer.call()`.
bounding_boxes: 2D bounding boxes to the layer. Forwarded from
`call()`.
transformation: The transformation object produced by
`get_random_transformation`. Used to coordinate the randomness
between image, label, bounding box, keypoints, and segmentation mask.
Returns:
output 2D tensor, which will be forward to `layer.call()`.
"""
raise NotImplementedError()
def augment_keypoints(self, keypoints, transformation, **kwargs):
"""Augment keypoints for one image during training.
Args:
keypoints: 2D keypoints input tensor to the layer. Forwarded from
`layer.call()`.
transformation: The transformation object produced by
`get_random_transformation`. Used to coordinate the randomness
between image, label, bounding box, keypoints, and segmentation mask.
Returns:
output 2D tensor, which will be forward to `layer.call()`.
"""
raise NotImplementedError()
def augment_segmentation_mask(self, segmentation_mask, transformation, **kwargs):
"""Augment a single image's segmentation mask during training.
Args:
segmentation_mask: 3D segmentation mask input tensor to the layer.
This should generally have the shape [H, W, 1], or in some cases [H, W, C] for multilabeled data.
Forwarded from `layer.call()`.
transformation: The transformation object produced by
`get_random_transformation`. Used to coordinate the randomness
between image, label, bounding box, keypoints, and segmentation mask.
Returns:
output 3D tensor containing the augmented segmentation mask, which will be forward to `layer.call()`.
"""
raise NotImplementedError()
def get_random_transformation(
self,
image=None,
label=None,
bounding_boxes=None,
keypoints=None,
segmentation_mask=None,
):
"""Produce random transformation config for one single input.
This is used to produce same randomness between
image/label/bounding_box.
Args:
image: 3D image tensor from inputs.
label: optional 1D label tensor from inputs.
bounding_box: optional 2D bounding boxes tensor from inputs.
segmentation_mask: optional 3D segmentation mask tensor from inputs.
Returns:
Any type of object, which will be forwarded to `augment_image`,
`augment_label` and `augment_bounding_box` as the `transformation`
parameter.
"""
return None
def call(self, inputs, training=True):
inputs = self._ensure_inputs_are_compute_dtype(inputs)
if training:
inputs, metadata = self._format_inputs(inputs)
images = inputs[IMAGES]
if images.shape.rank == 3:
return self._format_output(self._augment(inputs), metadata)
elif images.shape.rank == 4:
return self._format_output(self._batch_augment(inputs), metadata)
else:
raise ValueError(
"Image augmentation layers are expecting inputs to be "
"rank 3 (HWC) or 4D (NHWC) tensors. Got shape: "
f"{images.shape}"
)
else:
return inputs
def _augment(self, inputs):
image = inputs.get(IMAGES, None)
label = inputs.get(LABELS, None)
bounding_boxes = inputs.get(BOUNDING_BOXES, None)
keypoints = inputs.get(KEYPOINTS, None)
segmentation_mask = inputs.get(SEGMENTATION_MASKS, None)
transformation = self.get_random_transformation(
image=image,
label=label,
bounding_boxes=bounding_boxes,
keypoints=keypoints,
segmentation_mask=segmentation_mask,
)
image = self.augment_image(
image,
transformation=transformation,
bounding_boxes=bounding_boxes,
label=label,
)
result = {IMAGES: image}
if label is not None:
label = self.augment_target(
label,
transformation=transformation,
bounding_boxes=bounding_boxes,
image=image,
)
result[LABELS] = label
if bounding_boxes is not None:
bounding_boxes = self.augment_bounding_boxes(
bounding_boxes,
transformation=transformation,
label=label,
image=image,
)
result[BOUNDING_BOXES] = bounding_boxes
if keypoints is not None:
keypoints = self.augment_keypoints(
keypoints,
transformation=transformation,
label=label,
bounding_boxes=bounding_boxes,
image=image,
)
result[KEYPOINTS] = keypoints
if segmentation_mask is not None:
segmentation_mask = self.augment_segmentation_mask(
segmentation_mask,
transformation=transformation,
)
result[SEGMENTATION_MASKS] = segmentation_mask
# preserve any additional inputs unmodified by this layer.
for key in inputs.keys() - result.keys():
result[key] = inputs[key]
return result
def _batch_augment(self, inputs):
return self._map_fn(self._augment, inputs)
def _format_inputs(self, inputs):
metadata = {IS_DICT: True, USE_TARGETS: False}
if tf.is_tensor(inputs):
# single image input tensor
metadata[IS_DICT] = False
inputs = {IMAGES: inputs}
return inputs, metadata
if not isinstance(inputs, dict):
raise ValueError(
f"Expect the inputs to be image tensor or dict. Got inputs={inputs}"
)
if BOUNDING_BOXES in inputs:
inputs[BOUNDING_BOXES], updates = self._format_bounding_boxes(
inputs[BOUNDING_BOXES]
)
metadata.update(updates)
if isinstance(inputs, dict) and TARGETS in inputs:
# TODO(scottzhu): Check if it only contains the valid keys
inputs[LABELS] = inputs[TARGETS]
del inputs[TARGETS]
metadata[USE_TARGETS] = True
return inputs, metadata
return inputs, metadata
def _format_bounding_boxes(self, bounding_boxes):
metadata = {RAGGED_BOUNDING_BOXES: False}
if isinstance(bounding_boxes, tf.RaggedTensor):
metadata = {RAGGED_BOUNDING_BOXES: True}
bounding_boxes = bounding_box.pad_with_sentinels(bounding_boxes)
if bounding_boxes.shape[-1] < 5:
raise ValueError(
"Bounding boxes are missing class_id. If you would like to pad the "
"bounding boxes with class_id, use `keras_cv.bounding_box.add_class_id`"
)
return bounding_boxes, metadata
def _format_output(self, output, metadata):
if not metadata[IS_DICT]:
return output[IMAGES]
elif metadata[USE_TARGETS]:
output[TARGETS] = output[LABELS]
del output[LABELS]
if BOUNDING_BOXES in output:
if metadata[RAGGED_BOUNDING_BOXES]:
output[BOUNDING_BOXES] = bounding_box.filter_sentinels(
output[BOUNDING_BOXES]
)
return output
def _ensure_inputs_are_compute_dtype(self, inputs):
if isinstance(inputs, dict):
inputs[IMAGES] = preprocessing.ensure_tensor(
inputs[IMAGES],
self.compute_dtype,
)
else:
inputs = preprocessing.ensure_tensor(
inputs,
self.compute_dtype,
)
return inputs
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment