Commit 482823c8 authored by A. Unique TensorFlower's avatar A. Unique TensorFlower
Browse files

Merge pull request #10263 from PurdueDualityLab:dataload_pr

PiperOrigin-RevId: 399483092
parents 61f8185d 77aa3ea9
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Yolo preprocess ops."""
import tensorflow as tf
import tensorflow_addons as tfa
from official.vision.beta.projects.yolo.ops import box_ops
def resize_crop_filter(image, boxes, default_width, default_height,
target_width, target_height):
"""Apply zooming to the image and boxes.
Args:
image: a `Tensor` representing the image.
boxes: a `Tensor` represeting the boxes.
default_width: a `Tensor` representing the width of the image.
default_height: a `Tensor` representing the height of the image.
target_width: a `Tensor` representing the desired width of the image.
target_height: a `Tensor` representing the desired height of the image.
Returns:
images: a `Tensor` representing the augmented image.
boxes: a `Tensor` representing the augmented boxes.
"""
with tf.name_scope('resize_crop_filter'):
image = tf.image.resize(image, (target_width, target_height))
image = tf.image.resize_with_crop_or_pad(image,
target_height=default_height,
target_width=default_width)
default_width = tf.cast(default_width, boxes.dtype)
default_height = tf.cast(default_height, boxes.dtype)
target_width = tf.cast(target_width, boxes.dtype)
target_height = tf.cast(target_height, boxes.dtype)
aspect_change_width = target_width / default_width
aspect_change_height = target_height / default_height
x, y, width, height = tf.split(boxes, 4, axis=-1)
x = (x - 0.5) * target_width / default_width + 0.5
y = (y - 0.5) * target_height / default_height + 0.5
width = width * aspect_change_width
height = height * aspect_change_height
boxes = tf.concat([x, y, width, height], axis=-1)
return image, boxes
def random_translate(image, box, t, seed=None):
"""Randomly translate the image and boxes.
Args:
image: a `Tensor` representing the image.
box: a `Tensor` represeting the boxes.
t: an `int` representing the translation factor
seed: an optional seed for tf.random operations
Returns:
image: a `Tensor` representing the augmented image.
box: a `Tensor` representing the augmented boxes.
"""
t_x = tf.random.uniform(minval=-t,
maxval=t,
shape=(),
dtype=tf.float32,
seed=seed)
t_y = tf.random.uniform(minval=-t,
maxval=t,
shape=(),
dtype=tf.float32,
seed=seed)
box = translate_boxes(box, t_x, t_y)
image = translate_image(image, t_x, t_y)
return image, box
def translate_boxes(box, translate_x, translate_y):
"""Randomly translate the boxes.
Args:
box: a `Tensor` represeitng the boxes.
translate_x: a `Tensor` represting the translation on the x-axis.
translate_y: a `Tensor` represting the translation on the y-axis.
Returns:
box: a `Tensor` representing the augmented boxes.
"""
with tf.name_scope('translate_boxs'):
x = box[..., 0] + translate_x
y = box[..., 1] + translate_y
box = tf.stack([x, y, box[..., 2], box[..., 3]], axis=-1)
box.set_shape([None, 4])
return box
def translate_image(image, translate_x, translate_y):
"""Randomly translate the image.
Args:
image: a `Tensor` representing the image.
translate_x: a `Tensor` represting the translation on the x-axis.
translate_y: a `Tensor` represting the translation on the y-axis.
Returns:
box: a `Tensor` representing the augmented boxes.
"""
with tf.name_scope('translate_image'):
if (translate_x != 0 and translate_y != 0):
image_jitter = tf.convert_to_tensor([translate_x, translate_y])
image_jitter.set_shape([2])
image = tfa.image.translate(
image, image_jitter * tf.cast(tf.shape(image)[1], tf.float32))
return image
def pad_max_instances(value, instances, pad_value=0, pad_axis=0):
"""Pads tensors to max number of instances."""
shape = tf.shape(value)
dim1 = shape[pad_axis]
take = tf.math.reduce_min([instances, dim1])
value, _ = tf.split(value, [take, -1],
axis=pad_axis) # value[:instances, ...]
pad = tf.convert_to_tensor([tf.math.reduce_max([instances - dim1, 0])])
nshape = tf.concat([shape[:pad_axis], pad, shape[(pad_axis + 1):]], axis=0)
pad_tensor = tf.fill(nshape, tf.cast(pad_value, dtype=value.dtype))
value = tf.concat([value, pad_tensor], axis=pad_axis)
return value
def fit_preserve_aspect_ratio(image,
boxes,
width=None,
height=None,
target_dim=None):
"""Resizes the image while peserving the image aspect ratio.
Args:
image: a `Tensor` representing the image.
boxes: a `Tensor` representing the boxes.
width: int for the image width.
height: int for the image height.
target_dim: list or a Tensor of height and width.
Returns:
image: a `Tensor` representing the image.
box: a `Tensor` representing the boxes.
"""
if width is None or height is None:
shape = tf.shape(image)
if tf.shape(shape)[0] == 4:
width = shape[1]
height = shape[2]
else:
width = shape[0]
height = shape[1]
clipper = tf.math.maximum(width, height)
if target_dim is None:
target_dim = clipper
pad_width = clipper - width
pad_height = clipper - height
image = tf.image.pad_to_bounding_box(image, pad_width // 2, pad_height // 2,
clipper, clipper)
boxes = box_ops.yxyx_to_xcycwh(boxes)
x, y, w, h = tf.split(boxes, 4, axis=-1)
y *= tf.cast(width / clipper, tf.float32)
x *= tf.cast(height / clipper, tf.float32)
y += tf.cast((pad_width / clipper) / 2, tf.float32)
x += tf.cast((pad_height / clipper) / 2, tf.float32)
h *= tf.cast(width / clipper, tf.float32)
w *= tf.cast(height / clipper, tf.float32)
boxes = tf.concat([x, y, w, h], axis=-1)
boxes = box_ops.xcycwh_to_yxyx(boxes)
image = tf.image.resize(image, (target_dim, target_dim))
return image, boxes
def get_best_anchor(y_true, anchors, width=1, height=1):
"""Gets the correct anchor that is assoiciated with each box using IOU.
Args:
y_true: `tf.Tensor[]` for the list of bounding boxes in the yolo format.
anchors: list or tensor for the anchor boxes to be used in prediction
found via Kmeans.
width: int for the image width.
height: int for the image height.
Returns:
tf.Tensor: y_true with the anchor associated with each ground truth
box known.
"""
with tf.name_scope('get_anchor'):
width = tf.cast(width, dtype=tf.float32)
height = tf.cast(height, dtype=tf.float32)
# split the boxes into center and width height
anchor_xy = y_true[..., 0:2]
# scale thhe boxes
anchors = tf.convert_to_tensor(anchors, dtype=tf.float32)
anchors_x = anchors[..., 0] / width
anchors_y = anchors[..., 1] / height
anchors = tf.stack([anchors_x, anchors_y], axis=-1)
k = tf.shape(anchors)[0]
# build a matrix of anchor boxes of shape [num_anchors, num_boxes, 4]
anchors = tf.transpose(anchors, perm=[1, 0])
anchor_xy = tf.tile(tf.expand_dims(anchor_xy, axis=-1),
[1, 1, tf.shape(anchors)[-1]])
anchors = tf.tile(tf.expand_dims(anchors, axis=0),
[tf.shape(anchor_xy)[0], 1, 1])
# stack the xy so, each anchor is asscoaited once with each center from
# the ground truth input
anchors = tf.concat([anchor_xy, anchors], axis=1)
anchors = tf.transpose(anchors, perm=[2, 0, 1])
# copy the gt n times so that each anchor from above can be compared to
# input ground truth to shape: [num_anchors, num_boxes, 4]
truth_comp = tf.tile(tf.expand_dims(y_true[..., 0:4], axis=-1),
[1, 1, tf.shape(anchors)[0]])
truth_comp = tf.transpose(truth_comp, perm=[2, 0, 1])
# compute intersection over union of the boxes, and take the argmax of
# comuted iou for each box. thus each box is associated with the
# largest interection over union
iou_raw = box_ops.compute_iou(truth_comp, anchors)
values, indexes = tf.math.top_k(tf.transpose(iou_raw, perm=[1, 0]),
k=tf.cast(k, dtype=tf.int32),
sorted=True)
ind_mask = tf.cast(values > 0.213, dtype=indexes.dtype)
# pad the indexs such that all values less than the thresh are -1
# add one, multiply the mask to zeros all the bad locations
# subtract 1 makeing all the bad locations 0.
iou_index = tf.concat([
tf.keras.backend.expand_dims(indexes[..., 0], axis=-1),
((indexes[..., 1:] + 1) * ind_mask[..., 1:]) - 1
],
axis=-1)
iou_index = iou_index[..., :6]
return tf.cast(iou_index, dtype=tf.float32)
def build_grided_gt(y_true, mask, size, dtype, use_tie_breaker):
"""Converts ground truth for use in loss functions.
Args:
y_true: tf.Tensor[] ground truth
[box coords[0:4], classes_onehot[0:-1], best_fit_anchor_box].
mask: list of the anchor boxes choresponding to the output,
ex. [1, 2, 3] tells this layer to predict only the first 3
anchors in the total.
size: The dimensions of this output, for regular, it progresses
from 13, to 26, to 52.
dtype: The expected output dtype.
use_tie_breaker: boolean value for wether or not to use the tie_breaker.
Returns:
tf.Tensor[] of shape [size, size, #of_anchors, 4, 1, num_classes].
"""
# unpack required components from the input ground truth
boxes = tf.cast(y_true['bbox'], dtype)
classes = tf.expand_dims(tf.cast(y_true['classes'], dtype=dtype), axis=-1)
anchors = tf.cast(y_true['best_anchors'], dtype)
# get the number of boxes in the ground truth boxs
num_boxes = tf.shape(boxes)[0]
# get the number of anchor boxes used for this anchor scale
len_masks = tf.shape(mask)[0]
# init a fixed memeory size grid for this prediction scale
# [size, size, # of anchors, 1 + 1 + number of anchors per scale]
full = tf.zeros([size, size, len_masks, 6], dtype=dtype)
# init a grid to use to track which locations have already
# been used before (for the tie breaker)
depth_track = tf.zeros((size, size, len_masks), dtype=tf.int32)
# rescale the x and y centers to the size of the grid [size, size]
x = tf.cast(boxes[..., 0] * tf.cast(size, dtype=dtype), dtype=tf.int32)
y = tf.cast(boxes[..., 1] * tf.cast(size, dtype=dtype), dtype=tf.int32)
# init all the tensorArrays to be used in storeing the index
# and the values to be used to update both depth_track and full
update_index = tf.TensorArray(tf.int32, size=0, dynamic_size=True)
update = tf.TensorArray(dtype, size=0, dynamic_size=True)
# init constants and match data types before entering loop
i = 0
anchor_id = 0
const = tf.cast(tf.convert_to_tensor([1.]), dtype=dtype)
mask = tf.cast(mask, dtype=dtype)
rand_update = 0.0
for box_id in range(num_boxes):
# If the width or height of the box is zero, skip it.
# After pre processing, if the box is not in the i image bounds anymore,
# skip it.
if tf.keras.backend.all(tf.math.equal(
boxes[box_id, 2:4], 0)) or tf.keras.backend.any(
tf.math.less(boxes[box_id, 0:2], 0.0)) or tf.keras.backend.any(
tf.math.greater_equal(boxes[box_id, 0:2], 1.0)):
continue
if use_tie_breaker:
for anchor_id in range(tf.shape(anchors)[-1]):
index = tf.math.equal(anchors[box_id, anchor_id], mask)
if tf.keras.backend.any(index):
# using the boolean index mask to determine exactly which
# anchor box was used
p = tf.cast(
tf.keras.backend.argmax(tf.cast(index, dtype=tf.int32)),
dtype=tf.int32)
# determine if the index was used or not
used = depth_track[y[box_id], x[box_id], p]
# defualt used upadte value
uid = 1
# if anchor_id is 0, this is the best matched anchor for this box
# with the highest IOU
if anchor_id == 0:
# write the box to the update list
# create random numbr to trigger a replacment if the cell
# is used already
if tf.math.equal(used, 1):
rand_update = tf.random.uniform([], maxval=1)
else:
rand_update = 1.0
if rand_update > 0.5:
# write the box to the update list
update_index = update_index.write(i, [y[box_id], x[box_id], p])
value = tf.concat([boxes[box_id], const, classes[box_id]],
axis=-1)
update = update.write(i, value)
# if used is 2, this cell is filled with a non-optimal box
# if used is 0, the cell in the ground truth is not yet consumed
# in either case you can replace that cell with a new box, as long
# as it is not consumed by an optimal box with anchor_id = 0
elif tf.math.equal(used, 2) or tf.math.equal(used, 0):
uid = 2
# write the box to the update list
update_index = update_index.write(i, [y[box_id], x[box_id], p])
value = tf.concat([boxes[box_id], const, classes[box_id]], axis=-1)
update = update.write(i, value)
depth_track = tf.tensor_scatter_nd_update(
depth_track, [(y[box_id], x[box_id], p)], [uid])
i += 1
else:
index = tf.math.equal(anchors[box_id, 0], mask)
# if any there is an index match
if tf.keras.backend.any(index):
# find the index
p = tf.cast(
tf.keras.backend.argmax(tf.cast(index, dtype=tf.int32)),
dtype=tf.int32)
# update the list of used boxes
update_index = update_index.write(i, [y[box_id], x[box_id], p])
value = tf.concat([boxes[box_id], const, classes[box_id]], axis=-1)
update = update.write(i, value)
i += 1
# if the size of the update list is not 0, do an update, other wise,
# no boxes and pass an empty grid
if tf.math.greater(update_index.size(), 0):
update_index = update_index.stack()
update = update.stack()
full = tf.tensor_scatter_nd_update(full, update_index, update)
return full
def build_batch_grided_gt(y_true, mask, size, dtype, use_tie_breaker):
"""Converts ground truth for use in loss functions.
Args:
y_true: tf.Tensor[] ground truth
[batch, box coords[0:4], classes_onehot[0:-1], best_fit_anchor_box].
mask: list of the anchor boxes choresponding to the output,
ex. [1, 2, 3] tells this layer to predict only the first 3 anchors
in the total.
size: the dimensions of this output, for regular, it progresses from
13, to 26, to 52.
dtype: expected output datatype.
use_tie_breaker: boolean value for whether or not to use the tie
breaker.
Returns:
tf.Tensor[] of shape [batch, size, size, #of_anchors, 4, 1, num_classes].
"""
# unpack required components from the input ground truth
boxes = tf.cast(y_true['bbox'], dtype)
classes = tf.expand_dims(tf.cast(y_true['classes'], dtype=dtype), axis=-1)
anchors = tf.cast(y_true['best_anchors'], dtype)
# get the batch size
batches = tf.shape(boxes)[0]
# get the number of boxes in the ground truth boxs
num_boxes = tf.shape(boxes)[1]
# get the number of anchor boxes used for this anchor scale
len_masks = tf.shape(mask)[0]
# init a fixed memeory size grid for this prediction scale
# [batch, size, size, # of anchors, 1 + 1 + number of anchors per scale]
full = tf.zeros([batches, size, size, len_masks, 1 + 4 + 1], dtype=dtype)
# init a grid to use to track which locations have already
# been used before (for the tie breaker)
depth_track = tf.zeros((batches, size, size, len_masks), dtype=tf.int32)
# rescale the x and y centers to the size of the grid [size, size]
x = tf.cast(boxes[..., 0] * tf.cast(size, dtype=dtype), dtype=tf.int32)
y = tf.cast(boxes[..., 1] * tf.cast(size, dtype=dtype), dtype=tf.int32)
# init all the tensorArrays to be used in storeing the index and the values
# to be used to update both depth_track and full
update_index = tf.TensorArray(tf.int32, size=0, dynamic_size=True)
update = tf.TensorArray(dtype, size=0, dynamic_size=True)
# init constants and match data types before entering loop
i = 0
anchor_id = 0
const = tf.cast(tf.convert_to_tensor([1.]), dtype=dtype)
mask = tf.cast(mask, dtype=dtype)
rand_update = 0.0
for batch in range(batches):
for box_id in range(num_boxes):
# if the width or height of the box is zero, skip it
if tf.keras.backend.all(tf.math.equal(boxes[batch, box_id, 2:4], 0)):
continue
# after pre processing, if the box is not in the image bounds anymore
# skip the box
if tf.keras.backend.any(tf.math.less(
boxes[batch, box_id, 0:2], 0.0)) or tf.keras.backend.any(
tf.math.greater_equal(boxes[batch, box_id, 0:2], 1.0)):
continue
if use_tie_breaker:
for anchor_id in range(tf.shape(anchors)[-1]):
index = tf.math.equal(anchors[batch, box_id, anchor_id], mask)
if tf.keras.backend.any(index):
# using the boolean index mask to determine exactly which anchor
# box was used
p = tf.cast(tf.keras.backend.argmax(tf.cast(index, dtype=tf.int32)),
dtype=tf.int32)
# determine if the index was used or not
used = depth_track[batch, y[batch, box_id], x[batch, box_id], p]
# defualt used upadte value
uid = 1
# if anchor_id is 0, this is the best matched anchor for this box
# with the highest IOU
if anchor_id == 0:
# create random number to trigger a replacment if the cell
# is used already
if tf.math.equal(used, 1):
rand_update = tf.random.uniform([], maxval=1)
else:
rand_update = 1.0
if rand_update > 0.5:
# write the box to the update list
update_index = update_index.write(
i, [batch, y[batch, box_id], x[batch, box_id], p])
value = tf.concat(
[boxes[batch, box_id], const, classes[batch, box_id]],
axis=-1)
update = update.write(i, value)
# if used is 2, this cell is filled with a non-optimal box
# if used is 0, the cell in the ground truth is not yet consumed
# in either case you can replace that cell with a new box, as long
# as it is not consumed by an optimal box with anchor_id = 0
elif tf.math.equal(used, 2) or tf.math.equal(used, 0):
uid = 2
# write the box to the update list
update_index = update_index.write(
i, [batch, y[batch, box_id], x[batch, box_id], p])
value = ([boxes[batch, box_id], const, classes[batch, box_id]])
update = update.write(i, value)
# update the used index for where and how the box was placed
depth_track = tf.tensor_scatter_nd_update(
depth_track, [(batch, y[batch, box_id], x[batch, box_id], p)],
[uid])
i += 1
else:
index = tf.math.equal(anchors[batch, box_id, 0], mask)
if tf.keras.backend.any(index):
# if any there is an index match
p = tf.cast(
tf.keras.backend.argmax(tf.cast(index, dtype=tf.int32)),
dtype=tf.int32)
# write the box to the update list
update_index = update_index.write(
i, [batch, y[batch, box_id], x[batch, box_id], p])
value = tf.concat(
[boxes[batch, box_id], const, classes[batch, box_id]], axis=-1)
update = update.write(i, value)
i += 1
# if the size of the update list is not 0, do an update, other wise,
# no boxes and pass an empty grid
if tf.math.greater(update_index.size(), 0):
update_index = update_index.stack()
update = update.stack()
full = tf.tensor_scatter_nd_update(full, update_index, update)
return full
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""preprocess_ops tests."""
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from official.vision.beta.projects.yolo.ops import preprocess_ops
class PreprocessOpsTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters((416, 416, 5, 300, 300), (100, 200, 6, 50, 50))
def test_resize_crop_filter(self, default_width, default_height, num_boxes,
target_width, target_height):
image = tf.convert_to_tensor(
np.random.rand(default_width, default_height, 3))
boxes = tf.convert_to_tensor(np.random.rand(num_boxes, 4))
resized_image, resized_boxes = preprocess_ops.resize_crop_filter(
image, boxes, default_width, default_height, target_width,
target_height)
resized_image_shape = tf.shape(resized_image)
resized_boxes_shape = tf.shape(resized_boxes)
self.assertAllEqual([default_height, default_width, 3],
resized_image_shape.numpy())
self.assertAllEqual([num_boxes, 4], resized_boxes_shape.numpy())
@parameterized.parameters((7, 7., 5.), (25, 35., 45.))
def test_translate_boxes(self, num_boxes, translate_x, translate_y):
boxes = tf.convert_to_tensor(np.random.rand(num_boxes, 4))
translated_boxes = preprocess_ops.translate_boxes(
boxes, translate_x, translate_y)
translated_boxes_shape = tf.shape(translated_boxes)
self.assertAllEqual([num_boxes, 4], translated_boxes_shape.numpy())
@parameterized.parameters((100, 200, 75., 25.), (400, 600, 25., 75.))
def test_translate_image(self, image_height, image_width, translate_x,
translate_y):
image = tf.convert_to_tensor(np.random.rand(image_height, image_width, 4))
translated_image = preprocess_ops.translate_image(
image, translate_x, translate_y)
translated_image_shape = tf.shape(translated_image)
self.assertAllEqual([image_height, image_width, 4],
translated_image_shape.numpy())
@parameterized.parameters(([1, 2], 20, 0), ([13, 2, 4], 15, 0))
def test_pad_max_instances(self, input_shape, instances, pad_axis):
expected_output_shape = input_shape
expected_output_shape[pad_axis] = instances
output = preprocess_ops.pad_max_instances(
np.ones(input_shape), instances, pad_axis=pad_axis)
self.assertAllEqual(expected_output_shape, tf.shape(output).numpy())
if __name__ == '__main__':
tf.test.main()
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Preprocessing ops for yolo."""
import random
import numpy as np
import tensorflow as tf
import tensorflow_addons as tfa
from official.vision.beta.ops import box_ops as bbox_ops
PAD_VALUE = 114
GLOBAL_SEED_SET = False
def set_random_seeds(seed=0):
"""Sets all accessible global seeds to properly apply randomization.
This is not the same as passing the seed as a variable to each call
to tf.random.For more, see the documentation for tf.random on the tensorflow
website https://www.tensorflow.org/api_docs/python/tf/random/set_seed. Note
that passing the seed to each random number generator will not give you the
expected behavior if you use more than one generator in a single function.
Args:
seed: `Optional[int]` representing the seed you want to use.
"""
if seed is not None:
global GLOBAL_SEED_SET
random.seed(seed)
GLOBAL_SEED_SET = True
tf.random.set_seed(seed)
np.random.seed(seed)
def random_uniform_strong(minval,
maxval,
dtype=tf.float32,
seed=None,
shape=None):
"""A unified function for consistent random number generation.
Equivalent to tf.random.uniform, except that minval and maxval are flipped if
minval is greater than maxval. Seed Safe random number generator.
Args:
minval: An `int` for a lower or upper endpoint of the interval from which to
choose the random number.
maxval: An `int` for the other endpoint.
dtype: The output type of the tensor.
seed: An `int` used to set the seed.
shape: List or 1D tf.Tensor, output shape of the random generator.
Returns:
A random tensor of type `dtype` that falls between `minval` and `maxval`
excluding the larger one.
"""
if GLOBAL_SEED_SET:
seed = None
if minval > maxval:
minval, maxval = maxval, minval
return tf.random.uniform(
shape=shape or [], minval=minval, maxval=maxval, seed=seed, dtype=dtype)
def random_scale(val, dtype=tf.float32, seed=None):
"""Generates a random number for scaling a parameter by multiplication.
Generates a random number for the scale. Half of the time, the value is
between [1.0, val) with uniformly distributed probability. In the other half,
the value is the reciprocal of this value. The function is identical to the
one in the original implementation:
https://github.com/AlexeyAB/darknet/blob/a3714d0a/src/utils.c#L708-L713
Args:
val: A float representing the maximum scaling allowed.
dtype: The output type of the tensor.
seed: An `int` used to set the seed.
Returns:
The random scale.
"""
scale = random_uniform_strong(1.0, val, dtype=dtype, seed=seed)
do_ret = random_uniform_strong(minval=0, maxval=2, dtype=tf.int32, seed=seed)
if do_ret == 1:
return scale
return 1.0 / scale
def pad_max_instances(value, instances, pad_value=0, pad_axis=0):
"""Pad or clip the tensor value to a fixed length along a given axis.
Pads a dimension of the tensor to have a maximum number of instances filling
additional entries with the `pad_value`. Allows for selection of the padding
axis.
Args:
value: An input tensor.
instances: An `int` representing the maximum number of instances.
pad_value: An `int` representing the value used for padding until the
maximum number of instances is obtained.
pad_axis: An `int` representing the axis index to pad.
Returns:
The output tensor whose dimensions match the input tensor except with the
size along the `pad_axis` replaced by `instances`.
"""
# get the real shape of value
shape = tf.shape(value)
# compute the padding axis
if pad_axis < 0:
pad_axis = tf.rank(value) + pad_axis
# determin how much of the tensor value to keep
dim1 = shape[pad_axis]
take = tf.math.reduce_min([instances, dim1])
value, _ = tf.split(value, [take, -1], axis=pad_axis)
# pad the clipped tensor to the right shape
pad = tf.convert_to_tensor([tf.math.reduce_max([instances - dim1, 0])])
nshape = tf.concat([shape[:pad_axis], pad, shape[(pad_axis + 1):]], axis=0)
pad_tensor = tf.fill(nshape, tf.cast(pad_value, dtype=value.dtype))
value = tf.concat([value, pad_tensor], axis=pad_axis)
if isinstance(instances, int):
vshape = value.get_shape().as_list()
vshape[pad_axis] = instances
value.set_shape(vshape)
return value
def get_image_shape(image):
"""Consistently gets the width and height of the image.
Gets the shape of the image regardless of if the image is in the
(batch_size, x, y, c) format or the (x, y, c) format.
Args:
image: A tensor who has either 3 or 4 dimensions.
Returns:
A tuple (height, width), where height is the height of the image
and width is the width of the image.
"""
shape = tf.shape(image)
if shape.get_shape().as_list()[0] == 4:
width = shape[2]
height = shape[1]
else:
width = shape[1]
height = shape[0]
return height, width
def _augment_hsv_darknet(image, rh, rs, rv, seed=None):
"""Randomize the hue, saturation, and brightness via the darknet method."""
if rh > 0.0:
delta = random_uniform_strong(-rh, rh, seed=seed)
image = tf.image.adjust_hue(image, delta)
if rs > 0.0:
delta = random_scale(rs, seed=seed)
image = tf.image.adjust_saturation(image, delta)
if rv > 0.0:
delta = random_scale(rv, seed=seed)
image *= delta
# clip the values of the image between 0.0 and 1.0
image = tf.clip_by_value(image, 0.0, 1.0)
return image
def _augment_hsv_torch(image, rh, rs, rv, seed=None):
"""Randomize the hue, saturation, and brightness via the pytorch method."""
dtype = image.dtype
image = tf.cast(image, tf.float32)
image = tf.image.rgb_to_hsv(image)
gen_range = tf.cast([rh, rs, rv], image.dtype)
scale = tf.cast([180, 255, 255], image.dtype)
r = random_uniform_strong(
-1, 1, shape=[3], dtype=image.dtype, seed=seed) * gen_range + 1
image = tf.math.floor(tf.cast(image, scale.dtype) * scale)
image = tf.math.floor(tf.cast(image, r.dtype) * r)
h, s, v = tf.split(image, 3, axis=-1)
h = h % 180
s = tf.clip_by_value(s, 0, 255)
v = tf.clip_by_value(v, 0, 255)
image = tf.concat([h, s, v], axis=-1)
image = tf.cast(image, scale.dtype) / scale
image = tf.image.hsv_to_rgb(image)
return tf.cast(image, dtype)
def image_rand_hsv(image, rh, rs, rv, seed=None, darknet=False):
"""Randomly alters the hue, saturation, and brightness of an image.
Args:
image: `Tensor` of shape [None, None, 3] that needs to be altered.
rh: `float32` used to indicate the maximum delta that can be multiplied to
the hue.
rs: `float32` used to indicate the maximum delta that can be multiplied to
the saturation.
rv: `float32` used to indicate the maximum delta that can be multiplied to
the brightness.
seed: `Optional[int]` for the seed to use in the random number generation.
darknet: `bool` indicating whether the model was originally built in the
Darknet or PyTorch library.
Returns:
The HSV altered image in the same datatype as the input image.
"""
if darknet:
image = _augment_hsv_darknet(image, rh, rs, rv, seed=seed)
else:
image = _augment_hsv_torch(image, rh, rs, rv, seed=seed)
return image
def mosaic_cut(image, original_width, original_height, width, height, center,
ptop, pleft, pbottom, pright, shiftx, shifty):
"""Generates a random center location to use for the mosaic operation.
Given a center location, cuts the input image into a slice that will be
concatenated with other slices with the same center in order to construct
a final mosaicked image.
Args:
image: `Tensor` of shape [None, None, 3] that needs to be altered.
original_width: `float` value indicating the original width of the image.
original_height: `float` value indicating the original height of the image.
width: `float` value indicating the final width of the image.
height: `float` value indicating the final height of the image.
center: `float` value indicating the desired center of the final patched
image.
ptop: `float` value indicating the top of the image without padding.
pleft: `float` value indicating the left of the image without padding.
pbottom: `float` value indicating the bottom of the image without padding.
pright: `float` value indicating the right of the image without padding.
shiftx: `float` 0.0 or 1.0 value indicating if the image is on the left or
right.
shifty: `float` 0.0 or 1.0 value indicating if the image is at the top or
bottom.
Returns:
image: The cropped image in the same datatype as the input image.
crop_info: `float` tensor that is applied to the boxes in order to select
the boxes still contained within the image.
"""
def cast(values, dtype):
return [tf.cast(value, dtype) for value in values]
with tf.name_scope('mosaic_cut'):
center = tf.cast(center, width.dtype)
zero = tf.cast(0.0, width.dtype)
cut_x, cut_y = center[1], center[0]
# Select the crop of the image to use
left_shift = tf.minimum(
tf.minimum(cut_x, tf.maximum(zero, -pleft * width / original_width)),
width - cut_x)
top_shift = tf.minimum(
tf.minimum(cut_y, tf.maximum(zero, -ptop * height / original_height)),
height - cut_y)
right_shift = tf.minimum(
tf.minimum(width - cut_x,
tf.maximum(zero, -pright * width / original_width)), cut_x)
bot_shift = tf.minimum(
tf.minimum(height - cut_y,
tf.maximum(zero, -pbottom * height / original_height)),
cut_y)
(left_shift, top_shift, right_shift, bot_shift,
zero) = cast([left_shift, top_shift, right_shift, bot_shift, zero],
tf.float32)
# Build a crop offset and a crop size tensor to use for slicing.
crop_offset = [zero, zero, zero]
crop_size = [zero - 1, zero - 1, zero - 1]
if shiftx == 0.0 and shifty == 0.0:
crop_offset = [top_shift, left_shift, zero]
crop_size = [cut_y, cut_x, zero - 1]
elif shiftx == 1.0 and shifty == 0.0:
crop_offset = [top_shift, cut_x - right_shift, zero]
crop_size = [cut_y, width - cut_x, zero - 1]
elif shiftx == 0.0 and shifty == 1.0:
crop_offset = [cut_y - bot_shift, left_shift, zero]
crop_size = [height - cut_y, cut_x, zero - 1]
elif shiftx == 1.0 and shifty == 1.0:
crop_offset = [cut_y - bot_shift, cut_x - right_shift, zero]
crop_size = [height - cut_y, width - cut_x, zero - 1]
# Contain and crop the image.
ishape = tf.cast(tf.shape(image)[:2], crop_size[0].dtype)
crop_size[0] = tf.minimum(crop_size[0], ishape[0])
crop_size[1] = tf.minimum(crop_size[1], ishape[1])
crop_offset = tf.cast(crop_offset, tf.int32)
crop_size = tf.cast(crop_size, tf.int32)
image = tf.slice(image, crop_offset, crop_size)
crop_info = tf.stack([
tf.cast(ishape, tf.float32),
tf.cast(tf.shape(image)[:2], dtype=tf.float32),
tf.ones_like(ishape, dtype=tf.float32),
tf.cast(crop_offset[:2], tf.float32)
])
return image, crop_info
def resize_and_jitter_image(image,
desired_size,
jitter=0.0,
letter_box=None,
random_pad=True,
crop_only=False,
shiftx=0.5,
shifty=0.5,
cut=None,
method=tf.image.ResizeMethod.BILINEAR,
seed=None):
"""Resize, Pad, and distort a given input image.
Args:
image: a `Tensor` of shape [height, width, 3] representing an image.
desired_size: a `Tensor` or `int` list/tuple of two elements representing
[height, width] of the desired actual output image size.
jitter: an `int` representing the maximum jittering that can be applied to
the image.
letter_box: a `bool` representing if letterboxing should be applied.
random_pad: a `bool` representing if random padding should be applied.
crop_only: a `bool` representing if only cropping will be applied.
shiftx: a `float` indicating if the image is in the left or right.
shifty: a `float` value indicating if the image is in the top or bottom.
cut: a `float` value indicating the desired center of the final patched
image.
method: function to resize input image to scaled image.
seed: seed for random scale jittering.
Returns:
image_: a `Tensor` of shape [height, width, 3] where [height, width]
equals to `desired_size`.
infos: a 2D `Tensor` that encodes the information of the image and the
applied preprocessing. It is in the format of
[[original_height, original_width], [desired_height, desired_width],
[y_scale, x_scale], [y_offset, x_offset]], where [desired_height,
desired_width] is the actual scaled image size, and [y_scale, x_scale] is
the scaling factor, which is the ratio of
scaled dimension / original dimension.
cast([original_width, original_height, width, height, ptop, pleft, pbottom,
pright], tf.float32): a `Tensor` containing the information of the image
andthe applied preprocessing.
"""
def intersection(a, b):
"""Finds the intersection between 2 crops."""
minx = tf.maximum(a[0], b[0])
miny = tf.maximum(a[1], b[1])
maxx = tf.minimum(a[2], b[2])
maxy = tf.minimum(a[3], b[3])
return tf.convert_to_tensor([minx, miny, maxx, maxy])
def cast(values, dtype):
return [tf.cast(value, dtype) for value in values]
if jitter > 0.5 or jitter < 0:
raise ValueError('maximum change in aspect ratio must be between 0 and 0.5')
with tf.name_scope('resize_and_jitter_image'):
# Cast all parameters to a usable float data type.
jitter = tf.cast(jitter, tf.float32)
original_dtype, original_dims = image.dtype, tf.shape(image)[:2]
# original width, original height, desigered width, desired height
original_width, original_height, width, height = cast(
[original_dims[1], original_dims[0], desired_size[1], desired_size[0]],
tf.float32)
# Compute the random delta width and height etc. and randomize the
# location of the corner points.
jitter_width = original_width * jitter
jitter_height = original_height * jitter
pleft = random_uniform_strong(
-jitter_width, jitter_width, jitter_width.dtype, seed=seed)
pright = random_uniform_strong(
-jitter_width, jitter_width, jitter_width.dtype, seed=seed)
ptop = random_uniform_strong(
-jitter_height, jitter_height, jitter_height.dtype, seed=seed)
pbottom = random_uniform_strong(
-jitter_height, jitter_height, jitter_height.dtype, seed=seed)
# Letter box the image.
if letter_box:
(image_aspect_ratio,
input_aspect_ratio) = original_width / original_height, width / height
distorted_aspect = image_aspect_ratio / input_aspect_ratio
delta_h, delta_w = 0.0, 0.0
pullin_h, pullin_w = 0.0, 0.0
if distorted_aspect > 1:
delta_h = ((original_width / input_aspect_ratio) - original_height) / 2
else:
delta_w = ((original_height * input_aspect_ratio) - original_width) / 2
ptop = ptop - delta_h - pullin_h
pbottom = pbottom - delta_h - pullin_h
pright = pright - delta_w - pullin_w
pleft = pleft - delta_w - pullin_w
# Compute the width and height to crop or pad too, and clip all crops to
# to be contained within the image.
swidth = original_width - pleft - pright
sheight = original_height - ptop - pbottom
src_crop = intersection([ptop, pleft, sheight + ptop, swidth + pleft],
[0, 0, original_height, original_width])
# Random padding used for mosaic.
h_ = src_crop[2] - src_crop[0]
w_ = src_crop[3] - src_crop[1]
if random_pad:
rmh = tf.maximum(0.0, -ptop)
rmw = tf.maximum(0.0, -pleft)
else:
rmw = (swidth - w_) * shiftx
rmh = (sheight - h_) * shifty
# Cast cropping params to usable dtype.
src_crop = tf.cast(src_crop, tf.int32)
# Compute padding parmeters.
dst_shape = [rmh, rmw, rmh + h_, rmw + w_]
ptop, pleft, pbottom, pright = dst_shape
pad = dst_shape * tf.cast([1, 1, -1, -1], ptop.dtype)
pad += tf.cast([0, 0, sheight, swidth], ptop.dtype)
pad = tf.cast(pad, tf.int32)
infos = []
# Crop the image to desired size.
cropped_image = tf.slice(
image, [src_crop[0], src_crop[1], 0],
[src_crop[2] - src_crop[0], src_crop[3] - src_crop[1], -1])
crop_info = tf.stack([
tf.cast(original_dims, tf.float32),
tf.cast(tf.shape(cropped_image)[:2], dtype=tf.float32),
tf.ones_like(original_dims, dtype=tf.float32),
tf.cast(src_crop[:2], tf.float32)
])
infos.append(crop_info)
if crop_only:
if not letter_box:
h_, w_ = cast(get_image_shape(cropped_image), width.dtype)
width = tf.cast(tf.round((w_ * width) / swidth), tf.int32)
height = tf.cast(tf.round((h_ * height) / sheight), tf.int32)
cropped_image = tf.image.resize(
cropped_image, [height, width], method=method)
cropped_image = tf.cast(cropped_image, original_dtype)
return cropped_image, infos, cast([
original_width, original_height, width, height, ptop, pleft, pbottom,
pright
], tf.int32)
# Pad the image to desired size.
image_ = tf.pad(
cropped_image, [[pad[0], pad[2]], [pad[1], pad[3]], [0, 0]],
constant_values=PAD_VALUE)
pad_info = tf.stack([
tf.cast(tf.shape(cropped_image)[:2], tf.float32),
tf.cast(tf.shape(image_)[:2], dtype=tf.float32),
tf.ones_like(original_dims, dtype=tf.float32),
(-tf.cast(pad[:2], tf.float32))
])
infos.append(pad_info)
temp = tf.shape(image_)[:2]
cond = temp > tf.cast(desired_size, temp.dtype)
if tf.reduce_any(cond):
size = tf.cast(desired_size, temp.dtype)
size = tf.where(cond, size, temp)
image_ = tf.image.resize(
image_, (size[0], size[1]), method=tf.image.ResizeMethod.AREA)
image_ = tf.cast(image_, original_dtype)
image_ = tf.image.resize(
image_, (desired_size[0], desired_size[1]),
method=tf.image.ResizeMethod.BILINEAR,
antialias=False)
image_ = tf.cast(image_, original_dtype)
if cut is not None:
image_, crop_info = mosaic_cut(image_, original_width, original_height,
width, height, cut, ptop, pleft, pbottom,
pright, shiftx, shifty)
infos.append(crop_info)
return image_, infos, cast([
original_width, original_height, width, height, ptop, pleft, pbottom,
pright
], tf.float32)
def _build_transform(image,
perspective=0.00,
degrees=0.0,
scale_min=1.0,
scale_max=1.0,
translate=0.0,
random_pad=False,
desired_size=None,
seed=None):
"""Builds a unified affine transformation to spatially augment the image."""
height, width = get_image_shape(image)
ch = height = tf.cast(height, tf.float32)
cw = width = tf.cast(width, tf.float32)
deg_to_rad = lambda x: tf.cast(x, tf.float32) * np.pi / 180.0
if desired_size is not None:
desired_size = tf.cast(desired_size, tf.float32)
ch = desired_size[0]
cw = desired_size[1]
# Compute the center of the image in the output resulution.
center = tf.eye(3, dtype=tf.float32)
center = tf.tensor_scatter_nd_update(center, [[0, 2], [1, 2]],
[-cw / 2, -ch / 2])
center_boxes = tf.tensor_scatter_nd_update(center, [[0, 2], [1, 2]],
[cw / 2, ch / 2])
# Compute a random rotation to apply.
rotation = tf.eye(3, dtype=tf.float32)
a = deg_to_rad(random_uniform_strong(-degrees, degrees, seed=seed))
cos = tf.math.cos(a)
sin = tf.math.sin(a)
rotation = tf.tensor_scatter_nd_update(rotation,
[[0, 0], [0, 1], [1, 0], [1, 1]],
[cos, -sin, sin, cos])
rotation_boxes = tf.tensor_scatter_nd_update(rotation,
[[0, 0], [0, 1], [1, 0], [1, 1]],
[cos, sin, -sin, cos])
# Compute a random prespective change to apply.
prespective_warp = tf.eye(3)
px = random_uniform_strong(-perspective, perspective, seed=seed)
py = random_uniform_strong(-perspective, perspective, seed=seed)
prespective_warp = tf.tensor_scatter_nd_update(prespective_warp,
[[2, 0], [2, 1]], [px, py])
prespective_warp_boxes = tf.tensor_scatter_nd_update(prespective_warp,
[[2, 0], [2, 1]],
[-px, -py])
# Compute a random scaling to apply.
scale = tf.eye(3, dtype=tf.float32)
s = random_uniform_strong(scale_min, scale_max, seed=seed)
scale = tf.tensor_scatter_nd_update(scale, [[0, 0], [1, 1]], [1 / s, 1 / s])
scale_boxes = tf.tensor_scatter_nd_update(scale, [[0, 0], [1, 1]], [s, s])
# Compute a random Translation to apply.
translation = tf.eye(3)
if (random_pad and height * s < ch and width * s < cw):
# The image is contained within the image and arbitrarily translated to
# locations with in the image.
center = center_boxes = tf.eye(3, dtype=tf.float32)
tx = random_uniform_strong(-1, 0, seed=seed) * (cw / s - width)
ty = random_uniform_strong(-1, 0, seed=seed) * (ch / s - height)
else:
# The image can be translated outside of the output resolution window
# but the image is translated relative to the output resolution not the
# input image resolution.
tx = random_uniform_strong(0.5 - translate, 0.5 + translate, seed=seed)
ty = random_uniform_strong(0.5 - translate, 0.5 + translate, seed=seed)
# Center and Scale the image such that the window of translation is
# contained to the output resolution.
dx, dy = (width - cw / s) / width, (height - ch / s) / height
sx, sy = 1 - dx, 1 - dy
bx, by = dx / 2, dy / 2
tx, ty = bx + (sx * tx), by + (sy * ty)
# Scale the translation to width and height of the image.
tx *= width
ty *= height
translation = tf.tensor_scatter_nd_update(translation, [[0, 2], [1, 2]],
[tx, ty])
translation_boxes = tf.tensor_scatter_nd_update(translation, [[0, 2], [1, 2]],
[-tx, -ty])
# Use repeated matric multiplications to combine all the image transforamtions
# into a single unified augmentation operation M is applied to the image
# Mb is to apply to the boxes. The order of matrix multiplication is
# important. First, Translate, then Scale, then Rotate, then Center, then
# finally alter the Prepsective.
affine = (translation @ scale @ rotation @ center @ prespective_warp)
affine_boxes = (
prespective_warp_boxes @ center_boxes @ rotation_boxes @ scale_boxes
@ translation_boxes)
return affine, affine_boxes, s
def affine_warp_image(image,
desired_size,
perspective=0.00,
degrees=0.0,
scale_min=1.0,
scale_max=1.0,
translate=0.0,
random_pad=False,
seed=None):
"""Applies random spatial augmentation to the image.
Args:
image: A `Tensor` for the image.
desired_size: A `tuple` for desired output image size.
perspective: An `int` for the maximum that can be applied to random
perspective change.
degrees: An `int` for the maximum degrees that can be applied to random
rotation.
scale_min: An `int` for the minimum scaling factor that can be applied to
random scaling.
scale_max: An `int` for the maximum scaling factor that can be applied to
random scaling.
translate: An `int` for the maximum translation that can be applied to
random translation.
random_pad: A `bool` for using random padding.
seed: An `Optional[int]` for the seed to use in random number generation.
Returns:
image: A `Tensor` representing the augmented image.
affine_matrix: A `Tensor` representing the augmenting matrix for the image.
affine_info: A `List` containing the size of the original image, the desired
output_size of the image and the augmenting matrix for the boxes.
"""
# Build an image transformation matrix.
image_size = tf.cast(get_image_shape(image), tf.float32)
affine_matrix, affine_boxes, _ = _build_transform(
image,
perspective=perspective,
degrees=degrees,
scale_min=scale_min,
scale_max=scale_max,
translate=translate,
random_pad=random_pad,
desired_size=desired_size,
seed=seed)
affine = tf.reshape(affine_matrix, [-1])
affine = tf.cast(affine[:-1], tf.float32)
# Apply the transformation to image.
image = tfa.image.transform(
image,
affine,
fill_value=PAD_VALUE,
output_shape=desired_size,
interpolation='bilinear')
desired_size = tf.cast(desired_size, tf.float32)
affine_info = [image_size, desired_size, affine_boxes]
return image, affine_matrix, affine_info
def affine_warp_boxes(affine, boxes, output_size, box_history):
"""Applies random rotation, random perspective change and random translation.
and random scaling to the boxes.
Args:
affine: A `Tensor` for the augmenting matrix for the boxes.
boxes: A `Tensor` for the boxes.
output_size: A `list` of two integers, a two-element vector or a tensor such
that all but the last dimensions are `broadcastable` to `boxes`. The last
dimension is 2, which represents [height, width].
box_history: A `Tensor` for the boxes history, which are the boxes that
undergo the same augmentations as `boxes`, but no clipping was applied. We
can keep track of how much changes are done to the boxes by keeping track
of this tensor.
Returns:
clipped_boxes: A `Tensor` representing the augmented boxes.
box_history: A `Tensor` representing the augmented box_history.
"""
def _get_corners(box):
"""Get the corner of each box as a tuple of (x, y) coordinates."""
ymi, xmi, yma, xma = tf.split(box, 4, axis=-1)
tl = tf.concat([xmi, ymi], axis=-1)
bl = tf.concat([xmi, yma], axis=-1)
tr = tf.concat([xma, ymi], axis=-1)
br = tf.concat([xma, yma], axis=-1)
return tf.concat([tl, bl, tr, br], axis=-1)
def _corners_to_boxes(corner):
"""Convert (x, y) corners back into boxes [ymin, xmin, ymax, xmax]."""
corner = tf.reshape(corner, [-1, 4, 2])
y = corner[..., 1]
x = corner[..., 0]
y_min = tf.reduce_min(y, axis=-1)
x_min = tf.reduce_min(x, axis=-1)
y_max = tf.reduce_max(y, axis=-1)
x_max = tf.reduce_max(x, axis=-1)
return tf.stack([y_min, x_min, y_max, x_max], axis=-1)
def _aug_boxes(affine_matrix, box):
"""Apply an affine transformation matrix M to the boxes augmente boxes."""
corners = _get_corners(box)
corners = tf.reshape(corners, [-1, 4, 2])
z = tf.expand_dims(tf.ones_like(corners[..., 1]), axis=-1)
corners = tf.concat([corners, z], axis=-1)
corners = tf.transpose(
tf.matmul(affine_matrix, corners, transpose_b=True), perm=(0, 2, 1))
corners, p = tf.split(corners, [2, 1], axis=-1)
corners /= p
corners = tf.reshape(corners, [-1, 8])
box = _corners_to_boxes(corners)
return box
boxes = _aug_boxes(affine, boxes)
box_history = _aug_boxes(affine, box_history)
clipped_boxes = bbox_ops.clip_boxes(boxes, output_size)
return clipped_boxes, box_history
def boxes_candidates(clipped_boxes,
box_history,
wh_thr=2,
ar_thr=20,
area_thr=0.1):
"""Filters the boxes that don't satisfy the width/height and area constraints.
Args:
clipped_boxes: A `Tensor` for the boxes.
box_history: A `Tensor` for the boxes history, which are the boxes that
undergo the same augmentations as `boxes`, but no clipping was applied. We
can keep track of how much changes are done to the boxes by keeping track
of this tensor.
wh_thr: An `int` for the width/height threshold.
ar_thr: An `int` for the aspect ratio threshold.
area_thr: An `int` for the area threshold.
Returns:
indices[:, 0]: A `Tensor` representing valid boxes after filtering.
"""
area_thr = tf.math.abs(area_thr)
# Get the scaled and shifted heights of the original
# unclipped boxes.
og_height = tf.maximum(box_history[:, 2] - box_history[:, 0], 0.0)
og_width = tf.maximum(box_history[:, 3] - box_history[:, 1], 0.0)
# Get the scaled and shifted heights of the clipped boxes.
clipped_height = tf.maximum(clipped_boxes[:, 2] - clipped_boxes[:, 0], 0.0)
clipped_width = tf.maximum(clipped_boxes[:, 3] - clipped_boxes[:, 1], 0.0)
# Determine the aspect ratio of the clipped boxes.
ar = tf.maximum(clipped_width / (clipped_height + 1e-16),
clipped_height / (clipped_width + 1e-16))
# Ensure the clipped width adn height are larger than a preset threshold.
conda = clipped_width > wh_thr
condb = clipped_height > wh_thr
# Ensure the area of the clipped box is larger than the area threshold.
area = (clipped_height * clipped_width) / (og_width * og_height + 1e-16)
condc = area > area_thr
# Ensure the aspect ratio is not too extreme.
condd = ar < ar_thr
cond = tf.expand_dims(
tf.logical_and(
tf.logical_and(conda, condb), tf.logical_and(condc, condd)),
axis=-1)
# Set all the boxes that fail the test to be equal to zero.
indices = tf.where(cond)
return indices[:, 0]
def resize_and_crop_boxes(boxes, image_scale, output_size, offset, box_history):
"""Resizes and crops the boxes.
Args:
boxes: A `Tensor` for the boxes.
image_scale: A `Tensor` for the scaling factor of the image.
output_size: A `list` of two integers, a two-element vector or a tensor such
that all but the last dimensions are `broadcastable` to `boxes`. The last
dimension is 2, which represents [height, width].
offset: A `Tensor` for how much translation was applied to the image.
box_history: A `Tensor` for the boxes history, which are the boxes that
undergo the same augmentations as `boxes`, but no clipping was applied. We
can keep track of how much changes are done to the boxes by keeping track
of this tensor.
Returns:
clipped_boxes: A `Tensor` representing the augmented boxes.
box_history: A `Tensor` representing the augmented box_history.
"""
# Shift and scale the input boxes.
boxes *= tf.tile(tf.expand_dims(image_scale, axis=0), [1, 2])
boxes -= tf.tile(tf.expand_dims(offset, axis=0), [1, 2])
# Check the hitory of the boxes.
box_history *= tf.tile(tf.expand_dims(image_scale, axis=0), [1, 2])
box_history -= tf.tile(tf.expand_dims(offset, axis=0), [1, 2])
# Clip the shifted and scaled boxes.
clipped_boxes = bbox_ops.clip_boxes(boxes, output_size)
return clipped_boxes, box_history
def transform_and_clip_boxes(boxes,
infos,
affine=None,
shuffle_boxes=False,
area_thresh=0.1,
seed=None,
augment=True):
"""Clips and cleans the boxes.
Args:
boxes: A `Tensor` for the boxes.
infos: A `list` that contains the image infos.
affine: A `list` that contains parameters for resize and crop.
shuffle_boxes: A `bool` for shuffling the boxes.
area_thresh: An `int` for the area threshold.
seed: seed for random number generation.
augment: A `bool` for clipping the boxes to [0, 1].
Returns:
boxes: A `Tensor` representing the augmented boxes.
ind: A `Tensor` valid box indices.
"""
# Clip and clean boxes.
def get_valid_boxes(boxes):
"""Get indices for non-empty boxes."""
# Convert the boxes to center width height formatting.
height = boxes[:, 2] - boxes[:, 0]
width = boxes[:, 3] - boxes[:, 1]
base = tf.logical_and(tf.greater(height, 0), tf.greater(width, 0))
return base
# Initialize history to track operation applied to boxes
box_history = boxes
# Make sure all boxes are valid to start, clip to [0, 1] and get only the
# valid boxes.
output_size = tf.cast([640, 640], tf.float32)
if augment:
boxes = tf.math.maximum(tf.math.minimum(boxes, 1.0), 0.0)
cond = get_valid_boxes(boxes)
if infos is None:
infos = []
for info in infos:
# Denormalize the boxes.
boxes = bbox_ops.denormalize_boxes(boxes, info[0])
box_history = bbox_ops.denormalize_boxes(box_history, info[0])
# Shift and scale all boxes, and keep track of box history with no
# box clipping, history is used for removing boxes that have become
# too small or exit the image area.
(boxes, box_history) = resize_and_crop_boxes(
boxes, info[2, :], info[1, :], info[3, :], box_history=box_history)
# Get all the boxes that still remain in the image and store
# in a bit vector for later use.
cond = tf.logical_and(get_valid_boxes(boxes), cond)
# Normalize the boxes to [0, 1].
output_size = info[1]
boxes = bbox_ops.normalize_boxes(boxes, output_size)
box_history = bbox_ops.normalize_boxes(box_history, output_size)
if affine is not None:
# Denormalize the boxes.
boxes = bbox_ops.denormalize_boxes(boxes, affine[0])
box_history = bbox_ops.denormalize_boxes(box_history, affine[0])
# Clipped final boxes.
(boxes, box_history) = affine_warp_boxes(
affine[2], boxes, affine[1], box_history=box_history)
# Get all the boxes that still remain in the image and store
# in a bit vector for later use.
cond = tf.logical_and(get_valid_boxes(boxes), cond)
# Normalize the boxes to [0, 1].
output_size = affine[1]
boxes = bbox_ops.normalize_boxes(boxes, output_size)
box_history = bbox_ops.normalize_boxes(box_history, output_size)
# Remove the bad boxes.
boxes *= tf.cast(tf.expand_dims(cond, axis=-1), boxes.dtype)
# Threshold the existing boxes.
if augment:
boxes_ = bbox_ops.denormalize_boxes(boxes, output_size)
box_history_ = bbox_ops.denormalize_boxes(box_history, output_size)
inds = boxes_candidates(boxes_, box_history_, area_thr=area_thresh)
# Select and gather the good boxes.
if shuffle_boxes:
inds = tf.random.shuffle(inds, seed=seed)
else:
boxes = box_history
boxes_ = bbox_ops.denormalize_boxes(boxes, output_size)
inds = bbox_ops.get_non_empty_box_indices(boxes_)
boxes = tf.gather(boxes, inds)
return boxes, inds
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for preprocessing_ops.py."""
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from official.vision.beta.ops import box_ops as bbox_ops
from official.vision.beta.projects.yolo.ops import preprocessing_ops
class InputUtilsTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(([1, 2], 20, 0), ([13, 2, 4], 15, 0))
def testPadMaxInstances(self, input_shape, instances, pad_axis):
expected_output_shape = input_shape
expected_output_shape[pad_axis] = instances
output = preprocessing_ops.pad_max_instances(
np.ones(input_shape), instances, pad_axis=pad_axis)
self.assertAllEqual(expected_output_shape, tf.shape(output).numpy())
@parameterized.parameters((100, 200))
def testGetImageShape(self, image_height, image_width):
image = tf.convert_to_tensor(np.random.rand(image_height, image_width, 3))
image_shape = preprocessing_ops.get_image_shape(image)
self.assertAllEqual((image_height, image_width), image_shape)
@parameterized.parameters((400, 600, .5, .5, .0, True),
(100, 200, .5, .5, .5))
def testImageRandHSV(self,
image_height,
image_width,
rh,
rs,
rv,
is_darknet=False):
image = tf.convert_to_tensor(np.random.rand(image_height, image_width, 3))
processed_image = preprocessing_ops.image_rand_hsv(
image, rh, rs, rv, darknet=is_darknet)
processed_image_shape = tf.shape(processed_image)
self.assertAllEqual([image_height, image_width, 3],
processed_image_shape.numpy())
@parameterized.parameters((100, 200, [50, 100]))
def testResizeAndJitterImage(self, image_height, image_width, desired_size):
image = tf.convert_to_tensor(np.random.rand(image_height, image_width, 3))
processed_image, _, _ = preprocessing_ops.resize_and_jitter_image(
image, desired_size)
processed_image_shape = tf.shape(processed_image)
self.assertAllEqual([desired_size[0], desired_size[1], 3],
processed_image_shape.numpy())
@parameterized.parameters((400, 600, [200, 300]))
def testAffineWarpImage(self,
image_height,
image_width,
desired_size,
degrees=7.0,
scale_min=0.1,
scale_max=1.9):
image = tf.convert_to_tensor(np.random.rand(image_height, image_width, 3))
processed_image, _, _ = preprocessing_ops.affine_warp_image(
image,
desired_size,
degrees=degrees,
scale_min=scale_min,
scale_max=scale_max)
processed_image_shape = tf.shape(processed_image)
self.assertAllEqual([desired_size[0], desired_size[1], 3],
processed_image_shape.numpy())
# Working Test
@parameterized.parameters(([[400, 600], [200, 300],
[[0, 0, 0], [0, 0, 0], [0, 0, 0]]], 50))
def testAffineWarpBoxes(self, affine, num_boxes):
boxes = tf.convert_to_tensor(np.random.rand(num_boxes, 4))
boxes = bbox_ops.denormalize_boxes(boxes, affine[0])
processed_boxes, _ = preprocessing_ops.affine_warp_boxes(
tf.cast(affine[2], tf.double), boxes, affine[1], box_history=boxes)
processed_boxes_shape = tf.shape(processed_boxes)
self.assertAllEqual([num_boxes, 4], processed_boxes_shape.numpy())
# Working Test
@parameterized.parameters(([100, 100], [[-0.489, 51.28, 0.236, 51.686],
[65, 100, 200, 150],
[150, 80, 200, 130]]))
def testBoxCandidates(self, output_size, boxes):
boxes = tf.cast(bbox_ops.denormalize_boxes(boxes, output_size), tf.double)
clipped_ind = preprocessing_ops.boxes_candidates(
boxes, boxes, ar_thr=1e32, wh_thr=0, area_thr=tf.cast(0, tf.double))
clipped_ind_shape = tf.shape(clipped_ind)
self.assertAllEqual([3], clipped_ind_shape.numpy())
self.assertAllEqual([0, 1, 2], clipped_ind.numpy())
# Working Test
@parameterized.parameters((
50,
[0.5, 0.5],
[0, 0], # Clipping all boxes
[0.0, 0.0]))
def testResizeAndCropBoxes(self, num_boxes, image_scale, output_size, offset):
boxes = tf.convert_to_tensor(np.random.rand(num_boxes, 4))
processed_boxes, _ = preprocessing_ops.resize_and_crop_boxes(
boxes, tf.cast(image_scale, tf.double), output_size,
tf.cast(offset, tf.double), boxes)
processed_boxes_shape = tf.shape(processed_boxes)
self.assertAllEqual([num_boxes, 4], processed_boxes_shape.numpy())
self.assertAllEqual(
tf.math.reduce_sum(processed_boxes), tf.convert_to_tensor(0))
if __name__ == '__main__':
tf.test.main()
......@@ -12,15 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
"""Image classification task definition."""
import tensorflow as tf
from official.core import input_reader
from official.common import dataset_fn
from official.core import task_factory
from official.vision.beta.dataloaders import classification_input
from official.vision.beta.dataloaders import classification_input as classification_input_base
from official.vision.beta.dataloaders import input_reader_factory
from official.vision.beta.dataloaders import tfds_factory
from official.vision.beta.projects.yolo.configs import darknet_classification as exp_cfg
from official.vision.beta.projects.yolo.dataloaders import classification_tfds_decoder as cli
from official.vision.beta.projects.yolo.dataloaders import classification_input
from official.vision.beta.tasks import image_classification
......@@ -33,82 +32,34 @@ class ImageClassificationTask(image_classification.ImageClassificationTask):
num_classes = self.task_config.model.num_classes
input_size = self.task_config.model.input_size
image_field_key = self.task_config.train_data.image_field_key
label_field_key = self.task_config.train_data.label_field_key
is_multilabel = self.task_config.train_data.is_multilabel
if params.tfds_name:
decoder = cli.Decoder()
decoder = tfds_factory.get_classification_decoder(params.tfds_name)
else:
decoder = classification_input.Decoder()
decoder = classification_input_base.Decoder(
image_field_key=image_field_key,
label_field_key=label_field_key,
is_multilabel=is_multilabel)
parser = classification_input.Parser(
output_size=input_size[:2],
num_classes=num_classes,
image_field_key=image_field_key,
label_field_key=label_field_key,
decode_jpeg_only=params.decode_jpeg_only,
aug_rand_hflip=params.aug_rand_hflip,
aug_type=params.aug_type,
is_multilabel=is_multilabel,
dtype=params.dtype)
reader = input_reader.InputReader(
reader = input_reader_factory.input_reader_generator(
params,
dataset_fn=tf.data.TFRecordDataset,
dataset_fn=dataset_fn.pick_dataset_fn(params.file_type),
decoder_fn=decoder.decode,
parser_fn=parser.parse_fn(params.is_training))
dataset = reader.read(input_context=input_context)
return dataset
def train_step(self, inputs, model, optimizer, metrics=None):
"""Does forward and backward.
Args:
inputs: a dictionary of input tensors.
model: the model, forward pass definition.
optimizer: the optimizer for this training step.
metrics: a nested structure of metrics objects.
Returns:
A dictionary of logs.
"""
features, labels = inputs
if self.task_config.losses.one_hot:
labels = tf.one_hot(labels, self.task_config.model.num_classes)
num_replicas = tf.distribute.get_strategy().num_replicas_in_sync
with tf.GradientTape() as tape:
outputs = model(features, training=True)
# Casting output layer as float32 is necessary when mixed_precision is
# mixed_float16 or mixed_bfloat16 to ensure output is casted as float32.
outputs = tf.nest.map_structure(
lambda x: tf.cast(x, tf.float32), outputs)
# Computes per-replica loss.
loss = self.build_losses(
model_outputs=outputs, labels=labels, aux_losses=model.losses)
# Scales loss as the default gradients allreduce performs sum inside the
# optimizer.
scaled_loss = loss / num_replicas
# For mixed_precision policy, when LossScaleOptimizer is used, loss is
# scaled for numerical stability.
if isinstance(optimizer, tf.keras.mixed_precision.LossScaleOptimizer):
scaled_loss = optimizer.get_scaled_loss(scaled_loss)
tvars = model.trainable_variables
grads = tape.gradient(scaled_loss, tvars)
# Scales back gradient before apply_gradients when LossScaleOptimizer is
# used.
if isinstance(optimizer, tf.keras.mixed_precision.LossScaleOptimizer):
grads = optimizer.get_unscaled_gradients(grads)
# Apply gradient clipping.
if self.task_config.gradient_clip_norm > 0:
grads, _ = tf.clip_by_global_norm(
grads, self.task_config.gradient_clip_norm)
optimizer.apply_gradients(list(zip(grads, tvars)))
logs = {self.loss: loss}
if metrics:
self.process_metrics(metrics, labels, outputs)
logs.update({m.name: m.result() for m in metrics})
elif model.compiled_metrics:
self.process_compiled_metrics(model.compiled_metrics, labels, outputs)
logs.update({m.name: m.result() for m in model.metrics})
return logs
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment