Unverified Commit 83b992c5 authored by Akhil Chinnakotla's avatar Akhil Chinnakotla Committed by GitHub
Browse files

YOLO Family: Data Loaders (#9493)



* Data Loading

* data loading

* data loading

* data loading

* Bug fixes

* p

* documentation

* preprocessing_ops

* Dataloader PR Clean-Up

* First Set Of Code Review Fixes

* deleting imagenet

* Testing Functions

* Preserve aspect ratio
Co-Authored-By: default avatarVishnu Banna <vishnubanna@users.noreply.github.com>

* Put back the testing functions
Co-Authored-By: default avatarAkhil Chinnakotla <The-Indian-Chinna@users.noreply.github.com>

* Lint
Co-Authored-By: default avatarAkhil Chinnakotla <The-Indian-Chinna@users.noreply.github.com>

* Add forgotten build_grided_gt function
Co-Authored-By: default avatarVishnu Banna <vishnubanna@users.noreply.github.com>

* Add postprocessing function back
Co-Authored-By: default avatarVishnu Banna <vishnubanna@users.noreply.github.com>

* Change set to list
Co-Authored-By: default avatarVishnu Banna <vishnubanna@users.noreply.github.com>

* Fix small bugs
Co-Authored-By: default avatarVishnu Banna <vishnubanna@users.noreply.github.com>

* Add test case for training
Co-Authored-By: default avatarVishnu Banna <vishnubanna@users.noreply.github.com>

* Rename utils to ops
Co-Authored-By: default avatarVishnu Banna <vishnubanna@users.noreply.github.com>

* Remove pct_rand
Co-Authored-By: default avatarVishnu Banna <vishnubanna@users.noreply.github.com>

* More correct documentationin box_ops

* Remove junk files

* added tests for preprocessing_ops

* Add more descriptive comments

* Do not hardcode randomscale

* Remove useless BoxOps
Co-Authored-By: default avatarVishnu Banna <vishnubanna@users.noreply.github.com>

* added test for box_ops

* Merge branch 'dataloaders_pr' of https://github.com/PurdueCAM2Project/tf-models

 into dataloaders_pr

* Updated Docstring

* Lint
Co-Authored-By: default avatarAkhil Chinnakotla <The-Indian-Chinna@users.noreply.github.com>
Co-authored-by: default avataranivegesana <anirudh.vegesana@gmail.com>
Co-authored-by: default avatarTyan3001 <yan262@purdue.edu>
Co-authored-by: default avatarVishnu Banna <vishnubanna@users.noreply.github.com>
Co-authored-by: default avatarAkhil Chinnakotla <The-Indian-Chinna@users.noreply.github.com>
parent 6e01e1cd
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
"""TFDS Classification decoder.""" """TFDS Classification decoder."""
import tensorflow as tf import tensorflow as tf
from official.vision.beta.dataloaders import decoder from official.vision.beta.dataloaders import decoder
...@@ -27,10 +26,9 @@ class Decoder(decoder.Decoder): ...@@ -27,10 +26,9 @@ class Decoder(decoder.Decoder):
def decode(self, serialized_example): def decode(self, serialized_example):
sample_dict = { sample_dict = {
'image/encoded': tf.io.encode_jpeg( 'image/encoded':
serialized_example['image'], quality=100), tf.io.encode_jpeg(serialized_example['image'], quality=100),
'image/class/label': serialized_example['label'], 'image/class/label':
serialized_example['label'],
} }
return sample_dict return sample_dict
import tensorflow as tf
from official.vision.beta.dataloaders import decoder
class MSCOCODecoder(decoder.Decoder):
"""Tensorflow Example proto decoder."""
def __init__(self, include_mask=False, regenerate_source_id=False):
self._include_mask = include_mask
self._regenerate_source_id = regenerate_source_id
def decode(self, sample):
"""Decode the serialized example
Args:
sample: a dictonary example produced by tfds.
Returns:
decoded_tensors: a dictionary of tensors with the following fields:
- source_id: a string scalar tensor.
- image: a uint8 tensor of shape [None, None, 3].
- height: an integer scalar tensor.
- width: an integer scalar tensor.
- groundtruth_classes: a int64 tensor of shape [None].
- groundtruth_is_crowd: a bool tensor of shape [None].
- groundtruth_area: a float32 tensor of shape [None].
- groundtruth_boxes: a float32 tensor of shape [None, 4].
- groundtruth_instance_masks: a float32 tensor of shape
[None, None, None].
"""
decoded_tensors = {
'source_id': sample['image/id'],
'image': sample['image'],
'height': tf.shape(sample['image'])[0],
'width': tf.shape(sample['image'])[1],
'groundtruth_classes': sample['objects']['label'],
'groundtruth_is_crowd': sample['objects']['is_crowd'],
'groundtruth_area': sample['objects']['area'],
'groundtruth_boxes': sample['objects']['bbox'],
}
return decoded_tensors
""" Detection Data parser and processing for YOLO.
Parse image and ground truths in a dataset to training targets and package them
into (image, labels) tuple for RetinaNet.
"""
import tensorflow as tf
from official.vision.beta.dataloaders import parser
from official.vision.beta.ops import box_ops, preprocess_ops
from official.vision.beta.projects.yolo.ops import preprocessing_ops
from official.vision.beta.projects.yolo.ops import box_ops as yolo_box_ops
class Parser(parser.Parser):
"""Parser to parse an image and its annotations into a dictionary of tensors."""
def __init__(self,
output_size,
num_classes,
fixed_size=True,
jitter_im=0.1,
jitter_boxes=0.005,
use_tie_breaker=True,
min_level=3,
max_level=5,
masks=None,
max_process_size=608,
min_process_size=320,
max_num_instances=200,
random_flip=True,
aug_rand_saturation=True,
aug_rand_brightness=True,
aug_rand_zoom=True,
aug_rand_hue=True,
anchors=None,
seed=10,
dtype=tf.float32):
"""Initializes parameters for parsing annotations in the dataset.
Args:
output_size: a `Tuple` for (width, height) of input image.
num_classes: a `Tensor` or `int` for the number of classes.
fixed_size: a `bool` if True all output images have the same size.
jitter_im: a `float` representing a pixel value that is the maximum jitter
applied to the image for data augmentation during training.
jitter_boxes: a `float` representing a pixel value that is the maximum
jitter applied to the bounding box for data augmentation during training.
net_down_scale: an `int` that down scales the image width and height to
the closest multiple of net_down_scale.
max_process_size: an `int` for maximum image width and height.
min_process_size: an `int` for minimum image width and height ,
max_num_instances: an `int` number of maximum number of instances in an image.
random_flip: a `bool` if True, augment training with random horizontal flip.
masks: a `Tensor`, `List` or `numpy.ndarray` for anchor masks.
aug_rand_saturation: `bool`, if True, augment training with random
saturation.
aug_rand_brightness: `bool`, if True, augment training with random
brightness.
aug_rand_zoom: `bool`, if True, augment training with random
zoom.
aug_rand_hue: `bool`, if True, augment training with random
hue.
anchors: a `Tensor`, `List` or `numpy.ndarrray` for bounding box priors.
seed: an `int` for the seed used by tf.random
dtype: a `tf.dtypes.DType` object that represents the dtype the outputs will
be casted to. The available types are tf.float32, tf.float16, or
tf.bfloat16.
"""
self._net_down_scale = 2**max_level
self._num_classes = num_classes
self._image_w = (output_size[0] //
self._net_down_scale) * self._net_down_scale
self._image_h = (output_size[1] //
self._net_down_scale) * self._net_down_scale
self._max_process_size = max_process_size
self._min_process_size = min_process_size
self._fixed_size = fixed_size
self._anchors = anchors
self._masks = {
key: tf.convert_to_tensor(value) for key, value in masks.items()
}
self._use_tie_breaker = use_tie_breaker
self._jitter_im = 0.0 if jitter_im is None else jitter_im
self._jitter_boxes = 0.0 if jitter_boxes is None else jitter_boxes
self._max_num_instances = max_num_instances
self._random_flip = random_flip
self._aug_rand_saturation = aug_rand_saturation
self._aug_rand_brightness = aug_rand_brightness
self._aug_rand_zoom = aug_rand_zoom
self._aug_rand_hue = aug_rand_hue
self._seed = seed
self._dtype = dtype
def _build_grid(self, raw_true, width, batch=False, use_tie_breaker=False):
mask = self._masks
for key in self._masks.keys():
if not batch:
mask[key] = preprocessing_ops.build_grided_gt(
raw_true, self._masks[key], width // 2**int(key), self._num_classes,
raw_true['bbox'].dtype, use_tie_breaker)
else:
mask[key] = preprocessing_ops.build_batch_grided_gt(
raw_true, self._masks[key], width // 2**int(key), self._num_classes,
raw_true['bbox'].dtype, use_tie_breaker)
return mask
def _parse_train_data(self, data):
"""Generates images and labels that are usable for model training.
Args:
data: a dict of Tensors produced by the decoder.
Returns:
images: the image tensor.
labels: a dict of Tensors that contains labels.
"""
shape = tf.shape(data['image'])
image = data['image'] / 255
boxes = data['groundtruth_boxes']
width = shape[0]
height = shape[1]
image, boxes = preprocessing_ops.fit_preserve_aspect_ratio(
image,
boxes,
width=width,
height=height,
target_dim=self._max_process_size)
image_shape = tf.shape(image)[:2]
if self._random_flip:
image, boxes, _ = preprocess_ops.random_horizontal_flip(
image, boxes, seed=self._seed)
randscale = self._image_w // self._net_down_scale
if not self._fixed_size:
do_scale = tf.greater(
tf.random.uniform([], minval=0, maxval=1, seed=self._seed), 0.5)
if do_scale:
# This scales the image to a random multiple of net_down_scale
# between 320 to 608
randscale = tf.random.uniform(
[],
minval=self._min_process_size // self._net_down_scale,
maxval=self._max_process_size // self._net_down_scale,
seed=self._seed,
dtype=tf.int32) * self._net_down_scale
if self._jitter_boxes != 0.0:
boxes = box_ops.denormalize_boxes(boxes, image_shape)
boxes = box_ops.jitter_boxes(boxes, 0.025)
boxes = box_ops.normalize_boxes(boxes, image_shape)
# YOLO loss function uses x-center, y-center format
boxes = yolo_box_ops.yxyx_to_xcycwh(boxes)
if self._jitter_im != 0.0:
image, boxes = preprocessing_ops.random_translate(
image, boxes, self._jitter_im, seed=self._seed)
if self._aug_rand_zoom:
image, boxes = preprocessing_ops.resize_crop_filter(
image,
boxes,
default_width=self._image_w,
default_height=self._image_h,
target_width=randscale,
target_height=randscale)
image = tf.image.resize(image, (416, 416), preserve_aspect_ratio=False)
if self._aug_rand_brightness:
image = tf.image.random_brightness(
image=image, max_delta=.1) # Brightness
if self._aug_rand_saturation:
image = tf.image.random_saturation(
image=image, lower=0.75, upper=1.25) # Saturation
if self._aug_rand_hue:
image = tf.image.random_hue(image=image, max_delta=.3) # Hue
image = tf.clip_by_value(image, 0.0, 1.0)
# find the best anchor for the ground truth labels to maximize the iou
best_anchors = preprocessing_ops.get_best_anchor(
boxes, self._anchors, width=self._image_w, height=self._image_h)
# padding
boxes = preprocess_ops.clip_or_pad_to_fixed_size(boxes,
self._max_num_instances, 0)
classes = preprocess_ops.clip_or_pad_to_fixed_size(
data['groundtruth_classes'], self._max_num_instances, -1)
best_anchors = preprocess_ops.clip_or_pad_to_fixed_size(
best_anchors, self._max_num_instances, 0)
area = preprocess_ops.clip_or_pad_to_fixed_size(data['groundtruth_area'],
self._max_num_instances, 0)
is_crowd = preprocess_ops.clip_or_pad_to_fixed_size(
tf.cast(data['groundtruth_is_crowd'], tf.int32),
self._max_num_instances, 0)
labels = {
'source_id': data['source_id'],
'bbox': tf.cast(boxes, self._dtype),
'classes': tf.cast(classes, self._dtype),
'area': tf.cast(area, self._dtype),
'is_crowd': is_crowd,
'best_anchors': tf.cast(best_anchors, self._dtype),
'width': width,
'height': height,
'num_detections': tf.shape(data['groundtruth_classes'])[0],
}
if self._fixed_size:
grid = self._build_grid(
labels, self._image_w, use_tie_breaker=self._use_tie_breaker)
labels.update({'grid_form': grid})
return image, labels
# broken for some reason in task, i think dictionary to coco evaluator has
# issues
def _parse_eval_data(self, data):
"""Generates images and labels that are usable for model training.
Args:
data: a dict of Tensors produced by the decoder.
Returns:
images: the image tensor.
labels: a dict of Tensors that contains labels.
"""
shape = tf.shape(data['image'])
image = data['image'] / 255
boxes = data['groundtruth_boxes']
width = shape[0]
height = shape[1]
image, boxes = preprocessing_ops.fit_preserve_aspect_ratio(
image, boxes, width=width, height=height, target_dim=self._image_w)
boxes = yolo_box_ops.yxyx_to_xcycwh(boxes)
# find the best anchor for the ground truth labels to maximize the iou
best_anchors = preprocessing_ops.get_best_anchor(
boxes, self._anchors, width=self._image_w, height=self._image_h)
boxes = preprocessing_ops.pad_max_instances(boxes, self._max_num_instances,
0)
classes = preprocessing_ops.pad_max_instances(data['groundtruth_classes'],
self._max_num_instances, 0)
best_anchors = preprocessing_ops.pad_max_instances(best_anchors,
self._max_num_instances,
0)
area = preprocessing_ops.pad_max_instances(data['groundtruth_area'],
self._max_num_instances, 0)
is_crowd = preprocessing_ops.pad_max_instances(
tf.cast(data['groundtruth_is_crowd'], tf.int32),
self._max_num_instances, 0)
labels = {
'source_id': data['source_id'],
'bbox': tf.cast(boxes, self._dtype),
'classes': tf.cast(classes, self._dtype),
'area': tf.cast(area, self._dtype),
'is_crowd': is_crowd,
'best_anchors': tf.cast(best_anchors, self._dtype),
'width': width,
'height': height,
'num_detections': tf.shape(data['groundtruth_classes'])[0],
}
grid = self._build_grid(
labels,
self._image_w,
batch=False,
use_tie_breaker=self._use_tie_breaker)
labels.update({'grid_form': grid})
return image, labels
def _postprocess_fn(self, image, label):
randscale = self._image_w // self._net_down_scale
if not self._fixed_size:
do_scale = tf.greater(
tf.random.uniform([], minval=0, maxval=1, seed=self._seed), 0.5)
if do_scale:
# This scales the image to a random multiple of net_down_scale
# between 320 to 608
randscale = tf.random.uniform(
[],
minval=self._min_process_size // self._net_down_scale,
maxval=self._max_process_size // self._net_down_scale,
seed=self._seed,
dtype=tf.int32) * self._net_down_scale
width = randscale
image = tf.image.resize(image, (width, width))
grid = self._build_grid(
label, width, batch=True, use_tie_breaker=self._use_tie_breaker)
label.update({'grid_form': grid})
return image, label
def postprocess_fn(self, is_training=True):
return self._postprocess_fn if not self._fixed_size and is_training else None
"""Test case for YOLO detection dataloader configuration definition."""
import dataclasses
import tensorflow as tf
from official.core import config_definitions as cfg
from official.core import input_reader
from official.modeling import hyperparams
from official.vision.beta.projects.yolo.dataloaders import yolo_detection_input
from official.vision.beta.projects.yolo.dataloaders.decoders import \
tfds_coco_decoder
from official.vision.beta.projects.yolo.ops import box_ops
from absl.testing import parameterized
@dataclasses.dataclass
class Parser(hyperparams.Config):
"""Dummy configuration for parser"""
output_size: int = (416, 416)
num_classes: int = 80
fixed_size: bool = True
jitter_im: float = 0.1
jitter_boxes: float = 0.005
min_process_size: int = 320
max_process_size: int = 608
max_num_instances: int = 200
random_flip: bool = True
seed: int = 10
shuffle_buffer_size: int = 10000
@dataclasses.dataclass
class DataConfig(cfg.DataConfig):
"""Input config for training."""
input_path: str = ''
tfds_name: str = 'coco/2017'
tfds_split: str = 'train'
global_batch_size: int = 10
is_training: bool = True
dtype: str = 'float16'
decoder = None
parser: Parser = Parser()
shuffle_buffer_size: int = 10000
tfds_download: bool = True
class YoloDetectionInputTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(('training', True), ('testing', False))
def test_yolo_input(self, is_training):
with tf.device('/CPU:0'):
params = DataConfig(is_training=is_training)
decoder = tfds_coco_decoder.MSCOCODecoder()
anchors = [[12.0, 19.0], [31.0, 46.0], [96.0, 54.0], [46.0, 114.0],
[133.0, 127.0], [79.0, 225.0], [301.0, 150.0], [172.0, 286.0],
[348.0, 340.0]]
masks = {'3': [0, 1, 2], '4': [3, 4, 5], '5': [6, 7, 8]}
parser = yolo_detection_input.Parser(
output_size=params.parser.output_size,
num_classes=params.parser.num_classes,
fixed_size=params.parser.fixed_size,
jitter_im=params.parser.jitter_im,
jitter_boxes=params.parser.jitter_boxes,
min_process_size=params.parser.min_process_size,
max_process_size=params.parser.max_process_size,
max_num_instances=params.parser.max_num_instances,
random_flip=params.parser.random_flip,
seed=params.parser.seed,
anchors=anchors,
masks=masks)
postprocess_fn = parser.postprocess_fn(is_training=is_training)
reader = input_reader.InputReader(
params,
dataset_fn=tf.data.TFRecordDataset,
decoder_fn=decoder.decode,
parser_fn=parser.parse_fn(params.is_training))
dataset = reader.read(input_context=None)
for one_batch in dataset.batch(1):
self.assertAllEqual(one_batch[0].shape, (1, 10, 416, 416, 3))
break
for l, (i, j) in enumerate(dataset):
if postprocess_fn:
i, j = postprocess_fn(i, j)
boxes = box_ops.xcycwh_to_yxyx(j['bbox'])
self.assertTrue(tf.reduce_all(tf.math.logical_and(i >= 0, i <= 1)))
if l > 10:
break
if __name__ == '__main__':
tf.test.main()
""" bounding box utils file """
import tensorflow as tf
from typing import Tuple, Union
import math
def yxyx_to_xcycwh(box: tf.Tensor):
"""Converts boxes from ymin, xmin, ymax, xmax to x_center, y_center, width,
height.
Args:
box: a `Tensor` whose shape is [..., 4] and represents the coordinates
of boxes in ymin, xmin, ymax, xmax.
Returns:
a `Tensor` whose shape is [..., 4] and contains the new format.
Raises:
ValueError: If the last dimension of box is not 4 or if box's dtype isn't
a floating point type.
"""
with tf.name_scope('yxyx_to_xcycwh'):
ymin, xmin, ymax, xmax = tf.split(box, 4, axis=-1)
x_center = (xmax + xmin) / 2
y_center = (ymax + ymin) / 2
width = xmax - xmin
height = ymax - ymin
box = tf.concat([x_center, y_center, width, height], axis=-1)
return box
def xcycwh_to_yxyx(box: tf.Tensor, split_min_max: bool = False):
"""Converts boxes from x_center, y_center, width, height to ymin, xmin, ymax,
xmax.
Args:
box: a `Tensor` whose shape is [..., 4] and represents the coordinates
of boxes in x_center, y_center, width, height.
Returns:
box: a `Tensor` whose shape is [..., 4] and contains the new format.
Raises:
ValueError: If the last dimension of box is not 4 or if box's dtype isn't
a floating point type.
"""
with tf.name_scope('xcycwh_to_yxyx'):
xy, wh = tf.split(box, 2, axis=-1)
xy_min = xy - wh / 2
xy_max = xy + wh / 2
x_min, y_min = tf.split(xy_min, 2, axis=-1)
x_max, y_max = tf.split(xy_max, 2, axis=-1)
box = tf.concat([y_min, x_min, y_max, x_max], axis=-1)
if split_min_max:
box = tf.split(box, 2, axis=-1)
return box
def xcycwh_to_xyxy(box: tf.Tensor, split_min_max: bool = False):
"""Converts boxes from x_center, y_center, width, height to xmin, ymin, xmax,
ymax.
Args:
box: box: a `Tensor` whose shape is [..., 4] and represents the
coordinates of boxes in x_center, y_center, width, height.
Returns:
box: a `Tensor` whose shape is [..., 4] and contains the new format.
Raises:
ValueError: If the last dimension of box is not 4 or if box's dtype isn't
a floating point type.
"""
with tf.name_scope('xcycwh_to_yxyx'):
xy, wh = tf.split(box, 2, axis=-1)
xy_min = xy - wh / 2
xy_max = xy + wh / 2
box = (xy_min, xy_max)
if not split_min_max:
box = tf.concat(box, axis=-1)
return box
def center_distance(center_1: tf.Tensor, center_2: tf.Tensor):
"""Calculates the squared distance between two points.
This function is mathematically equivalent to the following code, but has
smaller rounding errors.
tf.norm(center_1 - center_2, axis=-1)**2
Args:
center_1: a `Tensor` whose shape is [..., 2] and represents a point.
center_2: a `Tensor` whose shape is [..., 2] and represents a point.
Returns:
dist: a `Tensor` whose shape is [...] and value represents the squared
distance between center_1 and center_2.
Raises:
ValueError: If the last dimension of either center_1 or center_2 is not 2.
"""
with tf.name_scope('center_distance'):
dist = (center_1[..., 0] - center_2[..., 0])**2 + (center_1[..., 1] -
center_2[..., 1])**2
return dist
# IOU
def compute_iou(box1, box2, yxyx=False):
"""Calculates the intersection of union between box1 and box2.
Args:
box1: a `Tensor` whose shape is [..., 4] and represents the coordinates of boxes in
x_center, y_center, width, height.
box2: a `Tensor` whose shape is [..., 4] and represents the coordinates of boxes in
x_center, y_center, width, height.
Returns:
iou: a `Tensor` whose shape is [...] and value represents the intersection over union.
Raises:
ValueError: If the last dimension of either box1 or box2 is not 4.
"""
# get box corners
with tf.name_scope('iou'):
if not yxyx:
box1 = xcycwh_to_yxyx(box1)
box2 = xcycwh_to_yxyx(box2)
b1mi, b1ma = tf.split(box1, 2, axis=-1)
b2mi, b2ma = tf.split(box2, 2, axis=-1)
intersect_mins = tf.math.maximum(b1mi, b2mi)
intersect_maxes = tf.math.minimum(b1ma, b2ma)
intersect_wh = tf.math.maximum(intersect_maxes - intersect_mins,
tf.zeros_like(intersect_mins))
intersection = tf.reduce_prod(
intersect_wh, axis=-1) # intersect_wh[..., 0] * intersect_wh[..., 1]
box1_area = tf.math.abs(tf.reduce_prod(b1ma - b1mi, axis=-1))
box2_area = tf.math.abs(tf.reduce_prod(b2ma - b2mi, axis=-1))
union = box1_area + box2_area - intersection
iou = intersection / (union + 1e-7
) # tf.math.divide_no_nan(intersection, union)
iou = tf.clip_by_value(iou, clip_value_min=0.0, clip_value_max=1.0)
return iou
def compute_giou(box1, box2):
"""Calculates the generalized intersection of union between box1 and box2.
Args:
box1: a `Tensor` whose shape is [..., 4] and represents the coordinates of boxes in
x_center, y_center, width, height.
box2: a `Tensor` whose shape is [..., 4] and represents the coordinates of boxes in
x_center, y_center, width, height.
Returns:
iou: a `Tensor` whose shape is [...] and value represents the generalized intersection over union.
Raises:
ValueError: If the last dimension of either box1 or box2 is not 4.
"""
with tf.name_scope('giou'):
# get box corners
box1 = xcycwh_to_yxyx(box1)
box2 = xcycwh_to_yxyx(box2)
# compute IOU
intersect_mins = tf.math.maximum(box1[..., 0:2], box2[..., 0:2])
intersect_maxes = tf.math.minimum(box1[..., 2:4], box2[..., 2:4])
intersect_wh = tf.math.maximum(intersect_maxes - intersect_mins,
tf.zeros_like(intersect_mins))
intersection = intersect_wh[..., 0] * intersect_wh[..., 1]
box1_area = tf.math.abs(
tf.reduce_prod(box1[..., 2:4] - box1[..., 0:2], axis=-1))
box2_area = tf.math.abs(
tf.reduce_prod(box2[..., 2:4] - box2[..., 0:2], axis=-1))
union = box1_area + box2_area - intersection
iou = tf.math.divide_no_nan(intersection, union)
iou = tf.clip_by_value(iou, clip_value_min=0.0, clip_value_max=1.0)
# find the smallest box to encompase both box1 and box2
c_mins = tf.math.minimum(box1[..., 0:2], box2[..., 0:2])
c_maxes = tf.math.maximum(box1[..., 2:4], box2[..., 2:4])
c = tf.math.abs(tf.reduce_prod(c_mins - c_maxes, axis=-1))
# compute giou
giou = iou - tf.math.divide_no_nan((c - union), c)
return iou, giou
def compute_diou(box1, box2):
"""Calculates the distance intersection of union between box1 and box2.
Args:
box1: a `Tensor` whose shape is [..., 4] and represents the coordinates of boxes in
x_center, y_center, width, height.
box2: a `Tensor` whose shape is [..., 4] and represents the coordinates of boxes in
x_center, y_center, width, height.
Returns:
iou: a `Tensor` whose shape is [...] and value represents the distance intersection over union.
Raises:
ValueError: If the last dimension of either box1 or box2 is not 4.
"""
with tf.name_scope('diou'):
# compute center distance
dist = center_distance(box1[..., 0:2], box2[..., 0:2])
# get box corners
box1 = xcycwh_to_yxyx(box1)
box2 = xcycwh_to_yxyx(box2)
# compute IOU
intersect_mins = tf.math.maximum(box1[..., 0:2], box2[..., 0:2])
intersect_maxes = tf.math.minimum(box1[..., 2:4], box2[..., 2:4])
intersect_wh = tf.math.maximum(intersect_maxes - intersect_mins,
tf.zeros_like(intersect_mins))
intersection = intersect_wh[..., 0] * intersect_wh[..., 1]
box1_area = tf.math.abs(
tf.reduce_prod(box1[..., 2:4] - box1[..., 0:2], axis=-1))
box2_area = tf.math.abs(
tf.reduce_prod(box2[..., 2:4] - box2[..., 0:2], axis=-1))
union = box1_area + box2_area - intersection
iou = tf.math.divide_no_nan(intersection, union)
iou = tf.clip_by_value(iou, clip_value_min=0.0, clip_value_max=1.0)
# compute max diagnal of the smallest enclosing box
c_mins = tf.math.minimum(box1[..., 0:2], box2[..., 0:2])
c_maxes = tf.math.maximum(box1[..., 2:4], box2[..., 2:4])
diag_dist = tf.reduce_sum((c_maxes - c_mins)**2, axis=-1)
regularization = tf.math.divide_no_nan(dist, diag_dist)
diou = iou + regularization
return iou, diou
def compute_ciou(box1, box2):
"""Calculates the complete intersection of union between box1 and box2.
Args:
box1: a `Tensor` whose shape is [..., 4] and represents the coordinates of boxes in
x_center, y_center, width, height.
box2: a `Tensor` whose shape is [..., 4] and represents the coordinates of boxes in
x_center, y_center, width, height.
Returns:
iou: a `Tensor` whose shape is [...] and value represents the complete intersection over union.
Raises:
ValueError: If the last dimension of either box1 or box2 is not 4.
"""
with tf.name_scope('ciou'):
# compute DIOU and IOU
iou, diou = compute_diou(box1, box2)
# computer aspect ratio consistency
arcterm = (
tf.math.atan(tf.math.divide_no_nan(box1[..., 2], box1[..., 3])) -
tf.math.atan(tf.math.divide_no_nan(box2[..., 2], box2[..., 3])))**2
v = 4 * arcterm / (math.pi)**2
# compute IOU regularization
a = tf.math.divide_no_nan(v, ((1 - iou) + v))
ciou = diou + v * a
return iou, ciou
import numpy as np
import tensorflow as tf
from absl.testing import parameterized
from official.vision.beta.projects.yolo.ops import box_ops
class InputUtilsTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters((1), (4))
def testBoxConversions(self, num_boxes):
boxes = tf.convert_to_tensor(np.random.rand(num_boxes, 4))
expected_shape = np.array([num_boxes, 4])
xywh_box = box_ops.yxyx_to_xcycwh(boxes)
yxyx_box = box_ops.xcycwh_to_yxyx(boxes)
xyxy_box = box_ops.xcycwh_to_xyxy(boxes)
self.assertAllEqual(tf.shape(xywh_box).numpy(), expected_shape)
self.assertAllEqual(tf.shape(yxyx_box).numpy(), expected_shape)
self.assertAllEqual(tf.shape(xyxy_box).numpy(), expected_shape)
@parameterized.parameters((1), (5), (7))
def testIOUs(self, num_boxes):
boxes = tf.convert_to_tensor(np.random.rand(num_boxes, 4))
expected_shape = np.array([
num_boxes,
])
expected_iou = np.ones([
num_boxes,
])
iou = box_ops.compute_iou(boxes, boxes)
_, giou = box_ops.compute_giou(boxes, boxes)
_, ciou = box_ops.compute_ciou(boxes, boxes)
_, diou = box_ops.compute_diou(boxes, boxes)
self.assertAllEqual(tf.shape(iou).numpy(), expected_shape)
self.assertArrayNear(iou, expected_iou, 0.001)
self.assertArrayNear(giou, expected_iou, 0.001)
self.assertArrayNear(ciou, expected_iou, 0.001)
self.assertArrayNear(diou, expected_iou, 0.001)
if __name__ == '__main__':
tf.test.main()
import tensorflow as tf
import tensorflow_addons as tfa
from official.vision.beta.projects.yolo.ops import box_ops
def resize_crop_filter(image, boxes, default_width, default_height,
target_width, target_height):
"""Apply zooming to the image and boxes.
Args:
image: a `Tensor` representing the image.
boxes: a `Tensor` represeting the boxes.
default_width: a `Tensor` representing the width of the image.
default_height: a `Tensor` representing the height of the image.
target_width: a `Tensor` representing the desired width of the image.
target_height: a `Tensor` representing the desired height of the image.
Returns:
images: a `Tensor` representing the augmented image.
boxes: a `Tensor` representing the augmented boxes.
"""
with tf.name_scope('resize_crop_filter'):
image = tf.image.resize(image, (target_width, target_height))
image = tf.image.resize_with_crop_or_pad(
image, target_height=default_height, target_width=default_width)
default_width = tf.cast(default_width, boxes.dtype)
default_height = tf.cast(default_height, boxes.dtype)
target_width = tf.cast(target_width, boxes.dtype)
target_height = tf.cast(target_height, boxes.dtype)
aspect_change_width = target_width / default_width
aspect_change_height = target_height / default_height
x, y, width, height = tf.split(boxes, 4, axis=-1)
x = (x - 0.5) * target_width / default_width + 0.5
y = (y - 0.5) * target_height / default_height + 0.5
width = width * aspect_change_width
height = height * aspect_change_height
boxes = tf.concat([x, y, width, height], axis=-1)
return image, boxes
def random_translate(image, box, t, seed=None):
"""Randomly translate the image and boxes.
Args:
image: a `Tensor` representing the image.
box: a `Tensor` represeting the boxes.
t: an `int` representing the translation factor
seed: an optional seed for tf.random operations
Returns:
image: a `Tensor` representing the augmented image.
box: a `Tensor` representing the augmented boxes.
"""
t_x = tf.random.uniform(
minval=-t, maxval=t, shape=(), dtype=tf.float32, seed=seed)
t_y = tf.random.uniform(
minval=-t, maxval=t, shape=(), dtype=tf.float32, seed=seed)
box = translate_boxes(box, t_x, t_y)
image = translate_image(image, t_x, t_y)
return image, box
def translate_boxes(box, translate_x, translate_y):
"""Randomly translate the boxes.
Args:
boxes: a `Tensor` represeitng the boxes.
translate_x: a `Tensor` represting the translation on the x-axis.
translate_y: a `Tensor` represting the translation on the y-axis.
Returns:
box: a `Tensor` representing the augmented boxes.
"""
with tf.name_scope('translate_boxs'):
x = box[..., 0] + translate_x
y = box[..., 1] + translate_y
box = tf.stack([x, y, box[..., 2], box[..., 3]], axis=-1)
box.set_shape([None, 4])
return box
def translate_image(image, translate_x, translate_y):
"""Randomly translate the image.
Args:
image: a `Tensor` representing the image.
translate_x: a `Tensor` represting the translation on the x-axis.
translate_y: a `Tensor` represting the translation on the y-axis.
Returns:
box: a `Tensor` representing the augmented boxes.
"""
with tf.name_scope('translate_image'):
if (translate_x != 0 and translate_y != 0):
image_jitter = tf.convert_to_tensor([translate_x, translate_y])
image_jitter.set_shape([2])
image = tfa.image.translate(
image, image_jitter * tf.cast(tf.shape(image)[1], tf.float32))
return image
def pad_max_instances(value, instances, pad_value=0, pad_axis=0):
shape = tf.shape(value)
dim1 = shape[pad_axis]
take = tf.math.reduce_min([instances, dim1])
value, _ = tf.split(
value, [take, -1], axis=pad_axis) # value[:instances, ...]
pad = tf.convert_to_tensor([tf.math.reduce_max([instances - dim1, 0])])
nshape = tf.concat([shape[:pad_axis], pad, shape[(pad_axis + 1):]], axis=0)
pad_tensor = tf.fill(nshape, tf.cast(pad_value, dtype=value.dtype))
value = tf.concat([value, pad_tensor], axis=pad_axis)
return value
def fit_preserve_aspect_ratio(image,
boxes,
width=None,
height=None,
target_dim=None):
"""Resizes the image while peserving the image aspect ratio.
Args:
image: a `Tensor` representing the image.
box: a `Tensor` representing the boxes.
Returns:
image: a `Tensor` representing the image.
box: a `Tensor` representing the boxes.
"""
if width is None or height is None:
shape = tf.shape(image)
if tf.shape(shape)[0] == 4:
width = shape[1]
height = shape[2]
else:
width = shape[0]
height = shape[1]
clipper = tf.math.maximum(width, height)
if target_dim is None:
target_dim = clipper
pad_width = clipper - width
pad_height = clipper - height
image = tf.image.pad_to_bounding_box(image, pad_width // 2, pad_height // 2,
clipper, clipper)
boxes = box_ops.yxyx_to_xcycwh(boxes)
x, y, w, h = tf.split(boxes, 4, axis=-1)
y *= tf.cast(width / clipper, tf.float32)
x *= tf.cast(height / clipper, tf.float32)
y += tf.cast((pad_width / clipper) / 2, tf.float32)
x += tf.cast((pad_height / clipper) / 2, tf.float32)
h *= tf.cast(width / clipper, tf.float32)
w *= tf.cast(height / clipper, tf.float32)
boxes = tf.concat([x, y, w, h], axis=-1)
boxes = box_ops.xcycwh_to_yxyx(boxes)
image = tf.image.resize(image, (target_dim, target_dim))
return image, boxes
def get_best_anchor(y_true, anchors, width=1, height=1):
"""Gets the correct anchor that is assoiciated with each box using IOU between
input anchors and ground truth.
Args:
y_true: tf.Tensor[] for the list of bounding boxes in the yolo format
anchors: list or tensor for the anchor boxes to be used in prediction
found via Kmeans
size: size of the image that the bounding boxes were selected at 416 is
the default for the original YOLO model
return:
tf.Tensor: y_true with the anchor associated with each ground truth box
known
"""
with tf.name_scope('get_anchor'):
width = tf.cast(width, dtype=tf.float32)
height = tf.cast(height, dtype=tf.float32)
anchor_xy = y_true[..., 0:2]
true_wh = y_true[..., 2:4]
# scale thhe boxes
anchors = tf.convert_to_tensor(anchors, dtype=tf.float32)
anchors_x = anchors[..., 0] / width
anchors_y = anchors[..., 1] / height
anchors = tf.stack([anchors_x, anchors_y], axis=-1)
# build a matrix of anchor boxes
anchors = tf.transpose(anchors, perm=[1, 0])
anchor_xy = tf.tile(
tf.expand_dims(anchor_xy, axis=-1), [1, 1, tf.shape(anchors)[-1]])
anchors = tf.tile(
tf.expand_dims(anchors, axis=0), [tf.shape(anchor_xy)[0], 1, 1])
# stack the xy so, each anchor is asscoaited once with each center from
# the ground truth input
anchors = tf.keras.layers.concatenate([anchor_xy, anchors], axis=1)
anchors = tf.transpose(anchors, perm=[2, 0, 1])
# copy the gt n times so that each anchor from above can be compared to
# input ground truth
truth_comp = tf.tile(
tf.expand_dims(y_true[..., 0:4], axis=-1),
[1, 1, tf.shape(anchors)[0]])
truth_comp = tf.transpose(truth_comp, perm=[2, 0, 1])
# compute intersection over union of the boxes, and take the argmax of
# comuted iou for each box. thus each box is associated with the largest
# interection over union
iou_raw = box_ops.compute_iou(truth_comp, anchors)
gt_mask = tf.cast(iou_raw > 0.213, dtype=iou_raw.dtype)
num_k = tf.reduce_max(
tf.reduce_sum(tf.transpose(gt_mask, perm=[1, 0]), axis=1))
if num_k <= 0:
num_k = 1.0
values, indexes = tf.math.top_k(
tf.transpose(iou_raw, perm=[1, 0]),
k=tf.cast(num_k, dtype=tf.int32),
sorted=True)
ind_mask = tf.cast(values > 0.213, dtype=indexes.dtype)
iou_index = tf.concat([
tf.expand_dims(indexes[..., 0], axis=-1),
((indexes[..., 1:] + 1) * ind_mask[..., 1:]) - 1
],
axis=-1)
stack = tf.zeros(
[tf.shape(iou_index)[0],
tf.cast(1, dtype=iou_index.dtype)],
dtype=iou_index.dtype) - 1
while num_k < 5:
iou_index = tf.concat([iou_index, stack], axis=-1)
num_k += 1
iou_index = iou_index[..., :5]
values = tf.concat([
tf.expand_dims(values[..., 0], axis=-1),
((values[..., 1:]) * tf.cast(ind_mask[..., 1:], dtype=tf.float32))
],
axis=-1)
return tf.cast(iou_index, dtype=tf.float32)
def build_grided_gt(y_true, mask, size, num_classes, dtype, use_tie_breaker):
"""convert ground truth for use in loss functions
Args:
y_true: tf.Tensor[] ground truth
[box coords[0:4], classes_onehot[0:-1], best_fit_anchor_box]
mask: list of the anchor boxes choresponding to the output,
ex. [1, 2, 3] tells this layer to predict only the first 3 anchors
in the total.
size: the dimensions of this output, for regular, it progresses from
13, to 26, to 52
Return:
tf.Tensor[] of shape [size, size, #of_anchors, 4, 1, num_classes]
"""
boxes = tf.cast(y_true['bbox'], dtype)
classes = tf.one_hot(
tf.cast(y_true['classes'], dtype=tf.int32),
depth=num_classes,
dtype=dtype)
anchors = tf.cast(y_true['best_anchors'], dtype)
num_boxes = tf.shape(boxes)[0]
len_masks = tf.shape(mask)[0]
full = tf.zeros([size, size, len_masks, num_classes + 4 + 1], dtype=dtype)
depth_track = tf.zeros((size, size, len_masks), dtype=tf.int32)
x = tf.cast(boxes[..., 0] * tf.cast(size, dtype=dtype), dtype=tf.int32)
y = tf.cast(boxes[..., 1] * tf.cast(size, dtype=dtype), dtype=tf.int32)
anchors = tf.repeat(tf.expand_dims(anchors, axis=-1), len_masks, axis=-1)
update_index = tf.TensorArray(tf.int32, size=0, dynamic_size=True)
update = tf.TensorArray(dtype, size=0, dynamic_size=True)
const = tf.cast(tf.convert_to_tensor([1.]), dtype=dtype)
mask = tf.cast(mask, dtype=dtype)
i = 0
anchor_id = 0
for box_id in range(num_boxes):
if tf.keras.backend.all(tf.math.equal(boxes[box_id, 2:4], 0)):
continue
if tf.keras.backend.any(tf.math.less(
boxes[box_id, 0:2], 0.0)) or tf.keras.backend.any(
tf.math.greater_equal(boxes[box_id, 0:2], 1.0)):
continue
if use_tie_breaker:
for anchor_id in range(tf.shape(anchors)[-1]):
index = tf.math.equal(anchors[box_id, anchor_id], mask)
if tf.keras.backend.any(index):
p = tf.cast(
tf.keras.backend.argmax(tf.cast(index, dtype=tf.int32)),
dtype=tf.int32)
uid = 1
used = depth_track[y[box_id], x[box_id], p]
if anchor_id == 0:
# write the box to the update list
update_index = update_index.write(i, [y[box_id], x[box_id], p])
value = tf.keras.backend.concatenate(
[boxes[box_id], const, classes[box_id]])
update = update.write(i, value)
elif tf.math.equal(used, 2) or tf.math.equal(used, 0):
# write the box to the update list
uid = 2
update_index = update_index.write(i, [y[box_id], x[box_id], p])
value = tf.keras.backend.concatenate(
[boxes[box_id], const, classes[box_id]])
update = update.write(i, value)
depth_track = tf.tensor_scatter_nd_update(depth_track,
[(y[box_id], x[box_id], p)],
[uid])
i += 1
else:
index = tf.math.equal(anchors[box_id, 0], mask)
if tf.keras.backend.any(index):
p = tf.cast(
tf.keras.backend.argmax(tf.cast(index, dtype=tf.int32)),
dtype=tf.int32)
update_index = update_index.write(i, [y[box_id], x[box_id], p])
value = tf.keras.backend.concatenate(
[boxes[box_id], const, classes[box_id]])
update = update.write(i, value)
i += 1
# if the size of the update list is not 0, do an update, other wise, no boxes
# and pass an empty grid
if tf.math.greater(update_index.size(), 0):
update_index = update_index.stack()
update = update.stack()
full = tf.tensor_scatter_nd_add(full, update_index, update)
return full
def build_batch_grided_gt(y_true, mask, size, num_classes, dtype,
use_tie_breaker):
"""convert ground truth for use in loss functions
Args:
y_true: tf.Tensor[] ground truth
[box coords[0:4], classes_onehot[0:-1], best_fit_anchor_box]
mask: list of the anchor boxes choresponding to the output,
ex. [1, 2, 3] tells this layer to predict only the first 3 anchors in
the total.
size: the dimensions of this output, for regular, it progresses from
13, to 26, to 52
Return:
tf.Tensor[] of shape [batch, size, size, #of_anchors, 4, 1, num_classes]
"""
boxes = tf.cast(y_true['bbox'], dtype)
classes = tf.one_hot(
tf.cast(y_true['classes'], dtype=tf.int32),
depth=num_classes,
dtype=dtype)
anchors = tf.cast(y_true['best_anchors'], dtype)
batches = tf.shape(boxes)[0]
num_boxes = tf.shape(boxes)[1]
len_masks = tf.shape(mask)[0]
full = tf.zeros([batches, size, size, len_masks, num_classes + 4 + 1],
dtype=dtype)
depth_track = tf.zeros((batches, size, size, len_masks), dtype=tf.int32)
x = tf.cast(boxes[..., 0] * tf.cast(size, dtype=dtype), dtype=tf.int32)
y = tf.cast(boxes[..., 1] * tf.cast(size, dtype=dtype), dtype=tf.int32)
anchors = tf.repeat(tf.expand_dims(anchors, axis=-1), len_masks, axis=-1)
update_index = tf.TensorArray(tf.int32, size=0, dynamic_size=True)
update = tf.TensorArray(dtype, size=0, dynamic_size=True)
const = tf.cast(tf.convert_to_tensor([1.]), dtype=dtype)
mask = tf.cast(mask, dtype=dtype)
i = 0
anchor_id = 0
for batch in range(batches):
for box_id in range(num_boxes):
if tf.keras.backend.all(tf.math.equal(boxes[batch, box_id, 2:4], 0)):
continue
if tf.keras.backend.any(tf.math.less(
boxes[batch, box_id, 0:2], 0.0)) or tf.keras.backend.any(
tf.math.greater_equal(boxes[batch, box_id, 0:2], 1.0)):
continue
if use_tie_breaker:
for anchor_id in range(tf.shape(anchors)[-1]):
index = tf.math.equal(anchors[batch, box_id, anchor_id], mask)
if tf.keras.backend.any(index):
#tf.print(anchor_id, anchors[batch, box_id, anchor_id])
p = tf.cast(
tf.keras.backend.argmax(tf.cast(index, dtype=tf.int32)),
dtype=tf.int32)
uid = 1
used = depth_track[batch, y[batch, box_id], x[batch, box_id], p]
if anchor_id == 0:
# write the box to the update list
update_index = update_index.write(
i, [batch, y[batch, box_id], x[batch, box_id], p])
value = tf.keras.backend.concatenate(
[boxes[batch, box_id], const, classes[batch, box_id]])
update = update.write(i, value)
elif tf.math.equal(used, 2) or tf.math.equal(used, 0):
uid = 2
# write the box to the update list
update_index = update_index.write(
i, [batch, y[batch, box_id], x[batch, box_id], p])
value = tf.keras.backend.concatenate(
[boxes[batch, box_id], const, classes[batch, box_id]])
update = update.write(i, value)
depth_track = tf.tensor_scatter_nd_update(
depth_track, [(batch, y[batch, box_id], x[batch, box_id], p)],
[uid])
i += 1
else:
index = tf.math.equal(anchors[batch, box_id, 0], mask)
if tf.keras.backend.any(index):
p = tf.cast(
tf.keras.backend.argmax(tf.cast(index, dtype=tf.int32)),
dtype=tf.int32)
update_index = update_index.write(
i, [batch, y[batch, box_id], x[batch, box_id], p])
value = tf.keras.backend.concatenate(
[boxes[batch, box_id], const, classes[batch, box_id]])
update = update.write(i, value)
i += 1
# if the size of the update list is not 0, do an update, other wise, no boxes
# and pass an empty grid
if tf.math.greater(update_index.size(), 0):
update_index = update_index.stack()
update = update.stack()
full = tf.tensor_scatter_nd_add(full, update_index, update)
return full
import numpy as np
import tensorflow as tf
from absl.testing import parameterized
from official.vision.beta.projects.yolo.ops import preprocessing_ops
class InputUtilsTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters((416, 416, 5, 300, 300), (100, 200, 6, 50, 50))
def testResizeCropFilter(self, default_width, default_height, num_boxes,
target_width, target_height):
image = tf.convert_to_tensor(
np.random.rand(default_width, default_height, 3))
boxes = tf.convert_to_tensor(np.random.rand(num_boxes, 4))
resized_image, resized_boxes = preprocessing_ops.resize_crop_filter(
image, boxes, default_width, default_height, target_width,
target_height)
resized_image_shape = tf.shape(resized_image)
resized_boxes_shape = tf.shape(resized_boxes)
self.assertAllEqual([default_height, default_width, 3],
resized_image_shape.numpy())
self.assertAllEqual([num_boxes, 4], resized_boxes_shape.numpy())
@parameterized.parameters((7, 7., 5.), (25, 35., 45.))
def testTranslateBoxes(self, num_boxes, translate_x, translate_y):
boxes = tf.convert_to_tensor(np.random.rand(num_boxes, 4))
translated_boxes = preprocessing_ops.translate_boxes(
boxes, translate_x, translate_y)
translated_boxes_shape = tf.shape(translated_boxes)
self.assertAllEqual([num_boxes, 4], translated_boxes_shape.numpy())
@parameterized.parameters((100, 200, 75., 25.), (400, 600, 25., 75.))
def testTranslateImage(self, image_height, image_width, translate_x,
translate_y):
image = tf.convert_to_tensor(np.random.rand(image_height, image_width, 4))
translated_image = preprocessing_ops.translate_image(
image, translate_x, translate_y)
translated_image_shape = tf.shape(translated_image)
self.assertAllEqual([image_height, image_width, 4],
translated_image_shape.numpy())
@parameterized.parameters(([1, 2], 20, 0), ([13, 2, 4], 15, 0))
def testPadMaxInstances(self, input_shape, instances, pad_axis):
expected_output_shape = input_shape
expected_output_shape[pad_axis] = instances
output = preprocessing_ops.pad_max_instances(
np.ones(input_shape), instances, pad_axis=pad_axis)
self.assertAllEqual(expected_output_shape, tf.shape(output).numpy())
if __name__ == '__main__':
tf.test.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment