"test/srt/vscode:/vscode.git/clone" did not exist on "7f875f1293aa4dab646e312b1e67edda372102c7"
Commit 18623283 authored by Abdullah Rashwan's avatar Abdullah Rashwan Committed by A. Unique TensorFlower
Browse files

Internal change

PiperOrigin-RevId: 331625146
parent 83c4844e
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests classification_input.py."""
import io
# Import libraries
from absl.testing import parameterized
import numpy as np
from PIL import Image
import tensorflow as tf
from official.core import input_reader
from official.modeling.hyperparams import config_definitions as cfg
from official.vision.beta.dataloaders import classification_input
def _encode_image(image_array, fmt):
image = Image.fromarray(image_array)
with io.BytesIO() as output:
image.save(output, format=fmt)
return output.getvalue()
class DecoderTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.parameters(
(100, 100, 0), (100, 100, 1), (100, 100, 2),
)
def test_decoder(self, image_height, image_width, num_instances):
decoder = classification_input.Decoder()
image = _encode_image(
np.uint8(np.random.rand(image_height, image_width, 3) * 255),
fmt='JPEG')
label = 2
serialized_example = tf.train.Example(
features=tf.train.Features(
feature={
'image/encoded': (tf.train.Feature(
bytes_list=tf.train.BytesList(value=[image]))),
'image/class/label': (
tf.train.Feature(
int64_list=tf.train.Int64List(value=[label]))),
})).SerializeToString()
decoded_tensors = decoder.decode(tf.convert_to_tensor(serialized_example))
results = tf.nest.map_structure(lambda x: x.numpy(), decoded_tensors)
self.assertCountEqual(
['image/encoded', 'image/class/label'], results.keys())
self.assertEqual(label, results['image/class/label'])
class ParserTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
([224, 224, 3], 'float32', True),
([224, 224, 3], 'float16', True),
([224, 224, 3], 'float32', False),
([224, 224, 3], 'float16', False),
([512, 640, 3], 'float32', True),
([512, 640, 3], 'float16', True),
([512, 640, 3], 'float32', False),
([512, 640, 3], 'float16', False),
([640, 640, 3], 'float32', True),
([640, 640, 3], 'bfloat16', True),
([640, 640, 3], 'float32', False),
([640, 640, 3], 'bfloat16', False),
)
def test_parser(self, output_size, dtype, is_training):
params = cfg.DataConfig(
input_path='imagenet-2012-tfrecord/train*',
global_batch_size=2,
is_training=True,
examples_consume=4)
decoder = classification_input.Decoder()
parser = classification_input.Parser(
output_size=output_size[:2],
num_classes=1001,
aug_rand_hflip=False,
dtype=dtype)
reader = input_reader.InputReader(
params,
dataset_fn=tf.data.TFRecordDataset,
decoder_fn=decoder.decode,
parser_fn=parser.parse_fn(params.is_training))
dataset = reader.read()
images, labels = next(iter(dataset))
self.assertAllEqual(images.numpy().shape,
[params.global_batch_size] + output_size)
self.assertAllEqual(labels.numpy().shape, [params.global_batch_size])
if dtype == 'float32':
self.assertAllEqual(images.dtype, tf.float32)
elif dtype == 'float16':
self.assertAllEqual(images.dtype, tf.float16)
elif dtype == 'bfloat16':
self.assertAllEqual(images.dtype, tf.bfloat16)
if __name__ == '__main__':
tf.test.main()
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for coco_evaluator."""
import io
import os
# Import libraries
from absl import logging
from absl.testing import absltest
from absl.testing import parameterized
import numpy as np
from PIL import Image
from pycocotools.coco import COCO
import six
import tensorflow as tf
from official.vision.beta.evaluation import coco_evaluator
from official.vision.beta.evaluation import coco_utils
_COCO_JSON_FILE = '/placer/prod/home/snaggletooth/test/data/coco/instances_val2017.json'
_SAVED_COCO_JSON_FILE = 'tmp.json'
def get_groundtruth_annotations(image_id, coco, include_mask=False):
anns = coco.loadAnns(coco.getAnnIds([image_id]))
if not anns:
return None
image = coco.loadImgs([image_id])[0]
groundtruths = {
'boxes': [],
'classes': [],
'is_crowds': [],
'areas': [],
}
if include_mask:
groundtruths['masks'] = []
for ann in anns:
# Creates detections from groundtruths.
# Converts [x, y, w, h] to [y1, x1, y2, x2] box format.
box = [ann['bbox'][1],
ann['bbox'][0],
(ann['bbox'][1] + ann['bbox'][3]),
(ann['bbox'][0] + ann['bbox'][2])]
# Creates groundtruths.
groundtruths['boxes'].append(box)
groundtruths['classes'].append(ann['category_id'])
groundtruths['is_crowds'].append(ann['iscrowd'])
groundtruths['areas'].append(ann['area'])
if include_mask:
mask_img = Image.fromarray(coco.annToMask(ann).astype(np.uint8))
with io.BytesIO() as stream:
mask_img.save(stream, format='PNG')
mask_bytes = stream.getvalue()
groundtruths['masks'].append(mask_bytes)
for key, val in groundtruths.items():
groundtruths[key] = np.stack(val, axis=0)
groundtruths['source_id'] = image['id']
groundtruths['height'] = image['height']
groundtruths['width'] = image['width']
groundtruths['num_detections'] = len(anns)
for k, v in six.iteritems(groundtruths):
groundtruths[k] = np.expand_dims(v, axis=0)
return groundtruths
def get_predictions(image_id, coco, include_mask=False):
anns = coco.loadAnns(coco.getAnnIds([image_id]))
if not anns:
return None
image = coco.loadImgs([image_id])[0]
predictions = {
'detection_boxes': [],
'detection_classes': [],
'detection_scores': [],
}
if include_mask:
predictions['detection_masks'] = []
for ann in anns:
# Creates detections from groundtruths.
# Converts [x, y, w, h] to [y1, x1, y2, x2] box format and
# does the denormalization.
box = [ann['bbox'][1],
ann['bbox'][0],
(ann['bbox'][1] + ann['bbox'][3]),
(ann['bbox'][0] + ann['bbox'][2])]
predictions['detection_boxes'].append(box)
predictions['detection_classes'].append(ann['category_id'])
predictions['detection_scores'].append(1)
if include_mask:
mask = coco.annToMask(ann)
predictions['detection_masks'].append(mask)
for key, val in predictions.items():
predictions[key] = np.expand_dims(np.stack(val, axis=0), axis=0)
predictions['source_id'] = np.array([image['id']])
predictions['num_detections'] = np.array([len(anns)])
predictions['image_info'] = np.array(
[[[image['height'], image['width']],
[image['height'], image['width']],
[1, 1],
[0, 0]]], dtype=np.float32)
return predictions
def get_fake_predictions(image_id, coco, include_mask=False):
anns = coco.loadAnns(coco.getAnnIds([image_id]))
if not anns:
return None
label_id_max = max([ann['category_id'] for ann in anns])
image = coco.loadImgs([image_id])[0]
num_detections = 100
xmin = np.random.randint(
low=0, high=int(image['width'] / 2), size=(1, num_detections))
xmax = np.random.randint(
low=int(image['width'] / 2), high=image['width'],
size=(1, num_detections))
ymin = np.random.randint(
low=0, high=int(image['height'] / 2), size=(1, num_detections))
ymax = np.random.randint(
low=int(image['height'] / 2), high=image['height'],
size=(1, num_detections))
predictions = {
'detection_boxes': np.stack([ymin, xmin, ymax, xmax], axis=-1),
'detection_classes': np.random.randint(
low=0, high=(label_id_max + 1), size=(1, num_detections)),
'detection_scores': np.random.random(size=(1, num_detections)),
}
if include_mask:
predictions['detection_masks'] = np.random.randint(
1, size=(1, num_detections, image['height'], image['width']))
predictions['source_id'] = np.array([image['id']])
predictions['num_detections'] = np.array([num_detections])
predictions['image_info'] = np.array(
[[[image['height'], image['width']],
[image['height'], image['width']],
[1, 1],
[0, 0]]], dtype=np.float32)
return predictions
class DummyGroundtruthGenerator(object):
def __init__(self, include_mask, image_id, coco):
self._include_mask = include_mask
self._image_id = image_id
self._coco = coco
def __call__(self):
yield get_groundtruth_annotations(
self._image_id, self._coco, self._include_mask)
class COCOEvaluatorTest(parameterized.TestCase, absltest.TestCase):
def setUp(self):
super(COCOEvaluatorTest, self).setUp()
temp = self.create_tempdir()
self._saved_coco_json_file = os.path.join(temp.full_path,
_SAVED_COCO_JSON_FILE)
def tearDown(self):
super(COCOEvaluatorTest, self).tearDown()
@parameterized.parameters(
(False, False), (False, True), (True, False), (True, True))
def testEval(self, include_mask, use_fake_predictions):
coco = COCO(annotation_file=_COCO_JSON_FILE)
index = np.random.randint(len(coco.dataset['images']))
image_id = coco.dataset['images'][index]['id']
# image_id = 26564
# image_id = 324158
if use_fake_predictions:
predictions = get_fake_predictions(
image_id, coco, include_mask=include_mask)
else:
predictions = get_predictions(image_id, coco, include_mask=include_mask)
if not predictions:
logging.info('Empty predictions for index=%d', index)
return
predictions = tf.nest.map_structure(
lambda x: tf.convert_to_tensor(x) if x is not None else None,
predictions)
evaluator_w_json = coco_evaluator.COCOEvaluator(
annotation_file=_COCO_JSON_FILE, include_mask=include_mask)
evaluator_w_json.update_state(groundtruths=None, predictions=predictions)
results_w_json = evaluator_w_json.result()
dummy_generator = DummyGroundtruthGenerator(
include_mask=include_mask, image_id=image_id, coco=coco)
coco_utils.generate_annotation_file(dummy_generator,
self._saved_coco_json_file)
evaluator_no_json = coco_evaluator.COCOEvaluator(
annotation_file=self._saved_coco_json_file, include_mask=include_mask)
evaluator_no_json.update_state(groundtruths=None, predictions=predictions)
results_no_json = evaluator_no_json.result()
for k, v in results_w_json.items():
self.assertEqual(v, results_no_json[k])
@parameterized.parameters(
(False, False), (False, True), (True, False), (True, True))
def testEvalOnTheFly(self, include_mask, use_fake_predictions):
coco = COCO(annotation_file=_COCO_JSON_FILE)
index = np.random.randint(len(coco.dataset['images']))
image_id = coco.dataset['images'][index]['id']
# image_id = 26564
# image_id = 324158
if use_fake_predictions:
predictions = get_fake_predictions(
image_id, coco, include_mask=include_mask)
else:
predictions = get_predictions(image_id, coco, include_mask=include_mask)
if not predictions:
logging.info('Empty predictions for index=%d', index)
return
predictions = tf.nest.map_structure(
lambda x: tf.convert_to_tensor(x) if x is not None else None,
predictions)
evaluator_w_json = coco_evaluator.COCOEvaluator(
annotation_file=_COCO_JSON_FILE, include_mask=include_mask)
evaluator_w_json.update_state(groundtruths=None, predictions=predictions)
results_w_json = evaluator_w_json.result()
groundtruths = get_groundtruth_annotations(image_id, coco, include_mask)
groundtruths = tf.nest.map_structure(
lambda x: tf.convert_to_tensor(x) if x is not None else None,
groundtruths)
evaluator_no_json = coco_evaluator.COCOEvaluator(
annotation_file=None, include_mask=include_mask)
evaluator_no_json.update_state(groundtruths, predictions)
results_no_json = evaluator_no_json.result()
for k, v in results_w_json.items():
self.assertEqual(v, results_no_json[k])
if __name__ == '__main__':
absltest.main()
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for box_matcher.py."""
# Import libraries
import numpy as np
import tensorflow as tf
from official.vision.beta.modeling.layers import box_matcher
class BoxMatcherTest(tf.test.TestCase):
def test_box_matcher(self):
boxes_np = np.array(
[[
[0, 0, 1, 1],
[5, 0, 10, 5],
]])
boxes = tf.constant(boxes_np, dtype=tf.float32)
gt_boxes_np = np.array(
[[
[0, 0, 5, 5],
[0, 5, 5, 10],
[5, 0, 10, 5],
[5, 5, 10, 10],
]])
gt_boxes = tf.constant(gt_boxes_np, dtype=tf.float32)
gt_classes_np = np.array([[2, 10, 3, -1]])
gt_classes = tf.constant(gt_classes_np, dtype=tf.int32)
fg_threshold = 0.5
bg_thresh_hi = 0.2
bg_thresh_lo = 0.0
matcher = box_matcher.BoxMatcher(fg_threshold, bg_thresh_hi, bg_thresh_lo)
# Runs on TPU.
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
(matched_gt_boxes_tpu, matched_gt_classes_tpu, matched_gt_indices_tpu,
positive_matches_tpu, negative_matches_tpu, ignored_matches_tpu) = (
matcher(boxes, gt_boxes, gt_classes))
# Runs on CPU.
(matched_gt_boxes_cpu, matched_gt_classes_cpu, matched_gt_indices_cpu,
positive_matches_cpu, negative_matches_cpu, ignored_matches_cpu) = (
matcher(boxes, gt_boxes, gt_classes))
# correctness
self.assertNDArrayNear(
matched_gt_boxes_cpu.numpy(),
[[[0, 0, 0, 0], [5, 0, 10, 5]]], 1e-4)
self.assertAllEqual(
matched_gt_classes_cpu.numpy(), [[0, 3]])
self.assertAllEqual(
matched_gt_indices_cpu.numpy(), [[-1, 2]])
self.assertAllEqual(
positive_matches_cpu.numpy(), [[False, True]])
self.assertAllEqual(
negative_matches_cpu.numpy(), [[True, False]])
self.assertAllEqual(
ignored_matches_cpu.numpy(), [[False, False]])
# consistency.
self.assertNDArrayNear(
matched_gt_boxes_cpu.numpy(), matched_gt_boxes_tpu.numpy(), 1e-4)
self.assertAllEqual(
matched_gt_classes_cpu.numpy(), matched_gt_classes_tpu.numpy())
self.assertAllEqual(
matched_gt_indices_cpu.numpy(), matched_gt_indices_tpu.numpy())
self.assertAllEqual(
positive_matches_cpu.numpy(), positive_matches_tpu.numpy())
self.assertAllEqual(
negative_matches_cpu.numpy(), negative_matches_tpu.numpy())
self.assertAllEqual(
ignored_matches_cpu.numpy(), ignored_matches_tpu.numpy())
def test_serialize_deserialize(self):
kwargs = dict(
foreground_iou_threshold=0.5,
background_iou_high_threshold=0.5,
background_iou_low_threshold=0.5,
)
matcher = box_matcher.BoxMatcher(**kwargs)
expected_config = dict(kwargs)
self.assertEqual(matcher.get_config(), expected_config)
new_matcher = box_matcher.BoxMatcher.from_config(matcher.get_config())
self.assertAllEqual(matcher.get_config(), new_matcher.get_config())
if __name__ == '__main__':
tf.test.main()
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for roi_sampler.py."""
# Import libraries
import numpy as np
import tensorflow as tf
from official.vision.beta.modeling.layers import box_sampler
class BoxSamplerTest(tf.test.TestCase):
def test_box_sampler(self):
positive_matches = np.array(
[[True, False, False, False, True, True, False],
[False, False, False, False, False, True, True]])
negative_matches = np.array(
[[False, True, True, True, False, False, False],
[True, True, True, True, False, False, False]])
ignored_matches = np.array(
[[False, False, False, False, False, False, True],
[False, False, False, False, True, False, False]])
sampler = box_sampler.BoxSampler(num_samples=2, foreground_fraction=0.5)
# Runs on TPU.
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
selected_indices_tpu = sampler(
positive_matches, negative_matches, ignored_matches)
self.assertEqual(2, tf.shape(selected_indices_tpu)[1])
# Runs on CPU.
selected_indices_cpu = sampler(
positive_matches, negative_matches, ignored_matches)
self.assertEqual(2, tf.shape(selected_indices_cpu)[1])
def test_serialize_deserialize(self):
kwargs = dict(
num_samples=512,
foreground_fraction=0.25,
)
sampler = box_sampler.BoxSampler(**kwargs)
expected_config = dict(kwargs)
self.assertEqual(sampler.get_config(), expected_config)
new_sampler = box_sampler.BoxSampler.from_config(
sampler.get_config())
self.assertAllEqual(sampler.get_config(), new_sampler.get_config())
if __name__ == '__main__':
tf.test.main()
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for mask_sampler.py."""
# Import libraries
import numpy as np
import tensorflow as tf
from official.vision.beta.modeling.layers import mask_sampler
class SampleAndCropForegroundMasksTest(tf.test.TestCase):
def test_sample_and_crop_foreground_masks(self):
candidate_rois_np = np.array(
[[[0, 0, 0.5, 0.5], [0.5, 0.5, 1, 1],
[2, 2, 4, 4], [1, 1, 5, 5]]])
candidate_rois = tf.constant(candidate_rois_np, dtype=tf.float32)
candidate_gt_boxes_np = np.array(
[[[0, 0, 0.6, 0.6], [0, 0, 0, 0],
[1, 1, 3, 3], [1, 1, 3, 3]]])
candidate_gt_boxes = tf.constant(candidate_gt_boxes_np, dtype=tf.float32)
candidate_gt_classes_np = np.array([[4, 0, 0, 2]])
candidate_gt_classes = tf.constant(
candidate_gt_classes_np, dtype=tf.float32)
candidate_gt_indices_np = np.array([[10, -1, -1, 20]])
candidate_gt_indices = tf.constant(
candidate_gt_indices_np, dtype=tf.int32)
gt_masks_np = np.random.rand(1, 100, 32, 32)
gt_masks = tf.constant(gt_masks_np, dtype=tf.float32)
num_mask_samples_per_image = 2
mask_target_size = 28
# Runs on TPU.
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
foreground_rois, foreground_classes, cropped_foreground_masks = (
mask_sampler._sample_and_crop_foreground_masks(
candidate_rois, candidate_gt_boxes, candidate_gt_classes,
candidate_gt_indices, gt_masks, num_mask_samples_per_image,
mask_target_size))
foreground_rois_tpu = foreground_rois.numpy()
foreground_classes_tpu = foreground_classes.numpy()
cropped_foreground_masks_tpu = cropped_foreground_masks.numpy()
foreground_rois, foreground_classes, cropped_foreground_masks = (
mask_sampler._sample_and_crop_foreground_masks(
candidate_rois, candidate_gt_boxes, candidate_gt_classes,
candidate_gt_indices, gt_masks, num_mask_samples_per_image,
mask_target_size))
foreground_rois_cpu = foreground_rois.numpy()
foreground_classes_cpu = foreground_classes.numpy()
cropped_foreground_masks_cpu = cropped_foreground_masks.numpy()
# consistency.
self.assertAllEqual(foreground_rois_tpu.shape, foreground_rois_cpu.shape)
self.assertAllEqual(
foreground_classes_tpu.shape, foreground_classes_cpu.shape)
self.assertAllEqual(
cropped_foreground_masks_tpu.shape, cropped_foreground_masks_cpu.shape)
# correctnesss.
self.assertAllEqual(foreground_rois_tpu.shape, [1, 2, 4])
self.assertAllEqual(foreground_classes_tpu.shape, [1, 2])
self.assertAllEqual(cropped_foreground_masks_tpu.shape, [1, 2, 28, 28])
class MaskSamplerTest(tf.test.TestCase):
def test_mask_sampler(self):
candidate_rois_np = np.array(
[[[0, 0, 0.5, 0.5], [0.5, 0.5, 1, 1],
[2, 2, 4, 4], [1, 1, 5, 5]]])
candidate_rois = tf.constant(candidate_rois_np, dtype=tf.float32)
candidate_gt_boxes_np = np.array(
[[[0, 0, 0.6, 0.6], [0, 0, 0, 0],
[1, 1, 3, 3], [1, 1, 3, 3]]])
candidate_gt_boxes = tf.constant(candidate_gt_boxes_np, dtype=tf.float32)
candidate_gt_classes_np = np.array([[4, 0, 0, 2]])
candidate_gt_classes = tf.constant(
candidate_gt_classes_np, dtype=tf.float32)
candidate_gt_indices_np = np.array([[10, -1, -1, 20]])
candidate_gt_indices = tf.constant(
candidate_gt_indices_np, dtype=tf.int32)
gt_masks_np = np.random.rand(1, 100, 32, 32)
gt_masks = tf.constant(gt_masks_np, dtype=tf.float32)
sampler = mask_sampler.MaskSampler(28, 2)
foreground_rois, foreground_classes, cropped_foreground_masks = sampler(
candidate_rois, candidate_gt_boxes, candidate_gt_classes,
candidate_gt_indices, gt_masks)
# correctnesss.
self.assertAllEqual(foreground_rois.numpy().shape, [1, 2, 4])
self.assertAllEqual(foreground_classes.numpy().shape, [1, 2])
self.assertAllEqual(cropped_foreground_masks.numpy().shape, [1, 2, 28, 28])
def test_serialize_deserialize(self):
kwargs = dict(
mask_target_size=7,
num_sampled_masks=10,
)
sampler = mask_sampler.MaskSampler(**kwargs)
expected_config = dict(kwargs)
self.assertEqual(sampler.get_config(), expected_config)
new_sampler = mask_sampler.MaskSampler.from_config(
sampler.get_config())
self.assertAllEqual(sampler.get_config(), new_sampler.get_config())
if __name__ == '__main__':
tf.test.main()
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for roi_generator.py."""
# Import libraries
import numpy as np
import tensorflow as tf
from official.vision.beta.modeling.layers import roi_generator
class MultilevelProposeRoisTest(tf.test.TestCase):
def test_multilevel_propose_rois_single_level(self):
rpn_boxes_np = np.array(
[[[[0, 0, 10, 10], [0.01, 0.01, 9.9, 9.9]],
[[5, 5, 10, 10], [2, 2, 8, 8]]],
[[[2, 2, 4, 4], [3, 3, 6, 6]],
[[3.1, 3.1, 6.1, 6.1], [1, 1, 8, 8]]]])
rpn_boxes = {
'2': tf.constant(rpn_boxes_np, dtype=tf.float32)
}
rpn_scores_np = np.array(
[[[[0.6], [0.9]], [[0.2], [0.3]]], [[[0.1], [0.8]], [[0.3], [0.5]]]])
rpn_scores = {
'2': tf.constant(rpn_scores_np, dtype=tf.float32)
}
anchor_boxes_np = np.array(
[[[[0, 0, 10, 10], [0.01, 0.01, 9.9, 9.9]],
[[5, 5, 10, 10], [2, 2, 8, 8]]],
[[[2, 2, 4, 4], [3, 3, 6, 6]],
[[3.1, 3.1, 6.1, 6.1], [1, 1, 8, 8]]]])
anchor_boxes = {
'2': tf.constant(anchor_boxes_np, dtype=tf.float32)
}
image_shape = tf.constant([[20, 20], [20, 20]], dtype=tf.int32)
selected_rois_np = np.array(
[[[0.01, 0.01, 9.9, 9.9], [2, 2, 8, 8], [5, 5, 10, 10], [0, 0, 0, 0]],
[[3, 3, 6, 6], [1, 1, 8, 8], [2, 2, 4, 4], [0, 0, 0, 0]]])
selected_roi_scores_np = np.array(
[[0.9, 0.3, 0.2, 0], [0.8, 0.5, 0.1, 0]])
# Runs on TPU.
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
selected_rois_tpu, selected_roi_scores_tpu = (
roi_generator._multilevel_propose_rois(
rpn_boxes,
rpn_scores,
anchor_boxes=anchor_boxes,
image_shape=image_shape,
pre_nms_top_k=4,
pre_nms_score_threshold=0.0,
pre_nms_min_size_threshold=0.0,
nms_iou_threshold=0.5,
num_proposals=4,
use_batched_nms=False,
decode_boxes=False,
clip_boxes=False,
apply_sigmoid_to_score=False))
# Runs on CPU.
selected_rois_cpu, selected_roi_scores_cpu = (
roi_generator._multilevel_propose_rois(
rpn_boxes,
rpn_scores,
anchor_boxes=anchor_boxes,
image_shape=image_shape,
pre_nms_top_k=4,
pre_nms_score_threshold=0.0,
pre_nms_min_size_threshold=0.0,
nms_iou_threshold=0.5,
num_proposals=4,
use_batched_nms=False,
decode_boxes=False,
clip_boxes=False,
apply_sigmoid_to_score=False))
self.assertNDArrayNear(
selected_rois_tpu.numpy(), selected_rois_cpu.numpy(), 1e-5)
self.assertNDArrayNear(
selected_roi_scores_tpu.numpy(), selected_roi_scores_cpu.numpy(), 1e-5)
self.assertNDArrayNear(
selected_rois_tpu.numpy(), selected_rois_np, 1e-5)
self.assertNDArrayNear(
selected_roi_scores_tpu.numpy(), selected_roi_scores_np, 1e-5)
def test_multilevel_propose_rois_two_levels(self):
rpn_boxes_1_np = np.array(
[[[[0, 0, 10, 10], [0.01, 0.01, 9.99, 9.99]],
[[5, 5, 10, 10], [2, 2, 8, 8]]],
[[[2, 2, 2.5, 2.5], [3, 3, 6, 6]],
[[3.1, 3.1, 6.1, 6.1], [1, 1, 8, 8]]]])
rpn_boxes_2_np = np.array(
[[[[0, 0, 10.01, 10.01]]], [[[2, 2, 4.5, 4.5]]]])
rpn_boxes = {
'2': tf.constant(rpn_boxes_1_np, dtype=tf.float32),
'3': tf.constant(rpn_boxes_2_np, dtype=tf.float32),
}
rpn_scores_1_np = np.array(
[[[[0.6], [0.9]], [[0.2], [0.3]]], [[[0.1], [0.8]], [[0.3], [0.5]]]])
rpn_scores_2_np = np.array([[[[0.95]]], [[[0.99]]]])
rpn_scores = {
'2': tf.constant(rpn_scores_1_np, dtype=tf.float32),
'3': tf.constant(rpn_scores_2_np, dtype=tf.float32),
}
anchor_boxes_1_np = np.array(
[[[[0, 0, 10, 10], [0.01, 0.01, 9.99, 9.99]],
[[5, 5, 10, 10], [2, 2, 8, 8]]],
[[[2, 2, 2.5, 2.5], [3, 3, 6, 6]],
[[3.1, 3.1, 6.1, 6.1], [1, 1, 8, 8]]]])
anchor_boxes_2_np = np.array(
[[[[0, 0, 10.01, 10.01]]], [[[2, 2, 4.5, 4.5]]]])
anchor_boxes = {
'2': tf.constant(anchor_boxes_1_np, dtype=tf.float32),
'3': tf.constant(anchor_boxes_2_np, dtype=tf.float32),
}
image_shape = tf.constant([[20, 20], [20, 20]], dtype=tf.int32)
selected_rois_np = np.array(
[[[0, 0, 10.01, 10.01], [0.01, 0.01, 9.99, 9.99]],
[[2, 2, 4.5, 4.5], [3, 3, 6, 6]]])
selected_roi_scores_np = np.array([[0.95, 0.9], [0.99, 0.8]])
# Runs on TPU.
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
selected_rois_tpu, selected_roi_scores_tpu = (
roi_generator._multilevel_propose_rois(
rpn_boxes,
rpn_scores,
anchor_boxes=anchor_boxes,
image_shape=image_shape,
pre_nms_top_k=4,
pre_nms_score_threshold=0.0,
pre_nms_min_size_threshold=0.0,
nms_iou_threshold=0.5,
num_proposals=2,
use_batched_nms=False,
decode_boxes=False,
clip_boxes=False,
apply_sigmoid_to_score=False))
# Runs on CPU.
selected_rois_cpu, selected_roi_scores_cpu = (
roi_generator._multilevel_propose_rois(
rpn_boxes,
rpn_scores,
anchor_boxes=anchor_boxes,
image_shape=image_shape,
pre_nms_top_k=4,
pre_nms_score_threshold=0.0,
pre_nms_min_size_threshold=0.0,
nms_iou_threshold=0.5,
num_proposals=2,
use_batched_nms=False,
decode_boxes=False,
clip_boxes=False,
apply_sigmoid_to_score=False))
self.assertNDArrayNear(
selected_rois_tpu.numpy(), selected_rois_cpu.numpy(), 1e-5)
self.assertNDArrayNear(
selected_roi_scores_tpu.numpy(), selected_roi_scores_cpu.numpy(), 1e-5)
self.assertNDArrayNear(
selected_rois_tpu.numpy(), selected_rois_np, 1e-5)
self.assertNDArrayNear(
selected_roi_scores_tpu.numpy(), selected_roi_scores_np, 1e-5)
class MultilevelROIGeneratorTest(tf.test.TestCase):
def test_serialize_deserialize(self):
kwargs = dict(
pre_nms_top_k=2000,
pre_nms_score_threshold=0.0,
pre_nms_min_size_threshold=0.0,
nms_iou_threshold=0.7,
num_proposals=1000,
test_pre_nms_top_k=1000,
test_pre_nms_score_threshold=0.0,
test_pre_nms_min_size_threshold=0.0,
test_nms_iou_threshold=0.7,
test_num_proposals=1000,
use_batched_nms=False,
)
generator = roi_generator.MultilevelROIGenerator(**kwargs)
expected_config = dict(kwargs)
self.assertEqual(generator.get_config(), expected_config)
new_generator = roi_generator.MultilevelROIGenerator.from_config(
generator.get_config())
self.assertAllEqual(generator.get_config(), new_generator.get_config())
if __name__ == '__main__':
tf.test.main()
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for roi_sampler.py."""
# Import libraries
import numpy as np
import tensorflow as tf
from official.vision.beta.modeling.layers import roi_sampler
class ROISamplerTest(tf.test.TestCase):
def test_roi_sampler(self):
boxes_np = np.array(
[[[0, 0, 5, 5], [2.5, 2.5, 7.5, 7.5],
[5, 5, 10, 10], [7.5, 7.5, 12.5, 12.5]]])
boxes = tf.constant(boxes_np, dtype=tf.float32)
gt_boxes_np = np.array(
[[[10, 10, 15, 15], [2.5, 2.5, 7.5, 7.5],
[-1, -1, -1, -1]]])
gt_boxes = tf.constant(gt_boxes_np, dtype=tf.float32)
gt_classes_np = np.array([[2, 10, -1]])
gt_classes = tf.constant(gt_classes_np, dtype=tf.int32)
generator = roi_sampler.ROISampler(
mix_gt_boxes=True,
num_sampled_rois=2,
foreground_fraction=0.5,
foreground_iou_threshold=0.5,
background_iou_high_threshold=0.5,
background_iou_low_threshold=0.0)
# Runs on TPU.
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
_ = generator(boxes, gt_boxes, gt_classes)
# Runs on CPU.
_ = generator(boxes, gt_boxes, gt_classes)
def test_serialize_deserialize(self):
kwargs = dict(
mix_gt_boxes=True,
num_sampled_rois=512,
foreground_fraction=0.25,
foreground_iou_threshold=0.5,
background_iou_high_threshold=0.5,
background_iou_low_threshold=0.5,
)
generator = roi_sampler.ROISampler(**kwargs)
expected_config = dict(kwargs)
self.assertEqual(generator.get_config(), expected_config)
new_generator = roi_sampler.ROISampler.from_config(
generator.get_config())
self.assertAllEqual(generator.get_config(), new_generator.get_config())
if __name__ == '__main__':
tf.test.main()
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for box_ops.py."""
# Import libraries
import numpy as np
import tensorflow as tf
from official.vision.beta.ops import box_ops
def _transform_boxes_on_tpu_and_cpu(transform_fn, boxes, *args):
# Runs on TPU.
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
transformed_op_tpu = transform_fn(boxes, *args)
transfomred_boxes_tpu = tf.nest.map_structure(lambda x: x.numpy(),
transformed_op_tpu)
# Runs on CPU.
transfomred_op_cpu = transform_fn(boxes, *args)
transfomred_boxes_cpu = tf.nest.map_structure(lambda x: x.numpy(),
transfomred_op_cpu)
return transfomred_boxes_tpu, transfomred_boxes_cpu
class ConvertBoxesTest(tf.test.TestCase):
def testConvertBoxes(self):
# y1, x1, y2, x2.
boxes = np.array([[0, 0, 1, 2], [0.2, 0.1, 1.2, 1.1]])
# x1, y1, width, height
target = np.array([[0, 0, 2, 1], [0.1, 0.2, 1, 1]])
outboxes = box_ops.yxyx_to_xywh(boxes)
self.assertNDArrayNear(outboxes, target, 1e-7)
class JitterBoxesTest(tf.test.TestCase):
def testJitterBoxes(self):
boxes_data = [[0, 0, 1, 1], [0, 0.1, 1, 1.1], [0, 0.3, 1, 1.3],
[0, 0.5, 1, 1.5], [0, 0.7, 1, 1.7], [0, 1.9, 1, 1.9]]
boxes_np = np.array(boxes_data, dtype=np.float32)
max_size = max(
np.amax(boxes_np[:, 3] - boxes_np[:, 1]),
np.amax(boxes_np[:, 2] - boxes_np[:, 0]))
noise_scale = 0.025
boxes = tf.constant(boxes_np)
def jitter_fn(input_boxes, arg_noise_scale):
return box_ops.jitter_boxes(input_boxes, arg_noise_scale)
jittered_boxes_tpu, jittered_boxes_cpu = _transform_boxes_on_tpu_and_cpu(
jitter_fn, boxes, noise_scale)
# Test that the jittered box is within 10 stds from the inputs.
self.assertNDArrayNear(jittered_boxes_tpu, boxes_np,
noise_scale * max_size * 10)
self.assertNDArrayNear(jittered_boxes_cpu, boxes_np,
noise_scale * max_size * 10)
class NormalizeBoxesTest(tf.test.TestCase):
def testNormalizeBoxes1DWithImageShapeAsList(self):
boxes = tf.constant([10, 30, 40, 90], tf.float32)
image_shape = [50, 100]
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.normalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu, [0.2, 0.3, 0.8, 0.9], 1e-5)
def testNormalizeBoxes1DWithImageShapeAsTensor(self):
boxes = tf.constant([10, 30, 40, 90], tf.float32)
image_shape = tf.constant([50, 100], tf.int32)
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.normalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu, [0.2, 0.3, 0.8, 0.9], 1e-5)
def testNormalizeBoxes2DWithImageShapeAsList(self):
boxes = tf.constant([[10, 30, 40, 90], [30, 10, 40, 50]], tf.float32)
image_shape = [50, 100]
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.normalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu,
[[0.2, 0.3, 0.8, 0.9], [0.6, 0.1, 0.8, 0.5]], 1e-5)
def testNormalizeBoxes2DWithImageShapeAsVector(self):
boxes = tf.constant([[10, 30, 40, 90], [30, 10, 40, 50]], tf.float32)
image_shape = tf.constant([50, 100], dtype=tf.int32)
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.normalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu,
[[0.2, 0.3, 0.8, 0.9], [0.6, 0.1, 0.8, 0.5]], 1e-5)
def testNormalizeBoxes2DWithImageShapeAsBroadcastableTensor(self):
boxes = tf.constant([[10, 30, 40, 90], [30, 10, 40, 50]], tf.float32)
image_shape = tf.constant([[50, 100]], tf.int32)
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.normalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu,
[[0.2, 0.3, 0.8, 0.9], [0.6, 0.1, 0.8, 0.5]], 1e-5)
def testNormalizeBoxes2DWithImageShapeAsSameShapeTensor(self):
boxes = tf.constant([[10, 30, 40, 90], [30, 10, 40, 50]], tf.float32)
image_shape = tf.constant([[50, 100], [50, 100]], tf.int32)
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.normalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu,
[[0.2, 0.3, 0.8, 0.9], [0.6, 0.1, 0.8, 0.5]], 1e-5)
def testNormalizeBoxes3DWithImageShapeAsList(self):
boxes = tf.constant([[[10, 30, 40, 90], [30, 10, 40, 50]],
[[20, 40, 50, 80], [30, 50, 40, 90]]], tf.float32)
image_shape = [50, 100]
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.normalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu,
[[[0.2, 0.3, 0.8, 0.9], [0.6, 0.1, 0.8, 0.5]],
[[0.4, 0.4, 1.0, 0.8], [0.6, 0.5, 0.8, 0.9]]], 1e-5)
def testNormalizeBoxes3DWithImageShapeAsVector(self):
boxes = tf.constant([[[10, 30, 40, 90], [30, 10, 40, 50]],
[[20, 40, 50, 80], [30, 50, 40, 90]]], tf.float32)
image_shape = tf.constant([50, 100], tf.int32)
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.normalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu,
[[[0.2, 0.3, 0.8, 0.9], [0.6, 0.1, 0.8, 0.5]],
[[0.4, 0.4, 1.0, 0.8], [0.6, 0.5, 0.8, 0.9]]], 1e-5)
def testNormalizeBoxes3DWithImageShapeAsBroadcastableTensor(self):
boxes = tf.constant([[[10, 30, 40, 90], [30, 10, 40, 50]],
[[20, 40, 50, 80], [30, 50, 40, 90]]], tf.float32)
image_shape = tf.constant([[[50, 100]], [[500, 1000]]], tf.int32)
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.normalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(
normalized_boxes_tpu,
[[[0.2, 0.3, 0.8, 0.9], [0.6, 0.1, 0.8, 0.5]],
[[0.04, 0.04, 0.1, 0.08], [0.06, 0.05, 0.08, 0.09]]], 1e-5)
def testNormalizeBoxes3DWithImageShapeAsSameShapeTensor(self):
boxes = tf.constant([[[10, 30, 40, 90], [30, 10, 40, 50]],
[[20, 40, 50, 80], [30, 50, 40, 90]]], tf.float32)
image_shape = tf.constant(
[[[50, 100], [50, 100]], [[500, 1000], [500, 1000]]], tf.int32)
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.normalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(
normalized_boxes_tpu,
[[[0.2, 0.3, 0.8, 0.9], [0.6, 0.1, 0.8, 0.5]],
[[0.04, 0.04, 0.1, 0.08], [0.06, 0.05, 0.08, 0.09]]], 1e-5)
class DenormalizeBoxesTest(tf.test.TestCase):
def testDenormalizeBoxes1DWithImageShapeAsList(self):
boxes = tf.constant([0.2, 0.3, 0.8, 0.9], tf.float32)
image_shape = [50, 100]
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.denormalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu, [10, 30, 40, 90], 1e-5)
def testDenormalizeBoxes1DWithImageShapeAsTensor(self):
boxes = tf.constant([0.2, 0.3, 0.8, 0.9], tf.float32)
image_shape = tf.constant([50, 100], tf.int32)
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.denormalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu, [10, 30, 40, 90], 1e-5)
def testDenormalizeBoxes2DWithImageShapeAsList(self):
boxes = tf.constant([[0.2, 0.3, 0.8, 0.9], [0.6, 0.1, 0.8, 0.5]],
tf.float32)
image_shape = [50, 100]
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.denormalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu,
[[10, 30, 40, 90], [30, 10, 40, 50]], 1e-5)
def testDenormalizeBoxes2DWithImageShapeAsVector(self):
boxes = tf.constant([[0.2, 0.3, 0.8, 0.9], [0.6, 0.1, 0.8, 0.5]],
tf.float32)
image_shape = tf.constant([50, 100], dtype=tf.int32)
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.denormalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu,
[[10, 30, 40, 90], [30, 10, 40, 50]], 1e-5)
def testDenormalizeBoxes2DWithImageShapeAsBroadcastableTensor(self):
boxes = tf.constant([[0.2, 0.3, 0.8, 0.9], [0.6, 0.1, 0.8, 0.5]],
tf.float32)
image_shape = tf.constant([[50, 100]], tf.int32)
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.denormalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu,
[[10, 30, 40, 90], [30, 10, 40, 50]], 1e-5)
def testDenormalizeBoxes2DWithImageShapeAsSameShapeTensor(self):
boxes = tf.constant([[0.2, 0.3, 0.8, 0.9], [0.6, 0.1, 0.8, 0.5]],
tf.float32)
image_shape = tf.constant([[50, 100], [50, 100]], tf.int32)
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.denormalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu,
[[10, 30, 40, 90], [30, 10, 40, 50]], 1e-5)
def testDenormalizeBoxes3DWithImageShapeAsList(self):
boxes = tf.constant([[[0.2, 0.3, 0.8, 0.9], [0.6, 0.1, 0.8, 0.5]],
[[0.4, 0.4, 1.0, 0.8], [0.6, 0.5, 0.8, 0.9]]],
tf.float32)
image_shape = [50, 100]
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.denormalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu,
[[[10, 30, 40, 90], [30, 10, 40, 50]],
[[20, 40, 50, 80], [30, 50, 40, 90]]], 1e-5)
def testDenormalizeBoxes3DWithImageShapeAsVector(self):
boxes = tf.constant([[[0.2, 0.3, 0.8, 0.9], [0.6, 0.1, 0.8, 0.5]],
[[0.4, 0.4, 1.0, 0.8], [0.6, 0.5, 0.8, 0.9]]],
tf.float32)
image_shape = tf.constant([50, 100], tf.int32)
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.denormalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu,
[[[10, 30, 40, 90], [30, 10, 40, 50]],
[[20, 40, 50, 80], [30, 50, 40, 90]]], 1e-5)
def testDenormalizeBoxes3DWithImageShapeAsBroadcastableTensor(self):
boxes = tf.constant([[[0.2, 0.3, 0.8, 0.9], [0.6, 0.1, 0.8, 0.5]],
[[0.04, 0.04, 0.1, 0.08], [0.06, 0.05, 0.08, 0.09]]],
tf.float32)
image_shape = tf.constant([[[50, 100]], [[500, 1000]]], tf.int32)
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.denormalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu,
[[[10, 30, 40, 90], [30, 10, 40, 50]],
[[20, 40, 50, 80], [30, 50, 40, 90]]], 1e-5)
def testDenormalizeBoxes3DWithImageShapeAsSameShapeTensor(self):
boxes = tf.constant([[[0.2, 0.3, 0.8, 0.9], [0.6, 0.1, 0.8, 0.5]],
[[0.04, 0.04, 0.1, 0.08], [0.06, 0.05, 0.08, 0.09]]],
tf.float32)
image_shape = tf.constant(
[[[50, 100], [50, 100]], [[500, 1000], [500, 1000]]], tf.int32)
normalized_boxes_tpu, normalized_boxes_cpu = (
_transform_boxes_on_tpu_and_cpu(
box_ops.denormalize_boxes, boxes, image_shape))
self.assertNDArrayNear(normalized_boxes_tpu, normalized_boxes_cpu, 1e-5)
self.assertNDArrayNear(normalized_boxes_tpu,
[[[10, 30, 40, 90], [30, 10, 40, 50]],
[[20, 40, 50, 80], [30, 50, 40, 90]]], 1e-5)
class ClipBoxesTest(tf.test.TestCase):
def testClipBoxesImageShapeAsList(self):
boxes_data = [[0, 0, 1, 1], [0, 0.1, 1, 1.1], [0, 0.3, 1, 1.3],
[0, 0.5, 1, 1.5], [0, 0.7, 1, 1.7], [0, 1.9, 1, 1.9]]
image_shape = [3, 3]
boxes = tf.constant(boxes_data)
clipped_boxes_tpu, clipped_boxes_cpu = _transform_boxes_on_tpu_and_cpu(
box_ops.clip_boxes, boxes, image_shape)
self.assertAllClose(clipped_boxes_tpu, clipped_boxes_cpu)
self.assertAllClose(clipped_boxes_tpu,
[[0, 0, 1, 1], [0, 0.1, 1, 1.1], [0, 0.3, 1, 1.3],
[0, 0.5, 1, 1.5], [0, 0.7, 1, 1.7], [0, 1.9, 1, 1.9]])
def testClipBoxesImageShapeAsVector(self):
boxes_data = [[0, 0, 1, 1], [0, 0.1, 1, 1.1], [0, 0.3, 1, 1.3],
[0, 0.5, 1, 1.5], [0, 0.7, 1, 1.7], [0, 1.9, 1, 1.9]]
boxes = tf.constant(boxes_data)
image_shape = np.array([3, 3], dtype=np.float32)
clipped_boxes_tpu, clipped_boxes_cpu = _transform_boxes_on_tpu_and_cpu(
box_ops.clip_boxes, boxes, image_shape)
self.assertAllClose(clipped_boxes_tpu, clipped_boxes_cpu)
self.assertAllClose(clipped_boxes_tpu,
[[0, 0, 1, 1], [0, 0.1, 1, 1.1], [0, 0.3, 1, 1.3],
[0, 0.5, 1, 1.5], [0, 0.7, 1, 1.7], [0, 1.9, 1, 1.9]])
def testClipBoxesImageShapeAsTensor(self):
boxes_data = [[0, 0, 1, 1], [0, 0.1, 1, 1.1], [0, 0.3, 1, 1.3],
[0, 0.5, 1, 1.5], [0, 0.7, 1, 1.7], [0, 1.9, 1, 1.9]]
boxes = tf.constant(boxes_data)
image_shape = tf.constant([[3, 3], [3, 3], [3, 3], [3, 3], [3, 3], [3, 3]],
dtype=tf.float32)
clipped_boxes_tpu, clipped_boxes_cpu = _transform_boxes_on_tpu_and_cpu(
box_ops.clip_boxes, boxes, image_shape)
self.assertAllClose(clipped_boxes_tpu, clipped_boxes_cpu)
self.assertAllClose(clipped_boxes_tpu,
[[0, 0, 1, 1], [0, 0.1, 1, 1.1], [0, 0.3, 1, 1.3],
[0, 0.5, 1, 1.5], [0, 0.7, 1, 1.7], [0, 1.9, 1, 1.9]])
class EncodeDecodeBoxesTest(tf.test.TestCase):
def test_encode_decode_boxes(self):
boxes_np = np.array([[[1.0, 2.0, 3.0, 4.0], [2.0, 3.0, 4.0, 5.0]],
[[4.0, 5.0, 6.0, 7.0], [5.0, 6.0, 7.0, 8.0]]])
boxes = tf.constant(boxes_np, dtype=tf.float32)
anchors = tf.constant([[[1.5, 2.5, 3.5, 4.5], [2.5, 3.5, 4.5, 5.5]],
[[1.5, 2.5, 3.5, 4.5], [2.5, 3.5, 4.5, 5.5]]],
dtype=tf.float32)
weights = [1.0, 1.0, 1.0, 1.0]
def test_fn(boxes, anchors):
encoded_boxes = box_ops.encode_boxes(boxes, anchors, weights)
decoded_boxes = box_ops.decode_boxes(encoded_boxes, anchors, weights)
return decoded_boxes
decoded_boxes_tpu, decoded_boxes_cpu = _transform_boxes_on_tpu_and_cpu(
test_fn, boxes, anchors)
self.assertNDArrayNear(decoded_boxes_tpu, decoded_boxes_cpu, 1e-5)
self.assertNDArrayNear(decoded_boxes_tpu, boxes_np, 1e-5)
def test_encode_decode_boxes_batch_broadcast(self):
boxes_np = np.array([[[1.0, 2.0, 3.0, 4.0], [2.0, 3.0, 4.0, 5.0]],
[[4.0, 5.0, 6.0, 7.0], [5.0, 6.0, 7.0, 8.0]]])
boxes = tf.constant(boxes_np, dtype=tf.float32)
anchors = tf.constant([[[1.5, 2.5, 3.5, 4.5], [2.5, 3.5, 4.5, 5.5]]],
dtype=tf.float32)
weights = [1.0, 1.0, 1.0, 1.0]
def test_fn(boxes, anchors):
encoded_boxes = box_ops.encode_boxes(boxes, anchors, weights)
decoded_boxes = box_ops.decode_boxes(encoded_boxes, anchors, weights)
return decoded_boxes
decoded_boxes_tpu, decoded_boxes_cpu = _transform_boxes_on_tpu_and_cpu(
test_fn, boxes, anchors)
self.assertNDArrayNear(decoded_boxes_tpu, decoded_boxes_cpu, 1e-5)
self.assertNDArrayNear(decoded_boxes_tpu, boxes_np, 1e-5)
class FilterBoxesTest(tf.test.TestCase):
def test_filter_boxes_batch(self):
# boxes -> [[small, good, outside], [outside, small, good]]
boxes_np = np.array([[[1.0, 2.0, 1.5, 2.5], [2.0, 3.0, 4.5, 5.5],
[7.0, 4.0, 9.5, 6.5]],
[[-2.0, 5.0, 0.0, 7.5], [5.0, 6.0, 5.1, 6.0],
[4.0, 1.0, 7.0, 4.0]]])
filtered_boxes_np = np.array([[[0.0, 0.0, 0.0, 0.0], [2.0, 3.0, 4.5, 5.5],
[0.0, 0.0, 0.0, 0.0]],
[[0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0],
[4.0, 1.0, 7.0, 4.0]]])
boxes = tf.constant(boxes_np, dtype=tf.float32)
scores_np = np.array([[0.9, 0.7, 0.5], [0.11, 0.22, 0.33]])
filtered_scores_np = np.array([[0.0, 0.7, 0.0], [0.0, 0.0, 0.33]])
scores = tf.constant(scores_np, dtype=tf.float32)
image_shape = tf.expand_dims(
tf.constant([[8, 8], [8, 8]], dtype=tf.int32), axis=1)
min_size_threshold = 2.0
def test_fn(boxes, scores, image_shape):
filtered_boxes, filtered_scores = box_ops.filter_boxes(
boxes, scores, image_shape, min_size_threshold)
return filtered_boxes, filtered_scores
filtered_results_tpu, filtered_results_cpu = (
_transform_boxes_on_tpu_and_cpu(
test_fn, boxes, scores, image_shape))
filtered_boxes_tpu, filtered_scores_tpu = filtered_results_tpu
filtered_boxes_cpu, filtered_scores_cpu = filtered_results_cpu
self.assertNDArrayNear(filtered_boxes_tpu, filtered_boxes_cpu, 1e-5)
self.assertNDArrayNear(filtered_scores_tpu, filtered_scores_cpu, 1e-5)
self.assertNDArrayNear(filtered_boxes_tpu, filtered_boxes_np, 1e-5)
self.assertNDArrayNear(filtered_scores_tpu, filtered_scores_np, 1e-5)
class FilterBoxesByScoresTest(tf.test.TestCase):
def test_filter_boxes_by_scores_batch(self):
# boxes -> [[small, good, outside], [outside, small, good]]
boxes_np = np.array([[[1.0, 2.0, 1.5, 2.5], [2.0, 3.0, 4.5, 5.5],
[7.0, 4.0, 9.5, 6.5]],
[[-2.0, 5.0, 0.0, 7.5], [5.0, 6.0, 5.1, 6.0],
[4.0, 1.0, 7.0, 4.0]]])
filtered_boxes_np = np.array([[[0.0, 0.0, 0.0, 0.0], [2.0, 3.0, 4.5, 5.5],
[7.0, 4.0, 9.5, 6.5]],
[[0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0],
[4.0, 1.0, 7.0, 4.0]]])
boxes = tf.constant(boxes_np, dtype=tf.float32)
scores_np = np.array([[0.1, 0.7, 0.6], [0.11, 0.22, 0.53]])
filtered_scores_np = np.array([[-1.0, 0.7, 0.6], [-1.0, -1.0, 0.53]])
scores = tf.constant(scores_np, dtype=tf.float32)
min_score_threshold = 0.5
def test_fn(boxes, scores):
filtered_boxes, filtered_scores = box_ops.filter_boxes_by_scores(
boxes, scores, min_score_threshold)
return filtered_boxes, filtered_scores
filtered_results_tpu, filtered_results_cpu = _transform_boxes_on_tpu_and_cpu(
test_fn, boxes, scores)
filtered_boxes_tpu, filtered_scores_tpu = filtered_results_tpu
filtered_boxes_cpu, filtered_scores_cpu = filtered_results_cpu
self.assertNDArrayNear(filtered_boxes_tpu, filtered_boxes_cpu, 1e-5)
self.assertNDArrayNear(filtered_scores_tpu, filtered_scores_cpu, 1e-5)
self.assertNDArrayNear(filtered_boxes_tpu, filtered_boxes_np, 1e-5)
self.assertNDArrayNear(filtered_scores_tpu, filtered_scores_np, 1e-5)
class GatherInstancesTest(tf.test.TestCase):
def test_gather_instances(self):
boxes_np = np.array([[[1.0, 2.0, 1.5, 2.5], [2.0, 3.0, 4.5, 5.5],
[7.0, 4.0, 9.5, 6.5]],
[[-2.0, 5.0, 0.0, 7.5], [5.0, 6.0, 5.1, 6.0],
[4.0, 1.0, 7.0, 4.0]]])
indices_np = np.array([[2, 0], [0, 1]])
boxes = tf.constant(boxes_np, dtype=tf.float32)
indices = tf.constant(indices_np, dtype=tf.int32)
selected_boxes = box_ops.gather_instances(indices, boxes)
expected_selected_boxes = np.array(
[[[7.0, 4.0, 9.5, 6.5], [1.0, 2.0, 1.5, 2.5]],
[[-2.0, 5.0, 0.0, 7.5], [5.0, 6.0, 5.1, 6.0]]])
self.assertNDArrayNear(expected_selected_boxes, selected_boxes, 1e-5)
def test_gather_instances_with_multiple_inputs(self):
boxes_np = np.array([[[1.0, 2.0, 1.5, 2.5], [2.0, 3.0, 4.5, 5.5],
[7.0, 4.0, 9.5, 6.5]],
[[-2.0, 5.0, 0.0, 7.5], [5.0, 6.0, 5.1, 6.0],
[4.0, 1.0, 7.0, 4.0]]])
classes_np = np.array([[1, 2, 3], [20, 30, 40]])
indices_np = np.array([[2, 0], [0, 1]])
boxes = tf.constant(boxes_np, dtype=tf.float32)
classes = tf.constant(classes_np, dtype=tf.int32)
indices = tf.constant(indices_np, dtype=tf.int32)
selected_boxes, selected_classes = box_ops.gather_instances(
indices, boxes, classes)
expected_selected_boxes = np.array(
[[[7.0, 4.0, 9.5, 6.5], [1.0, 2.0, 1.5, 2.5]],
[[-2.0, 5.0, 0.0, 7.5], [5.0, 6.0, 5.1, 6.0]]])
expected_selected_classes = np.array(
[[3, 1], [20, 30]])
self.assertNDArrayNear(expected_selected_boxes, selected_boxes, 1e-5)
self.assertAllEqual(expected_selected_classes, selected_classes)
class TopKBoxesTest(tf.test.TestCase):
def test_top_k_boxes_batch1(self):
boxes_np = np.array([[[1.0, 2.0, 1.5, 2.5], [2.0, 3.0, 4.5, 5.5],
[7.0, 4.0, 9.5, 6.5]]])
boxes = tf.constant(boxes_np, dtype=tf.float32)
scores_np = np.array([[0.9, 0.5, 0.7]])
scores = tf.constant(scores_np, dtype=tf.float32)
top_k_boxes_np = np.array([[[1.0, 2.0, 1.5, 2.5], [7.0, 4.0, 9.5, 6.5]]])
top_k_scores_np = np.array([[0.9, 0.7]])
def test_fn(boxes, scores):
top_k_boxes, top_k_scores = box_ops.top_k_boxes(boxes, scores, k=2)
return top_k_boxes, top_k_scores
top_k_results_tpu, top_k_results_cpu = _transform_boxes_on_tpu_and_cpu(
test_fn, boxes, scores)
top_k_boxes_tpu, top_k_scores_tpu = top_k_results_tpu
top_k_boxes_cpu, top_k_scores_cpu = top_k_results_cpu
self.assertNDArrayNear(top_k_boxes_tpu, top_k_boxes_cpu, 1e-5)
self.assertNDArrayNear(top_k_scores_tpu, top_k_scores_cpu, 1e-5)
self.assertNDArrayNear(top_k_boxes_tpu, top_k_boxes_np, 1e-5)
self.assertNDArrayNear(top_k_scores_tpu, top_k_scores_np, 1e-5)
def test_top_k_boxes_batch2(self):
boxes_np = np.array([[[1.0, 2.0, 1.5, 2.5], [2.0, 3.0, 4.5, 5.5],
[7.0, 4.0, 9.5, 6.5]],
[[-2.0, 5.0, 0.0, 7.5], [5.0, 6.0, 5.1, 6.0],
[4.0, 1.0, 7.0, 4.0]]])
boxes = tf.constant(boxes_np, dtype=tf.float32)
scores_np = np.array([[0.9, 0.7, 0.5], [0.11, 0.22, 0.33]])
scores = tf.constant(scores_np, dtype=tf.float32)
top_k_boxes_np = np.array([[[1.0, 2.0, 1.5, 2.5], [2.0, 3.0, 4.5, 5.5]],
[[4.0, 1.0, 7.0, 4.0], [5.0, 6.0, 5.1, 6.0]]])
top_k_scores_np = np.array([[0.9, 0.7], [0.33, 0.22]])
def test_fn(boxes, scores):
top_k_boxes, top_k_scores = box_ops.top_k_boxes(boxes, scores, k=2)
return top_k_boxes, top_k_scores
top_k_results_tpu, top_k_results_cpu = _transform_boxes_on_tpu_and_cpu(
test_fn, boxes, scores)
top_k_boxes_tpu, top_k_scores_tpu = top_k_results_tpu
top_k_boxes_cpu, top_k_scores_cpu = top_k_results_cpu
self.assertNDArrayNear(top_k_boxes_tpu, top_k_boxes_cpu, 1e-5)
self.assertNDArrayNear(top_k_scores_tpu, top_k_scores_cpu, 1e-5)
self.assertNDArrayNear(top_k_boxes_tpu, top_k_boxes_np, 1e-5)
self.assertNDArrayNear(top_k_scores_tpu, top_k_scores_np, 1e-5)
class BboxeOverlapTest(tf.test.TestCase):
def testBBoxeOverlapOpCorrectness(self):
boxes_data = [[[0, 0, 0.1, 1], [0, 0.2, 0.2, 1.2], [0, 0.3, 0.3, 1.3],
[0, 0.5, 0.4, 1.5], [0, 0.7, 0.5, 1.7], [0, 0.9, 0.6, 1.9],
[0, 0.1, 0.1, 1.1], [0, 0.3, 0.7, 1.3], [0, 0.9, 2, 1.9]],
[[0, 0, 1, 0.2], [0, 0.2, 0.5, 1.2], [0, 0.4, 0.9, 1.4],
[0, 0.6, 1.1, 1.6], [0, 0.8, 1.2, 1.8], [0, 1, 1.5, 2],
[0, 0.5, 1, 1], [0.5, 0.8, 1, 1.8], [-1, -1, -1, -1]]]
boxes_np = np.array(boxes_data, dtype=np.float32)
gt_boxes_data = [[[0, 0.1, 0.1, 1.1], [0, 0.3, 0.7, 1.3], [0, 0.9, 2, 1.9]],
[[0, 0.5, 1, 1], [0.5, 0.8, 1, 1.8], [-1, -1, -1, -1]]]
gt_boxes_np = np.array(gt_boxes_data, dtype=np.float32)
# Runs on TPU.
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
boxes = tf.constant(boxes_np)
gt_boxes = tf.constant(gt_boxes_np)
iou = box_ops.bbox_overlap(boxes=boxes, gt_boxes=gt_boxes)
iou = iou.numpy()
self.assertEqual(iou.shape, (2, 9, 3))
self.assertAllEqual(
np.argmax(iou, axis=2),
[[0, 0, 1, 1, 1, 2, 0, 1, 2], [0, 0, 0, 0, 1, 1, 0, 1, 0]])
def testBBoxeOverlapOpCheckShape(self):
batch_size = 2
rpn_post_nms_topn = 2000
gt_max_instances = 100
boxes_np = np.random.rand(batch_size, rpn_post_nms_topn,
4).astype(np.float32)
gt_boxes_np = np.random.rand(batch_size, gt_max_instances,
4).astype(np.float32)
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
boxes = tf.constant(boxes_np)
gt_boxes = tf.constant(gt_boxes_np)
iou = box_ops.bbox_overlap(boxes=boxes, gt_boxes=gt_boxes)
iou = iou.numpy()
self.assertEqual(iou.shape,
(batch_size, (rpn_post_nms_topn), gt_max_instances))
def testBBoxeOverlapOpCorrectnessWithNegativeData(self):
boxes_data = [[[0, -0.01, 0.1, 1.1], [0, 0.2, 0.2, 5.0],
[0, -0.01, 0.1, 1.], [-1, -1, -1, -1]]]
boxes_np = np.array(boxes_data, dtype=np.float32)
gt_boxes_np = boxes_np
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
boxes = tf.constant(boxes_np)
gt_boxes = tf.constant(gt_boxes_np)
iou = box_ops.bbox_overlap(boxes=boxes, gt_boxes=gt_boxes)
iou = iou.numpy()
expected = np.array([[[0.99999994, 0.0917431, 0.9099099, -1.],
[0.0917431, 1., 0.08154944, -1.],
[0.9099099, 0.08154944, 1., -1.],
[-1., -1., -1., -1.]]])
self.assertAllClose(expected, iou)
class BoxMatchingTest(tf.test.TestCase):
def test_box_matching_single(self):
boxes_np = np.array(
[[[0, 0, 5, 5], [2.5, 2.5, 7.5, 7.5],
[5, 5, 10, 10], [7.5, 7.5, 12.5, 12.5]]])
boxes = tf.constant(boxes_np, dtype=tf.float32)
gt_boxes_np = np.array(
[[[10, 10, 15, 15], [2.5, 2.5, 7.5, 7.5],
[-1, -1, -1, -1]]])
gt_boxes = tf.constant(gt_boxes_np, dtype=tf.float32)
gt_classes_np = np.array([[2, 10, -1]])
gt_classes = tf.constant(gt_classes_np, dtype=tf.int32)
matched_gt_boxes_np = np.array(
[[[2.5, 2.5, 7.5, 7.5],
[2.5, 2.5, 7.5, 7.5],
[2.5, 2.5, 7.5, 7.5],
[10, 10, 15, 15]]])
matched_gt_classes_np = np.array([[10, 10, 10, 2]])
matched_gt_indices_np = np.array([[1, 1, 1, 0]])
matched_iou_np = np.array(
[[0.142857142857, 1.0, 0.142857142857, 0.142857142857]])
iou_np = np.array(
[[[0, 0.142857142857, -1.0],
[0, 1.0, -1.0],
[0, 0.142857142857, -1.0],
[0.142857142857, 0, -1.0]]])
# Runs on TPU.
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
(matched_gt_boxes_tpu, matched_gt_classes_tpu,
matched_gt_indices_tpu, matched_iou_tpu, iou_tpu) = (
box_ops.box_matching(boxes, gt_boxes, gt_classes))
# Runs on CPU.
(matched_gt_boxes_cpu, matched_gt_classes_cpu,
matched_gt_indices_cpu, matched_iou_cpu, iou_cpu) = (
box_ops.box_matching(boxes, gt_boxes, gt_classes))
# consistency.
self.assertNDArrayNear(
matched_gt_boxes_tpu.numpy(), matched_gt_boxes_cpu.numpy(), 1e-5)
self.assertAllEqual(
matched_gt_classes_tpu.numpy(), matched_gt_classes_cpu.numpy())
self.assertAllEqual(
matched_gt_indices_tpu.numpy(), matched_gt_indices_cpu.numpy())
self.assertNDArrayNear(
matched_iou_tpu.numpy(), matched_iou_cpu.numpy(), 1e-5)
self.assertNDArrayNear(
iou_tpu.numpy(), iou_cpu.numpy(), 1e-5)
# correctness.
self.assertNDArrayNear(
matched_gt_boxes_tpu.numpy(), matched_gt_boxes_np, 1e-5)
self.assertAllEqual(
matched_gt_classes_tpu.numpy(), matched_gt_classes_np)
self.assertAllEqual(
matched_gt_indices_tpu.numpy(), matched_gt_indices_np)
self.assertNDArrayNear(
matched_iou_tpu.numpy(), matched_iou_np, 1e-5)
self.assertNDArrayNear(
iou_tpu.numpy(), iou_np, 1e-5)
def test_box_matching_single_no_gt(self):
boxes_np = np.array(
[[[0, 0, 5, 5], [2.5, 2.5, 7.5, 7.5],
[5, 5, 10, 10], [7.5, 7.5, 12.5, 12.5]]])
boxes = tf.constant(boxes_np, dtype=tf.float32)
gt_boxes_np = np.array(
[[[-1, -1, -1, -1],
[-1, -1, -1, -1],
[-1, -1, -1, -1]]])
gt_boxes = tf.constant(gt_boxes_np, dtype=tf.float32)
gt_classes_np = np.array([[-1, -1, -1]])
gt_classes = tf.constant(gt_classes_np, dtype=tf.int32)
matched_gt_boxes_np = np.array(
[[[0, 0, 0, 0],
[0, 0, 0, 0],
[0, 0, 0, 0],
[0, 0, 0, 0]]])
matched_gt_classes_np = np.array([[0, 0, 0, 0]])
matched_gt_indices_np = np.array([[-1, -1, -1, -1]])
matched_iou_np = np.array([[-1, -1, -1, -1]])
iou_np = np.array(
[[[-1, -1, -1],
[-1, -1, -1],
[-1, -1, -1],
[-1, -1, -1]]])
# Runs on TPU.
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
(matched_gt_boxes_tpu, matched_gt_classes_tpu,
matched_gt_indices_tpu, matched_iou_tpu, iou_tpu) = (
box_ops.box_matching(boxes, gt_boxes, gt_classes))
# Runs on CPU.
(matched_gt_boxes_cpu, matched_gt_classes_cpu,
matched_gt_indices_cpu, matched_iou_cpu, iou_cpu) = (
box_ops.box_matching(boxes, gt_boxes, gt_classes))
# consistency.
self.assertNDArrayNear(
matched_gt_boxes_tpu.numpy(), matched_gt_boxes_cpu.numpy(), 1e-5)
self.assertAllEqual(
matched_gt_classes_tpu.numpy(), matched_gt_classes_cpu.numpy())
self.assertAllEqual(
matched_gt_indices_tpu.numpy(), matched_gt_indices_cpu.numpy())
self.assertNDArrayNear(
matched_iou_tpu.numpy(), matched_iou_cpu.numpy(), 1e-5)
self.assertNDArrayNear(
iou_tpu.numpy(), iou_cpu.numpy(), 1e-5)
# correctness.
self.assertNDArrayNear(
matched_gt_boxes_tpu.numpy(), matched_gt_boxes_np, 1e-5)
self.assertAllEqual(
matched_gt_classes_tpu.numpy(), matched_gt_classes_np)
self.assertAllEqual(
matched_gt_indices_tpu.numpy(), matched_gt_indices_np)
self.assertNDArrayNear(
matched_iou_tpu.numpy(), matched_iou_np, 1e-5)
self.assertNDArrayNear(
iou_tpu.numpy(), iou_np, 1e-5)
def test_box_matching_batch(self):
boxes_np = np.array(
[[[0, 0, 5, 5], [2.5, 2.5, 7.5, 7.5],
[5, 5, 10, 10], [7.5, 7.5, 12.5, 12.5]],
[[0, 0, 5, 5], [2.5, 2.5, 7.5, 7.5],
[5, 5, 10, 10], [7.5, 7.5, 12.5, 12.5]]])
boxes = tf.constant(boxes_np, dtype=tf.float32)
gt_boxes_np = np.array(
[[[10, 10, 15, 15], [2.5, 2.5, 7.5, 7.5], [-1, -1, -1, -1]],
[[-1, -1, -1, -1], [-1, -1, -1, -1], [-1, -1, -1, -1]]])
gt_boxes = tf.constant(gt_boxes_np, dtype=tf.float32)
gt_classes_np = np.array([[2, 10, -1], [-1, -1, -1]])
gt_classes = tf.constant(gt_classes_np, dtype=tf.int32)
matched_gt_boxes_np = np.array(
[[[2.5, 2.5, 7.5, 7.5],
[2.5, 2.5, 7.5, 7.5],
[2.5, 2.5, 7.5, 7.5],
[10, 10, 15, 15]],
[[0, 0, 0, 0],
[0, 0, 0, 0],
[0, 0, 0, 0],
[0, 0, 0, 0]]])
matched_gt_classes_np = np.array(
[[10, 10, 10, 2],
[0, 0, 0, 0]])
matched_gt_indices_np = np.array(
[[1, 1, 1, 0],
[-1, -1, -1, -1]])
matched_iou_np = np.array(
[[0.142857142857, 1.0, 0.142857142857, 0.142857142857],
[-1, -1, -1, -1]])
iou_np = np.array(
[[[0, 0.142857142857, -1.0],
[0, 1.0, -1.0],
[0, 0.142857142857, -1.0],
[0.142857142857, 0, -1.0]],
[[-1, -1, -1],
[-1, -1, -1],
[-1, -1, -1],
[-1, -1, -1]]])
# Runs on TPU.
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
(matched_gt_boxes_tpu, matched_gt_classes_tpu,
matched_gt_indices_tpu, matched_iou_tpu, iou_tpu) = (
box_ops.box_matching(boxes, gt_boxes, gt_classes))
# Runs on CPU.
(matched_gt_boxes_cpu, matched_gt_classes_cpu,
matched_gt_indices_cpu, matched_iou_cpu, iou_cpu) = (
box_ops.box_matching(boxes, gt_boxes, gt_classes))
# consistency.
self.assertNDArrayNear(
matched_gt_boxes_tpu.numpy(), matched_gt_boxes_cpu.numpy(), 1e-5)
self.assertAllEqual(
matched_gt_classes_tpu.numpy(), matched_gt_classes_cpu.numpy())
self.assertAllEqual(
matched_gt_indices_tpu.numpy(), matched_gt_indices_cpu.numpy())
self.assertNDArrayNear(
matched_iou_tpu.numpy(), matched_iou_cpu.numpy(), 1e-5)
self.assertNDArrayNear(
iou_tpu.numpy(), iou_cpu.numpy(), 1e-5)
# correctness.
self.assertNDArrayNear(
matched_gt_boxes_tpu.numpy(), matched_gt_boxes_np, 1e-5)
self.assertAllEqual(
matched_gt_classes_tpu.numpy(), matched_gt_classes_np)
self.assertAllEqual(
matched_gt_indices_tpu.numpy(), matched_gt_indices_np)
self.assertNDArrayNear(
matched_iou_tpu.numpy(), matched_iou_np, 1e-5)
self.assertNDArrayNear(
iou_tpu.numpy(), iou_np, 1e-5)
if __name__ == '__main__':
tf.test.main()
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for nms.py."""
# Import libraries
import numpy as np
import tensorflow as tf
from official.vision.beta.ops import nms
class SortedNonMaxSuppressionTest(tf.test.TestCase):
def setUp(self):
super(SortedNonMaxSuppressionTest, self).setUp()
self.boxes_data = [[[0, 0, 1, 1], [0, 0.2, 1, 1.2], [0, 0.4, 1, 1.4],
[0, 0.6, 1, 1.6], [0, 0.8, 1, 1.8], [0, 2, 1, 2]],
[[0, 2, 1, 2], [0, 0.8, 1, 1.8], [0, 0.6, 1, 1.6],
[0, 0.4, 1, 1.4], [0, 0.2, 1, 1.2], [0, 0, 1, 1]]]
self.scores_data = [[0.9, 0.7, 0.6, 0.5, 0.4, 0.3],
[0.8, 0.7, 0.6, 0.5, 0.4, 0.3]]
self.max_output_size = 6
self.iou_threshold = 0.5
def testSortedNonMaxSuppressionOnTPU(self):
boxes_np = np.array(self.boxes_data, dtype=np.float32)
scores_np = np.array(self.scores_data, dtype=np.float32)
iou_threshold_np = np.array(self.iou_threshold, dtype=np.float32)
boxes = tf.constant(boxes_np)
scores = tf.constant(scores_np)
iou_threshold = tf.constant(iou_threshold_np)
# Runs on TPU.
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
scores_tpu, boxes_tpu = nms.sorted_non_max_suppression_padded(
boxes=boxes,
scores=scores,
max_output_size=self.max_output_size,
iou_threshold=iou_threshold)
self.assertEqual(boxes_tpu.numpy().shape, (2, self.max_output_size, 4))
self.assertAllClose(scores_tpu.numpy(),
[[0.9, 0.6, 0.4, 0.3, 0., 0.],
[0.8, 0.7, 0.5, 0.3, 0., 0.]])
def testSortedNonMaxSuppressionOnCPU(self):
boxes_np = np.array(self.boxes_data, dtype=np.float32)
scores_np = np.array(self.scores_data, dtype=np.float32)
iou_threshold_np = np.array(self.iou_threshold, dtype=np.float32)
boxes = tf.constant(boxes_np)
scores = tf.constant(scores_np)
iou_threshold = tf.constant(iou_threshold_np)
# Runs on CPU.
scores_cpu, boxes_cpu = nms.sorted_non_max_suppression_padded(
boxes=boxes,
scores=scores,
max_output_size=self.max_output_size,
iou_threshold=iou_threshold)
self.assertEqual(boxes_cpu.numpy().shape, (2, self.max_output_size, 4))
self.assertAllClose(scores_cpu.numpy(),
[[0.9, 0.6, 0.4, 0.3, 0., 0.],
[0.8, 0.7, 0.5, 0.3, 0., 0.]])
def testSortedNonMaxSuppressionOnTPUSpeed(self):
boxes_np = np.random.rand(2, 12000, 4).astype(np.float32)
scores_np = np.random.rand(2, 12000).astype(np.float32)
iou_threshold_np = np.array(0.7, dtype=np.float32)
boxes = tf.constant(boxes_np)
scores = tf.constant(scores_np)
iou_threshold = tf.constant(iou_threshold_np)
# Runs on TPU.
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
scores_tpu, boxes_tpu = nms.sorted_non_max_suppression_padded(
boxes=boxes,
scores=scores,
max_output_size=2000,
iou_threshold=iou_threshold)
self.assertEqual(scores_tpu.numpy().shape, (2, 2000))
self.assertEqual(boxes_tpu.numpy().shape, (2, 2000, 4))
if __name__ == '__main__':
tf.test.main()
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for spatial_transform_ops.py."""
# Import libraries
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from official.vision.beta.ops import spatial_transform_ops
class MultiLevelCropAndResizeTest(tf.test.TestCase):
def test_multilevel_crop_and_resize_square(self):
"""Example test case.
Input =
[
[0, 1, 2, 3],
[4, 5, 6, 7],
[8, 9, 10, 11],
[12, 13, 14, 15],
]
output_size = 2x2
box =
[
[[0, 0, 2, 2]]
]
Gathered data =
[
[0, 1, 1, 2],
[4, 5, 5, 6],
[4, 5, 5, 6],
[8, 9, 9, 10],
]
Interpolation kernel =
[
[1, 1, 1, 1],
[1, 1, 1, 1],
[1, 1, 1, 1],
[1, 1, 1, 1],
]
Output =
[
[2.5, 3.5],
[6.5, 7.5]
]
"""
input_size = 4
min_level = 0
max_level = 0
batch_size = 1
output_size = 2
num_filters = 1
features = {}
for level in range(min_level, max_level + 1):
feat_size = int(input_size / 2**level)
features[str(level)] = tf.range(
batch_size * feat_size * feat_size * num_filters, dtype=tf.float32)
features[str(level)] = tf.reshape(
features[str(level)], [batch_size, feat_size, feat_size, num_filters])
boxes = tf.constant([
[[0, 0, 2, 2]],
], dtype=tf.float32)
tf_roi_features = spatial_transform_ops.multilevel_crop_and_resize(
features, boxes, output_size)
roi_features = tf_roi_features.numpy()
self.assertAllClose(
roi_features,
np.array([[2.5, 3.5],
[6.5,
7.5]]).reshape([batch_size, 1, output_size, output_size, 1]))
def test_multilevel_crop_and_resize_rectangle(self):
"""Example test case.
Input =
[
[0, 1, 2, 3],
[4, 5, 6, 7],
[8, 9, 10, 11],
[12, 13, 14, 15],
]
output_size = 2x2
box =
[
[[0, 0, 2, 3]]
]
Box vertices =
[
[[0.5, 0.75], [0.5, 2.25]],
[[1.5, 0.75], [1.5, 2.25]],
]
Gathered data =
[
[0, 1, 2, 3],
[4, 5, 6, 7],
[4, 5, 6, 7],
[8, 9, 10, 11],
]
Interpolation kernel =
[
[0.5 1.5 1.5 0.5],
[0.5 1.5 1.5 0.5],
[0.5 1.5 1.5 0.5],
[0.5 1.5 1.5 0.5],
]
Output =
[
[2.75, 4.25],
[6.75, 8.25]
]
"""
input_size = 4
min_level = 0
max_level = 0
batch_size = 1
output_size = 2
num_filters = 1
features = {}
for level in range(min_level, max_level + 1):
feat_size = int(input_size / 2**level)
features[str(level)] = tf.range(
batch_size * feat_size * feat_size * num_filters, dtype=tf.float32)
features[str(level)] = tf.reshape(
features[str(level)], [batch_size, feat_size, feat_size, num_filters])
boxes = tf.constant([
[[0, 0, 2, 3]],
], dtype=tf.float32)
tf_roi_features = spatial_transform_ops.multilevel_crop_and_resize(
features, boxes, output_size)
roi_features = tf_roi_features.numpy()
self.assertAllClose(
roi_features,
np.array([[2.75, 4.25],
[6.75,
8.25]]).reshape([batch_size, 1, output_size, output_size,
1]))
def test_multilevel_crop_and_resize_two_boxes(self):
"""Test two boxes."""
input_size = 4
min_level = 0
max_level = 0
batch_size = 1
output_size = 2
num_filters = 1
features = {}
for level in range(min_level, max_level + 1):
feat_size = int(input_size / 2**level)
features[str(level)] = tf.range(
batch_size * feat_size * feat_size * num_filters, dtype=tf.float32)
features[str(level)] = tf.reshape(
features[str(level)], [batch_size, feat_size, feat_size, num_filters])
boxes = tf.constant([
[[0, 0, 2, 2], [0, 0, 2, 3]],
], dtype=tf.float32)
tf_roi_features = spatial_transform_ops.multilevel_crop_and_resize(
features, boxes, output_size)
roi_features = tf_roi_features.numpy()
self.assertAllClose(
roi_features,
np.array([[[2.5, 3.5], [6.5, 7.5]], [[2.75, 4.25], [6.75, 8.25]]
]).reshape([batch_size, 2, output_size, output_size, 1]))
def test_multilevel_crop_and_resize_feature_level_assignment(self):
"""Test feature level assignment."""
input_size = 640
min_level = 2
max_level = 5
batch_size = 1
output_size = 2
num_filters = 1
features = {}
for level in range(min_level, max_level + 1):
feat_size = int(input_size / 2**level)
features[str(level)] = float(level) * tf.ones(
[batch_size, feat_size, feat_size, num_filters], dtype=tf.float32)
boxes = tf.constant(
[
[
[0, 0, 111, 111], # Level 2.
[0, 0, 113, 113], # Level 3.
[0, 0, 223, 223], # Level 3.
[0, 0, 225, 225], # Level 4.
[0, 0, 449, 449]
], # Level 5.
],
dtype=tf.float32)
tf_roi_features = spatial_transform_ops.multilevel_crop_and_resize(
features, boxes, output_size)
roi_features = tf_roi_features.numpy()
self.assertAllClose(roi_features[0, 0], 2 * np.ones((2, 2, 1)))
self.assertAllClose(roi_features[0, 1], 3 * np.ones((2, 2, 1)))
self.assertAllClose(roi_features[0, 2], 3 * np.ones((2, 2, 1)))
self.assertAllClose(roi_features[0, 3], 4 * np.ones((2, 2, 1)))
self.assertAllClose(roi_features[0, 4], 5 * np.ones((2, 2, 1)))
def test_multilevel_crop_and_resize_large_input(self):
"""Test 512 boxes on TPU."""
input_size = 1408
min_level = 2
max_level = 6
batch_size = 2
num_boxes = 512
num_filters = 256
output_size = 7
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
features = {}
for level in range(min_level, max_level + 1):
feat_size = int(input_size / 2**level)
features[str(level)] = tf.constant(
np.reshape(
np.arange(
batch_size * feat_size * feat_size * num_filters,
dtype=np.float32),
[batch_size, feat_size, feat_size, num_filters]),
dtype=tf.bfloat16)
boxes = np.array([
[[0, 0, 256, 256]]*num_boxes,
], dtype=np.float32)
boxes = np.tile(boxes, [batch_size, 1, 1])
tf_boxes = tf.constant(boxes, dtype=tf.float32)
tf_roi_features = spatial_transform_ops.multilevel_crop_and_resize(
features, tf_boxes)
roi_features = tf_roi_features.numpy()
self.assertEqual(
roi_features.shape,
(batch_size, num_boxes, output_size, output_size, num_filters))
class CropMaskInTargetBoxTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
(False),
(True),
)
def test_crop_mask_in_target_box(self, use_einsum):
batch_size = 1
num_masks = 2
height = 2
width = 2
output_size = 2
strategy = tf.distribute.experimental.TPUStrategy()
with strategy.scope():
masks = tf.ones([batch_size, num_masks, height, width])
boxes = tf.constant(
[[0., 0., 1., 1.],
[0., 0., 1., 1.]])
target_boxes = tf.constant(
[[0., 0., 1., 1.],
[-1., -1., 1., 1.]])
expected_outputs = np.array([
[[[1., 1.],
[1., 1.]],
[[0., 0.],
[0., 1.]]]])
boxes = tf.reshape(boxes, [batch_size, num_masks, 4])
target_boxes = tf.reshape(target_boxes, [batch_size, num_masks, 4])
tf_cropped_masks = spatial_transform_ops.crop_mask_in_target_box(
masks, boxes, target_boxes, output_size, use_einsum=use_einsum)
cropped_masks = tf_cropped_masks.numpy()
self.assertAllEqual(cropped_masks, expected_outputs)
if __name__ == '__main__':
tf.test.main()
# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for image classification task."""
# pylint: disable=unused-import
from absl.testing import parameterized
import orbit
import tensorflow as tf
from official.core import exp_factory
from official.modeling import optimization
from official.vision import beta
from official.vision.beta.tasks import image_classification as img_cls_task
class ImageClassificationTaskTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.parameters(('resnet_imagenet'),
('revnet_imagenet'))
def test_task(self, config_name):
config = exp_factory.get_exp_config(config_name)
config.task.train_data.global_batch_size = 2
task = img_cls_task.ImageClassificationTask(config.task)
model = task.build_model()
metrics = task.build_metrics()
strategy = tf.distribute.get_strategy()
dataset = orbit.utils.make_distributed_dataset(strategy, task.build_inputs,
config.task.train_data)
iterator = iter(dataset)
opt_factory = optimization.OptimizerFactory(config.trainer.optimizer_config)
optimizer = opt_factory.build_optimizer(opt_factory.build_learning_rate())
logs = task.train_step(next(iterator), model, optimizer, metrics=metrics)
self.assertIn('loss', logs)
self.assertIn('accuracy', logs)
self.assertIn('top_5_accuracy', logs)
logs = task.validation_step(next(iterator), model, metrics=metrics)
self.assertIn('loss', logs)
self.assertIn('accuracy', logs)
self.assertIn('top_5_accuracy', logs)
if __name__ == '__main__':
tf.test.main()
# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for MaskRCNN task."""
# pylint: disable=unused-import
from absl.testing import parameterized
import orbit
import tensorflow as tf
from official.core import exp_factory
from official.modeling import optimization
from official.vision import beta
from official.vision.beta.tasks import maskrcnn
class RetinaNetTaskTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
("fasterrcnn_resnetfpn_coco", True),
("fasterrcnn_resnetfpn_coco", False),
("maskrcnn_resnetfpn_coco", True),
("maskrcnn_resnetfpn_coco", False),
)
def test_retinanet_task_train(self, test_config, is_training):
"""RetinaNet task test for training and val using toy configs."""
config = exp_factory.get_exp_config(test_config)
tf.keras.mixed_precision.experimental.Policy("mixed_bfloat16")
# modify config to suit local testing
config.trainer.steps_per_loop = 1
config.task.train_data.global_batch_size = 2
config.task.model.input_size = [384, 384, 3]
config.train_steps = 2
config.task.train_data.shuffle_buffer_size = 10
config.task.train_data.input_path = "coco/train-00000-of-00256.tfrecord"
config.task.validation_data.global_batch_size = 2
config.task.validation_data.input_path = "coco/val-00000-of-00032.tfrecord"
task = maskrcnn.MaskRCNNTask(config.task)
model = task.build_model()
metrics = task.build_metrics(training=is_training)
strategy = tf.distribute.get_strategy()
data_config = config.task.train_data if is_training else config.task.validation_data
dataset = orbit.utils.make_distributed_dataset(strategy, task.build_inputs,
data_config)
iterator = iter(dataset)
opt_factory = optimization.OptimizerFactory(config.trainer.optimizer_config)
optimizer = opt_factory.build_optimizer(opt_factory.build_learning_rate())
if is_training:
task.train_step(next(iterator), model, optimizer, metrics=metrics)
else:
task.validation_step(next(iterator), model, metrics=metrics)
if __name__ == "__main__":
tf.test.main()
# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for RetinaNet task."""
# pylint: disable=unused-import
from absl.testing import parameterized
import orbit
import tensorflow as tf
from official.core import exp_factory
from official.modeling import optimization
from official.vision import beta
from official.vision.beta.tasks import retinanet
class RetinaNetTaskTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
("retinanet_resnetfpn_coco", True),
("retinanet_spinenet_coco", True),
)
def test_retinanet_task_train(self, test_config, is_training):
"""RetinaNet task test for training and val using toy configs."""
config = exp_factory.get_exp_config(test_config)
# modify config to suit local testing
config.trainer.steps_per_loop = 1
config.task.train_data.global_batch_size = 2
config.task.validation_data.global_batch_size = 2
config.task.train_data.shuffle_buffer_size = 4
config.task.validation_data.shuffle_buffer_size = 4
config.train_steps = 2
task = retinanet.RetinaNetTask(config.task)
model = task.build_model()
metrics = task.build_metrics()
strategy = tf.distribute.get_strategy()
data_config = config.task.train_data if is_training else config.task.validation_data
dataset = orbit.utils.make_distributed_dataset(strategy, task.build_inputs,
data_config)
iterator = iter(dataset)
opt_factory = optimization.OptimizerFactory(config.trainer.optimizer_config)
optimizer = opt_factory.build_optimizer(opt_factory.build_learning_rate())
if is_training:
task.train_step(next(iterator), model, optimizer, metrics=metrics)
else:
task.validation_step(next(iterator), model, metrics=metrics)
if __name__ == "__main__":
tf.test.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment