Commit 8836ad40 authored by Jiageng Zhang's avatar Jiageng Zhang Committed by A. Unique TensorFlower
Browse files

Internal change

PiperOrigin-RevId: 464104309
parent 0feded5e
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for panoptic deeplab config."""
# pylint: disable=unused-import
from absl.testing import parameterized
import tensorflow as tf
from official.core import config_definitions as cfg
from official.core import exp_factory
from official.vision.beta.projects.panoptic_maskrcnn.configs import panoptic_deeplab as exp_cfg
class PanopticMaskRCNNConfigTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.parameters(
('panoptic_deeplab_resnet_coco', 'dilated_resnet'),
('panoptic_deeplab_mobilenetv3_large_coco', 'mobilenet'),
)
def test_panoptic_deeplab_configs(self, config_name, backbone_type):
config = exp_factory.get_exp_config(config_name)
self.assertIsInstance(config, cfg.ExperimentConfig)
self.assertIsInstance(config.task, exp_cfg.PanopticDeeplabTask)
self.assertIsInstance(config.task.model, exp_cfg.PanopticDeeplab)
self.assertIsInstance(config.task.train_data, exp_cfg.DataConfig)
self.assertEqual(config.task.model.backbone.type, backbone_type)
config.validate()
config.task.train_data.is_training = None
with self.assertRaisesRegex(KeyError, 'Found inconsistncy between key'):
config.validate()
if __name__ == '__main__':
tf.test.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for panoptic maskrcnn config."""
# pylint: disable=unused-import
from absl.testing import parameterized
import tensorflow as tf
from official.core import config_definitions as cfg
from official.core import exp_factory
from official.vision.beta.projects.panoptic_maskrcnn.configs import panoptic_maskrcnn as exp_cfg
class PanopticMaskRCNNConfigTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.parameters(
('panoptic_fpn_coco',),
)
def test_panoptic_maskrcnn_configs(self, config_name):
config = exp_factory.get_exp_config(config_name)
self.assertIsInstance(config, cfg.ExperimentConfig)
self.assertIsInstance(config.task, exp_cfg.PanopticMaskRCNNTask)
self.assertIsInstance(config.task.model, exp_cfg.PanopticMaskRCNN)
self.assertIsInstance(config.task.train_data, exp_cfg.DataConfig)
config.validate()
config.task.train_data.is_training = None
with self.assertRaisesRegex(KeyError, 'Found inconsistncy between key'):
config.validate()
if __name__ == '__main__':
tf.test.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for panoptic_deeplab_heads.py."""
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from official.vision.beta.projects.panoptic_maskrcnn.modeling.heads import panoptic_deeplab_heads
class PanopticDeeplabHeadsTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
(2, (2,), (48,)),
(3, (2,), (48,)),
(2, (2,), (48,)),
(2, (2,), (48,)),
(3, (2,), (48,)),
(3, (2,), (48,)),
(4, (4, 3), (64, 32)),
(4, (3, 2), (64, 32)))
def test_forward(self, level, low_level, low_level_num_filters):
backbone_features = {
'3': np.random.rand(2, 128, 128, 16),
'4': np.random.rand(2, 64, 64, 16),
'5': np.random.rand(2, 32, 32, 16),
}
decoder_features = {
'3': np.random.rand(2, 128, 128, 64),
'4': np.random.rand(2, 64, 64, 64),
'5': np.random.rand(2, 32, 32, 64),
'6': np.random.rand(2, 16, 16, 64),
}
backbone_features['2'] = np.random.rand(2, 256, 256, 16)
decoder_features['2'] = np.random.rand(2, 256, 256, 64)
num_classes = 10
semantic_head = panoptic_deeplab_heads.SemanticHead(
num_classes=num_classes,
level=level,
low_level=low_level,
low_level_num_filters=low_level_num_filters)
instance_head = panoptic_deeplab_heads.InstanceHead(
level=level,
low_level=low_level,
low_level_num_filters=low_level_num_filters)
semantic_outputs = semantic_head((backbone_features, decoder_features))
instance_outputs = instance_head((backbone_features, decoder_features))
if str(level) in decoder_features:
h, w = decoder_features[str(low_level[-1])].shape[1:3]
self.assertAllEqual(
semantic_outputs.numpy().shape,
[2, h, w, num_classes])
self.assertAllEqual(
instance_outputs['instance_centers_heatmap'].numpy().shape,
[2, h, w, 1])
self.assertAllEqual(
instance_outputs['instance_centers_offset'].numpy().shape,
[2, h, w, 2])
def test_serialize_deserialize(self):
semantic_head = panoptic_deeplab_heads.SemanticHead(num_classes=2, level=3)
instance_head = panoptic_deeplab_heads.InstanceHead(level=3)
semantic_head_config = semantic_head.get_config()
instance_head_config = instance_head.get_config()
new_semantic_head = panoptic_deeplab_heads.SemanticHead.from_config(
semantic_head_config)
new_instance_head = panoptic_deeplab_heads.InstanceHead.from_config(
instance_head_config)
self.assertAllEqual(semantic_head.get_config(),
new_semantic_head.get_config())
self.assertAllEqual(instance_head.get_config(),
new_instance_head.get_config())
if __name__ == '__main__':
tf.test.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test for panoptic_deeplab_merge.py.
Note that the tests are branched from
https://raw.githubusercontent.com/google-research/deeplab2/main/model/post_processor/panoptic_deeplab_test.py
"""
import numpy as np
import tensorflow as tf
from official.vision.beta.projects.panoptic_maskrcnn.modeling.layers import panoptic_deeplab_merge
class PostProcessingTest(tf.test.TestCase):
def test_py_func_merge_semantic_and_instance_maps_can_run(self):
batch = 1
height = 5
width = 5
semantic_prediction = tf.random.uniform((batch, height, width),
minval=0,
maxval=20,
dtype=tf.int32)
instance_maps = tf.random.uniform((batch, height, width),
minval=0,
maxval=3,
dtype=tf.int32)
thing_class_ids = tf.convert_to_tensor([1, 2, 3])
label_divisor = 256
stuff_area_limit = 3
void_label = 255
panoptic_prediction = panoptic_deeplab_merge._merge_semantic_and_instance_maps(
semantic_prediction, instance_maps, thing_class_ids, label_divisor,
stuff_area_limit, void_label)
self.assertListEqual(semantic_prediction.get_shape().as_list(),
panoptic_prediction.get_shape().as_list())
def test_merge_semantic_and_instance_maps_with_a_simple_example(self):
semantic_prediction = tf.convert_to_tensor(
[[[0, 0, 0, 0],
[0, 1, 1, 0],
[0, 2, 2, 0],
[2, 2, 3, 3]]], dtype=tf.int32)
instance_maps = tf.convert_to_tensor(
[[[0, 0, 0, 0],
[0, 0, 0, 0],
[0, 1, 1, 0],
[2, 2, 3, 3]]], dtype=tf.int32)
thing_class_ids = tf.convert_to_tensor([2, 3])
label_divisor = 256
stuff_area_limit = 3
void_label = 255
# The expected_panoptic_prediction is computed as follows.
# For `thing` segmentation, instance 1, 2, and 3 are kept, but instance 3
# will have a new instance ID 1, since it is the first instance in its
# own semantic label.
# For `stuff` segmentation, class-0 region is kept, while class-1 region
# is re-labeled as `void_label * label_divisor` since its area is smaller
# than stuff_area_limit.
expected_panoptic_prediction = tf.convert_to_tensor(
[[[0, 0, 0, 0],
[0, void_label * label_divisor, void_label * label_divisor, 0],
[0, 2 * label_divisor + 1, 2 * label_divisor + 1, 0],
[2 * label_divisor + 2, 2 * label_divisor + 2, 3 * label_divisor + 1,
3 * label_divisor + 1]]], dtype=tf.int32)
panoptic_prediction = panoptic_deeplab_merge._merge_semantic_and_instance_maps(
semantic_prediction, instance_maps, thing_class_ids, label_divisor,
stuff_area_limit, void_label)
self.assertAllClose(expected_panoptic_prediction,
panoptic_prediction)
def test_gets_panoptic_predictions_with_score(self):
batch = 1
height = 5
width = 5
classes = 3
semantic_logits = tf.random.uniform((batch, 1, 1, classes))
semantic_logits = tf.tile(semantic_logits, (1, height, width, 1))
center_heatmap = tf.convert_to_tensor([
[1.0, 0.0, 0.0, 0.0, 0.0],
[0.8, 0.0, 0.0, 0.0, 0.0],
[0.0, 0.0, 0.0, 0.0, 0.0],
[0.0, 0.0, 0.0, 0.1, 0.7],
[0.0, 0.0, 0.0, 0.0, 0.2],
], dtype=tf.float32)
center_heatmap = tf.expand_dims(center_heatmap, 0)
center_heatmap = tf.expand_dims(center_heatmap, 3)
center_offsets = tf.zeros((batch, height, width, 2))
center_threshold = 0.0
thing_class_ids = tf.range(classes) # No "stuff" classes.
label_divisor = 256
stuff_area_limit = 16
void_label = classes
nms_kernel_size = 3
keep_k_centers = 2
result = panoptic_deeplab_merge._get_panoptic_predictions(
semantic_logits, center_heatmap, center_offsets, center_threshold,
thing_class_ids, label_divisor, stuff_area_limit, void_label,
nms_kernel_size, keep_k_centers)
instance_maps = result[3].numpy()
instance_scores = result[2].numpy()
self.assertSequenceEqual(instance_maps.shape, (batch, height, width))
expected_instances = [[
[1, 1, 1, 1, 2],
[1, 1, 1, 2, 2],
[1, 1, 2, 2, 2],
[1, 2, 2, 2, 2],
[1, 2, 2, 2, 2],
]]
np.testing.assert_array_equal(instance_maps, expected_instances)
self.assertSequenceEqual(instance_scores.shape, (batch, height, width))
expected_instance_scores = [[
[1.0, 1.0, 1.0, 1.0, 0.7],
[1.0, 1.0, 1.0, 0.7, 0.7],
[1.0, 1.0, 0.7, 0.7, 0.7],
[1.0, 0.7, 0.7, 0.7, 0.7],
[1.0, 0.7, 0.7, 0.7, 0.7],
]]
self.assertAllClose(result[2],
tf.constant(expected_instance_scores))
if __name__ == '__main__':
tf.test.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for panoptic_segmentation_generator.py."""
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from tensorflow.python.distribute import combinations
from tensorflow.python.distribute import strategy_combinations
from official.vision.beta.projects.panoptic_maskrcnn.modeling.layers import panoptic_segmentation_generator
PANOPTIC_SEGMENTATION_GENERATOR = panoptic_segmentation_generator.PanopticSegmentationGenerator
class PanopticSegmentationGeneratorTest(
parameterized.TestCase, tf.test.TestCase):
def test_serialize_deserialize(self):
config = {
'output_size': [640, 640],
'max_num_detections': 100,
'stuff_classes_offset': 90,
'mask_binarize_threshold': 0.5,
'score_threshold': 0.005,
'things_class_label': 1,
'void_class_label': 0,
'void_instance_id': -1,
'rescale_predictions': False,
}
generator = PANOPTIC_SEGMENTATION_GENERATOR(**config)
expected_config = dict(config)
self.assertEqual(generator.get_config(), expected_config)
new_generator = PANOPTIC_SEGMENTATION_GENERATOR.from_config(
generator.get_config())
self.assertAllEqual(generator.get_config(), new_generator.get_config())
@combinations.generate(
combinations.combine(
strategy=[
strategy_combinations.default_strategy,
strategy_combinations.one_device_strategy_gpu,
]))
def test_outputs(self, strategy):
# 0 represents the void class label
thing_class_ids = [0, 1, 2, 3, 4]
stuff_class_ids = [0, 5, 6, 7, 8, 9, 10]
all_class_ids = set(thing_class_ids + stuff_class_ids)
num_thing_classes = len(thing_class_ids)
num_stuff_classes = len(stuff_class_ids)
num_classes_for_segmentation = num_stuff_classes + 1
# all thing classes are mapped to class_id=1, stuff class ids are offset
# such that the stuff class_ids start from 2, this means the semantic
# segmentation head will have ground truths with class_ids belonging to
# [0, 1, 2, 3, 4, 5, 6, 7]
config = {
'output_size': [640, 640],
'max_num_detections': 100,
'stuff_classes_offset': 3,
'mask_binarize_threshold': 0.5,
'score_threshold': 0.005,
'things_class_label': 1,
'void_class_label': 0,
'void_instance_id': -1,
'rescale_predictions': False,
}
generator = PANOPTIC_SEGMENTATION_GENERATOR(**config)
crop_height = 112
crop_width = 112
boxes = tf.constant([[
[167, 398, 342, 619],
[192, 171, 363, 449],
[211, 1, 382, 74]
]])
num_detections = boxes.get_shape().as_list()[1]
scores = tf.random.uniform([1, num_detections], 0, 1)
classes = tf.random.uniform(
[1, num_detections],
1, num_thing_classes, dtype=tf.int32)
masks = tf.random.normal(
[1, num_detections, crop_height, crop_width])
segmentation_mask = tf.random.uniform(
[1, *config['output_size']],
0, num_classes_for_segmentation, dtype=tf.int32)
segmentation_mask_one_hot = tf.one_hot(
segmentation_mask, depth=num_stuff_classes + 1)
inputs = {
'detection_boxes': boxes,
'detection_scores': scores,
'detection_classes': classes,
'detection_masks': masks,
'num_detections': tf.constant([num_detections]),
'segmentation_outputs': segmentation_mask_one_hot
}
def _run(inputs):
return generator(inputs=inputs)
@tf.function
def _distributed_run(inputs):
outputs = strategy.run(_run, args=((inputs,)))
return strategy.gather(outputs, axis=0)
outputs = _distributed_run(inputs)
self.assertIn('category_mask', outputs)
self.assertIn('instance_mask', outputs)
self.assertAllEqual(
outputs['category_mask'][0].get_shape().as_list(),
config['output_size'])
self.assertAllEqual(
outputs['instance_mask'][0].get_shape().as_list(),
config['output_size'])
for category_id in np.unique(outputs['category_mask']):
self.assertIn(category_id, all_class_ids)
if __name__ == '__main__':
tf.test.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for Panoptic Deeplab network."""
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from tensorflow.python.distribute import combinations
from tensorflow.python.distribute import test_util
from official.vision.beta.projects.panoptic_maskrcnn.modeling import panoptic_deeplab_model
from official.vision.beta.projects.panoptic_maskrcnn.modeling.heads import panoptic_deeplab_heads
from official.vision.beta.projects.panoptic_maskrcnn.modeling.layers import panoptic_deeplab_merge
from official.vision.modeling import backbones
from official.vision.modeling.decoders import aspp
class PanopticDeeplabNetworkTest(parameterized.TestCase, tf.test.TestCase):
@combinations.generate(
combinations.combine(
level=[2, 3, 4],
input_size=[256, 512],
low_level=[[4, 3], [3, 2]],
shared_decoder=[True, False],
training=[True, False]))
def test_panoptic_deeplab_network_creation(
self, input_size, level, low_level, shared_decoder, training):
"""Test for creation of a panoptic deeplab network."""
batch_size = 2 if training else 1
num_classes = 10
inputs = np.random.rand(batch_size, input_size, input_size, 3)
image_info = tf.convert_to_tensor(
[[[input_size, input_size], [input_size, input_size], [1, 1], [0, 0]]])
image_info = tf.tile(image_info, [batch_size, 1, 1])
tf.keras.backend.set_image_data_format('channels_last')
backbone = backbones.ResNet(model_id=50)
semantic_decoder = aspp.ASPP(
level=level, dilation_rates=[6, 12, 18])
if shared_decoder:
instance_decoder = semantic_decoder
else:
instance_decoder = aspp.ASPP(
level=level, dilation_rates=[6, 12, 18])
semantic_head = panoptic_deeplab_heads.SemanticHead(
num_classes,
level=level,
low_level=low_level,
low_level_num_filters=(64, 32))
instance_head = panoptic_deeplab_heads.InstanceHead(
level=level,
low_level=low_level,
low_level_num_filters=(64, 32))
post_processor = panoptic_deeplab_merge.PostProcessor(
output_size=[input_size, input_size],
center_score_threshold=0.1,
thing_class_ids=[1, 2, 3, 4],
label_divisor=[256],
stuff_area_limit=4096,
ignore_label=0,
nms_kernel=41,
keep_k_centers=41,
rescale_predictions=True)
model = panoptic_deeplab_model.PanopticDeeplabModel(
backbone=backbone,
semantic_decoder=semantic_decoder,
instance_decoder=instance_decoder,
semantic_head=semantic_head,
instance_head=instance_head,
post_processor=post_processor)
outputs = model(
inputs=inputs,
image_info=image_info,
training=training)
if training:
self.assertIn('segmentation_outputs', outputs)
self.assertIn('instance_centers_heatmap', outputs)
self.assertIn('instance_centers_offset', outputs)
self.assertAllEqual(
[2, input_size // (2**low_level[-1]),
input_size //(2**low_level[-1]),
num_classes],
outputs['segmentation_outputs'].numpy().shape)
self.assertAllEqual(
[2, input_size // (2**low_level[-1]),
input_size // (2**low_level[-1]),
1],
outputs['instance_centers_heatmap'].numpy().shape)
self.assertAllEqual(
[2, input_size // (2**low_level[-1]),
input_size // (2**low_level[-1]),
2],
outputs['instance_centers_offset'].numpy().shape)
else:
self.assertIn('panoptic_outputs', outputs)
self.assertIn('category_mask', outputs)
self.assertIn('instance_mask', outputs)
self.assertIn('instance_centers', outputs)
self.assertIn('instance_scores', outputs)
self.assertIn('segmentation_outputs', outputs)
@combinations.generate(
combinations.combine(
level=[2, 3, 4],
low_level=[(4, 3), (3, 2)],
shared_decoder=[True, False]))
def test_serialize_deserialize(self, level, low_level, shared_decoder):
"""Validate the network can be serialized and deserialized."""
num_classes = 10
backbone = backbones.ResNet(model_id=50)
semantic_decoder = aspp.ASPP(
level=level, dilation_rates=[6, 12, 18])
if shared_decoder:
instance_decoder = semantic_decoder
else:
instance_decoder = aspp.ASPP(
level=level, dilation_rates=[6, 12, 18])
semantic_head = panoptic_deeplab_heads.SemanticHead(
num_classes,
level=level,
low_level=low_level,
low_level_num_filters=(64, 32))
instance_head = panoptic_deeplab_heads.InstanceHead(
level=level,
low_level=low_level,
low_level_num_filters=(64, 32))
post_processor = panoptic_deeplab_merge.PostProcessor(
output_size=[640, 640],
center_score_threshold=0.1,
thing_class_ids=[1, 2, 3, 4],
label_divisor=[256],
stuff_area_limit=4096,
ignore_label=0,
nms_kernel=41,
keep_k_centers=41,
rescale_predictions=True)
model = panoptic_deeplab_model.PanopticDeeplabModel(
backbone=backbone,
semantic_decoder=semantic_decoder,
instance_decoder=instance_decoder,
semantic_head=semantic_head,
instance_head=instance_head,
post_processor=post_processor)
config = model.get_config()
new_model = panoptic_deeplab_model.PanopticDeeplabModel.from_config(config)
# Validate that the config can be forced to JSON.
_ = new_model.to_json()
# If the serialization was successful, the new config should match the old.
self.assertAllEqual(model.get_config(), new_model.get_config())
if __name__ == '__main__':
test_util.main()
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for panoptic_maskrcnn_model.py."""
import os
from absl.testing import parameterized
import tensorflow as tf
from tensorflow.python.distribute import combinations
from tensorflow.python.distribute import strategy_combinations
from official.vision.beta.projects.panoptic_maskrcnn.modeling import panoptic_maskrcnn_model
from official.vision.beta.projects.panoptic_maskrcnn.modeling.layers import panoptic_segmentation_generator
from official.vision.modeling.backbones import resnet
from official.vision.modeling.decoders import aspp
from official.vision.modeling.decoders import fpn
from official.vision.modeling.heads import dense_prediction_heads
from official.vision.modeling.heads import instance_heads
from official.vision.modeling.heads import segmentation_heads
from official.vision.modeling.layers import detection_generator
from official.vision.modeling.layers import mask_sampler
from official.vision.modeling.layers import roi_aligner
from official.vision.modeling.layers import roi_generator
from official.vision.modeling.layers import roi_sampler
from official.vision.ops import anchor
class PanopticMaskRCNNModelTest(parameterized.TestCase, tf.test.TestCase):
@combinations.generate(
combinations.combine(
use_separable_conv=[True, False],
build_anchor_boxes=[True, False],
shared_backbone=[True, False],
shared_decoder=[True, False],
is_training=[True,]))
def test_build_model(self,
use_separable_conv,
build_anchor_boxes,
shared_backbone,
shared_decoder,
is_training=True):
num_classes = 3
min_level = 2
max_level = 6
num_scales = 3
aspect_ratios = [1.0]
anchor_size = 3
resnet_model_id = 50
segmentation_resnet_model_id = 50
aspp_dilation_rates = [6, 12, 18]
aspp_decoder_level = 2
fpn_decoder_level = 2
num_anchors_per_location = num_scales * len(aspect_ratios)
image_size = 128
images = tf.random.normal([2, image_size, image_size, 3])
image_info = tf.convert_to_tensor(
[[[image_size, image_size], [image_size, image_size], [1, 1], [0, 0]],
[[image_size, image_size], [image_size, image_size], [1, 1], [0, 0]]])
shared_decoder = shared_decoder and shared_backbone
if build_anchor_boxes or not is_training:
anchor_boxes = anchor.Anchor(
min_level=min_level,
max_level=max_level,
num_scales=num_scales,
aspect_ratios=aspect_ratios,
anchor_size=3,
image_size=(image_size, image_size)).multilevel_boxes
for l in anchor_boxes:
anchor_boxes[l] = tf.tile(
tf.expand_dims(anchor_boxes[l], axis=0), [2, 1, 1, 1])
else:
anchor_boxes = None
backbone = resnet.ResNet(model_id=resnet_model_id)
decoder = fpn.FPN(
input_specs=backbone.output_specs,
min_level=min_level,
max_level=max_level,
use_separable_conv=use_separable_conv)
rpn_head = dense_prediction_heads.RPNHead(
min_level=min_level,
max_level=max_level,
num_anchors_per_location=num_anchors_per_location,
num_convs=1)
detection_head = instance_heads.DetectionHead(num_classes=num_classes)
roi_generator_obj = roi_generator.MultilevelROIGenerator()
roi_sampler_obj = roi_sampler.ROISampler()
roi_aligner_obj = roi_aligner.MultilevelROIAligner()
detection_generator_obj = detection_generator.DetectionGenerator()
panoptic_segmentation_generator_obj = panoptic_segmentation_generator.PanopticSegmentationGenerator(
output_size=[image_size, image_size],
max_num_detections=100,
stuff_classes_offset=90)
mask_head = instance_heads.MaskHead(
num_classes=num_classes, upsample_factor=2)
mask_sampler_obj = mask_sampler.MaskSampler(
mask_target_size=28, num_sampled_masks=1)
mask_roi_aligner_obj = roi_aligner.MultilevelROIAligner(crop_size=14)
if shared_backbone:
segmentation_backbone = None
else:
segmentation_backbone = resnet.ResNet(
model_id=segmentation_resnet_model_id)
if not shared_decoder:
feature_fusion = 'deeplabv3plus'
level = aspp_decoder_level
segmentation_decoder = aspp.ASPP(
level=level, dilation_rates=aspp_dilation_rates)
else:
feature_fusion = 'panoptic_fpn_fusion'
level = fpn_decoder_level
segmentation_decoder = None
segmentation_head = segmentation_heads.SegmentationHead(
num_classes=2, # stuff and common class for things,
level=level,
feature_fusion=feature_fusion,
decoder_min_level=min_level,
decoder_max_level=max_level,
num_convs=2)
model = panoptic_maskrcnn_model.PanopticMaskRCNNModel(
backbone,
decoder,
rpn_head,
detection_head,
roi_generator_obj,
roi_sampler_obj,
roi_aligner_obj,
detection_generator_obj,
panoptic_segmentation_generator_obj,
mask_head,
mask_sampler_obj,
mask_roi_aligner_obj,
segmentation_backbone=segmentation_backbone,
segmentation_decoder=segmentation_decoder,
segmentation_head=segmentation_head,
min_level=min_level,
max_level=max_level,
num_scales=num_scales,
aspect_ratios=aspect_ratios,
anchor_size=anchor_size)
gt_boxes = tf.convert_to_tensor(
[[[10, 10, 15, 15], [2.5, 2.5, 7.5, 7.5], [-1, -1, -1, -1]],
[[100, 100, 150, 150], [-1, -1, -1, -1], [-1, -1, -1, -1]]],
dtype=tf.float32)
gt_classes = tf.convert_to_tensor([[2, 1, -1], [1, -1, -1]], dtype=tf.int32)
gt_masks = tf.ones((2, 3, 100, 100))
# Results will be checked in test_forward.
_ = model(
images,
image_info,
anchor_boxes,
gt_boxes,
gt_classes,
gt_masks,
training=is_training)
@combinations.generate(
combinations.combine(
strategy=[
strategy_combinations.one_device_strategy,
strategy_combinations.one_device_strategy_gpu,
],
shared_backbone=[True, False],
shared_decoder=[True, False],
training=[True, False],
generate_panoptic_masks=[True, False]))
def test_forward(self, strategy, training,
shared_backbone, shared_decoder,
generate_panoptic_masks):
num_classes = 3
min_level = 2
max_level = 6
num_scales = 3
aspect_ratios = [1.0]
anchor_size = 3
segmentation_resnet_model_id = 101
aspp_dilation_rates = [6, 12, 18]
aspp_decoder_level = 2
fpn_decoder_level = 2
class_agnostic_bbox_pred = False
cascade_class_ensemble = False
image_size = (256, 256)
images = tf.random.normal([2, image_size[0], image_size[1], 3])
image_info = tf.convert_to_tensor(
[[[224, 100], [224, 100], [1, 1], [0, 0]],
[[224, 100], [224, 100], [1, 1], [0, 0]]])
shared_decoder = shared_decoder and shared_backbone
with strategy.scope():
anchor_boxes = anchor.Anchor(
min_level=min_level,
max_level=max_level,
num_scales=num_scales,
aspect_ratios=aspect_ratios,
anchor_size=anchor_size,
image_size=image_size).multilevel_boxes
num_anchors_per_location = len(aspect_ratios) * num_scales
input_specs = tf.keras.layers.InputSpec(shape=[None, None, None, 3])
backbone = resnet.ResNet(model_id=50, input_specs=input_specs)
decoder = fpn.FPN(
min_level=min_level,
max_level=max_level,
input_specs=backbone.output_specs)
rpn_head = dense_prediction_heads.RPNHead(
min_level=min_level,
max_level=max_level,
num_anchors_per_location=num_anchors_per_location)
detection_head = instance_heads.DetectionHead(
num_classes=num_classes,
class_agnostic_bbox_pred=class_agnostic_bbox_pred)
roi_generator_obj = roi_generator.MultilevelROIGenerator()
roi_sampler_cascade = []
roi_sampler_obj = roi_sampler.ROISampler()
roi_sampler_cascade.append(roi_sampler_obj)
roi_aligner_obj = roi_aligner.MultilevelROIAligner()
detection_generator_obj = detection_generator.DetectionGenerator()
if generate_panoptic_masks:
panoptic_segmentation_generator_obj = panoptic_segmentation_generator.PanopticSegmentationGenerator(
output_size=list(image_size),
max_num_detections=100,
stuff_classes_offset=90)
else:
panoptic_segmentation_generator_obj = None
mask_head = instance_heads.MaskHead(
num_classes=num_classes, upsample_factor=2)
mask_sampler_obj = mask_sampler.MaskSampler(
mask_target_size=28, num_sampled_masks=1)
mask_roi_aligner_obj = roi_aligner.MultilevelROIAligner(crop_size=14)
if shared_backbone:
segmentation_backbone = None
else:
segmentation_backbone = resnet.ResNet(
model_id=segmentation_resnet_model_id)
if not shared_decoder:
feature_fusion = 'deeplabv3plus'
level = aspp_decoder_level
segmentation_decoder = aspp.ASPP(
level=level, dilation_rates=aspp_dilation_rates)
else:
feature_fusion = 'panoptic_fpn_fusion'
level = fpn_decoder_level
segmentation_decoder = None
segmentation_head = segmentation_heads.SegmentationHead(
num_classes=2, # stuff and common class for things,
level=level,
feature_fusion=feature_fusion,
decoder_min_level=min_level,
decoder_max_level=max_level,
num_convs=2)
model = panoptic_maskrcnn_model.PanopticMaskRCNNModel(
backbone,
decoder,
rpn_head,
detection_head,
roi_generator_obj,
roi_sampler_obj,
roi_aligner_obj,
detection_generator_obj,
panoptic_segmentation_generator_obj,
mask_head,
mask_sampler_obj,
mask_roi_aligner_obj,
segmentation_backbone=segmentation_backbone,
segmentation_decoder=segmentation_decoder,
segmentation_head=segmentation_head,
class_agnostic_bbox_pred=class_agnostic_bbox_pred,
cascade_class_ensemble=cascade_class_ensemble,
min_level=min_level,
max_level=max_level,
num_scales=num_scales,
aspect_ratios=aspect_ratios,
anchor_size=anchor_size)
gt_boxes = tf.convert_to_tensor(
[[[10, 10, 15, 15], [2.5, 2.5, 7.5, 7.5], [-1, -1, -1, -1]],
[[100, 100, 150, 150], [-1, -1, -1, -1], [-1, -1, -1, -1]]],
dtype=tf.float32)
gt_classes = tf.convert_to_tensor(
[[2, 1, -1], [1, -1, -1]], dtype=tf.int32)
gt_masks = tf.ones((2, 3, 100, 100))
results = model(
images,
image_info,
anchor_boxes,
gt_boxes,
gt_classes,
gt_masks,
training=training)
self.assertIn('rpn_boxes', results)
self.assertIn('rpn_scores', results)
if training:
self.assertIn('class_targets', results)
self.assertIn('box_targets', results)
self.assertIn('class_outputs', results)
self.assertIn('box_outputs', results)
self.assertIn('mask_outputs', results)
else:
self.assertIn('detection_boxes', results)
self.assertIn('detection_scores', results)
self.assertIn('detection_classes', results)
self.assertIn('num_detections', results)
self.assertIn('detection_masks', results)
self.assertIn('segmentation_outputs', results)
self.assertAllEqual(
[2, image_size[0] // (2**level), image_size[1] // (2**level), 2],
results['segmentation_outputs'].numpy().shape)
if generate_panoptic_masks:
self.assertIn('panoptic_outputs', results)
self.assertIn('category_mask', results['panoptic_outputs'])
self.assertIn('instance_mask', results['panoptic_outputs'])
self.assertAllEqual(
[2, image_size[0], image_size[1]],
results['panoptic_outputs']['category_mask'].numpy().shape)
self.assertAllEqual(
[2, image_size[0], image_size[1]],
results['panoptic_outputs']['instance_mask'].numpy().shape)
else:
self.assertNotIn('panoptic_outputs', results)
@combinations.generate(
combinations.combine(
shared_backbone=[True, False], shared_decoder=[True, False]))
def test_serialize_deserialize(self, shared_backbone, shared_decoder):
input_specs = tf.keras.layers.InputSpec(shape=[None, None, None, 3])
backbone = resnet.ResNet(model_id=50, input_specs=input_specs)
decoder = fpn.FPN(
min_level=3, max_level=7, input_specs=backbone.output_specs)
rpn_head = dense_prediction_heads.RPNHead(
min_level=3, max_level=7, num_anchors_per_location=3)
detection_head = instance_heads.DetectionHead(num_classes=2)
roi_generator_obj = roi_generator.MultilevelROIGenerator()
roi_sampler_obj = roi_sampler.ROISampler()
roi_aligner_obj = roi_aligner.MultilevelROIAligner()
detection_generator_obj = detection_generator.DetectionGenerator()
panoptic_segmentation_generator_obj = panoptic_segmentation_generator.PanopticSegmentationGenerator(
output_size=[None, None],
max_num_detections=100,
stuff_classes_offset=90)
segmentation_resnet_model_id = 101
aspp_dilation_rates = [6, 12, 18]
min_level = 2
max_level = 6
aspp_decoder_level = 2
fpn_decoder_level = 2
shared_decoder = shared_decoder and shared_backbone
mask_head = instance_heads.MaskHead(num_classes=2, upsample_factor=2)
mask_sampler_obj = mask_sampler.MaskSampler(
mask_target_size=28, num_sampled_masks=1)
mask_roi_aligner_obj = roi_aligner.MultilevelROIAligner(crop_size=14)
if shared_backbone:
segmentation_backbone = None
else:
segmentation_backbone = resnet.ResNet(
model_id=segmentation_resnet_model_id)
if not shared_decoder:
feature_fusion = 'deeplabv3plus'
level = aspp_decoder_level
segmentation_decoder = aspp.ASPP(
level=level, dilation_rates=aspp_dilation_rates)
else:
feature_fusion = 'panoptic_fpn_fusion'
level = fpn_decoder_level
segmentation_decoder = None
segmentation_head = segmentation_heads.SegmentationHead(
num_classes=2, # stuff and common class for things,
level=level,
feature_fusion=feature_fusion,
decoder_min_level=min_level,
decoder_max_level=max_level,
num_convs=2)
model = panoptic_maskrcnn_model.PanopticMaskRCNNModel(
backbone,
decoder,
rpn_head,
detection_head,
roi_generator_obj,
roi_sampler_obj,
roi_aligner_obj,
detection_generator_obj,
panoptic_segmentation_generator_obj,
mask_head,
mask_sampler_obj,
mask_roi_aligner_obj,
segmentation_backbone=segmentation_backbone,
segmentation_decoder=segmentation_decoder,
segmentation_head=segmentation_head,
min_level=min_level,
max_level=max_level,
num_scales=3,
aspect_ratios=[1.0],
anchor_size=3)
config = model.get_config()
new_model = panoptic_maskrcnn_model.PanopticMaskRCNNModel.from_config(
config)
# Validate that the config can be forced to JSON.
_ = new_model.to_json()
# If the serialization was successful, the new config should match the old.
self.assertAllEqual(model.get_config(), new_model.get_config())
@combinations.generate(
combinations.combine(
shared_backbone=[True, False], shared_decoder=[True, False]))
def test_checkpoint(self, shared_backbone, shared_decoder):
input_specs = tf.keras.layers.InputSpec(shape=[None, None, None, 3])
backbone = resnet.ResNet(model_id=50, input_specs=input_specs)
decoder = fpn.FPN(
min_level=3, max_level=7, input_specs=backbone.output_specs)
rpn_head = dense_prediction_heads.RPNHead(
min_level=3, max_level=7, num_anchors_per_location=3)
detection_head = instance_heads.DetectionHead(num_classes=2)
roi_generator_obj = roi_generator.MultilevelROIGenerator()
roi_sampler_obj = roi_sampler.ROISampler()
roi_aligner_obj = roi_aligner.MultilevelROIAligner()
detection_generator_obj = detection_generator.DetectionGenerator()
panoptic_segmentation_generator_obj = panoptic_segmentation_generator.PanopticSegmentationGenerator(
output_size=[None, None],
max_num_detections=100,
stuff_classes_offset=90)
segmentation_resnet_model_id = 101
aspp_dilation_rates = [6, 12, 18]
min_level = 2
max_level = 6
aspp_decoder_level = 2
fpn_decoder_level = 2
shared_decoder = shared_decoder and shared_backbone
mask_head = instance_heads.MaskHead(num_classes=2, upsample_factor=2)
mask_sampler_obj = mask_sampler.MaskSampler(
mask_target_size=28, num_sampled_masks=1)
mask_roi_aligner_obj = roi_aligner.MultilevelROIAligner(crop_size=14)
if shared_backbone:
segmentation_backbone = None
else:
segmentation_backbone = resnet.ResNet(
model_id=segmentation_resnet_model_id)
if not shared_decoder:
feature_fusion = 'deeplabv3plus'
level = aspp_decoder_level
segmentation_decoder = aspp.ASPP(
level=level, dilation_rates=aspp_dilation_rates)
else:
feature_fusion = 'panoptic_fpn_fusion'
level = fpn_decoder_level
segmentation_decoder = None
segmentation_head = segmentation_heads.SegmentationHead(
num_classes=2, # stuff and common class for things,
level=level,
feature_fusion=feature_fusion,
decoder_min_level=min_level,
decoder_max_level=max_level,
num_convs=2)
model = panoptic_maskrcnn_model.PanopticMaskRCNNModel(
backbone,
decoder,
rpn_head,
detection_head,
roi_generator_obj,
roi_sampler_obj,
roi_aligner_obj,
detection_generator_obj,
panoptic_segmentation_generator_obj,
mask_head,
mask_sampler_obj,
mask_roi_aligner_obj,
segmentation_backbone=segmentation_backbone,
segmentation_decoder=segmentation_decoder,
segmentation_head=segmentation_head,
min_level=max_level,
max_level=max_level,
num_scales=3,
aspect_ratios=[1.0],
anchor_size=3)
expect_checkpoint_items = dict(
backbone=backbone,
decoder=decoder,
rpn_head=rpn_head,
detection_head=[detection_head])
expect_checkpoint_items['mask_head'] = mask_head
if not shared_backbone:
expect_checkpoint_items['segmentation_backbone'] = segmentation_backbone
if not shared_decoder:
expect_checkpoint_items['segmentation_decoder'] = segmentation_decoder
expect_checkpoint_items['segmentation_head'] = segmentation_head
self.assertAllEqual(expect_checkpoint_items, model.checkpoint_items)
# Test save and load checkpoints.
ckpt = tf.train.Checkpoint(model=model, **model.checkpoint_items)
save_dir = self.create_tempdir().full_path
ckpt.save(os.path.join(save_dir, 'ckpt'))
partial_ckpt = tf.train.Checkpoint(backbone=backbone)
partial_ckpt.read(tf.train.latest_checkpoint(
save_dir)).expect_partial().assert_existing_objects_matched()
partial_ckpt_mask = tf.train.Checkpoint(
backbone=backbone, mask_head=mask_head)
partial_ckpt_mask.restore(tf.train.latest_checkpoint(
save_dir)).expect_partial().assert_existing_objects_matched()
if not shared_backbone:
partial_ckpt_segmentation = tf.train.Checkpoint(
segmentation_backbone=segmentation_backbone,
segmentation_decoder=segmentation_decoder,
segmentation_head=segmentation_head)
elif not shared_decoder:
partial_ckpt_segmentation = tf.train.Checkpoint(
segmentation_decoder=segmentation_decoder,
segmentation_head=segmentation_head)
else:
partial_ckpt_segmentation = tf.train.Checkpoint(
segmentation_head=segmentation_head)
partial_ckpt_segmentation.restore(tf.train.latest_checkpoint(
save_dir)).expect_partial().assert_existing_objects_matched()
if __name__ == '__main__':
tf.test.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment