Commit a15e242e authored by Vishnu Banna's avatar Vishnu Banna
Browse files

config

parent b768c248
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TFDS Classification decoder."""
import tensorflow as tf
from official.vision.beta.dataloaders import decoder
class Decoder(decoder.Decoder):
"""A tf.Example decoder for classification task."""
def __init__(self):
return
def decode(self, serialized_example):
sample_dict = {
'image/encoded':
tf.io.encode_jpeg(serialized_example['image'], quality=100),
'image/class/label':
serialized_example['label'],
}
return sample_dict
"""Classification parser."""
# Import libraries
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_addons as tfa
from official.vision.beta.dataloaders import parser
from official.vision.beta.ops import preprocess_ops
from official.vision.beta.ops import augment
class Parser(parser.Parser):
"""Parser to parse an image and its annotations into a dictionary of tensors."""
def __init__(self,
output_size,
aug_policy,
scale=[128, 448],
dtype='float32'):
"""Initializes parameters for parsing annotations in the dataset.
Args:
output_size: `Tensor` or `list` for [height, width] of output image. The
output_size should be divided by the largest feature stride 2^max_level.
num_classes: `float`, number of classes.
aug_rand_saturation: `bool`, if True, augment training with random
saturation.
aug_rand_brightness: `bool`, if True, augment training with random
brightness.
aug_rand_zoom: `bool`, if True, augment training with random
zoom.
aug_rand_rotate: `bool`, if True, augment training with random
rotate.
aug_rand_hue: `bool`, if True, augment training with random
hue.
aug_rand_aspect: `bool`, if True, augment training with random
aspect.
scale: 'list', `Tensor` or `list` for [low, high] of the bounds of the
random scale.
seed: an `int` for the seed used by tf.random
"""
self._output_size = output_size
if aug_policy:
if aug_policy == 'autoaug':
self._augmenter = augment.AutoAugment()
elif aug_policy == 'randaug':
self._augmenter = augment.RandAugment(num_layers=2, magnitude=20)
else:
raise ValueError(
'Augmentation policy {} not supported.'.format(aug_policy))
else:
self._augmenter = None
self._scale = scale
if dtype == 'float32':
self._dtype = tf.float32
elif dtype == 'float16':
self._dtype = tf.float16
elif dtype == 'bfloat16':
self._dtype = tf.bfloat16
else:
raise ValueError('dtype {!r} is not supported!'.format(dtype))
def _parse_train_data(self, decoded_tensors):
"""Generates images and labels that are usable for model training.
Args:
decoded_tensors: a dict of Tensors produced by the decoder.
Returns:
images: the image tensor.
labels: a dict of Tensors that contains labels.
"""
image = tf.io.decode_image(decoded_tensors['image/encoded'])
image.set_shape((None, None, 3))
image = tf.image.resize_with_pad(
image,
target_width=self._output_size[0],
target_height=self._output_size[1])
scale = tf.random.uniform([],
minval=self._scale[0],
maxval=self._scale[1],
dtype=tf.int32)
if scale > self._output_size[0]:
image = tf.image.resize_with_crop_or_pad(
image, target_height=scale, target_width=scale)
else:
image = tf.image.random_crop(image, (scale, scale, 3))
if self._augmenter is not None:
image = self._augmenter.distort(image)
image = tf.image.random_flip_left_right(image)
image = tf.cast(image, tf.float32) / 255
image = tf.image.resize(image, (self._output_size[0], self._output_size[1]))
label = decoded_tensors['image/class/label']
return image, label
def _parse_eval_data(self, decoded_tensors):
"""Generates images and labels that are usable for model evaluation.
Args:
decoded_tensors: a dict of Tensors produced by the decoder.
Returns:
images: the image tensor.
labels: a dict of Tensors that contains labels.
"""
image = tf.io.decode_image(decoded_tensors['image/encoded'])
image.set_shape((None, None, 3))
image = tf.cast(image, tf.float32)
image = tf.image.resize_with_pad(
image,
target_width=self._output_size[0],
target_height=self._output_size[1]) # Final Output Shape
image = image / 255. # Normalize
#label = tf.one_hot(decoded_tensors['image/class/label'], self._num_classes)
label = decoded_tensors['image/class/label']
return image, label
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Detection Data parser and processing for YOLO.
Parse image and ground truths in a dataset to training targets and package them
into (image, labels) tuple for RetinaNet.
"""
import tensorflow as tf
from official.vision.beta.dataloaders import parser
from official.vision.beta.ops import box_ops
from official.vision.beta.ops import preprocess_ops
from official.vision.beta.projects.yolo.ops import box_ops as yolo_box_ops
from official.vision.beta.projects.yolo.ops import preprocess_ops as yolo_preprocess_ops
class Parser(parser.Parser):
"""Parser to parse an image and its annotations into a dictionary of tensors."""
def __init__(self,
output_size,
num_classes,
fixed_size=True,
jitter_im=0.1,
jitter_boxes=0.005,
use_tie_breaker=True,
min_level=3,
max_level=5,
masks=None,
max_process_size=608,
min_process_size=320,
max_num_instances=200,
random_flip=True,
aug_rand_saturation=True,
aug_rand_brightness=True,
aug_rand_zoom=True,
aug_rand_hue=True,
anchors=None,
seed=10,
dtype=tf.float32):
"""Initializes parameters for parsing annotations in the dataset.
Args:
output_size: a `Tuple` for (width, height) of input image.
num_classes: a `Tensor` or `int` for the number of classes.
fixed_size: a `bool` if True all output images have the same size.
jitter_im: a `float` representing a pixel value that is the maximum jitter
applied to the image for data augmentation during training.
jitter_boxes: a `float` representing a pixel value that is the maximum
jitter applied to the bounding box for data augmentation during
training.
use_tie_breaker: boolean value for wether or not to use the tie_breaker.
min_level: `int` number of minimum level of the output feature pyramid.
max_level: `int` number of maximum level of the output feature pyramid.
masks: a `Tensor`, `List` or `numpy.ndarray` for anchor masks.
max_process_size: an `int` for maximum image width and height.
min_process_size: an `int` for minimum image width and height.
max_num_instances: an `int` number of maximum number of instances in an
image.
random_flip: a `bool` if True, augment training with random horizontal
flip.
aug_rand_saturation: `bool`, if True, augment training with random
saturation.
aug_rand_brightness: `bool`, if True, augment training with random
brightness.
aug_rand_zoom: `bool`, if True, augment training with random zoom.
aug_rand_hue: `bool`, if True, augment training with random hue.
anchors: a `Tensor`, `List` or `numpy.ndarrray` for bounding box priors.
seed: an `int` for the seed used by tf.random
dtype: a `tf.dtypes.DType` object that represents the dtype the outputs
will be casted to. The available types are tf.float32, tf.float16, or
tf.bfloat16.
"""
self._net_down_scale = 2**max_level
self._num_classes = num_classes
self._image_w = (output_size[0] //
self._net_down_scale) * self._net_down_scale
self._image_h = (output_size[1] //
self._net_down_scale) * self._net_down_scale
self._max_process_size = max_process_size
self._min_process_size = min_process_size
self._fixed_size = fixed_size
self._anchors = anchors
self._masks = {
key: tf.convert_to_tensor(value) for key, value in masks.items()
}
self._use_tie_breaker = use_tie_breaker
self._jitter_im = 0.0 if jitter_im is None else jitter_im
self._jitter_boxes = 0.0 if jitter_boxes is None else jitter_boxes
self._max_num_instances = max_num_instances
self._random_flip = random_flip
self._aug_rand_saturation = aug_rand_saturation
self._aug_rand_brightness = aug_rand_brightness
self._aug_rand_zoom = aug_rand_zoom
self._aug_rand_hue = aug_rand_hue
self._seed = seed
self._dtype = dtype
def _build_grid(self, raw_true, width, batch=False, use_tie_breaker=False):
mask = self._masks
for key in self._masks.keys():
if not batch:
mask[key] = yolo_preprocess_ops.build_grided_gt(
raw_true, self._masks[key], width // 2**int(key),
raw_true['bbox'].dtype, use_tie_breaker)
else:
mask[key] = yolo_preprocess_ops.build_batch_grided_gt(
raw_true, self._masks[key], width // 2**int(key),
raw_true['bbox'].dtype, use_tie_breaker)
return mask
def _parse_train_data(self, data):
"""Generates images and labels that are usable for model training.
Args:
data: a dict of Tensors produced by the decoder.
Returns:
images: the image tensor.
labels: a dict of Tensors that contains labels.
"""
shape = tf.shape(data['image'])
image = data['image'] / 255
boxes = data['groundtruth_boxes']
width = shape[0]
height = shape[1]
image, boxes = yolo_preprocess_ops.fit_preserve_aspect_ratio(
image,
boxes,
width=width,
height=height,
target_dim=self._max_process_size)
image_shape = tf.shape(image)[:2]
if self._random_flip:
image, boxes, _ = preprocess_ops.random_horizontal_flip(
image, boxes, seed=self._seed)
randscale = self._image_w // self._net_down_scale
if not self._fixed_size:
do_scale = tf.greater(
tf.random.uniform([], minval=0, maxval=1, seed=self._seed), 0.5)
if do_scale:
# This scales the image to a random multiple of net_down_scale
# between 320 to 608
randscale = tf.random.uniform(
[],
minval=self._min_process_size // self._net_down_scale,
maxval=self._max_process_size // self._net_down_scale,
seed=self._seed,
dtype=tf.int32) * self._net_down_scale
if self._jitter_boxes != 0.0:
boxes = box_ops.denormalize_boxes(boxes, image_shape)
boxes = box_ops.jitter_boxes(boxes, 0.025)
boxes = box_ops.normalize_boxes(boxes, image_shape)
# YOLO loss function uses x-center, y-center format
boxes = yolo_box_ops.yxyx_to_xcycwh(boxes)
if self._jitter_im != 0.0:
image, boxes = yolo_preprocess_ops.random_translate(
image, boxes, self._jitter_im, seed=self._seed)
if self._aug_rand_zoom:
image, boxes = yolo_preprocess_ops.resize_crop_filter(
image,
boxes,
default_width=self._image_w,
default_height=self._image_h,
target_width=randscale,
target_height=randscale)
image = tf.image.resize(image, (416, 416), preserve_aspect_ratio=False)
if self._aug_rand_brightness:
image = tf.image.random_brightness(
image=image, max_delta=.1) # Brightness
if self._aug_rand_saturation:
image = tf.image.random_saturation(
image=image, lower=0.75, upper=1.25) # Saturation
if self._aug_rand_hue:
image = tf.image.random_hue(image=image, max_delta=.3) # Hue
image = tf.clip_by_value(image, 0.0, 1.0)
# Find the best anchor for the ground truth labels to maximize the iou
best_anchors = yolo_preprocess_ops.get_best_anchor(
boxes, self._anchors, width=self._image_w, height=self._image_h)
# Padding
boxes = preprocess_ops.clip_or_pad_to_fixed_size(boxes,
self._max_num_instances, 0)
classes = preprocess_ops.clip_or_pad_to_fixed_size(
data['groundtruth_classes'], self._max_num_instances, -1)
best_anchors = preprocess_ops.clip_or_pad_to_fixed_size(
best_anchors, self._max_num_instances, 0)
area = preprocess_ops.clip_or_pad_to_fixed_size(data['groundtruth_area'],
self._max_num_instances, 0)
is_crowd = preprocess_ops.clip_or_pad_to_fixed_size(
tf.cast(data['groundtruth_is_crowd'], tf.int32),
self._max_num_instances, 0)
labels = {
'source_id': data['source_id'],
'bbox': tf.cast(boxes, self._dtype),
'classes': tf.cast(classes, self._dtype),
'area': tf.cast(area, self._dtype),
'is_crowd': is_crowd,
'best_anchors': tf.cast(best_anchors, self._dtype),
'width': width,
'height': height,
'num_detections': tf.shape(data['groundtruth_classes'])[0],
}
if self._fixed_size:
grid = self._build_grid(
labels, self._image_w, use_tie_breaker=self._use_tie_breaker)
labels.update({'grid_form': grid})
return image, labels
def _parse_eval_data(self, data):
"""Generates images and labels that are usable for model training.
Args:
data: a dict of Tensors produced by the decoder.
Returns:
images: the image tensor.
labels: a dict of Tensors that contains labels.
"""
shape = tf.shape(data['image'])
image = data['image'] / 255
boxes = data['groundtruth_boxes']
width = shape[0]
height = shape[1]
image, boxes = yolo_preprocess_ops.fit_preserve_aspect_ratio(
image, boxes, width=width, height=height, target_dim=self._image_w)
boxes = yolo_box_ops.yxyx_to_xcycwh(boxes)
# Find the best anchor for the ground truth labels to maximize the iou
best_anchors = yolo_preprocess_ops.get_best_anchor(
boxes, self._anchors, width=self._image_w, height=self._image_h)
boxes = yolo_preprocess_ops.pad_max_instances(boxes,
self._max_num_instances, 0)
classes = yolo_preprocess_ops.pad_max_instances(data['groundtruth_classes'],
self._max_num_instances, 0)
best_anchors = yolo_preprocess_ops.pad_max_instances(
best_anchors, self._max_num_instances, 0)
area = yolo_preprocess_ops.pad_max_instances(data['groundtruth_area'],
self._max_num_instances, 0)
is_crowd = yolo_preprocess_ops.pad_max_instances(
tf.cast(data['groundtruth_is_crowd'], tf.int32),
self._max_num_instances, 0)
labels = {
'source_id': data['source_id'],
'bbox': tf.cast(boxes, self._dtype),
'classes': tf.cast(classes, self._dtype),
'area': tf.cast(area, self._dtype),
'is_crowd': is_crowd,
'best_anchors': tf.cast(best_anchors, self._dtype),
'width': width,
'height': height,
'num_detections': tf.shape(data['groundtruth_classes'])[0],
}
grid = self._build_grid(
labels,
self._image_w,
batch=False,
use_tie_breaker=self._use_tie_breaker)
labels.update({'grid_form': grid})
return image, labels
def _postprocess_fn(self, image, label):
randscale = self._image_w // self._net_down_scale
if not self._fixed_size:
do_scale = tf.greater(
tf.random.uniform([], minval=0, maxval=1, seed=self._seed), 0.5)
if do_scale:
# This scales the image to a random multiple of net_down_scale
# between 320 to 608
randscale = tf.random.uniform(
[],
minval=self._min_process_size // self._net_down_scale,
maxval=self._max_process_size // self._net_down_scale,
seed=self._seed,
dtype=tf.int32) * self._net_down_scale
width = randscale
image = tf.image.resize(image, (width, width))
grid = self._build_grid(
label, width, batch=True, use_tie_breaker=self._use_tie_breaker)
label.update({'grid_form': grid})
return image, label
def postprocess_fn(self, is_training=True):
return self._postprocess_fn if not self._fixed_size and is_training else None
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test case for YOLO detection dataloader configuration definition."""
from absl.testing import parameterized
import dataclasses
import tensorflow as tf
from official.core import config_definitions as cfg
from official.core import input_reader
from official.modeling import hyperparams
from official.vision.beta.dataloaders import tfds_detection_decoders
from official.vision.beta.projects.yolo.dataloaders import yolo_detection_input
@dataclasses.dataclass
class Parser(hyperparams.Config):
"""Dummy configuration for parser."""
output_size: int = (416, 416)
num_classes: int = 80
fixed_size: bool = True
jitter_im: float = 0.1
jitter_boxes: float = 0.005
min_process_size: int = 320
max_process_size: int = 608
max_num_instances: int = 200
random_flip: bool = True
seed: int = 10
shuffle_buffer_size: int = 10000
@dataclasses.dataclass
class DataConfig(cfg.DataConfig):
"""Input config for training."""
input_path: str = ''
tfds_name: str = 'coco/2017'
tfds_split: str = 'train'
global_batch_size: int = 10
is_training: bool = True
dtype: str = 'float16'
decoder = None
parser: Parser = Parser()
shuffle_buffer_size: int = 10
class YoloDetectionInputTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(('training', True), ('testing', False))
def test_yolo_input(self, is_training):
params = DataConfig(is_training=is_training)
decoder = tfds_detection_decoders.MSCOCODecoder()
anchors = [[12.0, 19.0], [31.0, 46.0], [96.0, 54.0], [46.0, 114.0],
[133.0, 127.0], [79.0, 225.0], [301.0, 150.0], [172.0, 286.0],
[348.0, 340.0]]
masks = {'3': [0, 1, 2], '4': [3, 4, 5], '5': [6, 7, 8]}
parser = yolo_detection_input.Parser(
output_size=params.parser.output_size,
num_classes=params.parser.num_classes,
fixed_size=params.parser.fixed_size,
jitter_im=params.parser.jitter_im,
jitter_boxes=params.parser.jitter_boxes,
min_process_size=params.parser.min_process_size,
max_process_size=params.parser.max_process_size,
max_num_instances=params.parser.max_num_instances,
random_flip=params.parser.random_flip,
seed=params.parser.seed,
anchors=anchors,
masks=masks)
postprocess_fn = parser.postprocess_fn(is_training=is_training)
reader = input_reader.InputReader(params,
dataset_fn=tf.data.TFRecordDataset,
decoder_fn=decoder.decode,
parser_fn=parser.parse_fn(
params.is_training))
dataset = reader.read(input_context=None).batch(10).take(1)
if postprocess_fn:
image, _ = postprocess_fn(
*tf.data.experimental.get_single_element(dataset))
else:
image, _ = tf.data.experimental.get_single_element(dataset)
print(image.shape)
self.assertAllEqual(image.shape, (10, 10, 416, 416, 3))
self.assertTrue(
tf.reduce_all(tf.math.logical_and(image >= 0, image <= 1)))
if __name__ == '__main__':
tf.test.main()
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Detection Data parser and processing for YOLO."""
import tensorflow as tf
import numpy as np
from official.vision.beta.projects.yolo.ops import preprocessing_ops
from official.vision.beta.projects.yolo.ops import box_ops as box_utils
from official.vision.beta.ops import preprocess_ops
from official.vision.beta.dataloaders import parser, utils
def _coco91_to_80(classif, box, areas, iscrowds):
"""Function used to reduce COCO 91 to COCO 80, or to convert from the 2017
foramt to the 2014 format"""
# Vector where index i coralates to the class at index[i].
x = [
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22,
23, 24, 25, 27, 28, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43,
44, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62,
63, 64, 65, 67, 70, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 84, 85,
86, 87, 88, 89, 90
]
no = tf.expand_dims(tf.convert_to_tensor(x), axis=0)
# Resahpe the classes to in order to build a class mask.
ce = tf.expand_dims(classif, axis=-1)
# One hot the classificiations to match the 80 class format.
ind = ce == tf.cast(no, ce.dtype)
# Select the max values.
co = tf.reshape(tf.math.argmax(tf.cast(ind, tf.float32), axis=-1), [-1])
ind = tf.where(tf.reduce_any(ind, axis=-1))
# Gather the valuable instances.
classif = tf.gather_nd(co, ind)
box = tf.gather_nd(box, ind)
areas = tf.gather_nd(areas, ind)
iscrowds = tf.gather_nd(iscrowds, ind)
# Restate the number of viable detections, ideally it should be the same.
num_detections = tf.shape(classif)[0]
return classif, box, areas, iscrowds, num_detections
class Parser(parser.Parser):
"""Parse the dataset in to the YOLO model format. """
def __init__(
self,
output_size,
masks,
anchors,
strides,
anchor_free_limits=None,
max_num_instances=200,
area_thresh=0.1,
aug_rand_hue=1.0,
aug_rand_saturation=1.0,
aug_rand_brightness=1.0,
letter_box=False,
random_pad=True,
random_flip=True,
jitter=0.0,
aug_scale_min=1.0,
aug_scale_max=1.0,
aug_rand_translate=0.0,
aug_rand_perspective=0.0,
aug_rand_angle=0.0,
anchor_t=4.0,
scale_xy=None,
best_match_only=False,
coco91to80=False,
darknet=False,
use_tie_breaker=True,
dtype='float32',
seed=None,
):
"""Initializes parameters for parsing annotations in the dataset.
Args:
output_size: `Tensor` or `List` for [height, width] of output image. The
output_size should be divided by the largest feature stride 2^max_level.
masks: `Dict[List[int]]` of values indicating the indexes in the
list of anchor boxes to use an each prediction level between min_level
and max_level. each level must have a list of indexes.
anchors: `List[List[Union[int, float]]]` values for each anchor box.
strides: `Dict[int]` for how much the model scales down the images at the
largest level.
anchor_free_limits: `List` the box sizes that will be allowed at each FPN
level as is done in the FCOS and YOLOX paper for anchor free box
assignment. Anchor free will perform worse than Anchor based, but only
slightly.
max_num_instances: `int` for the number of boxes to compute loss on.
area_thresh: `float` for the minimum area of a box to allow to pass
through for optimization.
aug_rand_hue: `float` indicating the maximum scaling value for
hue. saturation will be scaled between 1 - value and 1 + value.
aug_rand_saturation: `float` indicating the maximum scaling value for
saturation. saturation will be scaled between 1/value and value.
aug_rand_brightness: `float` indicating the maximum scaling value for
brightness. brightness will be scaled between 1/value and value.
letter_box: `boolean` indicating whether upon start of the datapipeline
regardless of the preprocessing ops that are used, the aspect ratio of
the images should be preserved.
random_pad: `bool` indiccating wether to use padding to apply random
translation true for darknet yolo false for scaled yolo.
random_flip: `boolean` indicating whether or not to randomly flip the
image horizontally.
jitter: `float` for the maximum change in aspect ratio expected in
each preprocessing step.
aug_scale_min: `float` indicating the minimum scaling value for image
scale jitter.
aug_scale_max: `float` indicating the maximum scaling value for image
scale jitter.
aug_rand_translate: `float` ranging from 0 to 1 indicating the maximum
amount to randomly translate an image.
aug_rand_perspective: `float` ranging from 0.000 to 0.001 indicating
how much to prespective warp the image.
aug_rand_angle: `float` indicating the maximum angle value for
angle. angle will be changes between 0 and value.
anchor_t: `float` indicating the threshold over which an anchor will be
considered for prediction, at zero, all the anchors will be used and at
1.0 only the best will be used. for anchor thresholds larger than 1.0
we stop using the IOU for anchor comparison and resort directly to
comparing the width and height, this is used for the scaled models.
scale_xy: dictionary `float` values inidcating how far each pixel can see
outside of its containment of 1.0. a value of 1.2 indicates there is a
20% extended radius around each pixel that this specific pixel can
predict values for a center at. the center can range from 0 - value/2
to 1 + value/2, this value is set in the yolo filter, and resused here.
there should be one value for scale_xy for each level from min_level to
max_level.
best_match_only: `boolean` indicating how boxes are selected for
optimization.
coco91to80: `bool` for wether to convert coco91 to coco80 to minimize
model parameters.
darknet: `boolean` indicating which data pipeline to use. Setting to True
swaps the pipeline to output images realtive to Yolov4 and older.
use_tie_breaker: `boolean` indicating whether to use the anchor threshold
value.
dtype: `str` indicating the output datatype of the datapipeline selecting
from {"float32", "float16", "bfloat16"}.
seed: `int` the seed for random number generation.
"""
for key in masks.keys():
# Assert that the width and height is viable
assert output_size[1] % strides[str(key)] == 0
assert output_size[0] % strides[str(key)] == 0
# scale of each FPN level
self._strides = strides
# Set the width and height properly and base init:
self._coco91to80 = coco91to80
self._image_w = output_size[1]
self._image_h = output_size[0]
# Set the anchor boxes and masks for each scale
self._anchors = anchors
self._anchor_free_limits = anchor_free_limits
self._masks = {
key: tf.convert_to_tensor(value) for key, value in masks.items()
}
self._use_tie_breaker = use_tie_breaker
self._best_match_only = best_match_only
self._max_num_instances = max_num_instances
# Image scaling params
self._jitter = 0.0 if jitter is None else jitter
self._aug_scale_min = aug_scale_min
self._aug_scale_max = aug_scale_max
self._aug_rand_translate = aug_rand_translate
self._aug_rand_perspective = aug_rand_perspective
# Image spatial distortion
self._random_flip = random_flip
self._letter_box = letter_box
self._random_pad = random_pad
self._aug_rand_angle = aug_rand_angle
# Color space distortion of the image
self._aug_rand_saturation = aug_rand_saturation
self._aug_rand_brightness = aug_rand_brightness
self._aug_rand_hue = aug_rand_hue
# Set the per level values needed for operation
self._scale_xy = scale_xy
self._anchor_t = anchor_t
self._darknet = darknet
self._area_thresh = area_thresh
keys = list(self._masks.keys())
if self._anchor_free_limits is not None:
maxim = 2000
self._scale_up = {key: maxim // self._max_num_instances for key in keys}
self._anchor_t = -0.01
elif not self._darknet:
self._scale_up = {key: 6 - i for i, key in enumerate(keys)}
else:
self._scale_up = {key: 1 for key in keys}
self._seed = seed
# Set the data type based on input string
self._dtype = dtype
def _get_identity_info(self, image):
"""Get an identity image op to pad all info vectors, this is used because
graph compilation if there are a variable number of info objects in a list.
"""
shape_ = tf.shape(image)
val = tf.stack([
tf.cast(shape_[:2], tf.float32),
tf.cast(shape_[:2], tf.float32),
tf.ones_like(tf.cast(shape_[:2], tf.float32)),
tf.zeros_like(tf.cast(shape_[:2], tf.float32)),
])
return val
def _jitter_scale(self, image, shape, letter_box, jitter, random_pad,
aug_scale_min, aug_scale_max, translate, angle,
perspective):
if (aug_scale_min != 1.0 or aug_scale_max != 1.0):
crop_only = True
# jitter gives you only one info object, resize and crop gives you one,
# if crop only then there can be 1 form jitter and 1 from crop
reps = 1
else:
crop_only = False
reps = 0
infos = []
image, info_a, _ = preprocessing_ops.resize_and_jitter_image(
image,
shape,
letter_box=letter_box,
jitter=jitter,
crop_only=crop_only,
random_pad=random_pad,
seed=self._seed,
)
infos.extend(info_a)
stale_a = self._get_identity_info(image)
for _ in range(reps):
infos.append(stale_a)
image, _, affine = preprocessing_ops.affine_warp_image(
image,
shape,
scale_min=aug_scale_min,
scale_max=aug_scale_max,
translate=translate,
degrees=angle,
perspective=perspective,
random_pad=random_pad,
seed=self._seed,
)
return image, infos, affine
def reorg91to80(self, data):
"""Function used to reduce COCO 91 to COCO 80, or to convert from the 2017
foramt to the 2014 format"""
if self._coco91to80:
(data['groundtruth_classes'], data['groundtruth_boxes'],
data['groundtruth_area'], data['groundtruth_is_crowd'],
_) = _coco91_to_80(data['groundtruth_classes'],
data['groundtruth_boxes'], data['groundtruth_area'],
data['groundtruth_is_crowd'])
return data
def _parse_train_data(self, data):
"""Parses data for training and evaluation."""
# Down size coco 91 to coco 80 if the option is selected.
data = self.reorg91to80(data)
# Initialize the shape constants.
image = data['image']
boxes = data['groundtruth_boxes']
classes = data['groundtruth_classes']
if self._random_flip:
# Randomly flip the image horizontally.
image, boxes, _ = preprocess_ops.random_horizontal_flip(
image, boxes, seed=self._seed)
if not data['is_mosaic']:
image, infos, affine = self._jitter_scale(
image, [self._image_h, self._image_w], self._letter_box, self._jitter,
self._random_pad, self._aug_scale_min, self._aug_scale_max,
self._aug_rand_translate, self._aug_rand_angle,
self._aug_rand_perspective)
# Clip and clean boxes.
boxes, inds = preprocessing_ops.apply_infos(
boxes,
infos,
affine=affine,
shuffle_boxes=False,
area_thresh=self._area_thresh,
augment=True,
seed=self._seed)
classes = tf.gather(classes, inds)
info = infos[-1]
else:
image = tf.image.resize(
image, (self._image_h, self._image_w), method='nearest')
inds = tf.cast(tf.range(0, tf.shape(boxes)[0]), tf.int64)
info = self._get_identity_info(image)
# Apply scaling to the hue saturation and brightness of an image.
image = tf.cast(image, dtype=self._dtype)
image = image / 255
image = preprocessing_ops.image_rand_hsv(
image,
self._aug_rand_hue,
self._aug_rand_saturation,
self._aug_rand_brightness,
seed=self._seed,
darknet=self._darknet)
# Cast the image to the selcted datatype.
image, labels = self._build_label(
image,
boxes,
classes,
self._image_w,
self._image_h,
info,
inds,
data,
is_training=True)
return image, labels
def _parse_eval_data(self, data):
# Down size coco 91 to coco 80 if the option is selected.
data = self.reorg91to80(data)
# Get the image shape constants and cast the image to the selcted datatype.
image = tf.cast(data['image'], dtype=self._dtype)
boxes = data['groundtruth_boxes']
classes = data['groundtruth_classes']
height, width = self._image_h, self._image_w
image, infos, _ = preprocessing_ops.resize_and_jitter_image(
image, [height, width],
letter_box=self._letter_box,
random_pad=False,
shiftx=0.5,
shifty=0.5,
jitter=0.0)
# Clip and clean boxes.
image = image / 255
boxes, inds = preprocessing_ops.apply_infos(
boxes, infos, shuffle_boxes=False, area_thresh=0.0, augment=True)
classes = tf.gather(classes, inds)
info = infos[-1]
image, labels = self._build_label(
image,
boxes,
classes,
width,
height,
info,
inds,
data,
is_training=False)
return image, labels
def set_shape(self, values, pad_axis=0, pad_value=0, inds=None, scale=1):
if inds is not None:
values = tf.gather(values, inds)
vshape = values.get_shape().as_list()
if pad_value is not None:
values = preprocessing_ops.pad_max_instances(
values,
self._max_num_instances,
pad_axis=pad_axis,
pad_value=pad_value)
vshape[pad_axis] = self._max_num_instances * scale
values.set_shape(vshape)
return values
def _build_grid(self, raw_true, width, height, use_tie_breaker=False):
'''Private function for building the full scale object and class grid.'''
indexes = {}
updates = {}
true_grids = {}
if self._anchor_free_limits is not None:
self._anchor_free_limits = [0.0] + self._anchor_free_limits + [np.inf]
# for each prediction path generate a properly scaled output prediction map
for i, key in enumerate(self._masks.keys()):
if self._anchor_free_limits is not None:
fpn_limits = self._anchor_free_limits[i:i + 2]
else:
fpn_limits = None
# build the actual grid as well and the list of boxes and classes AND
# their index in the prediction grid
scale_xy = self._scale_xy[key] if not self._darknet else 1
(indexes[key], updates[key],
true_grids[key]) = preprocessing_ops.build_grided_gt_ind(
raw_true,
self._masks[key],
width // self._strides[str(key)],
height // self._strides[str(key)],
raw_true['bbox'].dtype,
scale_xy,
self._scale_up[key],
use_tie_breaker,
self._strides[str(key)],
fpn_limits=fpn_limits)
# set/fix the shapes
indexes[key] = self.set_shape(indexes[key], -2, None, None,
self._scale_up[key])
updates[key] = self.set_shape(updates[key], -2, None, None,
self._scale_up[key])
# add all the values to the final dictionary
updates[key] = tf.cast(updates[key], dtype=self._dtype)
return indexes, updates, true_grids
def _build_label(self,
image,
gt_boxes,
gt_classes,
width,
height,
info,
inds,
data,
is_training=True):
"""Label construction for both the train and eval data. """
# Set the image shape.
imshape = image.get_shape().as_list()
imshape[-1] = 3
image.set_shape(imshape)
# Get the best anchors.
boxes = box_utils.yxyx_to_xcycwh(gt_boxes)
best_anchors, ious = preprocessing_ops.get_best_anchor(
boxes,
self._anchors,
width=width,
height=height,
iou_thresh=self._anchor_t,
best_match_only=self._best_match_only)
# Set/fix the boxes shape.
boxes = self.set_shape(boxes, pad_axis=0, pad_value=0)
classes = self.set_shape(gt_classes, pad_axis=0, pad_value=-1)
best_anchors = self.set_shape(best_anchors, pad_axis=0, pad_value=-1)
ious = self.set_shape(ious, pad_axis=0, pad_value=0)
area = self.set_shape(
data['groundtruth_area'], pad_axis=0, pad_value=0, inds=inds)
is_crowd = self.set_shape(
data['groundtruth_is_crowd'], pad_axis=0, pad_value=0, inds=inds)
# Build the dictionary set.
labels = {
'source_id': utils.process_source_id(data['source_id']),
'bbox': tf.cast(boxes, dtype=self._dtype),
'classes': tf.cast(classes, dtype=self._dtype),
'best_anchors': tf.cast(best_anchors, dtype=self._dtype),
'best_iou_match': ious,
}
# Build the grid formatted for loss computation in model output format.
labels['inds'], labels['upds'], labels['true_conf'] = self._build_grid(
labels, width, height, use_tie_breaker=self._use_tie_breaker)
# Update the labels dictionary.
labels['bbox'] = box_utils.xcycwh_to_yxyx(labels['bbox'])
if not is_training:
# Sets up groundtruth data for evaluation.
groundtruths = {
'source_id': labels['source_id'],
'height': height,
'width': width,
'num_detections': tf.shape(gt_boxes)[0],
'image_info': info,
'boxes': gt_boxes,
'classes': gt_classes,
'areas': area,
'is_crowds': tf.cast(is_crowd, tf.int32),
}
groundtruths['source_id'] = utils.process_source_id(
groundtruths['source_id'])
groundtruths = utils.pad_groundtruths_to_fixed_size(
groundtruths, self._max_num_instances)
labels['groundtruths'] = groundtruths
return image, labels
...@@ -535,8 +535,13 @@ def _darknet_new_coord_boxes(encoded_boxes, width, height, anchor_grid, ...@@ -535,8 +535,13 @@ def _darknet_new_coord_boxes(encoded_boxes, width, height, anchor_grid,
return (scaler, scaled_box, pred_box), delta return (scaler, scaled_box, pred_box), delta
def _anchor_free_scale_boxes(encoded_boxes, width, height, stride, grid_points, def _anchor_free_scale_boxes(encoded_boxes,
scale_xy): width,
height,
stride,
grid_points,
scale_xy,
darknet=False):
"""Decode models boxes using FPN stride under anchor free conditions.""" """Decode models boxes using FPN stride under anchor free conditions."""
# split the boxes # split the boxes
pred_xy = encoded_boxes[..., 0:2] pred_xy = encoded_boxes[..., 0:2]
...@@ -546,21 +551,30 @@ def _anchor_free_scale_boxes(encoded_boxes, width, height, stride, grid_points, ...@@ -546,21 +551,30 @@ def _anchor_free_scale_boxes(encoded_boxes, width, height, stride, grid_points,
scaler = tf.convert_to_tensor([height, width, height, width]) scaler = tf.convert_to_tensor([height, width, height, width])
scale_xy = tf.cast(scale_xy, encoded_boxes.dtype) scale_xy = tf.cast(scale_xy, encoded_boxes.dtype)
scale_down = lambda x, y: x / y
scale_up = lambda x, y: x * y
if darknet:
scale_down = tf.grad_pass_through(scale_down)
scale_up = tf.grad_pass_through(scale_up)
# scale the centers and find the offset of each box relative to # scale the centers and find the offset of each box relative to
# their center pixel # their center pixel
pred_xy = pred_xy * scale_xy - 0.5 * (scale_xy - 1) pred_xy = pred_xy * scale_xy - 0.5 * (scale_xy - 1)
# scale the offsets and add them to the grid points or a tensor that is # scale the offsets and add them to the grid points or a tensor that is
# the realtive location of each pixel # the realtive location of each pixel
box_xy = (grid_points + pred_xy) * stride box_xy = (grid_points + pred_xy)
# scale the width and height of the predictions and corlate them # scale the width and height of the predictions and corlate them
# to anchor boxes # to anchor boxes
box_wh = tf.math.exp(pred_wh) * stride box_wh = tf.math.exp(pred_wh)
# build the final predicted box # build the final predicted box
scaled_box = tf.concat([box_xy, box_wh], axis=-1) scaled_box = tf.concat([box_xy, box_wh], axis=-1)
pred_box = scaled_box / scaler
# properly scaling boxes gradeints
scaled_box = scale_up(scaled_box, stride)
pred_box = scale_down(scaled_box, (scaler * stride))
return (scaler, scaled_box, pred_box) return (scaler, scaled_box, pred_box)
...@@ -610,7 +624,7 @@ def get_predicted_box(width, ...@@ -610,7 +624,7 @@ def get_predicted_box(width,
if box_type == 'anchor_free': if box_type == 'anchor_free':
(scaler, scaled_box, (scaler, scaled_box,
pred_box) = _anchor_free_scale_boxes(encoded_boxes, width, height, stride, pred_box) = _anchor_free_scale_boxes(encoded_boxes, width, height, stride,
grid_points, scale_xy) grid_points, scale_xy, darknet=darknet)
elif darknet: elif darknet:
# pylint:disable=unbalanced-tuple-unpacking # pylint:disable=unbalanced-tuple-unpacking
......
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Mosaic data aug for YOLO."""
import random
import tensorflow as tf
import tensorflow_addons as tfa
from official.vision.beta.projects.yolo.ops import preprocessing_ops
from official.vision.beta.ops import box_ops
class Mosaic(object):
"""Stitch together sets of 4 images to generate samples with more boxes."""
def __init__(self,
output_size,
mosaic_frequency=1.0,
mixup_frequency=0.0,
letter_box=True,
jitter=0.0,
mosaic_crop_mode='scale',
mosaic_center=0.25,
aug_scale_min=1.0,
aug_scale_max=1.0,
aug_rand_angle=0.0,
aug_rand_perspective=0.0,
aug_rand_translate=0.0,
random_pad=False,
area_thresh=0.1,
seed=None):
"""Initializes parameters for mosaic.
Args:
output_size: `Tensor` or `List` for [height, width] of output image.
mosaic_frequency: `float` indicating how often to apply mosaic.
mixup_frequency: `float` indicating how often to apply mixup.
letter_box: `boolean` indicating whether upon start of the datapipeline
regardless of the preprocessing ops that are used, the aspect ratio of
the images should be preserved.
jitter: `float` for the maximum change in aspect ratio expected in
each preprocessing step.
mosaic_crop_mode: `str` they type of mosaic to apply. The options are
{crop, scale, None}, crop will construct a mosaic by slicing images
togther, scale will create a mosaic by concatnating and shifting the
image, and None will default to scale and apply no post processing to
the created mosaic.
mosaic_center: `float` indicating how much to randomly deviate from the
from the center of the image when creating a mosaic.
aug_scale_min: `float` indicating the minimum scaling value for image
scale jitter.
aug_scale_max: `float` indicating the maximum scaling value for image
scale jitter.
aug_rand_angle: `float` indicating the maximum angle value for
angle. angle will be changes between 0 and value.
aug_rand_translate: `float` ranging from 0 to 1 indicating the maximum
amount to randomly translate an image.
aug_rand_perspective: `float` ranging from 0.000 to 0.001 indicating
how much to prespective warp the image.
random_pad: `bool` indiccating wether to use padding to apply random
translation true for darknet yolo false for scaled yolo.
area_thresh: `float` for the minimum area of a box to allow to pass
through for optimization.
seed: `int` the seed for random number generation.
"""
self._output_size = output_size
self._area_thresh = area_thresh
self._mosaic_frequency = mosaic_frequency
self._mixup_frequency = mixup_frequency
self._letter_box = letter_box
self._random_crop = jitter
self._mosaic_crop_mode = mosaic_crop_mode
self._mosaic_center = mosaic_center
self._aug_scale_min = aug_scale_min
self._aug_scale_max = aug_scale_max
self._random_pad = random_pad
self._aug_rand_translate = aug_rand_translate
self._aug_rand_angle = aug_rand_angle
self._aug_rand_perspective = aug_rand_perspective
self._deterministic = seed != None
self._seed = seed if seed is not None else random.randint(0, 2**30)
def _generate_cut(self):
"""Generate a random center to use for slicing and patching the images."""
if self._mosaic_crop_mode == 'crop':
min_offset = self._mosaic_center
cut_x = preprocessing_ops.rand_uniform_strong(
self._output_size[1] * min_offset,
self._output_size[1] * (1 - min_offset),
seed=self._seed)
cut_y = preprocessing_ops.rand_uniform_strong(
self._output_size[0] * min_offset,
self._output_size[0] * (1 - min_offset),
seed=self._seed)
cut = [cut_x, cut_y]
ishape = tf.convert_to_tensor(
[self._output_size[1], self._output_size[0], 3])
else:
cut = None
ishape = tf.convert_to_tensor(
[self._output_size[1] * 2, self._output_size[0] * 2, 3])
return cut, ishape
def _augment_image(self,
image,
boxes,
classes,
is_crowd,
area,
xs=0.0,
ys=0.0,
cut=None):
"""Process a single image prior to the application of patching."""
# Randomly flip the image horizontally.
letter_box = self._letter_box
image, infos, crop_points = preprocessing_ops.resize_and_jitter_image(
image, [self._output_size[0], self._output_size[1]],
random_pad=False,
letter_box=letter_box,
jitter=self._random_crop,
shiftx=xs,
shifty=ys,
cut=cut,
seed=self._seed)
# Clip and clean boxes.
boxes, inds = preprocessing_ops.apply_infos(
boxes,
infos,
area_thresh=self._area_thresh,
shuffle_boxes=False,
augment=True,
seed=self._seed)
classes = tf.gather(classes, inds)
is_crowd = tf.gather(is_crowd, inds)
area = tf.gather(area, inds)
return image, boxes, classes, is_crowd, area, crop_points
def _mosaic_crop_image(self, image, boxes, classes, is_crowd, area):
"""Process a patched image in preperation for final output."""
if self._mosaic_crop_mode != "crop":
shape = tf.cast(preprocessing_ops.get_image_shape(image), tf.float32)
center = shape * self._mosaic_center
# shift the center of the image by applying a translation to the whole
# image
ch = tf.math.round(
preprocessing_ops.rand_uniform_strong(
-center[0], center[0], seed=self._seed))
cw = tf.math.round(
preprocessing_ops.rand_uniform_strong(
-center[1], center[1], seed=self._seed))
# clip the boxes to those with in the image
image = tfa.image.translate(
image, [cw, ch], fill_value=preprocessing_ops.get_pad_value())
boxes = box_ops.denormalize_boxes(boxes, shape[:2])
boxes = boxes + tf.cast([ch, cw, ch, cw], boxes.dtype)
boxes = box_ops.clip_boxes(boxes, shape[:2])
boxes = box_ops.normalize_boxes(boxes, shape[:2])
# warp and scale the fully stitched sample
image, _, affine = preprocessing_ops.affine_warp_image(
image, [self._output_size[0], self._output_size[1]],
scale_min=self._aug_scale_min,
scale_max=self._aug_scale_max,
translate=self._aug_rand_translate,
degrees=self._aug_rand_angle,
perspective=self._aug_rand_perspective,
random_pad=self._random_pad,
seed=self._seed)
height, width = self._output_size[0], self._output_size[1]
image = tf.image.resize(image, (height, width))
# clip and clean boxes
boxes, inds = preprocessing_ops.apply_infos(
boxes,
None,
affine=affine,
area_thresh=self._area_thresh,
augment=True,
seed=self._seed)
classes = tf.gather(classes, inds)
is_crowd = tf.gather(is_crowd, inds)
area = tf.gather(area, inds)
return image, boxes, classes, is_crowd, area, area
def scale_boxes(self, patch, ishape, boxes, classes, xs, ys):
"""Scale and translate the boxes for each image prior to patching."""
xs = tf.cast(xs, boxes.dtype)
ys = tf.cast(ys, boxes.dtype)
pshape = tf.cast(tf.shape(patch), boxes.dtype)
ishape = tf.cast(ishape, boxes.dtype)
translate = tf.cast((ishape - pshape), boxes.dtype)
boxes = box_ops.denormalize_boxes(boxes, pshape[:2])
boxes = boxes + tf.cast([
translate[0] * ys, translate[1] * xs, translate[0] * ys,
translate[1] * xs
], boxes.dtype)
boxes = box_ops.normalize_boxes(boxes, ishape[:2])
return boxes, classes
# mosaic full frequency doubles model speed
def _process_image(self, sample, shiftx, shifty, cut, ishape):
"""Process and augment each image."""
(image, boxes, classes, is_crowd, area, crop_points) = self._augment_image(
sample['image'], sample['groundtruth_boxes'],
sample['groundtruth_classes'], sample['groundtruth_is_crowd'],
sample['groundtruth_area'], shiftx, shifty, cut)
if cut is None and ishape is None:
cut, ishape = self._generate_cut()
(boxes, classes) = self.scale_boxes(image, ishape, boxes, classes,
1 - shiftx, 1 - shifty)
sample['image'] = image
sample['groundtruth_boxes'] = boxes
sample['groundtruth_classes'] = classes
sample['groundtruth_is_crowd'] = is_crowd
sample['groundtruth_area'] = area
sample['cut'] = cut
sample['shiftx'] = shiftx
sample['shifty'] = shifty
sample['crop_points'] = crop_points
return sample
def _patch2(self, one, two):
"""Stitch together 2 images in totality"""
sample = one
sample['image'] = tf.concat([one["image"], two["image"]], axis=-2)
sample['groundtruth_boxes'] = tf.concat(
[one['groundtruth_boxes'], two['groundtruth_boxes']], axis=0)
sample['groundtruth_classes'] = tf.concat(
[one['groundtruth_classes'], two['groundtruth_classes']], axis=0)
sample['groundtruth_is_crowd'] = tf.concat(
[one['groundtruth_is_crowd'], two['groundtruth_is_crowd']], axis=0)
sample['groundtruth_area'] = tf.concat(
[one['groundtruth_area'], two['groundtruth_area']], axis=0)
return sample
def _patch(self, one, two):
"""Build the full 4 patch of images from sets of 2 images."""
image = tf.concat([one["image"], two["image"]], axis=-3)
boxes = tf.concat([one['groundtruth_boxes'], two['groundtruth_boxes']],
axis=0)
classes = tf.concat(
[one['groundtruth_classes'], two['groundtruth_classes']], axis=0)
is_crowd = tf.concat(
[one['groundtruth_is_crowd'], two['groundtruth_is_crowd']], axis=0)
area = tf.concat([one['groundtruth_area'], two['groundtruth_area']], axis=0)
if self._mosaic_crop_mode is not None:
image, boxes, classes, is_crowd, area, _ = self._mosaic_crop_image(
image, boxes, classes, is_crowd, area)
sample = one
height, width = preprocessing_ops.get_image_shape(image)
sample['image'] = tf.cast(image, tf.uint8)
sample['groundtruth_boxes'] = boxes
sample['groundtruth_area'] = area
sample['groundtruth_classes'] = tf.cast(classes,
sample['groundtruth_classes'].dtype)
sample['groundtruth_is_crowd'] = tf.cast(is_crowd, tf.bool)
sample['width'] = tf.cast(width, sample['width'].dtype)
sample['height'] = tf.cast(height, sample['height'].dtype)
sample['num_detections'] = tf.shape(sample['groundtruth_boxes'])[1]
sample['is_mosaic'] = tf.cast(1.0, tf.bool)
del sample['shiftx'], sample['shifty'], sample['crop_points'], sample['cut']
return sample
def _mosaic(self, one, two, three, four):
"""Stitch together 4 images to build a mosaic."""
if self._mosaic_frequency >= 1.0:
domo = 1.0
else:
domo = preprocessing_ops.rand_uniform_strong(
0.0, 1.0, dtype=tf.float32, seed=self._seed)
noop = one.copy()
if domo >= (1 - self._mosaic_frequency):
cut, ishape = self._generate_cut()
one = self._process_image(one, 1.0, 1.0, cut, ishape)
two = self._process_image(two, 0.0, 1.0, cut, ishape)
three = self._process_image(three, 1.0, 0.0, cut, ishape)
four = self._process_image(four, 0.0, 0.0, cut, ishape)
patch1 = self._patch2(one, two)
patch2 = self._patch2(three, four)
stitched = self._patch(patch1, patch2)
return stitched
else:
return self._add_param(noop)
def _mixup(self, one, two):
"""Blend together 2 images for the mixup data augmentation."""
if self._mixup_frequency >= 1.0:
domo = 1.0
else:
domo = preprocessing_ops.rand_uniform_strong(
0.0, 1.0, dtype=tf.float32, seed=self._seed)
noop = one.copy()
if domo >= (1 - self._mixup_frequency):
sample = one
otype = one["image"].dtype
r = preprocessing_ops.rand_uniform_strong(
0.4, 0.6, tf.float32, seed=self._seed)
sample['image'] = (
r * tf.cast(one["image"], tf.float32) +
(1 - r) * tf.cast(two["image"], tf.float32))
sample['image'] = tf.cast(sample['image'], otype)
sample['groundtruth_boxes'] = tf.concat(
[one['groundtruth_boxes'], two['groundtruth_boxes']], axis=0)
sample['groundtruth_classes'] = tf.concat(
[one['groundtruth_classes'], two['groundtruth_classes']], axis=0)
sample['groundtruth_is_crowd'] = tf.concat(
[one['groundtruth_is_crowd'], two['groundtruth_is_crowd']], axis=0)
sample['groundtruth_area'] = tf.concat(
[one['groundtruth_area'], two['groundtruth_area']], axis=0)
return sample
else:
return self._add_param(noop)
def _add_param(self, sample):
"""Add parameters to handle skipped images."""
sample['is_mosaic'] = tf.cast(0.0, tf.bool)
sample['num_detections'] = tf.shape(sample['groundtruth_boxes'])[0]
return sample
def _apply(self, dataset):
"""Apply mosaic to an input dataset."""
determ = self._deterministic
one = dataset.shuffle(100, seed=self._seed, reshuffle_each_iteration=True)
two = dataset.shuffle(
100, seed=self._seed + 1, reshuffle_each_iteration=True)
three = dataset.shuffle(
100, seed=self._seed + 2, reshuffle_each_iteration=True)
four = dataset.shuffle(
100, seed=self._seed + 3, reshuffle_each_iteration=True)
dataset = tf.data.Dataset.zip((one, two, three, four))
dataset = dataset.map(
self._mosaic, num_parallel_calls=tf.data.AUTOTUNE, deterministic=determ)
if self._mixup_frequency > 0:
one = dataset.shuffle(
100, seed=self._seed + 4, reshuffle_each_iteration=True)
two = dataset.shuffle(
100, seed=self._seed + 5, reshuffle_each_iteration=True)
dataset = tf.data.Dataset.zip((one, two))
dataset = dataset.map(
self._mixup,
num_parallel_calls=tf.data.AUTOTUNE,
deterministic=determ)
return dataset
def _skip(self, dataset):
"""Skip samples in a dataset."""
determ = self._deterministic
return dataset.map(
self._add_param,
num_parallel_calls=tf.data.AUTOTUNE,
deterministic=determ)
def mosaic_fn(self, is_training=True):
"""Determine which function to apply based on whether model is training"""
if is_training and self._mosaic_frequency > 0.0:
return self._apply
else:
return self._skip
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Yolo preprocess ops."""
import tensorflow as tf
import tensorflow_addons as tfa
from official.vision.beta.projects.yolo.ops import box_ops
def resize_crop_filter(image, boxes, default_width, default_height,
target_width, target_height):
"""Apply zooming to the image and boxes.
Args:
image: a `Tensor` representing the image.
boxes: a `Tensor` represeting the boxes.
default_width: a `Tensor` representing the width of the image.
default_height: a `Tensor` representing the height of the image.
target_width: a `Tensor` representing the desired width of the image.
target_height: a `Tensor` representing the desired height of the image.
Returns:
images: a `Tensor` representing the augmented image.
boxes: a `Tensor` representing the augmented boxes.
"""
with tf.name_scope('resize_crop_filter'):
image = tf.image.resize(image, (target_width, target_height))
image = tf.image.resize_with_crop_or_pad(image,
target_height=default_height,
target_width=default_width)
default_width = tf.cast(default_width, boxes.dtype)
default_height = tf.cast(default_height, boxes.dtype)
target_width = tf.cast(target_width, boxes.dtype)
target_height = tf.cast(target_height, boxes.dtype)
aspect_change_width = target_width / default_width
aspect_change_height = target_height / default_height
x, y, width, height = tf.split(boxes, 4, axis=-1)
x = (x - 0.5) * target_width / default_width + 0.5
y = (y - 0.5) * target_height / default_height + 0.5
width = width * aspect_change_width
height = height * aspect_change_height
boxes = tf.concat([x, y, width, height], axis=-1)
return image, boxes
def random_translate(image, box, t, seed=None):
"""Randomly translate the image and boxes.
Args:
image: a `Tensor` representing the image.
box: a `Tensor` represeting the boxes.
t: an `int` representing the translation factor
seed: an optional seed for tf.random operations
Returns:
image: a `Tensor` representing the augmented image.
box: a `Tensor` representing the augmented boxes.
"""
t_x = tf.random.uniform(minval=-t,
maxval=t,
shape=(),
dtype=tf.float32,
seed=seed)
t_y = tf.random.uniform(minval=-t,
maxval=t,
shape=(),
dtype=tf.float32,
seed=seed)
box = translate_boxes(box, t_x, t_y)
image = translate_image(image, t_x, t_y)
return image, box
def translate_boxes(box, translate_x, translate_y):
"""Randomly translate the boxes.
Args:
box: a `Tensor` represeitng the boxes.
translate_x: a `Tensor` represting the translation on the x-axis.
translate_y: a `Tensor` represting the translation on the y-axis.
Returns:
box: a `Tensor` representing the augmented boxes.
"""
with tf.name_scope('translate_boxs'):
x = box[..., 0] + translate_x
y = box[..., 1] + translate_y
box = tf.stack([x, y, box[..., 2], box[..., 3]], axis=-1)
box.set_shape([None, 4])
return box
def translate_image(image, translate_x, translate_y):
"""Randomly translate the image.
Args:
image: a `Tensor` representing the image.
translate_x: a `Tensor` represting the translation on the x-axis.
translate_y: a `Tensor` represting the translation on the y-axis.
Returns:
box: a `Tensor` representing the augmented boxes.
"""
with tf.name_scope('translate_image'):
if (translate_x != 0 and translate_y != 0):
image_jitter = tf.convert_to_tensor([translate_x, translate_y])
image_jitter.set_shape([2])
image = tfa.image.translate(
image, image_jitter * tf.cast(tf.shape(image)[1], tf.float32))
return image
def pad_max_instances(value, instances, pad_value=0, pad_axis=0):
"""Pads tensors to max number of instances."""
shape = tf.shape(value)
dim1 = shape[pad_axis]
take = tf.math.reduce_min([instances, dim1])
value, _ = tf.split(value, [take, -1],
axis=pad_axis) # value[:instances, ...]
pad = tf.convert_to_tensor([tf.math.reduce_max([instances - dim1, 0])])
nshape = tf.concat([shape[:pad_axis], pad, shape[(pad_axis + 1):]], axis=0)
pad_tensor = tf.fill(nshape, tf.cast(pad_value, dtype=value.dtype))
value = tf.concat([value, pad_tensor], axis=pad_axis)
return value
def fit_preserve_aspect_ratio(image,
boxes,
width=None,
height=None,
target_dim=None):
"""Resizes the image while peserving the image aspect ratio.
Args:
image: a `Tensor` representing the image.
boxes: a `Tensor` representing the boxes.
width: int for the image width.
height: int for the image height.
target_dim: list or a Tensor of height and width.
Returns:
image: a `Tensor` representing the image.
box: a `Tensor` representing the boxes.
"""
if width is None or height is None:
shape = tf.shape(image)
if tf.shape(shape)[0] == 4:
width = shape[1]
height = shape[2]
else:
width = shape[0]
height = shape[1]
clipper = tf.math.maximum(width, height)
if target_dim is None:
target_dim = clipper
pad_width = clipper - width
pad_height = clipper - height
image = tf.image.pad_to_bounding_box(image, pad_width // 2, pad_height // 2,
clipper, clipper)
boxes = box_ops.yxyx_to_xcycwh(boxes)
x, y, w, h = tf.split(boxes, 4, axis=-1)
y *= tf.cast(width / clipper, tf.float32)
x *= tf.cast(height / clipper, tf.float32)
y += tf.cast((pad_width / clipper) / 2, tf.float32)
x += tf.cast((pad_height / clipper) / 2, tf.float32)
h *= tf.cast(width / clipper, tf.float32)
w *= tf.cast(height / clipper, tf.float32)
boxes = tf.concat([x, y, w, h], axis=-1)
boxes = box_ops.xcycwh_to_yxyx(boxes)
image = tf.image.resize(image, (target_dim, target_dim))
return image, boxes
def get_best_anchor(y_true, anchors, width=1, height=1):
"""Gets the correct anchor that is assoiciated with each box using IOU.
Args:
y_true: `tf.Tensor[]` for the list of bounding boxes in the yolo format.
anchors: list or tensor for the anchor boxes to be used in prediction
found via Kmeans.
width: int for the image width.
height: int for the image height.
Returns:
tf.Tensor: y_true with the anchor associated with each ground truth
box known.
"""
with tf.name_scope('get_anchor'):
width = tf.cast(width, dtype=tf.float32)
height = tf.cast(height, dtype=tf.float32)
# split the boxes into center and width height
anchor_xy = y_true[..., 0:2]
# scale thhe boxes
anchors = tf.convert_to_tensor(anchors, dtype=tf.float32)
anchors_x = anchors[..., 0] / width
anchors_y = anchors[..., 1] / height
anchors = tf.stack([anchors_x, anchors_y], axis=-1)
k = tf.shape(anchors)[0]
# build a matrix of anchor boxes of shape [num_anchors, num_boxes, 4]
anchors = tf.transpose(anchors, perm=[1, 0])
anchor_xy = tf.tile(tf.expand_dims(anchor_xy, axis=-1),
[1, 1, tf.shape(anchors)[-1]])
anchors = tf.tile(tf.expand_dims(anchors, axis=0),
[tf.shape(anchor_xy)[0], 1, 1])
# stack the xy so, each anchor is asscoaited once with each center from
# the ground truth input
anchors = tf.concat([anchor_xy, anchors], axis=1)
anchors = tf.transpose(anchors, perm=[2, 0, 1])
# copy the gt n times so that each anchor from above can be compared to
# input ground truth to shape: [num_anchors, num_boxes, 4]
truth_comp = tf.tile(tf.expand_dims(y_true[..., 0:4], axis=-1),
[1, 1, tf.shape(anchors)[0]])
truth_comp = tf.transpose(truth_comp, perm=[2, 0, 1])
# compute intersection over union of the boxes, and take the argmax of
# comuted iou for each box. thus each box is associated with the
# largest interection over union
iou_raw = box_ops.compute_iou(truth_comp, anchors)
values, indexes = tf.math.top_k(tf.transpose(iou_raw, perm=[1, 0]),
k=tf.cast(k, dtype=tf.int32),
sorted=True)
ind_mask = tf.cast(values > 0.213, dtype=indexes.dtype)
# pad the indexs such that all values less than the thresh are -1
# add one, multiply the mask to zeros all the bad locations
# subtract 1 makeing all the bad locations 0.
iou_index = tf.concat([
tf.keras.backend.expand_dims(indexes[..., 0], axis=-1),
((indexes[..., 1:] + 1) * ind_mask[..., 1:]) - 1
],
axis=-1)
iou_index = iou_index[..., :6]
return tf.cast(iou_index, dtype=tf.float32)
def build_grided_gt(y_true, mask, size, dtype, use_tie_breaker):
"""Converts ground truth for use in loss functions.
Args:
y_true: tf.Tensor[] ground truth
[box coords[0:4], classes_onehot[0:-1], best_fit_anchor_box].
mask: list of the anchor boxes choresponding to the output,
ex. [1, 2, 3] tells this layer to predict only the first 3
anchors in the total.
size: The dimensions of this output, for regular, it progresses
from 13, to 26, to 52.
dtype: The expected output dtype.
use_tie_breaker: boolean value for wether or not to use the tie_breaker.
Returns:
tf.Tensor[] of shape [size, size, #of_anchors, 4, 1, num_classes].
"""
# unpack required components from the input ground truth
boxes = tf.cast(y_true['bbox'], dtype)
classes = tf.expand_dims(tf.cast(y_true['classes'], dtype=dtype), axis=-1)
anchors = tf.cast(y_true['best_anchors'], dtype)
# get the number of boxes in the ground truth boxs
num_boxes = tf.shape(boxes)[0]
# get the number of anchor boxes used for this anchor scale
len_masks = tf.shape(mask)[0]
# init a fixed memeory size grid for this prediction scale
# [size, size, # of anchors, 1 + 1 + number of anchors per scale]
full = tf.zeros([size, size, len_masks, 6], dtype=dtype)
# init a grid to use to track which locations have already
# been used before (for the tie breaker)
depth_track = tf.zeros((size, size, len_masks), dtype=tf.int32)
# rescale the x and y centers to the size of the grid [size, size]
x = tf.cast(boxes[..., 0] * tf.cast(size, dtype=dtype), dtype=tf.int32)
y = tf.cast(boxes[..., 1] * tf.cast(size, dtype=dtype), dtype=tf.int32)
# init all the tensorArrays to be used in storeing the index
# and the values to be used to update both depth_track and full
update_index = tf.TensorArray(tf.int32, size=0, dynamic_size=True)
update = tf.TensorArray(dtype, size=0, dynamic_size=True)
# init constants and match data types before entering loop
i = 0
anchor_id = 0
const = tf.cast(tf.convert_to_tensor([1.]), dtype=dtype)
mask = tf.cast(mask, dtype=dtype)
rand_update = 0.0
for box_id in range(num_boxes):
# If the width or height of the box is zero, skip it.
# After pre processing, if the box is not in the i image bounds anymore,
# skip it.
if tf.keras.backend.all(tf.math.equal(
boxes[box_id, 2:4], 0)) or tf.keras.backend.any(
tf.math.less(boxes[box_id, 0:2], 0.0)) or tf.keras.backend.any(
tf.math.greater_equal(boxes[box_id, 0:2], 1.0)):
continue
if use_tie_breaker:
for anchor_id in range(tf.shape(anchors)[-1]):
index = tf.math.equal(anchors[box_id, anchor_id], mask)
if tf.keras.backend.any(index):
# using the boolean index mask to determine exactly which
# anchor box was used
p = tf.cast(
tf.keras.backend.argmax(tf.cast(index, dtype=tf.int32)),
dtype=tf.int32)
# determine if the index was used or not
used = depth_track[y[box_id], x[box_id], p]
# defualt used upadte value
uid = 1
# if anchor_id is 0, this is the best matched anchor for this box
# with the highest IOU
if anchor_id == 0:
# write the box to the update list
# create random numbr to trigger a replacment if the cell
# is used already
if tf.math.equal(used, 1):
rand_update = tf.random.uniform([], maxval=1)
else:
rand_update = 1.0
if rand_update > 0.5:
# write the box to the update list
update_index = update_index.write(i, [y[box_id], x[box_id], p])
value = tf.concat([boxes[box_id], const, classes[box_id]],
axis=-1)
update = update.write(i, value)
# if used is 2, this cell is filled with a non-optimal box
# if used is 0, the cell in the ground truth is not yet consumed
# in either case you can replace that cell with a new box, as long
# as it is not consumed by an optimal box with anchor_id = 0
elif tf.math.equal(used, 2) or tf.math.equal(used, 0):
uid = 2
# write the box to the update list
update_index = update_index.write(i, [y[box_id], x[box_id], p])
value = tf.concat([boxes[box_id], const, classes[box_id]], axis=-1)
update = update.write(i, value)
depth_track = tf.tensor_scatter_nd_update(
depth_track, [(y[box_id], x[box_id], p)], [uid])
i += 1
else:
index = tf.math.equal(anchors[box_id, 0], mask)
# if any there is an index match
if tf.keras.backend.any(index):
# find the index
p = tf.cast(
tf.keras.backend.argmax(tf.cast(index, dtype=tf.int32)),
dtype=tf.int32)
# update the list of used boxes
update_index = update_index.write(i, [y[box_id], x[box_id], p])
value = tf.concat([boxes[box_id], const, classes[box_id]], axis=-1)
update = update.write(i, value)
i += 1
# if the size of the update list is not 0, do an update, other wise,
# no boxes and pass an empty grid
if tf.math.greater(update_index.size(), 0):
update_index = update_index.stack()
update = update.stack()
full = tf.tensor_scatter_nd_update(full, update_index, update)
return full
def build_batch_grided_gt(y_true, mask, size, dtype, use_tie_breaker):
"""Converts ground truth for use in loss functions.
Args:
y_true: tf.Tensor[] ground truth
[batch, box coords[0:4], classes_onehot[0:-1], best_fit_anchor_box].
mask: list of the anchor boxes choresponding to the output,
ex. [1, 2, 3] tells this layer to predict only the first 3 anchors
in the total.
size: the dimensions of this output, for regular, it progresses from
13, to 26, to 52.
dtype: expected output datatype.
use_tie_breaker: boolean value for whether or not to use the tie
breaker.
Returns:
tf.Tensor[] of shape [batch, size, size, #of_anchors, 4, 1, num_classes].
"""
# unpack required components from the input ground truth
boxes = tf.cast(y_true['bbox'], dtype)
classes = tf.expand_dims(tf.cast(y_true['classes'], dtype=dtype), axis=-1)
anchors = tf.cast(y_true['best_anchors'], dtype)
# get the batch size
batches = tf.shape(boxes)[0]
# get the number of boxes in the ground truth boxs
num_boxes = tf.shape(boxes)[1]
# get the number of anchor boxes used for this anchor scale
len_masks = tf.shape(mask)[0]
# init a fixed memeory size grid for this prediction scale
# [batch, size, size, # of anchors, 1 + 1 + number of anchors per scale]
full = tf.zeros([batches, size, size, len_masks, 1 + 4 + 1], dtype=dtype)
# init a grid to use to track which locations have already
# been used before (for the tie breaker)
depth_track = tf.zeros((batches, size, size, len_masks), dtype=tf.int32)
# rescale the x and y centers to the size of the grid [size, size]
x = tf.cast(boxes[..., 0] * tf.cast(size, dtype=dtype), dtype=tf.int32)
y = tf.cast(boxes[..., 1] * tf.cast(size, dtype=dtype), dtype=tf.int32)
# init all the tensorArrays to be used in storeing the index and the values
# to be used to update both depth_track and full
update_index = tf.TensorArray(tf.int32, size=0, dynamic_size=True)
update = tf.TensorArray(dtype, size=0, dynamic_size=True)
# init constants and match data types before entering loop
i = 0
anchor_id = 0
const = tf.cast(tf.convert_to_tensor([1.]), dtype=dtype)
mask = tf.cast(mask, dtype=dtype)
rand_update = 0.0
for batch in range(batches):
for box_id in range(num_boxes):
# if the width or height of the box is zero, skip it
if tf.keras.backend.all(tf.math.equal(boxes[batch, box_id, 2:4], 0)):
continue
# after pre processing, if the box is not in the image bounds anymore
# skip the box
if tf.keras.backend.any(tf.math.less(
boxes[batch, box_id, 0:2], 0.0)) or tf.keras.backend.any(
tf.math.greater_equal(boxes[batch, box_id, 0:2], 1.0)):
continue
if use_tie_breaker:
for anchor_id in range(tf.shape(anchors)[-1]):
index = tf.math.equal(anchors[batch, box_id, anchor_id], mask)
if tf.keras.backend.any(index):
# using the boolean index mask to determine exactly which anchor
# box was used
p = tf.cast(tf.keras.backend.argmax(tf.cast(index, dtype=tf.int32)),
dtype=tf.int32)
# determine if the index was used or not
used = depth_track[batch, y[batch, box_id], x[batch, box_id], p]
# defualt used upadte value
uid = 1
# if anchor_id is 0, this is the best matched anchor for this box
# with the highest IOU
if anchor_id == 0:
# create random number to trigger a replacment if the cell
# is used already
if tf.math.equal(used, 1):
rand_update = tf.random.uniform([], maxval=1)
else:
rand_update = 1.0
if rand_update > 0.5:
# write the box to the update list
update_index = update_index.write(
i, [batch, y[batch, box_id], x[batch, box_id], p])
value = tf.concat(
[boxes[batch, box_id], const, classes[batch, box_id]],
axis=-1)
update = update.write(i, value)
# if used is 2, this cell is filled with a non-optimal box
# if used is 0, the cell in the ground truth is not yet consumed
# in either case you can replace that cell with a new box, as long
# as it is not consumed by an optimal box with anchor_id = 0
elif tf.math.equal(used, 2) or tf.math.equal(used, 0):
uid = 2
# write the box to the update list
update_index = update_index.write(
i, [batch, y[batch, box_id], x[batch, box_id], p])
value = ([boxes[batch, box_id], const, classes[batch, box_id]])
update = update.write(i, value)
# update the used index for where and how the box was placed
depth_track = tf.tensor_scatter_nd_update(
depth_track, [(batch, y[batch, box_id], x[batch, box_id], p)],
[uid])
i += 1
else:
index = tf.math.equal(anchors[batch, box_id, 0], mask)
if tf.keras.backend.any(index):
# if any there is an index match
p = tf.cast(
tf.keras.backend.argmax(tf.cast(index, dtype=tf.int32)),
dtype=tf.int32)
# write the box to the update list
update_index = update_index.write(
i, [batch, y[batch, box_id], x[batch, box_id], p])
value = tf.concat(
[boxes[batch, box_id], const, classes[batch, box_id]], axis=-1)
update = update.write(i, value)
i += 1
# if the size of the update list is not 0, do an update, other wise,
# no boxes and pass an empty grid
if tf.math.greater(update_index.size(), 0):
update_index = update_index.stack()
update = update.stack()
full = tf.tensor_scatter_nd_update(full, update_index, update)
return full
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""preprocess_ops tests."""
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from official.vision.beta.projects.yolo.ops import preprocess_ops
class PreprocessOpsTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters((416, 416, 5, 300, 300), (100, 200, 6, 50, 50))
def test_resize_crop_filter(self, default_width, default_height, num_boxes,
target_width, target_height):
image = tf.convert_to_tensor(
np.random.rand(default_width, default_height, 3))
boxes = tf.convert_to_tensor(np.random.rand(num_boxes, 4))
resized_image, resized_boxes = preprocess_ops.resize_crop_filter(
image, boxes, default_width, default_height, target_width,
target_height)
resized_image_shape = tf.shape(resized_image)
resized_boxes_shape = tf.shape(resized_boxes)
self.assertAllEqual([default_height, default_width, 3],
resized_image_shape.numpy())
self.assertAllEqual([num_boxes, 4], resized_boxes_shape.numpy())
@parameterized.parameters((7, 7., 5.), (25, 35., 45.))
def test_translate_boxes(self, num_boxes, translate_x, translate_y):
boxes = tf.convert_to_tensor(np.random.rand(num_boxes, 4))
translated_boxes = preprocess_ops.translate_boxes(
boxes, translate_x, translate_y)
translated_boxes_shape = tf.shape(translated_boxes)
self.assertAllEqual([num_boxes, 4], translated_boxes_shape.numpy())
@parameterized.parameters((100, 200, 75., 25.), (400, 600, 25., 75.))
def test_translate_image(self, image_height, image_width, translate_x,
translate_y):
image = tf.convert_to_tensor(np.random.rand(image_height, image_width, 4))
translated_image = preprocess_ops.translate_image(
image, translate_x, translate_y)
translated_image_shape = tf.shape(translated_image)
self.assertAllEqual([image_height, image_width, 4],
translated_image_shape.numpy())
@parameterized.parameters(([1, 2], 20, 0), ([13, 2, 4], 15, 0))
def test_pad_max_instances(self, input_shape, instances, pad_axis):
expected_output_shape = input_shape
expected_output_shape[pad_axis] = instances
output = preprocess_ops.pad_max_instances(
np.ones(input_shape), instances, pad_axis=pad_axis)
self.assertAllEqual(expected_output_shape, tf.shape(output).numpy())
if __name__ == '__main__':
tf.test.main()
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Preproceesing operations for YOLO."""
import tensorflow as tf
import numpy as np
import random
import os
import tensorflow_addons as tfa
from official.vision.beta.projects.yolo.ops import box_ops
from official.vision.beta.projects.yolo.ops import loss_utils
from official.vision.beta.ops import box_ops as bbox_ops
PAD_VALUE = 114
GLOBAL_SEED_SET = False
def set_random_seeds(seed=0):
"""Sets all accessible global seeds to properly apply randomization.
This is not the same as passing seed as a variable to each call to tf.random.
For more, see the documentation for tf.random on the tensorflow website
https://www.tensorflow.org/api_docs/python/tf/random/set_seed. Note that
passing seed to each random number generator will not giv you the expected
behavior IF you use more than one generator in a single function.
Args:
seed: `Optional[int]` representing the seed you want to use.
"""
if seed is not None:
global GLOBAL_SEED_SET
os.environ['PYTHONHASHSEED'] = str(seed)
random.seed(seed)
GLOBAL_SEED_SET = True
tf.random.set_seed(seed)
np.random.seed(seed)
def get_pad_value():
return PAD_VALUE
def rand_uniform_strong(minval, maxval, dtype=tf.float32, seed=None, shape=[]):
"""A unified fucntion for consistant random number generation.
Equivalent to tf.random.uniform, except that minval and maxval are flipped if
minval is greater than maxval. Seed Safe random number generator.
Args:
minval: An `int` for a lower or upper endpoint of the interval from which to
choose the random number.
maxval: An `int` for the other endpoint.
dtype: The output type of the tensor.
Returns:
A random tensor of type dtype that falls between minval and maxval excluding
the bigger one.
"""
if GLOBAL_SEED_SET:
seed = None
if minval > maxval:
minval, maxval = maxval, minval
return tf.random.uniform(
shape=shape, minval=minval, maxval=maxval, seed=seed, dtype=dtype)
def rand_scale(val, dtype=tf.float32, seed=None):
"""Generate a random number for scaling a parameter by multiplication.
Generates a random number for the scale. Half the time, the value is between
[1.0, val) with uniformly distributed probability. The other half, the value
is the reciprocal of this value.
The function is identical to the one in the original implementation:
https://github.com/AlexeyAB/darknet/blob/a3714d0a/src/utils.c#L708-L713
Args:
val: A float representing the maximum scaling allowed.
dtype: The output type of the tensor.
Returns:
The random scale.
"""
scale = rand_uniform_strong(1.0, val, dtype=dtype, seed=seed)
do_ret = rand_uniform_strong(minval=0, maxval=2, dtype=tf.int32, seed=seed)
if (do_ret == 1):
return scale
return 1.0 / scale
def pad_max_instances(value, instances, pad_value=0, pad_axis=0):
"""Pad pr clip the tensor value to a fixed length along a given axis.
Pad a dimension of the tensor to have a maximum number of instances filling
additional entries with the `pad_value`. Allows for selection of the padding
axis
Args:
value: An input tensor.
instances: An int representing the maximum number of instances.
pad_value: An int representing the value used for padding until the maximum
number of instances is obtained.
pad_axis: An int representing the axis index to pad.
Returns:
The output tensor whose dimensions match the input tensor except with the
size along the `pad_axis` replaced by `instances`.
"""
# get the real shape of value
shape = tf.shape(value)
# compute the padding axis
if pad_axis < 0:
pad_axis = tf.rank(value) + pad_axis
# determin how much of the tensor value to keep
dim1 = shape[pad_axis]
take = tf.math.reduce_min([instances, dim1])
value, _ = tf.split(value, [take, -1], axis=pad_axis)
# pad the clipped tensor to the right shape
pad = tf.convert_to_tensor([tf.math.reduce_max([instances - dim1, 0])])
nshape = tf.concat([shape[:pad_axis], pad, shape[(pad_axis + 1):]], axis=0)
pad_tensor = tf.fill(nshape, tf.cast(pad_value, dtype=value.dtype))
value = tf.concat([value, pad_tensor], axis=pad_axis)
return value
def get_image_shape(image):
""" Consitently get the width and height of the image.
Get the shape of the image regardless of if the image is in the
(batch_size, x, y, c) format or the (x, y, c) format.
Args:
image: A tensor who has either 3 or 4 dimensions.
Returns:
A tuple representing the (height, width) of the image.
"""
shape = tf.shape(image)
if shape.get_shape().as_list()[0] == 4:
width = shape[2]
height = shape[1]
else:
width = shape[1]
height = shape[0]
return height, width
def _augment_hsv_darknet(image, rh, rs, rv, seed=None):
"""Randomly alter the hue, saturation, and brightness of an image.
Applies ranomdization the same way as Darknet by scaling the saturation and
brightness of the image and adding/rotating the hue.
Args:
image: Tensor of shape [None, None, 3] that needs to be altered.
rh: `float32` used to indicate the maximum delta that can be added to hue.
rs: `float32` used to indicate the maximum delta that can be multiplied to
saturation.
rv: `float32` used to indicate the maximum delta that can be multiplied to
brightness.
seed: `Optional[int]` for the seed to use in random number generation.
Returns:
The HSV altered image in the same datatype as the input image
"""
if rh > 0.0:
delta = rand_uniform_strong(-rh, rh, seed=seed)
image = tf.image.adjust_hue(image, delta)
if rs > 0.0:
delta = rand_scale(rs, seed=seed)
image = tf.image.adjust_saturation(image, delta)
if rv > 0.0:
delta = rand_scale(rv, seed=seed)
image *= delta
# clip the values of the image between 0.0 and 1.0
image = tf.clip_by_value(image, 0.0, 1.0)
return image
def _augment_hsv_torch(image, rh, rs, rv, seed=None):
"""Randomly alter the hue, saturation, and brightness of an image.
Applies ranomdization the same way as Darknet by scaling the saturation and
brightness and hue of the image.
Args:
image: Tensor of shape [None, None, 3] that needs to be altered.
rh: `float32` used to indicate the maximum delta that can be multiplied to
hue.
rs: `float32` used to indicate the maximum delta that can be multiplied to
saturation.
rv: `float32` used to indicate the maximum delta that can be multiplied to
brightness.
seed: `Optional[int]` for the seed to use in random number generation.
Returns:
The HSV altered image in the same datatype as the input image
"""
dtype = image.dtype
image = tf.cast(image, tf.float32)
image = tf.image.rgb_to_hsv(image)
gen_range = tf.cast([rh, rs, rv], image.dtype)
scale = tf.cast([180, 255, 255], image.dtype)
r = rand_uniform_strong(
-1, 1, shape=[3], dtype=image.dtype, seed=seed) * gen_range + 1
# image = tf.cast(tf.cast(image, r.dtype) * (r * scale), tf.int32)
image = tf.math.floor(tf.cast(image, scale.dtype) * scale)
image = tf.math.floor(tf.cast(image, r.dtype) * r)
h, s, v = tf.split(image, 3, axis=-1)
h = h % 180
s = tf.clip_by_value(s, 0, 255)
v = tf.clip_by_value(v, 0, 255)
image = tf.concat([h, s, v], axis=-1)
image = tf.cast(image, scale.dtype) / scale
image = tf.image.hsv_to_rgb(image)
return tf.cast(image, dtype)
def image_rand_hsv(image, rh, rs, rv, seed=None, darknet=False):
"""Randomly alter the hue, saturation, and brightness of an image.
Args:
image: Tensor of shape [None, None, 3] that needs to be altered.
rh: `float32` used to indicate the maximum delta that can be multiplied to
hue.
rs: `float32` used to indicate the maximum delta that can be multiplied to
saturation.
rv: `float32` used to indicate the maximum delta that can be multiplied to
brightness.
seed: `Optional[int]` for the seed to use in random number generation.
darknet: `bool` indicating wether the model was orignally built in the
darknet or the pytorch library.
Returns:
The HSV altered image in the same datatype as the input image
"""
if darknet:
image = _augment_hsv_darknet(image, rh, rs, rv, seed=seed)
else:
image = _augment_hsv_torch(image, rh, rs, rv, seed=seed)
return image
def mosaic_cut(image, original_width, original_height, width, height, center,
ptop, pleft, pbottom, pright, shiftx, shifty):
"""Use a provided center to take slices of 4 images to apply mosaic.
Given a center location, cut the input image into a slice that will be
concatnated with other slices with the same center in order to construct
a final mosaiced image.
Args:
image: Tensor of shape [None, None, 3] that needs to be altered.
original_width: `float` value indicating the orignal width of the image.
original_height: `float` value indicating the orignal height of the image.
width: `float` value indicating the final width image.
height: `float` value indicating the final height image.
center: `float` value indicating the desired center of the final patched
image.
ptop: `float` value indicating the top of the image without padding.
pleft: `float` value indicating the left of the image without padding.
pbottom: `float` value indicating the bottom of the image without padding.
pright: `float` value indicating the right of the image without padding.
shiftx: `float` 0.0 or 1.0 value indicating if the image is in the
left or right.
shifty: `float` 0.0 or 1.0 value indicating if the image is in the
top or bottom.
Returns:
image: The cropped image in the same datatype as the input image.
crop_info: `float` tensor that is applied to the boxes in order to select
the boxes still contained within the image.
"""
def cast(values, dtype):
return [tf.cast(value, dtype) for value in values]
with tf.name_scope('mosaic_cut'):
center = tf.cast(center, width.dtype)
zero = tf.cast(0.0, width.dtype)
cut_x, cut_y = center[1], center[0]
# Select the crop of the image to use
left_shift = tf.minimum(
tf.minimum(cut_x, tf.maximum(zero, -pleft * width / original_width)),
width - cut_x)
top_shift = tf.minimum(
tf.minimum(cut_y, tf.maximum(zero, -ptop * height / original_height)),
height - cut_y)
right_shift = tf.minimum(
tf.minimum(width - cut_x,
tf.maximum(zero, -pright * width / original_width)), cut_x)
bot_shift = tf.minimum(
tf.minimum(height - cut_y,
tf.maximum(zero, -pbottom * height / original_height)),
cut_y)
(left_shift, top_shift, right_shift, bot_shift,
zero) = cast([left_shift, top_shift, right_shift, bot_shift, zero],
tf.float32)
# Build a crop offset and a crop size tensor to use for slicing.
crop_offset = [zero, zero, zero]
crop_size = [zero - 1, zero - 1, zero - 1]
if shiftx == 0.0 and shifty == 0.0:
crop_offset = [top_shift, left_shift, zero]
crop_size = [cut_y, cut_x, zero - 1]
elif shiftx == 1.0 and shifty == 0.0:
crop_offset = [top_shift, cut_x - right_shift, zero]
crop_size = [cut_y, width - cut_x, zero - 1]
elif shiftx == 0.0 and shifty == 1.0:
crop_offset = [cut_y - bot_shift, left_shift, zero]
crop_size = [height - cut_y, cut_x, zero - 1]
elif shiftx == 1.0 and shifty == 1.0:
crop_offset = [cut_y - bot_shift, cut_x - right_shift, zero]
crop_size = [height - cut_y, width - cut_x, zero - 1]
# Contain and crop the image.
ishape = tf.cast(tf.shape(image)[:2], crop_size[0].dtype)
crop_size[0] = tf.minimum(crop_size[0], ishape[0])
crop_size[1] = tf.minimum(crop_size[1], ishape[1])
crop_offset = tf.cast(crop_offset, tf.int32)
crop_size = tf.cast(crop_size, tf.int32)
image = tf.slice(image, crop_offset, crop_size)
crop_info = tf.stack([
tf.cast(ishape, tf.float32),
tf.cast(tf.shape(image)[:2], dtype=tf.float32),
tf.ones_like(ishape, dtype=tf.float32),
tf.cast(crop_offset[:2], tf.float32)
])
return image, crop_info
def resize_and_jitter_image(image,
desired_size,
jitter=0.0,
letter_box=None,
random_pad=True,
crop_only=False,
shiftx=0.5,
shifty=0.5,
cut=None,
method=tf.image.ResizeMethod.BILINEAR,
seed=None):
"""Resize, Pad, and distort a given input image following Darknet.
"""
def intersection(a, b):
minx = tf.maximum(a[0], b[0])
miny = tf.maximum(a[1], b[1])
maxx = tf.minimum(a[2], b[2])
maxy = tf.minimum(a[3], b[3])
return tf.convert_to_tensor([minx, miny, maxx, maxy])
def cast(values, dtype):
return [tf.cast(value, dtype) for value in values]
if jitter > 0.5 or jitter < 0:
raise Exception("maximum change in aspect ratio must be between 0 and 0.5")
with tf.name_scope('resize_and_jitter_image'):
# Cast all parameters to a usable float data type.
jitter = tf.cast(jitter, tf.float32)
original_dtype, original_dims = image.dtype, tf.shape(image)[:2]
# original width, original height, desigered width, desired height
original_width, original_height, width, height = cast(
[original_dims[1], original_dims[0], desired_size[1], desired_size[0]],
tf.float32)
# Compute the random delta width and height etc. and randomize the
# location of the corner points.
jitter_width = original_width * jitter
jitter_height = original_height * jitter
pleft = rand_uniform_strong(
-jitter_width, jitter_width, jitter_width.dtype, seed=seed)
pright = rand_uniform_strong(
-jitter_width, jitter_width, jitter_width.dtype, seed=seed)
ptop = rand_uniform_strong(
-jitter_height, jitter_height, jitter_height.dtype, seed=seed)
pbottom = rand_uniform_strong(
-jitter_height, jitter_height, jitter_height.dtype, seed=seed)
# Letter box the image.
if letter_box == True or letter_box is None:
image_aspect_ratio, input_aspect_ratio = original_width / original_height, width / height
distorted_aspect = image_aspect_ratio / input_aspect_ratio
delta_h, delta_w = 0.0, 0.0
pullin_h, pullin_w = 0.0, 0.0
if distorted_aspect > 1:
delta_h = ((original_width / input_aspect_ratio) - original_height) / 2
else:
delta_w = ((original_height * input_aspect_ratio) - original_width) / 2
if letter_box is None:
rwidth = original_width + delta_w + delta_w
rheight = original_height + delta_h + delta_h
if rheight < height and rwidth < width:
pullin_h = ((height - rheight) * rheight / height) / 2
pullin_w = ((width - rwidth) * rwidth / width) / 2
ptop = ptop - delta_h - pullin_h
pbottom = pbottom - delta_h - pullin_h
pright = pright - delta_w - pullin_w
pleft = pleft - delta_w - pullin_w
# Compute the width and height to crop or pad too, and clip all crops to
# to be contained within the image.
swidth = original_width - pleft - pright
sheight = original_height - ptop - pbottom
src_crop = intersection([ptop, pleft, sheight + ptop, swidth + pleft],
[0, 0, original_height, original_width])
# Random padding used for mosaic.
h_ = src_crop[2] - src_crop[0]
w_ = src_crop[3] - src_crop[1]
if random_pad:
rmh = tf.maximum(0.0, -ptop)
rmw = tf.maximum(0.0, -pleft)
else:
rmw = (swidth - w_) * shiftx
rmh = (sheight - h_) * shifty
# Cast cropping params to usable dtype.
src_crop = tf.cast(src_crop, tf.int32)
# Compute padding parmeters.
dst_shape = [rmh, rmw, rmh + h_, rmw + w_]
ptop, pleft, pbottom, pright = dst_shape
pad = dst_shape * tf.cast([1, 1, -1, -1], ptop.dtype)
pad += tf.cast([0, 0, sheight, swidth], ptop.dtype)
pad = tf.cast(pad, tf.int32)
infos = []
# Crop the image to desired size.
cropped_image = tf.slice(
image, [src_crop[0], src_crop[1], 0],
[src_crop[2] - src_crop[0], src_crop[3] - src_crop[1], -1])
crop_info = tf.stack([
tf.cast(original_dims, tf.float32),
tf.cast(tf.shape(cropped_image)[:2], dtype=tf.float32),
tf.ones_like(original_dims, dtype=tf.float32),
tf.cast(src_crop[:2], tf.float32)
])
infos.append(crop_info)
if crop_only:
if not letter_box:
h_, w_ = cast(get_image_shape(cropped_image), width.dtype)
width = tf.cast(tf.round((w_ * width) / swidth), tf.int32)
height = tf.cast(tf.round((h_ * height) / sheight), tf.int32)
cropped_image = tf.image.resize(
cropped_image, [height, width], method=method)
cropped_image = tf.cast(cropped_image, original_dtype)
return cropped_image, infos, cast([
original_width, original_height, width, height, ptop, pleft, pbottom,
pright
], tf.int32)
# Pad the image to desired size.
image_ = tf.pad(
cropped_image, [[pad[0], pad[2]], [pad[1], pad[3]], [0, 0]],
constant_values=get_pad_value())
pad_info = tf.stack([
tf.cast(tf.shape(cropped_image)[:2], tf.float32),
tf.cast(tf.shape(image_)[:2], dtype=tf.float32),
tf.ones_like(original_dims, dtype=tf.float32),
(-tf.cast(pad[:2], tf.float32))
])
infos.append(pad_info)
temp = tf.shape(image_)[:2]
cond = temp > tf.cast(desired_size, temp.dtype)
if tf.reduce_any(cond):
size = tf.cast(desired_size, temp.dtype)
size = tf.where(cond, size, temp)
image_ = tf.image.resize(
image_, (size[0], size[1]), method=tf.image.ResizeMethod.AREA)
image_ = tf.cast(image_, original_dtype)
image_ = tf.image.resize(
image_, (desired_size[0], desired_size[1]),
method=tf.image.ResizeMethod.BILINEAR,
antialias=False)
image_ = tf.cast(image_, original_dtype)
if cut is not None:
image_, crop_info = mosaic_cut(image_, original_width, original_height,
width, height, cut, ptop, pleft, pbottom,
pright, shiftx, shifty)
infos.append(crop_info)
return image_, infos, cast([
original_width, original_height, width, height, ptop, pleft, pbottom,
pright
], tf.float32)
def _build_transform(image,
perspective=0.00,
degrees=0.0,
scale_min=1.0,
scale_max=1.0,
translate=0.0,
random_pad=False,
desired_size=None,
seed=None):
"""Builds a unifed affine transformation to spatially augment the image."""
height, width = get_image_shape(image)
ch = height = tf.cast(height, tf.float32)
cw = width = tf.cast(width, tf.float32)
deg_to_rad = lambda x: tf.cast(x, tf.float32) * np.pi / 180.0
if desired_size is not None:
desired_size = tf.cast(desired_size, tf.float32)
ch = desired_size[0]
cw = desired_size[1]
# Compute the center of the image in the output resulution.
center = tf.eye(3, dtype=tf.float32)
center = tf.tensor_scatter_nd_update(center, [[0, 2], [1, 2]],
[-cw / 2, -ch / 2])
center_boxes = tf.tensor_scatter_nd_update(center, [[0, 2], [1, 2]],
[cw / 2, ch / 2])
# Compute a random rotation to apply.
rotation = tf.eye(3, dtype=tf.float32)
a = deg_to_rad(rand_uniform_strong(-degrees, degrees, seed=seed))
cos = tf.math.cos(a)
sin = tf.math.sin(a)
rotation = tf.tensor_scatter_nd_update(rotation,
[[0, 0], [0, 1], [1, 0], [1, 1]],
[cos, -sin, sin, cos])
rotation_boxes = tf.tensor_scatter_nd_update(rotation,
[[0, 0], [0, 1], [1, 0], [1, 1]],
[cos, sin, -sin, cos])
# Compute a random prespective change to apply.
prespective_warp = tf.eye(3)
Px = rand_uniform_strong(-perspective, perspective, seed=seed)
Py = rand_uniform_strong(-perspective, perspective, seed=seed)
prespective_warp = tf.tensor_scatter_nd_update(prespective_warp,
[[2, 0], [2, 1]], [Px, Py])
prespective_warp_boxes = tf.tensor_scatter_nd_update(prespective_warp,
[[2, 0], [2, 1]],
[-Px, -Py])
# Compute a random scaling to apply.
scale = tf.eye(3, dtype=tf.float32)
s = rand_uniform_strong(scale_min, scale_max, seed=seed)
scale = tf.tensor_scatter_nd_update(scale, [[0, 0], [1, 1]], [1 / s, 1 / s])
scale_boxes = tf.tensor_scatter_nd_update(scale, [[0, 0], [1, 1]], [s, s])
# Compute a random Translation to apply.
translation = tf.eye(3)
if (random_pad and height * s < ch and width * s < cw):
# The image is contained within the image and arbitrarily translated to
# locations with in the image.
center = center_boxes = tf.eye(3, dtype=tf.float32)
Tx = rand_uniform_strong(-1, 0, seed=seed) * (cw / s - width)
Ty = rand_uniform_strong(-1, 0, seed=seed) * (ch / s - height)
else:
# The image can be translated outside of the output resolution window
# but the image is translated relative to the output resolution not the
# input image resolution.
Tx = rand_uniform_strong(0.5 - translate, 0.5 + translate, seed=seed)
Ty = rand_uniform_strong(0.5 - translate, 0.5 + translate, seed=seed)
# Center and Scale the image such that the window of translation is
# contained to the output resolution.
dx, dy = (width - cw / s) / width, (height - ch / s) / height
sx, sy = 1 - dx, 1 - dy
bx, by = dx / 2, dy / 2
Tx, Ty = bx + (sx * Tx), by + (sy * Ty)
# Scale the translation to width and height of the image.
Tx *= width
Ty *= height
translation = tf.tensor_scatter_nd_update(translation, [[0, 2], [1, 2]],
[Tx, Ty])
translation_boxes = tf.tensor_scatter_nd_update(translation, [[0, 2], [1, 2]],
[-Tx, -Ty])
# Use repeated matric multiplications to combine all the image transforamtions
# into a single unified augmentation operation M is applied to the image
# Mb is to apply to the boxes. The order of matrix multiplication is
# important. First, Translate, then Scale, then Rotate, then Center, then
# finally alter the Prepsective.
affine = (translation @ scale @ rotation @ center @ prespective_warp)
affine_boxes = (
prespective_warp_boxes @ center_boxes @ rotation_boxes @ scale_boxes
@ translation_boxes)
return affine, affine_boxes, s
def affine_warp_image(image,
desired_size,
perspective=0.00,
degrees=0.0,
scale_min=1.0,
scale_max=1.0,
translate=0.0,
random_pad=False,
seed=None):
# Build an image transformation matrix.
image_size = tf.cast(get_image_shape(image), tf.float32)
affine_matrix, affine_boxes, _ = _build_transform(
image,
perspective=perspective,
degrees=degrees,
scale_min=scale_min,
scale_max=scale_max,
translate=translate,
random_pad=random_pad,
desired_size=desired_size,
seed=seed)
affine = tf.reshape(affine_matrix, [-1])
affine = tf.cast(affine[:-1], tf.float32)
# Apply the transformation to image.
image = tfa.image.transform(
image,
affine,
fill_value=get_pad_value(),
output_shape=desired_size,
interpolation='bilinear')
desired_size = tf.cast(desired_size, tf.float32)
return image, affine_matrix, [image_size, desired_size, affine_boxes]
# ops for box clipping and cleaning
def affine_warp_boxes(affine, boxes, output_size, box_history):
def _get_corners(box):
"""Get the corner of each box as a tuple of (x, y) coordinates"""
ymi, xmi, yma, xma = tf.split(box, 4, axis=-1)
tl = tf.concat([xmi, ymi], axis=-1)
bl = tf.concat([xmi, yma], axis=-1)
tr = tf.concat([xma, ymi], axis=-1)
br = tf.concat([xma, yma], axis=-1)
return tf.concat([tl, bl, tr, br], axis=-1)
def _corners_to_boxes(corner):
"""Convert (x, y) corner tuples back into boxes in the format
[ymin, xmin, ymax, xmax]"""
corner = tf.reshape(corner, [-1, 4, 2])
y = corner[..., 1]
x = corner[..., 0]
y_min = tf.reduce_min(y, axis=-1)
x_min = tf.reduce_min(x, axis=-1)
y_max = tf.reduce_max(y, axis=-1)
x_max = tf.reduce_max(x, axis=-1)
return tf.stack([y_min, x_min, y_max, x_max], axis=-1)
def _aug_boxes(affine_matrix, box):
"""Apply an affine transformation matrix M to the boxes to get the
randomly augmented boxes"""
corners = _get_corners(box)
corners = tf.reshape(corners, [-1, 4, 2])
z = tf.expand_dims(tf.ones_like(corners[..., 1]), axis=-1)
corners = tf.concat([corners, z], axis=-1)
corners = tf.transpose(
tf.matmul(affine_matrix, corners, transpose_b=True), perm=(0, 2, 1))
corners, p = tf.split(corners, [2, 1], axis=-1)
corners /= p
corners = tf.reshape(corners, [-1, 8])
box = _corners_to_boxes(corners)
return box
boxes = _aug_boxes(affine, boxes)
box_history = _aug_boxes(affine, box_history)
clipped_boxes = bbox_ops.clip_boxes(boxes, output_size)
return clipped_boxes, box_history
def boxes_candidates(clipped_boxes,
box_history,
wh_thr=2,
ar_thr=20,
area_thr=0.1):
area_thr = tf.math.abs(area_thr)
# Get the scaled and shifted heights of the original
# unclipped boxes.
og_height = tf.maximum(box_history[:, 2] - box_history[:, 0], 0.0)
og_width = tf.maximum(box_history[:, 3] - box_history[:, 1], 0.0)
# Get the scaled and shifted heights of the clipped boxes.
clipped_height = tf.maximum(clipped_boxes[:, 2] - clipped_boxes[:, 0], 0.0)
clipped_width = tf.maximum(clipped_boxes[:, 3] - clipped_boxes[:, 1], 0.0)
# Determine the aspect ratio of the clipped boxes.
ar = tf.maximum(clipped_width / (clipped_height + 1e-16),
clipped_height / (clipped_width + 1e-16))
# Ensure the clipped width adn height are larger than a preset threshold.
conda = clipped_width > wh_thr
condb = clipped_height > wh_thr
# Ensure the area of the clipped box is larger than the area threshold.
area = (clipped_height * clipped_width) / (og_width * og_height + 1e-16)
condc = area > area_thr
# Ensure the aspect ratio is not too extreme.
condd = ar < ar_thr
cond = tf.expand_dims(
tf.logical_and(
tf.logical_and(conda, condb), tf.logical_and(condc, condd)),
axis=-1)
# Set all the boxes that fail the test to be equal to zero.
indices = tf.where(cond)
return indices[:, 0]
def resize_and_crop_boxes(boxes, image_scale, output_size, offset, box_history):
# Shift and scale the input boxes.
boxes *= tf.tile(tf.expand_dims(image_scale, axis=0), [1, 2])
boxes -= tf.tile(tf.expand_dims(offset, axis=0), [1, 2])
# Check the hitory of the boxes.
box_history *= tf.tile(tf.expand_dims(image_scale, axis=0), [1, 2])
box_history -= tf.tile(tf.expand_dims(offset, axis=0), [1, 2])
# Clip the shifted and scaled boxes.
clipped_boxes = bbox_ops.clip_boxes(boxes, output_size)
return clipped_boxes, box_history
def apply_infos(boxes,
infos,
affine=None,
shuffle_boxes=False,
area_thresh=0.1,
seed=None,
augment=True):
# Clip and clean boxes.
def get_valid_boxes(boxes):
"""Get indices for non-empty boxes."""
# Convert the boxes to center width height formatting.
height = boxes[:, 2] - boxes[:, 0]
width = boxes[:, 3] - boxes[:, 1]
base = tf.logical_and(tf.greater(height, 0), tf.greater(width, 0))
return base
# Initialize history to track operation applied to boxes
box_history = boxes
# Make sure all boxes are valid to start, clip to [0, 1] and get only the
# valid boxes.
output_size = tf.cast([640, 640], tf.float32)
if augment:
boxes = tf.math.maximum(tf.math.minimum(boxes, 1.0), 0.0)
cond = get_valid_boxes(boxes)
if infos is None:
infos = []
for info in infos:
# Denormalize the boxes.
boxes = bbox_ops.denormalize_boxes(boxes, info[0])
box_history = bbox_ops.denormalize_boxes(box_history, info[0])
# Shift and scale all boxes, and keep track of box history with no
# box clipping, history is used for removing boxes that have become
# too small or exit the image area.
(
boxes, # Clipped final boxes.
box_history) = resize_and_crop_boxes(
boxes, info[2, :], info[1, :], info[3, :], box_history=box_history)
# Get all the boxes that still remain in the image and store
# in a bit vector for later use.
cond = tf.logical_and(get_valid_boxes(boxes), cond)
# Normalize the boxes to [0, 1].
output_size = info[1]
boxes = bbox_ops.normalize_boxes(boxes, output_size)
box_history = bbox_ops.normalize_boxes(box_history, output_size)
if affine is not None:
# Denormalize the boxes.
boxes = bbox_ops.denormalize_boxes(boxes, affine[0])
box_history = bbox_ops.denormalize_boxes(box_history, affine[0])
(
boxes, # Clipped final boxes.
box_history) = affine_warp_boxes(
affine[2], boxes, affine[1], box_history=box_history)
# Get all the boxes that still remain in the image and store
# in a bit vector for later use.
cond = tf.logical_and(get_valid_boxes(boxes), cond)
# Normalize the boxes to [0, 1].
output_size = affine[1]
boxes = bbox_ops.normalize_boxes(boxes, output_size)
box_history = bbox_ops.normalize_boxes(box_history, output_size)
# Remove the bad boxes.
boxes *= tf.cast(tf.expand_dims(cond, axis=-1), boxes.dtype)
# Threshold the existing boxes.
if augment:
boxes_ = bbox_ops.denormalize_boxes(boxes, output_size)
box_history_ = bbox_ops.denormalize_boxes(box_history, output_size)
inds = boxes_candidates(boxes_, box_history_, area_thr=area_thresh)
# Select and gather the good boxes.
if shuffle_boxes:
inds = tf.random.shuffle(inds, seed=seed)
else:
boxes = box_history
boxes_ = bbox_ops.denormalize_boxes(boxes, output_size)
inds = bbox_ops.get_non_empty_box_indices(boxes_)
boxes = tf.gather(boxes, inds)
return boxes, inds
def _gen_viable_box_mask(boxes):
"""Generate a mask to filter the boxes to only those with in the image. """
equal = tf.reduce_all(tf.math.less_equal(boxes[..., 2:4], 0), axis=-1)
lower_bound = tf.reduce_any(tf.math.less(boxes[..., 0:2], 0.0), axis=-1)
upper_bound = tf.reduce_any(
tf.math.greater_equal(boxes[..., 0:2], 1.0), axis=-1)
negative_mask = tf.logical_or(tf.logical_or(equal, lower_bound), upper_bound)
return tf.logical_not(negative_mask)
def _get_box_locations(anchors, mask, boxes):
"""Calculate the number of anchors associated with each ground truth box."""
box_mask = _gen_viable_box_mask(boxes)
mask = tf.reshape(mask, [1, 1, 1, -1])
box_mask = tf.reshape(box_mask, [-1, 1, 1])
anchors = tf.expand_dims(anchors, axis=-1)
# split the anchors into the best matches and other wise
anchors_primary, anchors_alternate = tf.split(anchors, [1, -1], axis=-2)
anchors_alternate = tf.concat(
[-tf.ones_like(anchors_primary), anchors_alternate], axis=-2)
# convert all the masks into index locations
viable_primary = tf.where(
tf.squeeze(tf.logical_and(box_mask, anchors_primary == mask), axis=0))
viable_alternate = tf.where(
tf.squeeze(tf.logical_and(box_mask, anchors_alternate == mask), axis=0))
viable_full = tf.where(
tf.squeeze(tf.logical_and(box_mask, anchors == mask), axis=0))
# compute the number of anchors associated with each ground truth box.
acheck = tf.reduce_any(anchors == mask, axis=-1)
repititions = tf.squeeze(
tf.reduce_sum(tf.cast(acheck, mask.dtype), axis=-1), axis=0)
# cast to int32
viable_primary = tf.cast(viable_primary, tf.int32)
viable_alternate = tf.cast(viable_alternate, tf.int32)
viable_full = tf.cast(viable_full, tf.int32)
return repititions, viable_primary, viable_alternate, viable_full
def _write_sample(box, anchor_id, offset, sample, ind_val, ind_sample, height,
width, num_written):
"""Find the correct x,y indexs for each box in the output groundtruth."""
anchor_index = tf.convert_to_tensor([tf.cast(anchor_id, tf.int32)])
gain = tf.cast(tf.convert_to_tensor([width, height]), box.dtype)
y = box[1] * height
x = box[0] * width
y_index = tf.convert_to_tensor([tf.cast(y, tf.int32)])
x_index = tf.convert_to_tensor([tf.cast(x, tf.int32)])
grid_idx = tf.concat([y_index, x_index, anchor_index], axis=-1)
ind_val = ind_val.write(num_written, grid_idx)
ind_sample = ind_sample.write(num_written, sample)
num_written += 1
if offset > 0:
offset = tf.cast(offset, x.dtype)
grid_xy = tf.cast(tf.convert_to_tensor([x, y]), x.dtype)
clamp = lambda x, ma: tf.maximum(
tf.minimum(x, tf.cast(ma, x.dtype)), tf.zeros_like(x))
grid_xy_index = grid_xy - tf.floor(grid_xy)
positive_shift = ((grid_xy_index < offset) & (grid_xy > 1.))
negative_shift = ((grid_xy_index > (1 - offset)) & (grid_xy < (gain - 1.)))
shifts = [
positive_shift[0], positive_shift[1], negative_shift[0],
negative_shift[1]
]
offset = tf.cast([[1, 0], [0, 1], [-1, 0], [0, -1]], offset.dtype) * offset
for i in range(4):
if shifts[i]:
x_index = tf.convert_to_tensor([tf.cast(x - offset[i, 0], tf.int32)])
y_index = tf.convert_to_tensor([tf.cast(y - offset[i, 1], tf.int32)])
grid_idx = tf.concat([
clamp(y_index, height - 1),
clamp(x_index, width - 1), anchor_index
],
axis=-1)
ind_val = ind_val.write(num_written, grid_idx)
ind_sample = ind_sample.write(num_written, sample)
num_written += 1
return ind_val, ind_sample, num_written
def _write_grid(viable, num_reps, boxes, classes, ious, ind_val, ind_sample,
height, width, num_written, num_instances, offset):
"""Iterate all viable anchor boxes and write each sample to groundtruth."""
const = tf.cast(tf.convert_to_tensor([1.]), dtype=boxes.dtype)
num_viable = tf.shape(viable)[0]
for val in range(num_viable):
idx = viable[val]
obj_id, anchor, anchor_idx = idx[0], idx[1], idx[2]
if num_written >= num_instances:
break
reps = tf.convert_to_tensor([num_reps[obj_id]])
box = boxes[obj_id]
cls_ = classes[obj_id]
iou = tf.convert_to_tensor([ious[obj_id, anchor]])
sample = tf.concat([box, const, cls_, iou, reps], axis=-1)
ind_val, ind_sample, num_written = _write_sample(box, anchor_idx, offset,
sample, ind_val,
ind_sample, height, width,
num_written)
return ind_val, ind_sample, num_written
def _write_anchor_free_grid(boxes,
classes,
height,
width,
num_written,
stride,
fpn_limits,
center_radius=2.5):
"""Iterate all boxes and write to grid without anchors boxes."""
gen = loss_utils.GridGenerator(
masks=None, anchors=[[1, 1]], scale_anchors=stride)
grid_points = gen(width, height, 1, boxes.dtype)[0]
grid_points = tf.squeeze(grid_points, axis=0)
box_list = boxes
class_list = classes
grid_points = (grid_points + 0.5) * stride
x_centers, y_centers = grid_points[..., 0], grid_points[..., 1]
boxes *= (tf.convert_to_tensor([width, height, width, height]) * stride)
tlbr_boxes = box_ops.xcycwh_to_yxyx(boxes)
boxes = tf.reshape(boxes, [1, 1, -1, 4])
tlbr_boxes = tf.reshape(tlbr_boxes, [1, 1, -1, 4])
mask = tf.reshape(class_list != -1, [1, 1, -1])
# check if the box is in the receptive feild of the this fpn level
b_t = y_centers - tlbr_boxes[..., 0]
b_l = x_centers - tlbr_boxes[..., 1]
b_b = tlbr_boxes[..., 2] - y_centers
b_r = tlbr_boxes[..., 3] - x_centers
box_delta = tf.stack([b_t, b_l, b_b, b_r], axis=-1)
if fpn_limits is not None:
max_reg_targets_per_im = tf.reduce_max(box_delta, axis=-1)
gt_min = max_reg_targets_per_im >= fpn_limits[0]
gt_max = max_reg_targets_per_im <= fpn_limits[1]
is_in_boxes = tf.logical_and(gt_min, gt_max)
else:
is_in_boxes = tf.reduce_min(box_delta, axis=-1) > 0.0
is_in_boxes = tf.logical_and(is_in_boxes, mask)
is_in_boxes_all = tf.reduce_any(is_in_boxes, axis=(0, 1), keepdims=True)
# check if the center is in the receptive feild of the this fpn level
c_t = y_centers - (boxes[..., 1] - center_radius * stride)
c_l = x_centers - (boxes[..., 0] - center_radius * stride)
c_b = (boxes[..., 1] + center_radius * stride) - y_centers
c_r = (boxes[..., 0] + center_radius * stride) - x_centers
centers_delta = tf.stack([c_t, c_l, c_b, c_r], axis=-1)
is_in_centers = tf.reduce_min(centers_delta, axis=-1) > 0.0
is_in_centers = tf.logical_and(is_in_centers, mask)
is_in_centers_all = tf.reduce_any(is_in_centers, axis=(0, 1), keepdims=True)
# colate all masks to get the final locations
is_in_index = tf.logical_or(is_in_boxes_all, is_in_centers_all)
is_in_boxes_and_center = tf.logical_and(is_in_boxes, is_in_centers)
is_in_boxes_and_center = tf.logical_and(is_in_index, is_in_boxes_and_center)
# construct the index update grid
reps = tf.reduce_sum(tf.cast(is_in_boxes_and_center, tf.int16), axis=-1)
indexes = tf.cast(tf.where(is_in_boxes_and_center), tf.int32)
y, x, t = tf.split(indexes, 3, axis=-1)
boxes = tf.gather_nd(box_list, t)
classes = tf.cast(tf.gather_nd(class_list, t), boxes.dtype)
reps = tf.gather_nd(reps, tf.concat([y, x], axis=-1))
reps = tf.cast(tf.expand_dims(reps, axis=-1), boxes.dtype)
conf = tf.ones_like(classes)
# return the samples and the indexes
samples = tf.concat([boxes, conf, classes, conf, reps], axis=-1)
indexes = tf.concat([y, x, tf.zeros_like(t)], axis=-1)
num_written = tf.shape(reps)[0]
return indexes, samples, num_written
def build_grided_gt_ind(y_true,
mask,
sizew,
sizeh,
dtype,
scale_xy,
scale_num_inst,
use_tie_breaker,
stride,
fpn_limits=None):
"""Convert ground truth for use in loss functions.
Args:
y_true: tf.Tensor[] ground truth
[batch, box coords[0:4], classes_onehot[0:-1], best_fit_anchor_box]
mask: list of the anchor boxes choresponding to the output,
ex. [1, 2, 3] tells this layer to predict only the first 3 anchors
in the total.
size: the dimensions of this output, for regular, it progresses from
13, to 26, to 52
num_classes: `integer` for the number of classes
dtype: expected output datatype
scale_xy: A `float` to represent the amount the boxes are scaled in the
loss function.
scale_num_inst: A `float` to represent the scale at which to multiply the
number of predicted boxes by to get the number of instances to write
to the grid.
Return:
tf.Tensor[] of shape [batch, size, size, #of_anchors, 4, 1, num_classes]
"""
# unpack required components from the input ground truth
boxes = tf.cast(y_true['bbox'], dtype)
classes = tf.expand_dims(tf.cast(y_true['classes'], dtype=dtype), axis=-1)
anchors = tf.cast(y_true['best_anchors'], dtype)
ious = tf.cast(y_true['best_iou_match'], dtype)
width = tf.cast(sizew, boxes.dtype)
height = tf.cast(sizeh, boxes.dtype)
# get the number of anchor boxes used for this anchor scale
len_masks = len(mask)
# number of anchors
num_instances = tf.shape(boxes)[-2] * scale_num_inst
# rescale the x and y centers to the size of the grid [size, size]
pull_in = tf.cast(0.5 * (scale_xy - 1), boxes.dtype)
mask = tf.cast(mask, dtype=dtype)
num_reps, viable_primary, viable_alternate, viable = _get_box_locations(
anchors, mask, boxes)
# tensor arrays for tracking samples
num_written = 0
if fpn_limits is not None:
(indexes, samples,
num_written) = _write_anchor_free_grid(boxes, classes, height, width,
num_written, stride, fpn_limits)
else:
ind_val = tf.TensorArray(
tf.int32, size=0, dynamic_size=True, element_shape=[
3,
])
ind_sample = tf.TensorArray(
dtype, size=0, dynamic_size=True, element_shape=[
8,
])
if pull_in > 0.0:
(ind_val, ind_sample,
num_written) = _write_grid(viable, num_reps, boxes, classes, ious,
ind_val, ind_sample, height, width,
num_written, num_instances, pull_in)
else:
(ind_val, ind_sample,
num_written) = _write_grid(viable_primary, num_reps, boxes, classes,
ious, ind_val, ind_sample, height, width,
num_written, num_instances, 0.0)
if use_tie_breaker:
(ind_val, ind_sample,
num_written) = _write_grid(viable_alternate, num_reps, boxes, classes,
ious, ind_val, ind_sample, height, width,
num_written, num_instances, 0.0)
indexes = ind_val.stack()
samples = ind_sample.stack()
(_, ind_mask, _, _, num_reps) = tf.split(samples, [4, 1, 1, 1, 1], axis=-1)
full = tf.zeros([sizeh, sizew, len_masks, 1], dtype=dtype)
full = tf.tensor_scatter_nd_add(full, indexes, ind_mask)
if num_written >= num_instances:
tf.print("clipped")
indexs = pad_max_instances(indexes, num_instances, pad_value=0, pad_axis=0)
samples = pad_max_instances(samples, num_instances, pad_value=0, pad_axis=0)
return indexs, samples, full
def get_best_anchor(y_true,
anchors,
width=1,
height=1,
iou_thresh=0.25,
best_match_only=False):
"""
get the correct anchor that is assoiciated with each box using IOU
Args:
y_true: tf.Tensor[] for the list of bounding boxes in the yolo format
anchors: list or tensor for the anchor boxes to be used in prediction
found via Kmeans
width: int for the image width
height: int for the image height
Return:
tf.Tensor: y_true with the anchor associated with each ground truth
box known
"""
with tf.name_scope('get_best_anchor'):
is_batch = True
ytrue_shape = y_true.get_shape()
if ytrue_shape.ndims == 2:
is_batch = False
y_true = tf.expand_dims(y_true, 0)
elif ytrue_shape.ndims is None:
is_batch = False
y_true = tf.expand_dims(y_true, 0)
y_true.set_shape([None] * 3)
elif ytrue_shape.ndims != 3:
raise ValueError('\'box\' (shape %s) must have either 3 or 4 dimensions.')
width = tf.cast(width, dtype=tf.float32)
height = tf.cast(height, dtype=tf.float32)
scaler = tf.convert_to_tensor([width, height])
true_wh = tf.cast(y_true[..., 2:4], dtype=tf.float32) * scaler
anchors = tf.cast(anchors, dtype=tf.float32)
k = tf.shape(anchors)[0]
anchors = tf.expand_dims(
tf.concat([tf.zeros_like(anchors), anchors], axis=-1), axis=0)
truth_comp = tf.concat([tf.zeros_like(true_wh), true_wh], axis=-1)
if iou_thresh >= 1.0:
anchors = tf.expand_dims(anchors, axis=-2)
truth_comp = tf.expand_dims(truth_comp, axis=-3)
aspect = truth_comp[..., 2:4] / anchors[..., 2:4]
aspect = tf.where(tf.math.is_nan(aspect), tf.zeros_like(aspect), aspect)
aspect = tf.maximum(aspect, 1 / aspect)
aspect = tf.where(tf.math.is_nan(aspect), tf.zeros_like(aspect), aspect)
aspect = tf.reduce_max(aspect, axis=-1)
values, indexes = tf.math.top_k(
tf.transpose(-aspect, perm=[0, 2, 1]),
k=tf.cast(k, dtype=tf.int32),
sorted=True)
values = -values
ind_mask = tf.cast(values < iou_thresh, dtype=indexes.dtype)
else:
# iou_raw = box_ops.compute_iou(truth_comp, anchors)
truth_comp = box_ops.xcycwh_to_yxyx(truth_comp)
anchors = box_ops.xcycwh_to_yxyx(anchors)
iou_raw = box_ops.aggregated_comparitive_iou(
truth_comp,
anchors,
iou_type=3,
)
values, indexes = tf.math.top_k(
iou_raw, #tf.transpose(iou_raw, perm=[0, 2, 1]),
k=tf.cast(k, dtype=tf.int32),
sorted=True)
ind_mask = tf.cast(values >= iou_thresh, dtype=indexes.dtype)
# pad the indexs such that all values less than the thresh are -1
# add one, multiply the mask to zeros all the bad locations
# subtract 1 makeing all the bad locations 0.
if best_match_only:
iou_index = ((indexes[..., 0:] + 1) * ind_mask[..., 0:]) - 1
else:
iou_index = tf.concat([
tf.expand_dims(indexes[..., 0], axis=-1),
((indexes[..., 1:] + 1) * ind_mask[..., 1:]) - 1
],
axis=-1)
true_prod = tf.reduce_prod(true_wh, axis=-1, keepdims=True)
iou_index = tf.where(true_prod > 0, iou_index, tf.zeros_like(iou_index) - 1)
if not is_batch:
iou_index = tf.squeeze(iou_index, axis=0)
values = tf.squeeze(values, axis=0)
return tf.cast(iou_index, dtype=tf.float32), tf.cast(values, dtype=tf.float32)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment