Commit c8e6faf7 authored by A. Unique TensorFlower's avatar A. Unique TensorFlower
Browse files

Internal change

PiperOrigin-RevId: 431756117
parent 13a5e4fb
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for tf_example_decoder.py."""
# Import libraries
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from official.vision.dataloaders import tf_example_decoder
from official.vision.dataloaders import tfexample_utils
class TfExampleDecoderTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.parameters(
(100, 100, 0, True),
(100, 100, 1, True),
(100, 100, 2, True),
(100, 100, 0, False),
(100, 100, 1, False),
(100, 100, 2, False),
)
def test_result_shape(self,
image_height,
image_width,
num_instances,
regenerate_source_id):
decoder = tf_example_decoder.TfExampleDecoder(
include_mask=True, regenerate_source_id=regenerate_source_id)
serialized_example = tfexample_utils.create_detection_test_example(
image_height=image_height,
image_width=image_width,
image_channel=3,
num_instances=num_instances).SerializeToString()
decoded_tensors = decoder.decode(
tf.convert_to_tensor(value=serialized_example))
results = tf.nest.map_structure(lambda x: x.numpy(), decoded_tensors)
self.assertAllEqual(
(image_height, image_width, 3), results['image'].shape)
if not regenerate_source_id:
self.assertEqual(tfexample_utils.DUMP_SOURCE_ID, results['source_id'])
self.assertEqual(image_height, results['height'])
self.assertEqual(image_width, results['width'])
self.assertAllEqual(
(num_instances,), results['groundtruth_classes'].shape)
self.assertAllEqual(
(num_instances,), results['groundtruth_is_crowd'].shape)
self.assertAllEqual(
(num_instances,), results['groundtruth_area'].shape)
self.assertAllEqual(
(num_instances, 4), results['groundtruth_boxes'].shape)
self.assertAllEqual(
(num_instances, image_height, image_width),
results['groundtruth_instance_masks'].shape)
self.assertAllEqual(
(num_instances,), results['groundtruth_instance_masks_png'].shape)
def test_result_content(self):
decoder = tf_example_decoder.TfExampleDecoder(include_mask=True)
image_content = [[[0, 0, 0], [0, 0, 0], [0, 0, 0], [0, 0, 0]],
[[0, 0, 0], [255, 255, 255], [255, 255, 255], [0, 0, 0]],
[[0, 0, 0], [255, 255, 255], [255, 255, 255], [0, 0, 0]],
[[0, 0, 0], [0, 0, 0], [0, 0, 0], [0, 0, 0]]]
image = tfexample_utils.encode_image(np.uint8(image_content), fmt='PNG')
image_height = 4
image_width = 4
num_instances = 2
xmins = [0, 0.25]
xmaxs = [0.5, 1.0]
ymins = [0, 0]
ymaxs = [0.5, 1.0]
labels = [3, 1]
areas = [
0.25 * image_height * image_width, 0.75 * image_height * image_width
]
is_crowds = [1, 0]
mask_content = [[[255, 255, 0, 0],
[255, 255, 0, 0],
[0, 0, 0, 0],
[0, 0, 0, 0]],
[[0, 255, 255, 255],
[0, 255, 255, 255],
[0, 255, 255, 255],
[0, 255, 255, 255]]]
masks = [
tfexample_utils.encode_image(np.uint8(m), fmt='PNG')
for m in list(mask_content)
]
serialized_example = tf.train.Example(
features=tf.train.Features(
feature={
'image/encoded': (tf.train.Feature(
bytes_list=tf.train.BytesList(value=[image]))),
'image/source_id': (tf.train.Feature(
bytes_list=tf.train.BytesList(
value=[tfexample_utils.DUMP_SOURCE_ID]))),
'image/height': (tf.train.Feature(
int64_list=tf.train.Int64List(value=[image_height]))),
'image/width': (tf.train.Feature(
int64_list=tf.train.Int64List(value=[image_width]))),
'image/object/bbox/xmin': (tf.train.Feature(
float_list=tf.train.FloatList(value=xmins))),
'image/object/bbox/xmax': (tf.train.Feature(
float_list=tf.train.FloatList(value=xmaxs))),
'image/object/bbox/ymin': (tf.train.Feature(
float_list=tf.train.FloatList(value=ymins))),
'image/object/bbox/ymax': (tf.train.Feature(
float_list=tf.train.FloatList(value=ymaxs))),
'image/object/class/label': (tf.train.Feature(
int64_list=tf.train.Int64List(value=labels))),
'image/object/is_crowd': (tf.train.Feature(
int64_list=tf.train.Int64List(value=is_crowds))),
'image/object/area': (tf.train.Feature(
float_list=tf.train.FloatList(value=areas))),
'image/object/mask': (tf.train.Feature(
bytes_list=tf.train.BytesList(value=masks))),
})).SerializeToString()
decoded_tensors = decoder.decode(
tf.convert_to_tensor(value=serialized_example))
results = tf.nest.map_structure(lambda x: x.numpy(), decoded_tensors)
self.assertAllEqual(
(image_height, image_width, 3), results['image'].shape)
self.assertAllEqual(image_content, results['image'])
self.assertEqual(tfexample_utils.DUMP_SOURCE_ID, results['source_id'])
self.assertEqual(image_height, results['height'])
self.assertEqual(image_width, results['width'])
self.assertAllEqual(
(num_instances,), results['groundtruth_classes'].shape)
self.assertAllEqual(
(num_instances,), results['groundtruth_is_crowd'].shape)
self.assertAllEqual(
(num_instances,), results['groundtruth_area'].shape)
self.assertAllEqual(
(num_instances, 4), results['groundtruth_boxes'].shape)
self.assertAllEqual(
(num_instances, image_height, image_width),
results['groundtruth_instance_masks'].shape)
self.assertAllEqual(
(num_instances,), results['groundtruth_instance_masks_png'].shape)
self.assertAllEqual(
[3, 1], results['groundtruth_classes'])
self.assertAllEqual(
[True, False], results['groundtruth_is_crowd'])
self.assertNDArrayNear(
[0.25 * image_height * image_width, 0.75 * image_height * image_width],
results['groundtruth_area'], 1e-4)
self.assertNDArrayNear(
[[0, 0, 0.5, 0.5], [0, 0.25, 1.0, 1.0]],
results['groundtruth_boxes'], 1e-4)
self.assertNDArrayNear(
mask_content, results['groundtruth_instance_masks'], 1e-4)
self.assertAllEqual(
masks, results['groundtruth_instance_masks_png'])
def test_handling_missing_fields(self):
decoder = tf_example_decoder.TfExampleDecoder(include_mask=True)
image_content = [[[0, 0, 0], [0, 0, 0], [0, 0, 0], [0, 0, 0]],
[[0, 0, 0], [255, 255, 255], [255, 255, 255], [0, 0, 0]],
[[0, 0, 0], [255, 255, 255], [255, 255, 255], [0, 0, 0]],
[[0, 0, 0], [0, 0, 0], [0, 0, 0], [0, 0, 0]]]
image = tfexample_utils.encode_image(np.uint8(image_content), fmt='PNG')
image_height = 4
image_width = 4
num_instances = 2
xmins = [0, 0.25]
xmaxs = [0.5, 1.0]
ymins = [0, 0]
ymaxs = [0.5, 1.0]
labels = [3, 1]
mask_content = [[[255, 255, 0, 0],
[255, 255, 0, 0],
[0, 0, 0, 0],
[0, 0, 0, 0]],
[[0, 255, 255, 255],
[0, 255, 255, 255],
[0, 255, 255, 255],
[0, 255, 255, 255]]]
masks = [
tfexample_utils.encode_image(np.uint8(m), fmt='PNG')
for m in list(mask_content)
]
serialized_example = tf.train.Example(
features=tf.train.Features(
feature={
'image/encoded': (tf.train.Feature(
bytes_list=tf.train.BytesList(value=[image]))),
'image/source_id': (tf.train.Feature(
bytes_list=tf.train.BytesList(
value=[tfexample_utils.DUMP_SOURCE_ID]))),
'image/height': (tf.train.Feature(
int64_list=tf.train.Int64List(value=[image_height]))),
'image/width': (tf.train.Feature(
int64_list=tf.train.Int64List(value=[image_width]))),
'image/object/bbox/xmin': (tf.train.Feature(
float_list=tf.train.FloatList(value=xmins))),
'image/object/bbox/xmax': (tf.train.Feature(
float_list=tf.train.FloatList(value=xmaxs))),
'image/object/bbox/ymin': (tf.train.Feature(
float_list=tf.train.FloatList(value=ymins))),
'image/object/bbox/ymax': (tf.train.Feature(
float_list=tf.train.FloatList(value=ymaxs))),
'image/object/class/label': (tf.train.Feature(
int64_list=tf.train.Int64List(value=labels))),
'image/object/mask': (tf.train.Feature(
bytes_list=tf.train.BytesList(value=masks))),
})).SerializeToString()
decoded_tensors = decoder.decode(
tf.convert_to_tensor(serialized_example))
results = tf.nest.map_structure(lambda x: x.numpy(), decoded_tensors)
self.assertAllEqual(
(image_height, image_width, 3), results['image'].shape)
self.assertAllEqual(image_content, results['image'])
self.assertEqual(tfexample_utils.DUMP_SOURCE_ID, results['source_id'])
self.assertEqual(image_height, results['height'])
self.assertEqual(image_width, results['width'])
self.assertAllEqual(
(num_instances,), results['groundtruth_classes'].shape)
self.assertAllEqual(
(num_instances,), results['groundtruth_is_crowd'].shape)
self.assertAllEqual(
(num_instances,), results['groundtruth_area'].shape)
self.assertAllEqual(
(num_instances, 4), results['groundtruth_boxes'].shape)
self.assertAllEqual(
(num_instances, image_height, image_width),
results['groundtruth_instance_masks'].shape)
self.assertAllEqual(
(num_instances,), results['groundtruth_instance_masks_png'].shape)
self.assertAllEqual(
[3, 1], results['groundtruth_classes'])
self.assertAllEqual(
[False, False], results['groundtruth_is_crowd'])
self.assertNDArrayNear(
[0.25 * image_height * image_width, 0.75 * image_height * image_width],
results['groundtruth_area'], 1e-4)
self.assertNDArrayNear(
[[0, 0, 0.5, 0.5], [0, 0.25, 1.0, 1.0]],
results['groundtruth_boxes'], 1e-4)
self.assertNDArrayNear(
mask_content, results['groundtruth_instance_masks'], 1e-4)
self.assertAllEqual(
masks, results['groundtruth_instance_masks_png'])
if __name__ == '__main__':
tf.test.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tensorflow Example proto decoder for object detection.
A decoder to decode string tensors containing serialized tensorflow.Example
protos for object detection.
"""
import csv
# Import libraries
import tensorflow as tf
from official.vision.dataloaders import tf_example_decoder
class TfExampleDecoderLabelMap(tf_example_decoder.TfExampleDecoder):
"""Tensorflow Example proto decoder."""
def __init__(self, label_map, include_mask=False, regenerate_source_id=False,
mask_binarize_threshold=None):
super(TfExampleDecoderLabelMap, self).__init__(
include_mask=include_mask, regenerate_source_id=regenerate_source_id,
mask_binarize_threshold=mask_binarize_threshold)
self._keys_to_features.update({
'image/object/class/text': tf.io.VarLenFeature(tf.string),
})
name_to_id = self._process_label_map(label_map)
self._name_to_id_table = tf.lookup.StaticHashTable(
tf.lookup.KeyValueTensorInitializer(
keys=tf.constant(list(name_to_id.keys()), dtype=tf.string),
values=tf.constant(list(name_to_id.values()), dtype=tf.int64)),
default_value=-1)
def _process_label_map(self, label_map):
if label_map.endswith('.csv'):
name_to_id = self._process_csv(label_map)
else:
raise ValueError('The label map file is in incorrect format.')
return name_to_id
def _process_csv(self, label_map):
name_to_id = {}
with tf.io.gfile.GFile(label_map, 'r') as f:
reader = csv.reader(f, delimiter=',')
for row in reader:
if len(row) != 2:
raise ValueError('Each row of the csv label map file must be in '
'`id,name` format. length = {}'.format(len(row)))
id_index = int(row[0])
name = row[1]
name_to_id[name] = id_index
return name_to_id
def _decode_classes(self, parsed_tensors):
return self._name_to_id_table.lookup(
parsed_tensors['image/object/class/text'])
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for tf_example_label_map_decoder.py."""
import os
# Import libraries
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from official.vision.dataloaders import tf_example_label_map_decoder
from official.vision.dataloaders import tfexample_utils
LABEL_MAP_CSV_CONTENT = '0,class_0\n1,class_1\n2,class_2'
class TfExampleDecoderLabelMapTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.parameters(
(100, 100, 0),
(100, 100, 1),
(100, 100, 2),
(100, 100, 0),
(100, 100, 1),
(100, 100, 2),
)
def test_result_shape(self, image_height, image_width, num_instances):
label_map_dir = self.get_temp_dir()
label_map_name = 'label_map.csv'
label_map_path = os.path.join(label_map_dir, label_map_name)
with open(label_map_path, 'w') as f:
f.write(LABEL_MAP_CSV_CONTENT)
decoder = tf_example_label_map_decoder.TfExampleDecoderLabelMap(
label_map_path, include_mask=True)
serialized_example = tfexample_utils.create_detection_test_example(
image_height=image_height,
image_width=image_width,
image_channel=3,
num_instances=num_instances).SerializeToString()
decoded_tensors = decoder.decode(
tf.convert_to_tensor(value=serialized_example))
results = tf.nest.map_structure(lambda x: x.numpy(), decoded_tensors)
self.assertAllEqual(
(image_height, image_width, 3), results['image'].shape)
self.assertEqual(tfexample_utils.DUMP_SOURCE_ID, results['source_id'])
self.assertEqual(image_height, results['height'])
self.assertEqual(image_width, results['width'])
self.assertAllEqual(
(num_instances,), results['groundtruth_classes'].shape)
self.assertAllEqual(
(num_instances,), results['groundtruth_is_crowd'].shape)
self.assertAllEqual(
(num_instances,), results['groundtruth_area'].shape)
self.assertAllEqual(
(num_instances, 4), results['groundtruth_boxes'].shape)
self.assertAllEqual(
(num_instances, image_height, image_width),
results['groundtruth_instance_masks'].shape)
self.assertAllEqual(
(num_instances,), results['groundtruth_instance_masks_png'].shape)
def test_result_content(self):
label_map_dir = self.get_temp_dir()
label_map_name = 'label_map.csv'
label_map_path = os.path.join(label_map_dir, label_map_name)
with open(label_map_path, 'w') as f:
f.write(LABEL_MAP_CSV_CONTENT)
decoder = tf_example_label_map_decoder.TfExampleDecoderLabelMap(
label_map_path, include_mask=True)
image_content = [[[0, 0, 0], [0, 0, 0], [0, 0, 0], [0, 0, 0]],
[[0, 0, 0], [255, 255, 255], [255, 255, 255], [0, 0, 0]],
[[0, 0, 0], [255, 255, 255], [255, 255, 255], [0, 0, 0]],
[[0, 0, 0], [0, 0, 0], [0, 0, 0], [0, 0, 0]]]
image = tfexample_utils.encode_image(np.uint8(image_content), fmt='PNG')
image_height = 4
image_width = 4
num_instances = 2
xmins = [0, 0.25]
xmaxs = [0.5, 1.0]
ymins = [0, 0]
ymaxs = [0.5, 1.0]
labels = [b'class_2', b'class_0']
areas = [
0.25 * image_height * image_width, 0.75 * image_height * image_width
]
is_crowds = [1, 0]
mask_content = [[[255, 255, 0, 0],
[255, 255, 0, 0],
[0, 0, 0, 0],
[0, 0, 0, 0]],
[[0, 255, 255, 255],
[0, 255, 255, 255],
[0, 255, 255, 255],
[0, 255, 255, 255]]]
masks = [
tfexample_utils.encode_image(np.uint8(m), fmt='PNG')
for m in list(mask_content)
]
serialized_example = tf.train.Example(
features=tf.train.Features(
feature={
'image/encoded': (tf.train.Feature(
bytes_list=tf.train.BytesList(value=[image]))),
'image/source_id': (tf.train.Feature(
bytes_list=tf.train.BytesList(
value=[tfexample_utils.DUMP_SOURCE_ID]))),
'image/height': (tf.train.Feature(
int64_list=tf.train.Int64List(value=[image_height]))),
'image/width': (tf.train.Feature(
int64_list=tf.train.Int64List(value=[image_width]))),
'image/object/bbox/xmin': (tf.train.Feature(
float_list=tf.train.FloatList(value=xmins))),
'image/object/bbox/xmax': (tf.train.Feature(
float_list=tf.train.FloatList(value=xmaxs))),
'image/object/bbox/ymin': (tf.train.Feature(
float_list=tf.train.FloatList(value=ymins))),
'image/object/bbox/ymax': (tf.train.Feature(
float_list=tf.train.FloatList(value=ymaxs))),
'image/object/class/text': (tf.train.Feature(
bytes_list=tf.train.BytesList(value=labels))),
'image/object/is_crowd': (tf.train.Feature(
int64_list=tf.train.Int64List(value=is_crowds))),
'image/object/area': (tf.train.Feature(
float_list=tf.train.FloatList(value=areas))),
'image/object/mask': (tf.train.Feature(
bytes_list=tf.train.BytesList(value=masks))),
})).SerializeToString()
decoded_tensors = decoder.decode(
tf.convert_to_tensor(value=serialized_example))
results = tf.nest.map_structure(lambda x: x.numpy(), decoded_tensors)
self.assertAllEqual(
(image_height, image_width, 3), results['image'].shape)
self.assertAllEqual(image_content, results['image'])
self.assertEqual(tfexample_utils.DUMP_SOURCE_ID, results['source_id'])
self.assertEqual(image_height, results['height'])
self.assertEqual(image_width, results['width'])
self.assertAllEqual(
(num_instances,), results['groundtruth_classes'].shape)
self.assertAllEqual(
(num_instances,), results['groundtruth_is_crowd'].shape)
self.assertAllEqual(
(num_instances,), results['groundtruth_area'].shape)
self.assertAllEqual(
(num_instances, 4), results['groundtruth_boxes'].shape)
self.assertAllEqual(
(num_instances, image_height, image_width),
results['groundtruth_instance_masks'].shape)
self.assertAllEqual(
(num_instances,), results['groundtruth_instance_masks_png'].shape)
self.assertAllEqual(
[2, 0], results['groundtruth_classes'])
self.assertAllEqual(
[True, False], results['groundtruth_is_crowd'])
self.assertNDArrayNear(
[0.25 * image_height * image_width, 0.75 * image_height * image_width],
results['groundtruth_area'], 1e-4)
self.assertNDArrayNear(
[[0, 0, 0.5, 0.5], [0, 0.25, 1.0, 1.0]],
results['groundtruth_boxes'], 1e-4)
self.assertNDArrayNear(
mask_content, results['groundtruth_instance_masks'], 1e-4)
self.assertAllEqual(
masks, results['groundtruth_instance_masks_png'])
if __name__ == '__main__':
tf.test.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TFDS Classification decoders."""
import tensorflow as tf
from official.vision.dataloaders import decoder
class ClassificationDecorder(decoder.Decoder):
"""A tf.Example decoder for tfds classification datasets."""
def decode(self, serialized_example):
sample_dict = {
'image/encoded':
tf.io.encode_jpeg(serialized_example['image'], quality=100),
'image/class/label':
serialized_example['label'],
}
return sample_dict
TFDS_ID_TO_DECODER_MAP = {
'cifar10': ClassificationDecorder,
'cifar100': ClassificationDecorder,
'imagenet2012': ClassificationDecorder,
}
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TFDS detection decoders."""
import tensorflow as tf
from official.vision.dataloaders import decoder
class MSCOCODecoder(decoder.Decoder):
"""A tf.Example decoder for tfds coco datasets."""
def decode(self, serialized_example):
"""Decode the serialized example.
Args:
serialized_example: a dictonary example produced by tfds.
Returns:
decoded_tensors: a dictionary of tensors with the following fields:
- source_id: a string scalar tensor.
- image: a uint8 tensor of shape [None, None, 3].
- height: an integer scalar tensor.
- width: an integer scalar tensor.
- groundtruth_classes: a int64 tensor of shape [None].
- groundtruth_is_crowd: a bool tensor of shape [None].
- groundtruth_area: a float32 tensor of shape [None].
- groundtruth_boxes: a float32 tensor of shape [None, 4].
"""
decoded_tensors = {
'source_id': tf.strings.as_string(serialized_example['image/id']),
'image': serialized_example['image'],
'height': tf.cast(tf.shape(serialized_example['image'])[0], tf.int64),
'width': tf.cast(tf.shape(serialized_example['image'])[1], tf.int64),
'groundtruth_classes': serialized_example['objects']['label'],
'groundtruth_is_crowd': serialized_example['objects']['is_crowd'],
'groundtruth_area': tf.cast(
serialized_example['objects']['area'], tf.float32),
'groundtruth_boxes': serialized_example['objects']['bbox'],
}
return decoded_tensors
TFDS_ID_TO_DECODER_MAP = {
'coco/2017': MSCOCODecoder,
'coco/2014': MSCOCODecoder,
'coco': MSCOCODecoder
}
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TFDS factory functions."""
from official.vision.dataloaders import decoder as base_decoder
from official.vision.dataloaders import tfds_detection_decoders
from official.vision.dataloaders import tfds_segmentation_decoders
from official.vision.dataloaders import tfds_classification_decoders
def get_classification_decoder(tfds_name: str) -> base_decoder.Decoder:
"""Gets classification decoder.
Args:
tfds_name: `str`, name of the tfds classification decoder.
Returns:
`base_decoder.Decoder` instance.
Raises:
ValueError if the tfds_name doesn't exist in the available decoders.
"""
if tfds_name in tfds_classification_decoders.TFDS_ID_TO_DECODER_MAP:
decoder = tfds_classification_decoders.TFDS_ID_TO_DECODER_MAP[tfds_name]()
else:
raise ValueError(
f'TFDS Classification {tfds_name} is not supported')
return decoder
def get_detection_decoder(tfds_name: str) -> base_decoder.Decoder:
"""Gets detection decoder.
Args:
tfds_name: `str`, name of the tfds detection decoder.
Returns:
`base_decoder.Decoder` instance.
Raises:
ValueError if the tfds_name doesn't exist in the available decoders.
"""
if tfds_name in tfds_detection_decoders.TFDS_ID_TO_DECODER_MAP:
decoder = tfds_detection_decoders.TFDS_ID_TO_DECODER_MAP[tfds_name]()
else:
raise ValueError(f'TFDS Detection {tfds_name} is not supported')
return decoder
def get_segmentation_decoder(tfds_name: str) -> base_decoder.Decoder:
"""Gets segmentation decoder.
Args:
tfds_name: `str`, name of the tfds segmentation decoder.
Returns:
`base_decoder.Decoder` instance.
Raises:
ValueError if the tfds_name doesn't exist in the available decoders.
"""
if tfds_name in tfds_segmentation_decoders.TFDS_ID_TO_DECODER_MAP:
decoder = tfds_segmentation_decoders.TFDS_ID_TO_DECODER_MAP[tfds_name]()
else:
raise ValueError(f'TFDS Segmentation {tfds_name} is not supported')
return decoder
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for tfds factory functions."""
from absl.testing import parameterized
import tensorflow as tf
from official.vision.dataloaders import decoder as base_decoder
from official.vision.dataloaders import tfds_factory
class TFDSFactoryTest(tf.test.TestCase, parameterized.TestCase):
def _create_test_example(self):
serialized_example = {
'image': tf.ones(shape=(100, 100, 3), dtype=tf.uint8),
'label': 1,
'image/id': 0,
'objects': {
'label': 1,
'is_crowd': 0,
'area': 0.5,
'bbox': [0.1, 0.2, 0.3, 0.4]
},
'segmentation_label': tf.ones((100, 100, 1), dtype=tf.uint8),
'image_left': tf.ones(shape=(100, 100, 3), dtype=tf.uint8)
}
return serialized_example
@parameterized.parameters(
('imagenet2012'),
('cifar10'),
('cifar100'),
)
def test_classification_decoder(self, tfds_name):
decoder = tfds_factory.get_classification_decoder(tfds_name)
self.assertIsInstance(decoder, base_decoder.Decoder)
decoded_tensor = decoder.decode(self._create_test_example())
self.assertLen(decoded_tensor, 2)
self.assertIn('image/encoded', decoded_tensor)
self.assertIn('image/class/label', decoded_tensor)
@parameterized.parameters(
('flowers'),
('coco'),
)
def test_doesnt_exit_classification_decoder(self, tfds_name):
with self.assertRaises(ValueError):
_ = tfds_factory.get_classification_decoder(tfds_name)
@parameterized.parameters(
('coco'),
('coco/2014'),
('coco/2017'),
)
def test_detection_decoder(self, tfds_name):
decoder = tfds_factory.get_detection_decoder(tfds_name)
self.assertIsInstance(decoder, base_decoder.Decoder)
decoded_tensor = decoder.decode(self._create_test_example())
self.assertLen(decoded_tensor, 8)
self.assertIn('image', decoded_tensor)
self.assertIn('source_id', decoded_tensor)
self.assertIn('height', decoded_tensor)
self.assertIn('width', decoded_tensor)
self.assertIn('groundtruth_classes', decoded_tensor)
self.assertIn('groundtruth_is_crowd', decoded_tensor)
self.assertIn('groundtruth_area', decoded_tensor)
self.assertIn('groundtruth_boxes', decoded_tensor)
@parameterized.parameters(
('pascal'),
('cityscapes'),
)
def test_doesnt_exit_detection_decoder(self, tfds_name):
with self.assertRaises(ValueError):
_ = tfds_factory.get_detection_decoder(tfds_name)
@parameterized.parameters(
('cityscapes'),
('cityscapes/semantic_segmentation'),
('cityscapes/semantic_segmentation_extra'),
)
def test_segmentation_decoder(self, tfds_name):
decoder = tfds_factory.get_segmentation_decoder(tfds_name)
self.assertIsInstance(decoder, base_decoder.Decoder)
decoded_tensor = decoder.decode(self._create_test_example())
self.assertLen(decoded_tensor, 4)
self.assertIn('image/encoded', decoded_tensor)
self.assertIn('image/segmentation/class/encoded', decoded_tensor)
self.assertIn('image/height', decoded_tensor)
self.assertIn('image/width', decoded_tensor)
@parameterized.parameters(
('coco'),
('imagenet'),
)
def test_doesnt_exit_segmentation_decoder(self, tfds_name):
with self.assertRaises(ValueError):
_ = tfds_factory.get_segmentation_decoder(tfds_name)
if __name__ == '__main__':
tf.test.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TFDS Semantic Segmentation decoders."""
import tensorflow as tf
from official.vision.dataloaders import decoder
class CityScapesDecorder(decoder.Decoder):
"""A tf.Example decoder for tfds cityscapes datasets."""
def __init__(self):
# Original labels to trainable labels map, 255 is the ignore class.
self._label_map = {
-1: 255,
0: 255,
1: 255,
2: 255,
3: 255,
4: 255,
5: 255,
6: 255,
7: 0,
8: 1,
9: 255,
10: 255,
11: 2,
12: 3,
13: 4,
14: 255,
15: 255,
16: 255,
17: 5,
18: 255,
19: 6,
20: 7,
21: 8,
22: 9,
23: 10,
24: 11,
25: 12,
26: 13,
27: 14,
28: 15,
29: 255,
30: 255,
31: 16,
32: 17,
33: 18,
}
def decode(self, serialized_example):
# Convert labels according to the self._label_map
label = serialized_example['segmentation_label']
for original_label in self._label_map:
label = tf.where(label == original_label,
self._label_map[original_label] * tf.ones_like(label),
label)
sample_dict = {
'image/encoded':
tf.io.encode_jpeg(serialized_example['image_left'], quality=100),
'image/height': serialized_example['image_left'].shape[0],
'image/width': serialized_example['image_left'].shape[1],
'image/segmentation/class/encoded':
tf.io.encode_png(label),
}
return sample_dict
TFDS_ID_TO_DECODER_MAP = {
'cityscapes': CityScapesDecorder,
'cityscapes/semantic_segmentation': CityScapesDecorder,
'cityscapes/semantic_segmentation_extra': CityScapesDecorder,
}
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
"""Utility functions to create tf.Example and tf.SequnceExample for test.
Example:video classification end-to-end test
i.e. from reading input file to train and eval.
```python
class FooTrainTest(tf.test.TestCase):
def setUp(self):
super(TrainTest, self).setUp()
# Write the fake tf.train.SequenceExample to file for test.
data_dir = os.path.join(self.get_temp_dir(), 'data')
tf.io.gfile.makedirs(data_dir)
self._data_path = os.path.join(data_dir, 'data.tfrecord')
examples = [
tfexample_utils.make_video_test_example(
image_shape=(36, 36, 3),
audio_shape=(20, 128),
label=random.randint(0, 100)) for _ in range(2)
]
tfexample_utils.dump_to_tfrecord(self._data_path, tf_examples=examples)
def test_foo(self):
dataset = tf.data.TFRecordDataset(self._data_path)
...
```
"""
import io
from typing import Sequence, Union
import numpy as np
from PIL import Image
import tensorflow as tf
IMAGE_KEY = 'image/encoded'
CLASSIFICATION_LABEL_KEY = 'image/class/label'
DISTILATION_LABEL_KEY = 'image/class/soft_labels'
LABEL_KEY = 'clip/label/index'
AUDIO_KEY = 'features/audio'
DUMP_SOURCE_ID = b'123'
def encode_image(image_array: np.array, fmt: str) -> bytes:
image = Image.fromarray(image_array)
with io.BytesIO() as output:
image.save(output, format=fmt)
return output.getvalue()
def make_image_bytes(shape: Sequence[int], fmt: str = 'JPEG') -> bytes:
"""Generates image and return bytes in specified format."""
random_image = np.random.randint(0, 256, size=shape, dtype=np.uint8)
return encode_image(random_image, fmt=fmt)
def put_int64_to_context(seq_example: tf.train.SequenceExample,
label: int = 0,
key: str = LABEL_KEY):
"""Puts int64 to SequenceExample context with key."""
seq_example.context.feature[key].int64_list.value[:] = [label]
def put_bytes_list_to_feature(seq_example: tf.train.SequenceExample,
raw_image_bytes: bytes,
key: str = IMAGE_KEY,
repeat_num: int = 2):
"""Puts bytes list to SequenceExample context with key."""
for _ in range(repeat_num):
seq_example.feature_lists.feature_list.get_or_create(
key).feature.add().bytes_list.value[:] = [raw_image_bytes]
def put_float_list_to_feature(seq_example: tf.train.SequenceExample,
value: Sequence[Sequence[float]], key: str):
"""Puts float list to SequenceExample context with key."""
for s in value:
seq_example.feature_lists.feature_list.get_or_create(
key).feature.add().float_list.value[:] = s
def make_video_test_example(image_shape: Sequence[int] = (263, 320, 3),
audio_shape: Sequence[int] = (10, 256),
label: int = 42):
"""Generates data for testing video models (inc. RGB, audio, & label)."""
raw_image_bytes = make_image_bytes(shape=image_shape)
random_audio = np.random.normal(size=audio_shape).tolist()
seq_example = tf.train.SequenceExample()
put_int64_to_context(seq_example, label=label, key=LABEL_KEY)
put_bytes_list_to_feature(
seq_example, raw_image_bytes, key=IMAGE_KEY, repeat_num=4)
put_float_list_to_feature(seq_example, value=random_audio, key=AUDIO_KEY)
return seq_example
def dump_to_tfrecord(record_file: str,
tf_examples: Sequence[Union[tf.train.Example,
tf.train.SequenceExample]]):
"""Writes serialized Example to TFRecord file with path."""
with tf.io.TFRecordWriter(record_file) as writer:
for tf_example in tf_examples:
writer.write(tf_example.SerializeToString())
def _encode_image(image_array: np.ndarray, fmt: str) -> bytes:
"""Util function to encode an image."""
image = Image.fromarray(image_array)
with io.BytesIO() as output:
image.save(output, format=fmt)
return output.getvalue()
def create_classification_example(
image_height: int,
image_width: int,
image_format: str = 'JPEG',
is_multilabel: bool = False) -> tf.train.Example:
"""Creates image and labels for image classification input pipeline."""
image = _encode_image(
np.uint8(np.random.rand(image_height, image_width, 3) * 255),
fmt=image_format)
labels = [0, 1] if is_multilabel else [0]
serialized_example = tf.train.Example(
features=tf.train.Features(
feature={
IMAGE_KEY: (tf.train.Feature(
bytes_list=tf.train.BytesList(value=[image]))),
CLASSIFICATION_LABEL_KEY: (tf.train.Feature(
int64_list=tf.train.Int64List(value=labels))),
})).SerializeToString()
return serialized_example
def create_distillation_example(
image_height: int,
image_width: int,
num_labels: int,
image_format: str = 'JPEG') -> tf.train.Example:
"""Creates image and labels for image classification with distillation."""
image = _encode_image(
np.uint8(np.random.rand(image_height, image_width, 3) * 255),
fmt=image_format)
soft_labels = [0.6] * num_labels
serialized_example = tf.train.Example(
features=tf.train.Features(
feature={
IMAGE_KEY: (tf.train.Feature(
bytes_list=tf.train.BytesList(value=[image]))),
DISTILATION_LABEL_KEY: (tf.train.Feature(
float_list=tf.train.FloatList(value=soft_labels))),
})).SerializeToString()
return serialized_example
def create_3d_image_test_example(image_height: int, image_width: int,
image_volume: int,
image_channel: int) -> tf.train.Example:
"""Creates 3D image and label."""
images = np.random.rand(image_height, image_width, image_volume,
image_channel)
images = images.astype(np.float32)
labels = np.random.randint(
low=2, size=(image_height, image_width, image_volume, image_channel))
labels = labels.astype(np.float32)
feature = {
IMAGE_KEY: (tf.train.Feature(
bytes_list=tf.train.BytesList(value=[images.tobytes()]))),
CLASSIFICATION_LABEL_KEY: (tf.train.Feature(
bytes_list=tf.train.BytesList(value=[labels.tobytes()])))
}
return tf.train.Example(features=tf.train.Features(feature=feature))
def create_detection_test_example(image_height: int, image_width: int,
image_channel: int,
num_instances: int) -> tf.train.Example:
"""Creates and returns a test example containing box and mask annotations.
Args:
image_height: The height of test image.
image_width: The width of test image.
image_channel: The channel of test image.
num_instances: The number of object instances per image.
Returns:
A tf.train.Example for testing.
"""
image = make_image_bytes([image_height, image_width, image_channel])
if num_instances == 0:
xmins = []
xmaxs = []
ymins = []
ymaxs = []
labels = []
areas = []
is_crowds = []
masks = []
labels_text = []
else:
xmins = list(np.random.rand(num_instances))
xmaxs = list(np.random.rand(num_instances))
ymins = list(np.random.rand(num_instances))
ymaxs = list(np.random.rand(num_instances))
labels_text = [b'class_1'] * num_instances
labels = list(np.random.randint(100, size=num_instances))
areas = [(xmax - xmin) * (ymax - ymin) * image_height * image_width
for xmin, xmax, ymin, ymax in zip(xmins, xmaxs, ymins, ymaxs)]
is_crowds = [0] * num_instances
masks = []
for _ in range(num_instances):
mask = make_image_bytes([image_height, image_width], fmt='PNG')
masks.append(mask)
return tf.train.Example(
features=tf.train.Features(
feature={
'image/encoded': (tf.train.Feature(
bytes_list=tf.train.BytesList(value=[image]))),
'image/source_id': (tf.train.Feature(
bytes_list=tf.train.BytesList(value=[DUMP_SOURCE_ID]))),
'image/height': (tf.train.Feature(
int64_list=tf.train.Int64List(value=[image_height]))),
'image/width': (tf.train.Feature(
int64_list=tf.train.Int64List(value=[image_width]))),
'image/object/bbox/xmin': (tf.train.Feature(
float_list=tf.train.FloatList(value=xmins))),
'image/object/bbox/xmax': (tf.train.Feature(
float_list=tf.train.FloatList(value=xmaxs))),
'image/object/bbox/ymin': (tf.train.Feature(
float_list=tf.train.FloatList(value=ymins))),
'image/object/bbox/ymax': (tf.train.Feature(
float_list=tf.train.FloatList(value=ymaxs))),
'image/object/class/label': (tf.train.Feature(
int64_list=tf.train.Int64List(value=labels))),
'image/object/class/text': (tf.train.Feature(
bytes_list=tf.train.BytesList(value=labels_text))),
'image/object/is_crowd': (tf.train.Feature(
int64_list=tf.train.Int64List(value=is_crowds))),
'image/object/area': (tf.train.Feature(
float_list=tf.train.FloatList(value=areas))),
'image/object/mask': (tf.train.Feature(
bytes_list=tf.train.BytesList(value=masks))),
}))
def create_segmentation_test_example(image_height: int, image_width: int,
image_channel: int) -> tf.train.Example:
"""Creates and returns a test example containing mask annotations.
Args:
image_height: The height of test image.
image_width: The width of test image.
image_channel: The channel of test image.
Returns:
A tf.train.Example for testing.
"""
image = make_image_bytes([image_height, image_width, image_channel])
mask = make_image_bytes([image_height, image_width], fmt='PNG')
return tf.train.Example(
features=tf.train.Features(
feature={
'image/encoded': (tf.train.Feature(
bytes_list=tf.train.BytesList(value=[image]))),
'image/segmentation/class/encoded': (tf.train.Feature(
bytes_list=tf.train.BytesList(value=[mask]))),
'image/height': (tf.train.Feature(
int64_list=tf.train.Int64List(value=[image_height]))),
'image/width': (tf.train.Feature(
int64_list=tf.train.Int64List(value=[image_width])))
}))
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Data loader utils."""
from typing import Dict
# Import libraries
import tensorflow as tf
from official.vision.ops import preprocess_ops
def process_source_id(source_id: tf.Tensor) -> tf.Tensor:
"""Processes source_id to the right format.
Args:
source_id: A `tf.Tensor` that contains the source ID. It can be empty.
Returns:
A formatted source ID.
"""
if source_id.dtype == tf.string:
source_id = tf.strings.to_number(source_id, tf.int64)
with tf.control_dependencies([source_id]):
source_id = tf.cond(
pred=tf.equal(tf.size(input=source_id), 0),
true_fn=lambda: tf.cast(tf.constant(-1), tf.int64),
false_fn=lambda: tf.identity(source_id))
return source_id
def pad_groundtruths_to_fixed_size(groundtruths: Dict[str, tf.Tensor],
size: int) -> Dict[str, tf.Tensor]:
"""Pads the first dimension of groundtruths labels to the fixed size.
Args:
groundtruths: A dictionary of {`str`: `tf.Tensor`} that contains groundtruth
annotations of `boxes`, `is_crowds`, `areas` and `classes`.
size: An `int` that specifies the expected size of the first dimension of
padded tensors.
Returns:
A dictionary of the same keys as input and padded tensors as values.
"""
groundtruths['boxes'] = preprocess_ops.clip_or_pad_to_fixed_size(
groundtruths['boxes'], size, -1)
groundtruths['is_crowds'] = preprocess_ops.clip_or_pad_to_fixed_size(
groundtruths['is_crowds'], size, 0)
groundtruths['areas'] = preprocess_ops.clip_or_pad_to_fixed_size(
groundtruths['areas'], size, -1)
groundtruths['classes'] = preprocess_ops.clip_or_pad_to_fixed_size(
groundtruths['classes'], size, -1)
if 'attributes' in groundtruths:
for k, v in groundtruths['attributes'].items():
groundtruths['attributes'][k] = preprocess_ops.clip_or_pad_to_fixed_size(
v, size, -1)
return groundtruths
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for dataloader utils functions."""
# Import libraries
from absl.testing import parameterized
import tensorflow as tf
from official.vision.dataloaders import utils
class UtilsTest(tf.test.TestCase, parameterized.TestCase):
def test_process_empty_source_id(self):
source_id = tf.constant([], dtype=tf.int64)
source_id = tf.strings.as_string(source_id)
self.assertEqual(-1, utils.process_source_id(source_id=source_id))
@parameterized.parameters(
([128, 256], [128, 256]),
([128, 32, 16], [128, 32, 16]),
)
def test_process_source_id(self, source_id, expected_result):
source_id = tf.constant(source_id, dtype=tf.int64)
source_id = tf.strings.as_string(source_id)
self.assertSequenceAlmostEqual(expected_result,
utils.process_source_id(source_id=source_id))
@parameterized.parameters(
([[10, 20, 30, 40]], [[100]], [[0]], 10, None),
([[0.1, 0.2, 0.5, 0.6]], [[0.5]], [[1]], 2, [[1.0, 2.0]]),
)
def test_pad_groundtruths_to_fixed_size(self, boxes, area, classes, size,
attributes):
groundtruths = {}
groundtruths['boxes'] = tf.constant(boxes)
groundtruths['is_crowds'] = tf.constant([[0]])
groundtruths['areas'] = tf.constant(area)
groundtruths['classes'] = tf.constant(classes)
if attributes:
groundtruths['attributes'] = {'depth': tf.constant(attributes)}
actual_result = utils.pad_groundtruths_to_fixed_size(
groundtruths=groundtruths, size=size)
# Check that the first dimension is padded to the expected size.
for key in actual_result:
if key == 'attributes':
for _, v in actual_result[key].items():
pad_shape = v.shape[0]
self.assertEqual(size, pad_shape)
else:
pad_shape = actual_result[key].shape[0]
self.assertEqual(size, pad_shape)
if __name__ == '__main__':
tf.test.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
"""Parser for video and label datasets."""
from typing import Dict, Optional, Tuple, Union
from absl import logging
import tensorflow as tf
from official.vision.configs import video_classification as exp_cfg
from official.vision.dataloaders import decoder
from official.vision.dataloaders import parser
from official.vision.ops import augment
from official.vision.ops import preprocess_ops_3d
IMAGE_KEY = 'image/encoded'
LABEL_KEY = 'clip/label/index'
def process_image(image: tf.Tensor,
is_training: bool = True,
num_frames: int = 32,
stride: int = 1,
random_stride_range: int = 0,
num_test_clips: int = 1,
min_resize: int = 256,
crop_size: int = 224,
num_crops: int = 1,
zero_centering_image: bool = False,
min_aspect_ratio: float = 0.5,
max_aspect_ratio: float = 2,
min_area_ratio: float = 0.49,
max_area_ratio: float = 1.0,
augmenter: Optional[augment.ImageAugment] = None,
seed: Optional[int] = None) -> tf.Tensor:
"""Processes a serialized image tensor.
Args:
image: Input Tensor of shape [timesteps] and type tf.string of serialized
frames.
is_training: Whether or not in training mode. If True, random sample, crop
and left right flip is used.
num_frames: Number of frames per subclip.
stride: Temporal stride to sample frames.
random_stride_range: An int indicating the min and max bounds to uniformly
sample different strides from the video. E.g., a value of 1 with stride=2
will uniformly sample a stride in {1, 2, 3} for each video in a batch.
Only used enabled training for the purposes of frame-rate augmentation.
Defaults to 0, which disables random sampling.
num_test_clips: Number of test clips (1 by default). If more than 1, this
will sample multiple linearly spaced clips within each video at test time.
If 1, then a single clip in the middle of the video is sampled. The clips
are aggreagated in the batch dimension.
min_resize: Frames are resized so that min(height, width) is min_resize.
crop_size: Final size of the frame after cropping the resized frames. Both
height and width are the same.
num_crops: Number of crops to perform on the resized frames.
zero_centering_image: If True, frames are normalized to values in [-1, 1].
If False, values in [0, 1].
min_aspect_ratio: The minimum aspect range for cropping.
max_aspect_ratio: The maximum aspect range for cropping.
min_area_ratio: The minimum area range for cropping.
max_area_ratio: The maximum area range for cropping.
augmenter: Image augmenter to distort each image.
seed: A deterministic seed to use when sampling.
Returns:
Processed frames. Tensor of shape
[num_frames * num_test_clips, crop_size, crop_size, 3].
"""
# Validate parameters.
if is_training and num_test_clips != 1:
logging.warning(
'`num_test_clips` %d is ignored since `is_training` is `True`.',
num_test_clips)
if random_stride_range < 0:
raise ValueError('Random stride range should be >= 0, got {}'.format(
random_stride_range))
# Temporal sampler.
if is_training:
if random_stride_range > 0:
# Uniformly sample different frame-rates
stride = tf.random.uniform(
[],
tf.maximum(stride - random_stride_range, 1),
stride + random_stride_range,
dtype=tf.int32)
# Sample random clip.
image = preprocess_ops_3d.sample_sequence(image, num_frames, True, stride,
seed)
elif num_test_clips > 1:
# Sample linspace clips.
image = preprocess_ops_3d.sample_linspace_sequence(image, num_test_clips,
num_frames, stride)
else:
# Sample middle clip.
image = preprocess_ops_3d.sample_sequence(image, num_frames, False, stride)
# Decode JPEG string to tf.uint8.
if image.dtype == tf.string:
image = preprocess_ops_3d.decode_jpeg(image, 3)
if is_training:
# Standard image data augmentation: random resized crop and random flip.
image = preprocess_ops_3d.random_crop_resize(
image, crop_size, crop_size, num_frames, 3,
(min_aspect_ratio, max_aspect_ratio),
(min_area_ratio, max_area_ratio))
image = preprocess_ops_3d.random_flip_left_right(image, seed)
if augmenter is not None:
image = augmenter.distort(image)
else:
# Resize images (resize happens only if necessary to save compute).
image = preprocess_ops_3d.resize_smallest(image, min_resize)
# Crop of the frames.
image = preprocess_ops_3d.crop_image(image, crop_size, crop_size, False,
num_crops)
# Cast the frames in float32, normalizing according to zero_centering_image.
return preprocess_ops_3d.normalize_image(image, zero_centering_image)
def postprocess_image(image: tf.Tensor,
is_training: bool = True,
num_frames: int = 32,
num_test_clips: int = 1,
num_test_crops: int = 1) -> tf.Tensor:
"""Processes a batched Tensor of frames.
The same parameters used in process should be used here.
Args:
image: Input Tensor of shape [batch, timesteps, height, width, 3].
is_training: Whether or not in training mode. If True, random sample, crop
and left right flip is used.
num_frames: Number of frames per subclip.
num_test_clips: Number of test clips (1 by default). If more than 1, this
will sample multiple linearly spaced clips within each video at test time.
If 1, then a single clip in the middle of the video is sampled. The clips
are aggreagated in the batch dimension.
num_test_crops: Number of test crops (1 by default). If more than 1, there
are multiple crops for each clip at test time. If 1, there is a single
central crop. The crops are aggreagated in the batch dimension.
Returns:
Processed frames. Tensor of shape
[batch * num_test_clips * num_test_crops, num_frames, height, width, 3].
"""
num_views = num_test_clips * num_test_crops
if num_views > 1 and not is_training:
# In this case, multiple views are merged together in batch dimenstion which
# will be batch * num_views.
image = tf.reshape(image, [-1, num_frames] + image.shape[2:].as_list())
return image
def process_label(label: tf.Tensor,
one_hot_label: bool = True,
num_classes: Optional[int] = None) -> tf.Tensor:
"""Processes label Tensor."""
# Validate parameters.
if one_hot_label and not num_classes:
raise ValueError(
'`num_classes` should be given when requesting one hot label.')
# Cast to tf.int32.
label = tf.cast(label, dtype=tf.int32)
if one_hot_label:
# Replace label index by one hot representation.
label = tf.one_hot(label, num_classes)
if len(label.shape.as_list()) > 1:
label = tf.reduce_sum(label, axis=0)
if num_classes == 1:
# The trick for single label.
label = 1 - label
return label
class Decoder(decoder.Decoder):
"""A tf.Example decoder for classification task."""
def __init__(self, image_key: str = IMAGE_KEY, label_key: str = LABEL_KEY):
self._context_description = {
# One integer stored in context.
label_key: tf.io.VarLenFeature(tf.int64),
}
self._sequence_description = {
# Each image is a string encoding JPEG.
image_key: tf.io.FixedLenSequenceFeature((), tf.string),
}
def add_feature(self, feature_name: str,
feature_type: Union[tf.io.VarLenFeature,
tf.io.FixedLenFeature,
tf.io.FixedLenSequenceFeature]):
self._sequence_description[feature_name] = feature_type
def add_context(self, feature_name: str,
feature_type: Union[tf.io.VarLenFeature,
tf.io.FixedLenFeature,
tf.io.FixedLenSequenceFeature]):
self._context_description[feature_name] = feature_type
def decode(self, serialized_example):
"""Parses a single tf.Example into image and label tensors."""
result = {}
context, sequences = tf.io.parse_single_sequence_example(
serialized_example, self._context_description,
self._sequence_description)
result.update(context)
result.update(sequences)
for key, value in result.items():
if isinstance(value, tf.SparseTensor):
result[key] = tf.sparse.to_dense(value)
return result
class VideoTfdsDecoder(decoder.Decoder):
"""A tf.SequenceExample decoder for tfds video classification datasets."""
def __init__(self, image_key: str = IMAGE_KEY, label_key: str = LABEL_KEY):
self._image_key = image_key
self._label_key = label_key
def decode(self, features):
"""Decode the TFDS FeatureDict.
Args:
features: features from TFDS video dataset.
See https://www.tensorflow.org/datasets/catalog/ucf101 for example.
Returns:
Dict of tensors.
"""
sample_dict = {
self._image_key: features['video'],
self._label_key: features['label'],
}
return sample_dict
class Parser(parser.Parser):
"""Parses a video and label dataset."""
def __init__(self,
input_params: exp_cfg.DataConfig,
image_key: str = IMAGE_KEY,
label_key: str = LABEL_KEY):
self._num_frames = input_params.feature_shape[0]
self._stride = input_params.temporal_stride
self._random_stride_range = input_params.random_stride_range
self._num_test_clips = input_params.num_test_clips
self._min_resize = input_params.min_image_size
self._crop_size = input_params.feature_shape[1]
self._num_crops = input_params.num_test_crops
self._one_hot_label = input_params.one_hot
self._num_classes = input_params.num_classes
self._image_key = image_key
self._label_key = label_key
self._dtype = tf.dtypes.as_dtype(input_params.dtype)
self._output_audio = input_params.output_audio
self._min_aspect_ratio = input_params.aug_min_aspect_ratio
self._max_aspect_ratio = input_params.aug_max_aspect_ratio
self._min_area_ratio = input_params.aug_min_area_ratio
self._max_area_ratio = input_params.aug_max_area_ratio
if self._output_audio:
self._audio_feature = input_params.audio_feature
self._audio_shape = input_params.audio_feature_shape
self._augmenter = None
if input_params.aug_type is not None:
aug_type = input_params.aug_type
if aug_type == 'autoaug':
logging.info('Using AutoAugment.')
self._augmenter = augment.AutoAugment()
elif aug_type == 'randaug':
logging.info('Using RandAugment.')
self._augmenter = augment.RandAugment()
else:
raise ValueError('Augmentation policy {} is not supported.'.format(
aug_type))
def _parse_train_data(
self, decoded_tensors: Dict[str, tf.Tensor]
) -> Tuple[Dict[str, tf.Tensor], tf.Tensor]:
"""Parses data for training."""
# Process image and label.
image = decoded_tensors[self._image_key]
image = process_image(
image=image,
is_training=True,
num_frames=self._num_frames,
stride=self._stride,
random_stride_range=self._random_stride_range,
num_test_clips=self._num_test_clips,
min_resize=self._min_resize,
crop_size=self._crop_size,
min_aspect_ratio=self._min_aspect_ratio,
max_aspect_ratio=self._max_aspect_ratio,
min_area_ratio=self._min_area_ratio,
max_area_ratio=self._max_area_ratio,
augmenter=self._augmenter)
image = tf.cast(image, dtype=self._dtype)
features = {'image': image}
label = decoded_tensors[self._label_key]
label = process_label(label, self._one_hot_label, self._num_classes)
if self._output_audio:
audio = decoded_tensors[self._audio_feature]
audio = tf.cast(audio, dtype=self._dtype)
# TODO(yeqing): synchronize audio/video sampling. Especially randomness.
audio = preprocess_ops_3d.sample_sequence(
audio, self._audio_shape[0], random=False, stride=1)
audio = tf.ensure_shape(audio, self._audio_shape)
features['audio'] = audio
return features, label
def _parse_eval_data(
self, decoded_tensors: Dict[str, tf.Tensor]
) -> Tuple[Dict[str, tf.Tensor], tf.Tensor]:
"""Parses data for evaluation."""
image = decoded_tensors[self._image_key]
image = process_image(
image=image,
is_training=False,
num_frames=self._num_frames,
stride=self._stride,
num_test_clips=self._num_test_clips,
min_resize=self._min_resize,
crop_size=self._crop_size,
num_crops=self._num_crops)
image = tf.cast(image, dtype=self._dtype)
features = {'image': image}
label = decoded_tensors[self._label_key]
label = process_label(label, self._one_hot_label, self._num_classes)
if self._output_audio:
audio = decoded_tensors[self._audio_feature]
audio = tf.cast(audio, dtype=self._dtype)
audio = preprocess_ops_3d.sample_sequence(
audio, self._audio_shape[0], random=False, stride=1)
audio = tf.ensure_shape(audio, self._audio_shape)
features['audio'] = audio
return features, label
class PostBatchProcessor(object):
"""Processes a video and label dataset which is batched."""
def __init__(self, input_params: exp_cfg.DataConfig):
self._is_training = input_params.is_training
self._num_frames = input_params.feature_shape[0]
self._num_test_clips = input_params.num_test_clips
self._num_test_crops = input_params.num_test_crops
def __call__(self, features: Dict[str, tf.Tensor],
label: tf.Tensor) -> Tuple[Dict[str, tf.Tensor], tf.Tensor]:
"""Parses a single tf.Example into image and label tensors."""
for key in ['image']:
if key in features:
features[key] = postprocess_image(
image=features[key],
is_training=self._is_training,
num_frames=self._num_frames,
num_test_clips=self._num_test_clips,
num_test_crops=self._num_test_crops)
return features, label
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
import io
# Import libraries
import numpy as np
from PIL import Image
import tensorflow as tf
import tensorflow_datasets as tfds
from official.vision.configs import video_classification as exp_cfg
from official.vision.dataloaders import video_input
AUDIO_KEY = 'features/audio'
def fake_seq_example():
# Create fake data.
random_image = np.random.randint(0, 256, size=(263, 320, 3), dtype=np.uint8)
random_image = Image.fromarray(random_image)
label = 42
with io.BytesIO() as buffer:
random_image.save(buffer, format='JPEG')
raw_image_bytes = buffer.getvalue()
seq_example = tf.train.SequenceExample()
seq_example.feature_lists.feature_list.get_or_create(
video_input.IMAGE_KEY).feature.add().bytes_list.value[:] = [
raw_image_bytes
]
seq_example.feature_lists.feature_list.get_or_create(
video_input.IMAGE_KEY).feature.add().bytes_list.value[:] = [
raw_image_bytes
]
seq_example.context.feature[video_input.LABEL_KEY].int64_list.value[:] = [
label
]
random_audio = np.random.normal(size=(10, 256)).tolist()
for s in random_audio:
seq_example.feature_lists.feature_list.get_or_create(
AUDIO_KEY).feature.add().float_list.value[:] = s
return seq_example, label
class DecoderTest(tf.test.TestCase):
"""A tf.SequenceExample decoder for the video classification task."""
def test_decoder(self):
decoder = video_input.Decoder()
seq_example, label = fake_seq_example()
serialized_example = seq_example.SerializeToString()
decoded_tensors = decoder.decode(tf.convert_to_tensor(serialized_example))
results = tf.nest.map_structure(lambda x: x.numpy(), decoded_tensors)
self.assertCountEqual([video_input.IMAGE_KEY, video_input.LABEL_KEY],
results.keys())
self.assertEqual(label, results[video_input.LABEL_KEY])
def test_decode_audio(self):
decoder = video_input.Decoder()
decoder.add_feature(AUDIO_KEY, tf.io.VarLenFeature(dtype=tf.float32))
seq_example, label = fake_seq_example()
serialized_example = seq_example.SerializeToString()
decoded_tensors = decoder.decode(tf.convert_to_tensor(serialized_example))
results = tf.nest.map_structure(lambda x: x.numpy(), decoded_tensors)
self.assertCountEqual(
[video_input.IMAGE_KEY, video_input.LABEL_KEY, AUDIO_KEY],
results.keys())
self.assertEqual(label, results[video_input.LABEL_KEY])
self.assertEqual(results[AUDIO_KEY].shape, (10, 256))
def test_tfds_decode(self):
with tfds.testing.mock_data(num_examples=1):
dataset = tfds.load('ucf101', split='train').take(1)
data = next(iter(dataset))
decoder = video_input.VideoTfdsDecoder()
decoded_tensors = decoder.decode(data)
self.assertContainsSubset([video_input.LABEL_KEY, video_input.IMAGE_KEY],
decoded_tensors.keys())
class VideoAndLabelParserTest(tf.test.TestCase):
def test_video_input(self):
params = exp_cfg.kinetics600(is_training=True)
params.feature_shape = (2, 224, 224, 3)
params.min_image_size = 224
decoder = video_input.Decoder()
parser = video_input.Parser(params).parse_fn(params.is_training)
seq_example, label = fake_seq_example()
input_tensor = tf.constant(seq_example.SerializeToString())
decoded_tensors = decoder.decode(input_tensor)
output_tensor = parser(decoded_tensors)
image_features, label = output_tensor
image = image_features['image']
self.assertAllEqual(image.shape, (2, 224, 224, 3))
self.assertAllEqual(label.shape, (600,))
def test_video_audio_input(self):
params = exp_cfg.kinetics600(is_training=True)
params.feature_shape = (2, 224, 224, 3)
params.min_image_size = 224
params.output_audio = True
params.audio_feature = AUDIO_KEY
params.audio_feature_shape = (15, 256)
decoder = video_input.Decoder()
decoder.add_feature(params.audio_feature,
tf.io.VarLenFeature(dtype=tf.float32))
parser = video_input.Parser(params).parse_fn(params.is_training)
seq_example, label = fake_seq_example()
input_tensor = tf.constant(seq_example.SerializeToString())
decoded_tensors = decoder.decode(input_tensor)
output_tensor = parser(decoded_tensors)
features, label = output_tensor
image = features['image']
audio = features['audio']
self.assertAllEqual(image.shape, (2, 224, 224, 3))
self.assertAllEqual(label.shape, (600,))
self.assertEqual(audio.shape, (15, 256))
def test_video_input_random_stride(self):
params = exp_cfg.kinetics600(is_training=True)
params.feature_shape = (2, 224, 224, 3)
params.min_image_size = 224
params.temporal_stride = 2
params.random_stride_range = 1
decoder = video_input.Decoder()
parser = video_input.Parser(params).parse_fn(params.is_training)
seq_example, label = fake_seq_example()
input_tensor = tf.constant(seq_example.SerializeToString())
decoded_tensors = decoder.decode(input_tensor)
output_tensor = parser(decoded_tensors)
image_features, label = output_tensor
image = image_features['image']
self.assertAllEqual(image.shape, (2, 224, 224, 3))
self.assertAllEqual(label.shape, (600,))
def test_video_input_augmentation_returns_shape(self):
params = exp_cfg.kinetics600(is_training=True)
params.feature_shape = (2, 224, 224, 3)
params.min_image_size = 224
params.temporal_stride = 2
params.aug_type = 'autoaug'
decoder = video_input.Decoder()
parser = video_input.Parser(params).parse_fn(params.is_training)
seq_example, label = fake_seq_example()
input_tensor = tf.constant(seq_example.SerializeToString())
decoded_tensors = decoder.decode(input_tensor)
output_tensor = parser(decoded_tensors)
image_features, label = output_tensor
image = image_features['image']
self.assertAllEqual(image.shape, (2, 224, 224, 3))
self.assertAllEqual(label.shape, (600,))
if __name__ == '__main__':
tf.test.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The COCO-style evaluator.
The following snippet demonstrates the use of interfaces:
evaluator = COCOEvaluator(...)
for _ in range(num_evals):
for _ in range(num_batches_per_eval):
predictions, groundtruth = predictor.predict(...) # pop a batch.
evaluator.update_state(groundtruths, predictions)
evaluator.result() # finish one full eval and reset states.
See also: https://github.com/cocodataset/cocoapi/
"""
import atexit
import tempfile
# Import libraries
from absl import logging
import numpy as np
from pycocotools import cocoeval
import six
import tensorflow as tf
from official.vision.evaluation import coco_utils
class COCOEvaluator(object):
"""COCO evaluation metric class."""
def __init__(self,
annotation_file,
include_mask,
need_rescale_bboxes=True,
per_category_metrics=False):
"""Constructs COCO evaluation class.
The class provides the interface to COCO metrics_fn. The
_update_op() takes detections from each image and push them to
self.detections. The _evaluate() loads a JSON file in COCO annotation format
as the groundtruths and runs COCO evaluation.
Args:
annotation_file: a JSON file that stores annotations of the eval dataset.
If `annotation_file` is None, groundtruth annotations will be loaded
from the dataloader.
include_mask: a boolean to indicate whether or not to include the mask
eval.
need_rescale_bboxes: If true bboxes in `predictions` will be rescaled back
to absolute values (`image_info` is needed in this case).
per_category_metrics: Whether to return per category metrics.
"""
if annotation_file:
if annotation_file.startswith('gs://'):
_, local_val_json = tempfile.mkstemp(suffix='.json')
tf.io.gfile.remove(local_val_json)
tf.io.gfile.copy(annotation_file, local_val_json)
atexit.register(tf.io.gfile.remove, local_val_json)
else:
local_val_json = annotation_file
self._coco_gt = coco_utils.COCOWrapper(
eval_type=('mask' if include_mask else 'box'),
annotation_file=local_val_json)
self._annotation_file = annotation_file
self._include_mask = include_mask
self._per_category_metrics = per_category_metrics
self._metric_names = [
'AP', 'AP50', 'AP75', 'APs', 'APm', 'APl', 'ARmax1', 'ARmax10',
'ARmax100', 'ARs', 'ARm', 'ARl'
]
self._required_prediction_fields = [
'source_id', 'num_detections', 'detection_classes', 'detection_scores',
'detection_boxes'
]
self._need_rescale_bboxes = need_rescale_bboxes
if self._need_rescale_bboxes:
self._required_prediction_fields.append('image_info')
self._required_groundtruth_fields = [
'source_id', 'height', 'width', 'classes', 'boxes'
]
if self._include_mask:
mask_metric_names = ['mask_' + x for x in self._metric_names]
self._metric_names.extend(mask_metric_names)
self._required_prediction_fields.extend(['detection_masks'])
self._required_groundtruth_fields.extend(['masks'])
self.reset_states()
@property
def name(self):
return 'coco_metric'
def reset_states(self):
"""Resets internal states for a fresh run."""
self._predictions = {}
if not self._annotation_file:
self._groundtruths = {}
def result(self):
"""Evaluates detection results, and reset_states."""
metric_dict = self.evaluate()
# Cleans up the internal variables in order for a fresh eval next time.
self.reset_states()
return metric_dict
def evaluate(self):
"""Evaluates with detections from all images with COCO API.
Returns:
coco_metric: float numpy array with shape [24] representing the
coco-style evaluation metrics (box and mask).
"""
if not self._annotation_file:
logging.info('There is no annotation_file in COCOEvaluator.')
gt_dataset = coco_utils.convert_groundtruths_to_coco_dataset(
self._groundtruths)
coco_gt = coco_utils.COCOWrapper(
eval_type=('mask' if self._include_mask else 'box'),
gt_dataset=gt_dataset)
else:
logging.info('Using annotation file: %s', self._annotation_file)
coco_gt = self._coco_gt
coco_predictions = coco_utils.convert_predictions_to_coco_annotations(
self._predictions)
coco_dt = coco_gt.loadRes(predictions=coco_predictions)
image_ids = [ann['image_id'] for ann in coco_predictions]
coco_eval = cocoeval.COCOeval(coco_gt, coco_dt, iouType='bbox')
coco_eval.params.imgIds = image_ids
coco_eval.evaluate()
coco_eval.accumulate()
coco_eval.summarize()
coco_metrics = coco_eval.stats
if self._include_mask:
mcoco_eval = cocoeval.COCOeval(coco_gt, coco_dt, iouType='segm')
mcoco_eval.params.imgIds = image_ids
mcoco_eval.evaluate()
mcoco_eval.accumulate()
mcoco_eval.summarize()
mask_coco_metrics = mcoco_eval.stats
if self._include_mask:
metrics = np.hstack((coco_metrics, mask_coco_metrics))
else:
metrics = coco_metrics
metrics_dict = {}
for i, name in enumerate(self._metric_names):
metrics_dict[name] = metrics[i].astype(np.float32)
# Adds metrics per category.
if self._per_category_metrics:
metrics_dict.update(self._retrieve_per_category_metrics(coco_eval))
if self._include_mask:
metrics_dict.update(self._retrieve_per_category_metrics(
mcoco_eval, prefix='mask'))
return metrics_dict
def _retrieve_per_category_metrics(self, coco_eval, prefix=''):
"""Retrieves and per-category metrics and retuns them in a dict.
Args:
coco_eval: a cocoeval.COCOeval object containing evaluation data.
prefix: str, A string used to prefix metric names.
Returns:
metrics_dict: A dictionary with per category metrics.
"""
metrics_dict = {}
if prefix:
prefix = prefix + ' '
if hasattr(coco_eval, 'category_stats'):
for category_index, category_id in enumerate(coco_eval.params.catIds):
if self._annotation_file:
coco_category = self._coco_gt.cats[category_id]
# if 'name' is available use it, otherwise use `id`
category_display_name = coco_category.get('name', category_id)
else:
category_display_name = category_id
metrics_dict[prefix + 'Precision mAP ByCategory/{}'.format(
category_display_name
)] = coco_eval.category_stats[0][category_index].astype(np.float32)
metrics_dict[prefix + 'Precision mAP ByCategory@50IoU/{}'.format(
category_display_name
)] = coco_eval.category_stats[1][category_index].astype(np.float32)
metrics_dict[prefix + 'Precision mAP ByCategory@75IoU/{}'.format(
category_display_name
)] = coco_eval.category_stats[2][category_index].astype(np.float32)
metrics_dict[prefix + 'Precision mAP ByCategory (small) /{}'.format(
category_display_name
)] = coco_eval.category_stats[3][category_index].astype(np.float32)
metrics_dict[prefix + 'Precision mAP ByCategory (medium) /{}'.format(
category_display_name
)] = coco_eval.category_stats[4][category_index].astype(np.float32)
metrics_dict[prefix + 'Precision mAP ByCategory (large) /{}'.format(
category_display_name
)] = coco_eval.category_stats[5][category_index].astype(np.float32)
metrics_dict[prefix + 'Recall AR@1 ByCategory/{}'.format(
category_display_name
)] = coco_eval.category_stats[6][category_index].astype(np.float32)
metrics_dict[prefix + 'Recall AR@10 ByCategory/{}'.format(
category_display_name
)] = coco_eval.category_stats[7][category_index].astype(np.float32)
metrics_dict[prefix + 'Recall AR@100 ByCategory/{}'.format(
category_display_name
)] = coco_eval.category_stats[8][category_index].astype(np.float32)
metrics_dict[prefix + 'Recall AR (small) ByCategory/{}'.format(
category_display_name
)] = coco_eval.category_stats[9][category_index].astype(np.float32)
metrics_dict[prefix + 'Recall AR (medium) ByCategory/{}'.format(
category_display_name
)] = coco_eval.category_stats[10][category_index].astype(np.float32)
metrics_dict[prefix + 'Recall AR (large) ByCategory/{}'.format(
category_display_name
)] = coco_eval.category_stats[11][category_index].astype(np.float32)
return metrics_dict
def _process_predictions(self, predictions):
image_scale = np.tile(predictions['image_info'][:, 2:3, :], (1, 1, 2))
predictions['detection_boxes'] = (
predictions['detection_boxes'].astype(np.float32))
predictions['detection_boxes'] /= image_scale
if 'detection_outer_boxes' in predictions:
predictions['detection_outer_boxes'] = (
predictions['detection_outer_boxes'].astype(np.float32))
predictions['detection_outer_boxes'] /= image_scale
def _convert_to_numpy(self, groundtruths, predictions):
"""Converts tesnors to numpy arrays."""
if groundtruths:
labels = tf.nest.map_structure(lambda x: x.numpy(), groundtruths)
numpy_groundtruths = {}
for key, val in labels.items():
if isinstance(val, tuple):
val = np.concatenate(val)
numpy_groundtruths[key] = val
else:
numpy_groundtruths = groundtruths
if predictions:
outputs = tf.nest.map_structure(lambda x: x.numpy(), predictions)
numpy_predictions = {}
for key, val in outputs.items():
if isinstance(val, tuple):
val = np.concatenate(val)
numpy_predictions[key] = val
else:
numpy_predictions = predictions
return numpy_groundtruths, numpy_predictions
def update_state(self, groundtruths, predictions):
"""Update and aggregate detection results and groundtruth data.
Args:
groundtruths: a dictionary of Tensors including the fields below.
See also different parsers under `../dataloader` for more details.
Required fields:
- source_id: a numpy array of int or string of shape [batch_size].
- height: a numpy array of int of shape [batch_size].
- width: a numpy array of int of shape [batch_size].
- num_detections: a numpy array of int of shape [batch_size].
- boxes: a numpy array of float of shape [batch_size, K, 4].
- classes: a numpy array of int of shape [batch_size, K].
Optional fields:
- is_crowds: a numpy array of int of shape [batch_size, K]. If the
field is absent, it is assumed that this instance is not crowd.
- areas: a numy array of float of shape [batch_size, K]. If the
field is absent, the area is calculated using either boxes or
masks depending on which one is available.
- masks: a numpy array of float of shape
[batch_size, K, mask_height, mask_width],
predictions: a dictionary of tensors including the fields below.
See different parsers under `../dataloader` for more details.
Required fields:
- source_id: a numpy array of int or string of shape [batch_size].
- image_info [if `need_rescale_bboxes` is True]: a numpy array of
float of shape [batch_size, 4, 2].
- num_detections: a numpy array of
int of shape [batch_size].
- detection_boxes: a numpy array of float of shape [batch_size, K, 4].
- detection_classes: a numpy array of int of shape [batch_size, K].
- detection_scores: a numpy array of float of shape [batch_size, K].
Optional fields:
- detection_masks: a numpy array of float of shape
[batch_size, K, mask_height, mask_width].
Raises:
ValueError: if the required prediction or groundtruth fields are not
present in the incoming `predictions` or `groundtruths`.
"""
groundtruths, predictions = self._convert_to_numpy(groundtruths,
predictions)
for k in self._required_prediction_fields:
if k not in predictions:
raise ValueError(
'Missing the required key `{}` in predictions!'.format(k))
if self._need_rescale_bboxes:
self._process_predictions(predictions)
for k, v in six.iteritems(predictions):
if k not in self._predictions:
self._predictions[k] = [v]
else:
self._predictions[k].append(v)
if not self._annotation_file:
assert groundtruths
for k in self._required_groundtruth_fields:
if k not in groundtruths:
raise ValueError(
'Missing the required key `{}` in groundtruths!'.format(k))
for k, v in six.iteritems(groundtruths):
if k not in self._groundtruths:
self._groundtruths[k] = [v]
else:
self._groundtruths[k].append(v)
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Util functions related to pycocotools and COCO eval."""
import copy
import json
# Import libraries
from absl import logging
import numpy as np
from PIL import Image
from pycocotools import coco
from pycocotools import mask as mask_api
import six
import tensorflow as tf
from official.common import dataset_fn
from official.vision.dataloaders import tf_example_decoder
from official.vision.ops import box_ops
from official.vision.ops import mask_ops
class COCOWrapper(coco.COCO):
"""COCO wrapper class.
This class wraps COCO API object, which provides the following additional
functionalities:
1. Support string type image id.
2. Support loading the groundtruth dataset using the external annotation
dictionary.
3. Support loading the prediction results using the external annotation
dictionary.
"""
def __init__(self, eval_type='box', annotation_file=None, gt_dataset=None):
"""Instantiates a COCO-style API object.
Args:
eval_type: either 'box' or 'mask'.
annotation_file: a JSON file that stores annotations of the eval dataset.
This is required if `gt_dataset` is not provided.
gt_dataset: the groundtruth eval datatset in COCO API format.
"""
if ((annotation_file and gt_dataset) or
((not annotation_file) and (not gt_dataset))):
raise ValueError('One and only one of `annotation_file` and `gt_dataset` '
'needs to be specified.')
if eval_type not in ['box', 'mask']:
raise ValueError('The `eval_type` can only be either `box` or `mask`.')
coco.COCO.__init__(self, annotation_file=annotation_file)
self._eval_type = eval_type
if gt_dataset:
self.dataset = gt_dataset
self.createIndex()
def loadRes(self, predictions):
"""Loads result file and return a result api object.
Args:
predictions: a list of dictionary each representing an annotation in COCO
format. The required fields are `image_id`, `category_id`, `score`,
`bbox`, `segmentation`.
Returns:
res: result COCO api object.
Raises:
ValueError: if the set of image id from predctions is not the subset of
the set of image id of the groundtruth dataset.
"""
res = coco.COCO()
res.dataset['images'] = copy.deepcopy(self.dataset['images'])
res.dataset['categories'] = copy.deepcopy(self.dataset['categories'])
image_ids = [ann['image_id'] for ann in predictions]
if set(image_ids) != (set(image_ids) & set(self.getImgIds())):
raise ValueError('Results do not correspond to the current dataset!')
for ann in predictions:
x1, x2, y1, y2 = [ann['bbox'][0], ann['bbox'][0] + ann['bbox'][2],
ann['bbox'][1], ann['bbox'][1] + ann['bbox'][3]]
if self._eval_type == 'box':
ann['area'] = ann['bbox'][2] * ann['bbox'][3]
ann['segmentation'] = [
[x1, y1, x1, y2, x2, y2, x2, y1]]
elif self._eval_type == 'mask':
ann['area'] = mask_api.area(ann['segmentation'])
res.dataset['annotations'] = copy.deepcopy(predictions)
res.createIndex()
return res
def convert_predictions_to_coco_annotations(predictions):
"""Converts a batch of predictions to annotations in COCO format.
Args:
predictions: a dictionary of lists of numpy arrays including the following
fields. K below denotes the maximum number of instances per image.
Required fields:
- source_id: a list of numpy arrays of int or string of shape
[batch_size].
- num_detections: a list of numpy arrays of int of shape [batch_size].
- detection_boxes: a list of numpy arrays of float of shape
[batch_size, K, 4], where coordinates are in the original image
space (not the scaled image space).
- detection_classes: a list of numpy arrays of int of shape
[batch_size, K].
- detection_scores: a list of numpy arrays of float of shape
[batch_size, K].
Optional fields:
- detection_masks: a list of numpy arrays of float of shape
[batch_size, K, mask_height, mask_width].
Returns:
coco_predictions: prediction in COCO annotation format.
"""
coco_predictions = []
num_batches = len(predictions['source_id'])
max_num_detections = predictions['detection_classes'][0].shape[1]
use_outer_box = 'detection_outer_boxes' in predictions
for i in range(num_batches):
predictions['detection_boxes'][i] = box_ops.yxyx_to_xywh(
predictions['detection_boxes'][i])
if use_outer_box:
predictions['detection_outer_boxes'][i] = box_ops.yxyx_to_xywh(
predictions['detection_outer_boxes'][i])
mask_boxes = predictions['detection_outer_boxes']
else:
mask_boxes = predictions['detection_boxes']
batch_size = predictions['source_id'][i].shape[0]
for j in range(batch_size):
if 'detection_masks' in predictions:
image_masks = mask_ops.paste_instance_masks(
predictions['detection_masks'][i][j],
mask_boxes[i][j],
int(predictions['image_info'][i][j, 0, 0]),
int(predictions['image_info'][i][j, 0, 1]))
binary_masks = (image_masks > 0.0).astype(np.uint8)
encoded_masks = [
mask_api.encode(np.asfortranarray(binary_mask))
for binary_mask in list(binary_masks)]
for k in range(max_num_detections):
ann = {}
ann['image_id'] = predictions['source_id'][i][j]
ann['category_id'] = predictions['detection_classes'][i][j, k]
ann['bbox'] = predictions['detection_boxes'][i][j, k]
ann['score'] = predictions['detection_scores'][i][j, k]
if 'detection_masks' in predictions:
ann['segmentation'] = encoded_masks[k]
coco_predictions.append(ann)
for i, ann in enumerate(coco_predictions):
ann['id'] = i + 1
return coco_predictions
def convert_groundtruths_to_coco_dataset(groundtruths, label_map=None):
"""Converts groundtruths to the dataset in COCO format.
Args:
groundtruths: a dictionary of numpy arrays including the fields below.
Note that each element in the list represent the number for a single
example without batch dimension. K below denotes the actual number of
instances for each image.
Required fields:
- source_id: a list of numpy arrays of int or string of shape
[batch_size].
- height: a list of numpy arrays of int of shape [batch_size].
- width: a list of numpy arrays of int of shape [batch_size].
- num_detections: a list of numpy arrays of int of shape [batch_size].
- boxes: a list of numpy arrays of float of shape [batch_size, K, 4],
where coordinates are in the original image space (not the
normalized coordinates).
- classes: a list of numpy arrays of int of shape [batch_size, K].
Optional fields:
- is_crowds: a list of numpy arrays of int of shape [batch_size, K]. If
th field is absent, it is assumed that this instance is not crowd.
- areas: a list of numy arrays of float of shape [batch_size, K]. If the
field is absent, the area is calculated using either boxes or
masks depending on which one is available.
- masks: a list of numpy arrays of string of shape [batch_size, K],
label_map: (optional) a dictionary that defines items from the category id
to the category name. If `None`, collect the category mappping from the
`groundtruths`.
Returns:
coco_groundtruths: the groundtruth dataset in COCO format.
"""
source_ids = np.concatenate(groundtruths['source_id'], axis=0)
heights = np.concatenate(groundtruths['height'], axis=0)
widths = np.concatenate(groundtruths['width'], axis=0)
gt_images = [{'id': int(i), 'height': int(h), 'width': int(w)} for i, h, w
in zip(source_ids, heights, widths)]
gt_annotations = []
num_batches = len(groundtruths['source_id'])
for i in range(num_batches):
logging.info(
'convert_groundtruths_to_coco_dataset: Processing annotation %d', i)
max_num_instances = groundtruths['classes'][i].shape[1]
batch_size = groundtruths['source_id'][i].shape[0]
for j in range(batch_size):
num_instances = groundtruths['num_detections'][i][j]
if num_instances > max_num_instances:
logging.warning(
'num_groundtruths is larger than max_num_instances, %d v.s. %d',
num_instances, max_num_instances)
num_instances = max_num_instances
for k in range(int(num_instances)):
ann = {}
ann['image_id'] = int(groundtruths['source_id'][i][j])
if 'is_crowds' in groundtruths:
ann['iscrowd'] = int(groundtruths['is_crowds'][i][j, k])
else:
ann['iscrowd'] = 0
ann['category_id'] = int(groundtruths['classes'][i][j, k])
boxes = groundtruths['boxes'][i]
ann['bbox'] = [
float(boxes[j, k, 1]),
float(boxes[j, k, 0]),
float(boxes[j, k, 3] - boxes[j, k, 1]),
float(boxes[j, k, 2] - boxes[j, k, 0])]
if 'areas' in groundtruths:
ann['area'] = float(groundtruths['areas'][i][j, k])
else:
ann['area'] = float(
(boxes[j, k, 3] - boxes[j, k, 1]) *
(boxes[j, k, 2] - boxes[j, k, 0]))
if 'masks' in groundtruths:
if isinstance(groundtruths['masks'][i][j, k], tf.Tensor):
mask = Image.open(
six.BytesIO(groundtruths['masks'][i][j, k].numpy()))
width, height = mask.size
np_mask = (
np.array(mask.getdata()).reshape(height,
width).astype(np.uint8))
else:
mask = Image.open(
six.BytesIO(groundtruths['masks'][i][j, k]))
width, height = mask.size
np_mask = (
np.array(mask.getdata()).reshape(height,
width).astype(np.uint8))
np_mask[np_mask > 0] = 255
encoded_mask = mask_api.encode(np.asfortranarray(np_mask))
ann['segmentation'] = encoded_mask
# Ensure the content of `counts` is JSON serializable string.
if 'counts' in ann['segmentation']:
ann['segmentation']['counts'] = six.ensure_str(
ann['segmentation']['counts'])
if 'areas' not in groundtruths:
ann['area'] = mask_api.area(encoded_mask)
gt_annotations.append(ann)
for i, ann in enumerate(gt_annotations):
ann['id'] = i + 1
if label_map:
gt_categories = [{'id': i, 'name': label_map[i]} for i in label_map]
else:
category_ids = [gt['category_id'] for gt in gt_annotations]
gt_categories = [{'id': i} for i in set(category_ids)]
gt_dataset = {
'images': gt_images,
'categories': gt_categories,
'annotations': copy.deepcopy(gt_annotations),
}
return gt_dataset
class COCOGroundtruthGenerator:
"""Generates the groundtruth annotations from a single example."""
def __init__(self, file_pattern, file_type, num_examples, include_mask,
regenerate_source_id=False):
self._file_pattern = file_pattern
self._num_examples = num_examples
self._include_mask = include_mask
self._dataset_fn = dataset_fn.pick_dataset_fn(file_type)
self._regenerate_source_id = regenerate_source_id
def _parse_single_example(self, example):
"""Parses a single serialized tf.Example proto.
Args:
example: a serialized tf.Example proto string.
Returns:
A dictionary of groundtruth with the following fields:
source_id: a scalar tensor of int64 representing the image source_id.
height: a scalar tensor of int64 representing the image height.
width: a scalar tensor of int64 representing the image width.
boxes: a float tensor of shape [K, 4], representing the groundtruth
boxes in absolute coordinates with respect to the original image size.
classes: a int64 tensor of shape [K], representing the class labels of
each instances.
is_crowds: a bool tensor of shape [K], indicating whether the instance
is crowd.
areas: a float tensor of shape [K], indicating the area of each
instance.
masks: a string tensor of shape [K], containing the bytes of the png
mask of each instance.
"""
decoder = tf_example_decoder.TfExampleDecoder(
include_mask=self._include_mask,
regenerate_source_id=self._regenerate_source_id)
decoded_tensors = decoder.decode(example)
image = decoded_tensors['image']
image_size = tf.shape(image)[0:2]
boxes = box_ops.denormalize_boxes(
decoded_tensors['groundtruth_boxes'], image_size)
source_id = decoded_tensors['source_id']
if source_id.dtype is tf.string:
source_id = tf.strings.to_number(source_id, out_type=tf.int64)
groundtruths = {
'source_id': source_id,
'height': decoded_tensors['height'],
'width': decoded_tensors['width'],
'num_detections': tf.shape(decoded_tensors['groundtruth_classes'])[0],
'boxes': boxes,
'classes': decoded_tensors['groundtruth_classes'],
'is_crowds': decoded_tensors['groundtruth_is_crowd'],
'areas': decoded_tensors['groundtruth_area'],
}
if self._include_mask:
groundtruths.update({
'masks': decoded_tensors['groundtruth_instance_masks_png'],
})
return groundtruths
def _build_pipeline(self):
"""Builds data pipeline to generate groundtruth annotations."""
dataset = tf.data.Dataset.list_files(self._file_pattern, shuffle=False)
dataset = dataset.interleave(
map_func=lambda filename: self._dataset_fn(filename).prefetch(1),
cycle_length=None,
num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset = dataset.take(self._num_examples)
dataset = dataset.map(self._parse_single_example,
num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset = dataset.batch(1, drop_remainder=False)
dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
return dataset
def __call__(self):
return self._build_pipeline()
def scan_and_generator_annotation_file(file_pattern: str,
file_type: str,
num_samples: int,
include_mask: bool,
annotation_file: str,
regenerate_source_id: bool = False):
"""Scans and generate the COCO-style annotation JSON file given a dataset."""
groundtruth_generator = COCOGroundtruthGenerator(
file_pattern, file_type, num_samples, include_mask, regenerate_source_id)
generate_annotation_file(groundtruth_generator, annotation_file)
def generate_annotation_file(groundtruth_generator,
annotation_file):
"""Generates COCO-style annotation JSON file given a groundtruth generator."""
groundtruths = {}
logging.info('Loading groundtruth annotations from dataset to memory...')
for i, groundtruth in enumerate(groundtruth_generator()):
logging.info('generate_annotation_file: Processing annotation %d', i)
for k, v in six.iteritems(groundtruth):
if k not in groundtruths:
groundtruths[k] = [v]
else:
groundtruths[k].append(v)
gt_dataset = convert_groundtruths_to_coco_dataset(groundtruths)
logging.info('Saving groundtruth annotations to the JSON file...')
with tf.io.gfile.GFile(annotation_file, 'w') as f:
f.write(json.dumps(gt_dataset))
logging.info('Done saving the JSON file...')
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for coco_utils."""
import os
import tensorflow as tf
from official.vision.dataloaders import tfexample_utils
from official.vision.evaluation import coco_utils
class CocoUtilsTest(tf.test.TestCase):
def test_scan_and_generator_annotation_file(self):
num_samples = 10
example = tfexample_utils.create_detection_test_example(
image_height=512, image_width=512, image_channel=3, num_instances=10)
tf_examples = [example] * num_samples
data_file = os.path.join(self.create_tempdir(), 'test.tfrecord')
tfexample_utils.dump_to_tfrecord(
record_file=data_file, tf_examples=tf_examples)
annotation_file = os.path.join(self.create_tempdir(), 'annotation.json')
coco_utils.scan_and_generator_annotation_file(
file_pattern=data_file,
file_type='tfrecord',
num_samples=num_samples,
include_mask=True,
annotation_file=annotation_file)
self.assertTrue(
tf.io.gfile.exists(annotation_file),
msg='Annotation file {annotation_file} does not exists.')
if __name__ == '__main__':
tf.test.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""IOU Metrics used for semantic segmentation models."""
import numpy as np
import tensorflow as tf
class PerClassIoU(tf.keras.metrics.Metric):
"""Computes the per-class Intersection-Over-Union metric.
Mean Intersection-Over-Union is a common evaluation metric for semantic image
segmentation, which first computes the IOU for each semantic class.
IOU is defined as follows:
IOU = true_positive / (true_positive + false_positive + false_negative).
The predictions are accumulated in a confusion matrix, weighted by
`sample_weight` and the metric is then calculated from it.
If `sample_weight` is `None`, weights default to 1.
Use `sample_weight` of 0 to mask values.
Example:
>>> # cm = [[1, 1],
>>> # [1, 1]]
>>> # sum_row = [2, 2], sum_col = [2, 2], true_positives = [1, 1]
>>> # iou = true_positives / (sum_row + sum_col - true_positives))
>>> # result = [(1 / (2 + 2 - 1), 1 / (2 + 2 - 1)] = 0.33
>>> m = tf.keras.metrics.MeanIoU(num_classes=2)
>>> m.update_state([0, 0, 1, 1], [0, 1, 0, 1])
>>> m.result().numpy()
[0.33333334, 0.33333334]
"""
def __init__(self, num_classes, name=None, dtype=None):
"""Initializes `PerClassIoU`.
Args:
num_classes: The possible number of labels the prediction task can have.
This value must be provided, since a confusion matrix of dimension =
[num_classes, num_classes] will be allocated.
name: (Optional) string name of the metric instance.
dtype: (Optional) data type of the metric result.
"""
super(PerClassIoU, self).__init__(name=name, dtype=dtype)
self.num_classes = num_classes
# Variable to accumulate the predictions in the confusion matrix.
self.total_cm = self.add_weight(
'total_confusion_matrix',
shape=(num_classes, num_classes),
initializer=tf.compat.v1.zeros_initializer)
def update_state(self, y_true, y_pred, sample_weight=None):
"""Accumulates the confusion matrix statistics.
Args:
y_true: The ground truth values.
y_pred: The predicted values.
sample_weight: Optional weighting of each example. Defaults to 1. Can be a
`Tensor` whose rank is either 0, or the same rank as `y_true`, and must
be broadcastable to `y_true`.
Returns:
IOU per class.
"""
y_true = tf.cast(y_true, self._dtype)
y_pred = tf.cast(y_pred, self._dtype)
# Flatten the input if its rank > 1.
if y_pred.shape.ndims > 1:
y_pred = tf.reshape(y_pred, [-1])
if y_true.shape.ndims > 1:
y_true = tf.reshape(y_true, [-1])
if sample_weight is not None:
sample_weight = tf.cast(sample_weight, self._dtype)
if sample_weight.shape.ndims > 1:
sample_weight = tf.reshape(sample_weight, [-1])
# Accumulate the prediction to current confusion matrix.
current_cm = tf.math.confusion_matrix(
y_true,
y_pred,
self.num_classes,
weights=sample_weight,
dtype=self._dtype)
return self.total_cm.assign_add(current_cm)
def result(self):
"""Compute the mean intersection-over-union via the confusion matrix."""
sum_over_row = tf.cast(
tf.reduce_sum(self.total_cm, axis=0), dtype=self._dtype)
sum_over_col = tf.cast(
tf.reduce_sum(self.total_cm, axis=1), dtype=self._dtype)
true_positives = tf.cast(
tf.linalg.tensor_diag_part(self.total_cm), dtype=self._dtype)
# sum_over_row + sum_over_col =
# 2 * true_positives + false_positives + false_negatives.
denominator = sum_over_row + sum_over_col - true_positives
return tf.math.divide_no_nan(true_positives, denominator)
def reset_states(self):
tf.keras.backend.set_value(
self.total_cm, np.zeros((self.num_classes, self.num_classes)))
def get_config(self):
config = {'num_classes': self.num_classes}
base_config = super(PerClassIoU, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for iou metric."""
import tensorflow as tf
from official.vision.evaluation import iou
class MeanIoUTest(tf.test.TestCase):
def test_config(self):
m_obj = iou.PerClassIoU(num_classes=2, name='per_class_iou')
self.assertEqual(m_obj.name, 'per_class_iou')
self.assertEqual(m_obj.num_classes, 2)
m_obj2 = iou.PerClassIoU.from_config(m_obj.get_config())
self.assertEqual(m_obj2.name, 'per_class_iou')
self.assertEqual(m_obj2.num_classes, 2)
def test_unweighted(self):
y_pred = [0, 1, 0, 1]
y_true = [0, 0, 1, 1]
m_obj = iou.PerClassIoU(num_classes=2)
result = m_obj(y_true, y_pred)
# cm = [[1, 1],
# [1, 1]]
# sum_row = [2, 2], sum_col = [2, 2], true_positives = [1, 1]
# iou = true_positives / (sum_row + sum_col - true_positives))
expected_result = [1 / (2 + 2 - 1), 1 / (2 + 2 - 1)]
self.assertAllClose(expected_result, result, atol=1e-3)
def test_weighted(self):
y_pred = tf.constant([0, 1, 0, 1], dtype=tf.float32)
y_true = tf.constant([0, 0, 1, 1])
sample_weight = tf.constant([0.2, 0.3, 0.4, 0.1])
m_obj = iou.PerClassIoU(num_classes=2)
result = m_obj(y_true, y_pred, sample_weight=sample_weight)
# cm = [[0.2, 0.3],
# [0.4, 0.1]]
# sum_row = [0.6, 0.4], sum_col = [0.5, 0.5], true_positives = [0.2, 0.1]
# iou = true_positives / (sum_row + sum_col - true_positives))
expected_result = [0.2 / (0.6 + 0.5 - 0.2), 0.1 / (0.4 + 0.5 - 0.1)]
self.assertAllClose(expected_result, result, atol=1e-3)
def test_multi_dim_input(self):
y_pred = tf.constant([[0, 1], [0, 1]], dtype=tf.float32)
y_true = tf.constant([[0, 0], [1, 1]])
sample_weight = tf.constant([[0.2, 0.3], [0.4, 0.1]])
m_obj = iou.PerClassIoU(num_classes=2)
result = m_obj(y_true, y_pred, sample_weight=sample_weight)
# cm = [[0.2, 0.3],
# [0.4, 0.1]]
# sum_row = [0.6, 0.4], sum_col = [0.5, 0.5], true_positives = [0.2, 0.1]
# iou = true_positives / (sum_row + sum_col - true_positives))
expected_result = [0.2 / (0.6 + 0.5 - 0.2), 0.1 / (0.4 + 0.5 - 0.1)]
self.assertAllClose(expected_result, result, atol=1e-3)
def test_zero_valid_entries(self):
m_obj = iou.PerClassIoU(num_classes=2)
self.assertAllClose(m_obj.result(), [0, 0], atol=1e-3)
def test_zero_and_non_zero_entries(self):
y_pred = tf.constant([1], dtype=tf.float32)
y_true = tf.constant([1])
m_obj = iou.PerClassIoU(num_classes=2)
result = m_obj(y_true, y_pred)
# cm = [[0, 0],
# [0, 1]]
# sum_row = [0, 1], sum_col = [0, 1], true_positives = [0, 1]
# iou = true_positives / (sum_row + sum_col - true_positives))
expected_result = [0, 1 / (1 + 1 - 1)]
self.assertAllClose(expected_result, result, atol=1e-3)
def test_update_state_annd_result(self):
y_pred = [0, 1, 0, 1]
y_true = [0, 0, 1, 1]
m_obj = iou.PerClassIoU(num_classes=2)
m_obj.update_state(y_true, y_pred)
result = m_obj.result()
# cm = [[1, 1],
# [1, 1]]
# sum_row = [2, 2], sum_col = [2, 2], true_positives = [1, 1]
# iou = true_positives / (sum_row + sum_col - true_positives))
expected_result = [1 / (2 + 2 - 1), 1 / (2 + 2 - 1)]
self.assertAllClose(expected_result, result, atol=1e-3)
if __name__ == '__main__':
tf.test.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of the Panoptic Quality metric.
Panoptic Quality is an instance-based metric for evaluating the task of
image parsing, aka panoptic segmentation.
Please see the paper for details:
"Panoptic Segmentation", Alexander Kirillov, Kaiming He, Ross Girshick,
Carsten Rother and Piotr Dollar. arXiv:1801.00868, 2018.
Note that this metric class is branched from
https://github.com/tensorflow/models/blob/master/research/deeplab/evaluation/panoptic_quality.py
"""
import collections
import numpy as np
_EPSILON = 1e-10
def realdiv_maybe_zero(x, y):
"""Element-wise x / y where y may contain zeros, for those returns 0 too."""
return np.where(
np.less(np.abs(y), _EPSILON), np.zeros_like(x), np.divide(x, y))
def _ids_to_counts(id_array):
"""Given a numpy array, a mapping from each unique entry to its count."""
ids, counts = np.unique(id_array, return_counts=True)
return dict(zip(ids, counts))
class PanopticQuality:
"""Metric class for Panoptic Quality.
"Panoptic Segmentation" by Alexander Kirillov, Kaiming He, Ross Girshick,
Carsten Rother, Piotr Dollar.
https://arxiv.org/abs/1801.00868
"""
def __init__(self, num_categories, ignored_label, max_instances_per_category,
offset):
"""Initialization for PanopticQualityMetric.
Args:
num_categories: The number of segmentation categories (or "classes" in the
dataset.
ignored_label: A category id that is ignored in evaluation, e.g. the void
label as defined in COCO panoptic segmentation dataset.
max_instances_per_category: The maximum number of instances for each
category. Used in ensuring unique instance labels.
offset: The maximum number of unique labels. This is used, by multiplying
the ground-truth labels, to generate unique ids for individual regions
of overlap between groundtruth and predicted segments.
"""
self.num_categories = num_categories
self.ignored_label = ignored_label
self.max_instances_per_category = max_instances_per_category
self.offset = offset
self.reset()
def _naively_combine_labels(self, category_mask, instance_mask):
"""Naively creates a combined label array from categories and instances."""
return (category_mask.astype(np.uint32) * self.max_instances_per_category +
instance_mask.astype(np.uint32))
def compare_and_accumulate(self, groundtruths, predictions):
"""Compares predicted segmentation with groundtruth, accumulates its metric.
It is not assumed that instance ids are unique across different categories.
See for example combine_semantic_and_instance_predictions.py in official
PanopticAPI evaluation code for issues to consider when fusing category
and instance labels.
Instances ids of the ignored category have the meaning that id 0 is "void"
and remaining ones are crowd instances.
Args:
groundtruths: A dictionary contains groundtruth labels. It should contain
the following fields.
- category_mask: A 2D numpy uint16 array of groundtruth per-pixel
category labels.
- instance_mask: A 2D numpy uint16 array of groundtruth instance labels.
predictions: A dictionary contains the model outputs. It should contain
the following fields.
- category_array: A 2D numpy uint16 array of predicted per-pixel
category labels.
- instance_array: A 2D numpy uint16 array of predicted instance labels.
"""
groundtruth_category_mask = groundtruths['category_mask']
groundtruth_instance_mask = groundtruths['instance_mask']
predicted_category_mask = predictions['category_mask']
predicted_instance_mask = predictions['instance_mask']
# First, combine the category and instance labels so that every unique
# value for (category, instance) is assigned a unique integer label.
pred_segment_id = self._naively_combine_labels(predicted_category_mask,
predicted_instance_mask)
gt_segment_id = self._naively_combine_labels(groundtruth_category_mask,
groundtruth_instance_mask)
# Pre-calculate areas for all groundtruth and predicted segments.
gt_segment_areas = _ids_to_counts(gt_segment_id)
pred_segment_areas = _ids_to_counts(pred_segment_id)
# We assume there is only one void segment and it has instance id = 0.
void_segment_id = self.ignored_label * self.max_instances_per_category
# There may be other ignored groundtruth segments with instance id > 0, find
# those ids using the unique segment ids extracted with the area computation
# above.
ignored_segment_ids = {
gt_segment_id for gt_segment_id in gt_segment_areas
if (gt_segment_id //
self.max_instances_per_category) == self.ignored_label
}
# Next, combine the groundtruth and predicted labels. Dividing up the pixels
# based on which groundtruth segment and which predicted segment they belong
# to, this will assign a different 32-bit integer label to each choice
# of (groundtruth segment, predicted segment), encoded as
# gt_segment_id * offset + pred_segment_id.
intersection_id_array = (
gt_segment_id.astype(np.uint64) * self.offset +
pred_segment_id.astype(np.uint64))
# For every combination of (groundtruth segment, predicted segment) with a
# non-empty intersection, this counts the number of pixels in that
# intersection.
intersection_areas = _ids_to_counts(intersection_id_array)
# Helper function that computes the area of the overlap between a predicted
# segment and the ground-truth void/ignored segment.
def prediction_void_overlap(pred_segment_id):
void_intersection_id = void_segment_id * self.offset + pred_segment_id
return intersection_areas.get(void_intersection_id, 0)
# Compute overall ignored overlap.
def prediction_ignored_overlap(pred_segment_id):
total_ignored_overlap = 0
for ignored_segment_id in ignored_segment_ids:
intersection_id = ignored_segment_id * self.offset + pred_segment_id
total_ignored_overlap += intersection_areas.get(intersection_id, 0)
return total_ignored_overlap
# Sets that are populated with which segments groundtruth/predicted segments
# have been matched with overlapping predicted/groundtruth segments
# respectively.
gt_matched = set()
pred_matched = set()
# Calculate IoU per pair of intersecting segments of the same category.
for intersection_id, intersection_area in intersection_areas.items():
gt_segment_id = int(intersection_id // self.offset)
pred_segment_id = int(intersection_id % self.offset)
gt_category = int(gt_segment_id // self.max_instances_per_category)
pred_category = int(pred_segment_id // self.max_instances_per_category)
if gt_category != pred_category:
continue
# Union between the groundtruth and predicted segments being compared does
# not include the portion of the predicted segment that consists of
# groundtruth "void" pixels.
union = (
gt_segment_areas[gt_segment_id] +
pred_segment_areas[pred_segment_id] - intersection_area -
prediction_void_overlap(pred_segment_id))
iou = intersection_area / union
if iou > 0.5:
self.tp_per_class[gt_category] += 1
self.iou_per_class[gt_category] += iou
gt_matched.add(gt_segment_id)
pred_matched.add(pred_segment_id)
# Count false negatives for each category.
for gt_segment_id in gt_segment_areas:
if gt_segment_id in gt_matched:
continue
category = gt_segment_id // self.max_instances_per_category
# Failing to detect a void segment is not a false negative.
if category == self.ignored_label:
continue
self.fn_per_class[category] += 1
# Count false positives for each category.
for pred_segment_id in pred_segment_areas:
if pred_segment_id in pred_matched:
continue
# A false positive is not penalized if is mostly ignored in the
# groundtruth.
if (prediction_ignored_overlap(pred_segment_id) /
pred_segment_areas[pred_segment_id]) > 0.5:
continue
category = pred_segment_id // self.max_instances_per_category
self.fp_per_class[category] += 1
def _valid_categories(self):
"""Categories with a "valid" value for the metric, have > 0 instances.
We will ignore the `ignore_label` class and other classes which have
`tp + fn + fp = 0`.
Returns:
Boolean array of shape `[num_categories]`.
"""
valid_categories = np.not_equal(
self.tp_per_class + self.fn_per_class + self.fp_per_class, 0)
if self.ignored_label >= 0 and self.ignored_label < self.num_categories:
valid_categories[self.ignored_label] = False
return valid_categories
def result_per_category(self):
"""For supported metrics, return individual per-category metric values.
Returns:
A dictionary contains all per-class metrics, each metrics is a numpy array
of shape `[self.num_categories]`, where index `i` is the metrics value
over only that category.
"""
sq_per_class = realdiv_maybe_zero(self.iou_per_class, self.tp_per_class)
rq_per_class = realdiv_maybe_zero(
self.tp_per_class,
self.tp_per_class + 0.5 * self.fn_per_class + 0.5 * self.fp_per_class)
return {
'sq_per_class': sq_per_class,
'rq_per_class': rq_per_class,
'pq_per_class': np.multiply(sq_per_class, rq_per_class)
}
def result(self, is_thing=None):
"""Computes and returns the detailed metric results over all comparisons.
Args:
is_thing: A boolean array of length `num_categories`. The entry
`is_thing[category_id]` is True iff that category is a "thing" category
instead of "stuff."
Returns:
A dictionary with a breakdown of metrics and/or metric factors by things,
stuff, and all categories.
"""
results = self.result_per_category()
valid_categories = self._valid_categories()
# If known, break down which categories are valid _and_ things/stuff.
category_sets = collections.OrderedDict()
category_sets['All'] = valid_categories
if is_thing is not None:
category_sets['Things'] = np.logical_and(valid_categories, is_thing)
category_sets['Stuff'] = np.logical_and(valid_categories,
np.logical_not(is_thing))
for category_set_name, in_category_set in category_sets.items():
if np.any(in_category_set):
results.update({
f'{category_set_name}_pq':
np.mean(results['pq_per_class'][in_category_set]),
f'{category_set_name}_sq':
np.mean(results['sq_per_class'][in_category_set]),
f'{category_set_name}_rq':
np.mean(results['rq_per_class'][in_category_set]),
# The number of categories in this subset.
f'{category_set_name}_num_categories':
np.sum(in_category_set.astype(np.int32)),
})
else:
results[category_set_name] = {
f'{category_set_name}_pq': 0.,
f'{category_set_name}_sq': 0.,
f'{category_set_name}_rq': 0.,
f'{category_set_name}_num_categories': 0
}
return results
def reset(self):
"""Resets the accumulation to the metric class's state at initialization."""
self.iou_per_class = np.zeros(self.num_categories, dtype=np.float64)
self.tp_per_class = np.zeros(self.num_categories, dtype=np.float64)
self.fn_per_class = np.zeros(self.num_categories, dtype=np.float64)
self.fp_per_class = np.zeros(self.num_categories, dtype=np.float64)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment