Commit 588d6da4 authored by Jaeyoun Kim's avatar Jaeyoun Kim Committed by A. Unique TensorFlower
Browse files

Copybara import of the project:

--
63719f08

 by Anirudh Vegesana <anirudh.vegesana@gmail.com>:

YOLO Family: Updated model (#9923)

* Update YOLO model

* Fix some docstrings

* Fix docstrings

* Address some of Dr. Davis' changes

* Give descriptive names to the test cases

* Fix bugs

* Fix YOLO head imports

* docstring and variable name updates

* docstring and variable name updates

* docstring and variable name updates
Co-authored-by: default avatarvishnubanna <banna3vishnu@gmail.com>
Co-authored-by: default avatarVishnu Banna <43182884+vishnubanna@users.noreply.github.com>
--
725b8c8c

 by Anirudh Vegesana <anirudh.vegesana@gmail.com>:

disclaimer (#10020)
Co-authored-by: default avatarVishnu Banna <43182884+vishnubanna@users.noreply.github.com>
--
404d24b0

 by Anirudh Vegesana <anirudh.vegesana@gmail.com>:

YOLO Family: Linting (#10027)

* YOLO Family: Updated model (#9923)

* Update YOLO model

* Fix some docstrings

* Fix docstrings

* Address some of Dr. Davis' changes

* Give descriptive names to the test cases

* Fix bugs

* Fix YOLO head imports

* docstring and variable name updates

* docstring and variable name updates

* docstring and variable name updates
Co-authored-by: default avatarvishnubanna <banna3vishnu@gmail.com>
Co-authored-by: default avatarVishnu Banna <43182884+vishnubanna@users.noreply.github.com>

* disclaimer

* Fix some PyLint errors
Co-authored-by: default avatarvishnubanna <banna3vishnu@gmail.com>
Co-authored-by: default avatarVishnu Banna <43182884+vishnubanna@users.noreply.github.com>
COPYBARA_INTEGRATE_REVIEW=https://github.com/tensorflow/models/pull/10021 from tensorflow:purdue-yolo 404d24b0
PiperOrigin-RevId: 379372162
parent e15c0aec
DISCLAIMER: this YOLO implementation is still under development. No support will
be provided during the development phase.
# YOLO Object Detectors, You Only Look Once
[![Paper](http://img.shields.io/badge/Paper-arXiv.1804.02767-B3181B?logo=arXiv)](https://arxiv.org/abs/1804.02767)
......@@ -74,3 +77,5 @@ head could be connected to a new, more powerful backbone if a person chose to.
[![TensorFlow 2.2](https://img.shields.io/badge/TensorFlow-2.2-FF6F00?logo=tensorflow)](https://github.com/tensorflow/tensorflow/releases/tag/v2.2.0)
[![Python 3.8](https://img.shields.io/badge/Python-3.8-3776AB)](https://www.python.org/downloads/release/python-380/)
......@@ -24,11 +24,14 @@ from official.vision.beta.configs import backbones
@dataclasses.dataclass
class DarkNet(hyperparams.Config):
"""DarkNet config."""
model_id: str = "darknet53"
class Darknet(hyperparams.Config):
"""Darknet config."""
model_id: str = 'darknet53'
width_scale: float = 1.0
depth_scale: float = 1.0
dilate: bool = False
@dataclasses.dataclass
class Backbone(backbones.Backbone):
darknet: DarkNet = DarkNet()
darknet: Darknet = Darknet()
......@@ -32,7 +32,7 @@ class ImageClassificationModel(hyperparams.Config):
num_classes: int = 0
input_size: List[int] = dataclasses.field(default_factory=list)
backbone: backbones.Backbone = backbones.Backbone(
type='darknet', resnet=backbones.DarkNet())
type='darknet', darknet=backbones.Darknet())
dropout_rate: float = 0.0
norm_activation: common.NormActivation = common.NormActivation()
# Adds a BatchNormalization layer pre-GlobalAveragePooling in classification
......
......@@ -13,7 +13,6 @@
# limitations under the License.
# Lint as: python3
"""Contains definitions of Darknet Backbone Networks.
The models are inspired by ResNet, and CSPNet
......@@ -29,15 +28,15 @@ Cross Stage Partial networks (CSPNets) were proposed in:
arXiv:1911.11929
DarkNets Are used mainly for Object detection in:
Darknets are used mainly for object detection in:
[1] Joseph Redmon, Ali Farhadi
YOLOv3: An Incremental Improvement. arXiv:1804.02767
[2] Alexey Bochkovskiy, Chien-Yao Wang, Hong-Yuan Mark Liao
YOLOv4: Optimal Speed and Accuracy of Object Detection. arXiv:2004.10934
"""
import collections
import collections
import tensorflow as tf
from official.modeling import hyperparams
......@@ -45,28 +44,32 @@ from official.vision.beta.modeling.backbones import factory
from official.vision.beta.projects.yolo.modeling.layers import nn_blocks
class BlockConfig(object):
"""Get layer config to make code more readable.
Args:
layer: string layer name
stack: the type of layer ordering to use for this specific level
repetitions: integer for the number of times to repeat block
bottelneck: boolean for does this stack have a bottle neck layer
filters: integer for the output depth of the level
pool_size: integer the pool_size of max pool layers
kernel_size: optional integer, for convolution kernel size
strides: integer or tuple to indicate convolution strides
padding: the padding to apply to layers in this stack
activation: string for the activation to use for this stack
route: integer for what level to route from to get the next input
output_name: the name to use for this output
is_output: is this layer an output in the default model
"""
class BlockConfig:
"""Class to store layer config to make code more readable."""
def __init__(self, layer, stack, reps, bottleneck, filters, pool_size,
kernel_size, strides, padding, activation, route, output_name,
is_output):
kernel_size, strides, padding, activation, route, dilation_rate,
output_name, is_output):
"""Initializing method for BlockConfig.
Args:
layer: A `str` for layer name.
stack: A `str` for the type of layer ordering to use for this specific
level.
reps: An `int` for the number of times to repeat block.
bottleneck: A `bool` for whether this stack has a bottle neck layer.
filters: An `int` for the output depth of the level.
pool_size: An `int` for the pool_size of max pool layers.
kernel_size: An `int` for convolution kernel size.
strides: A `Union[int, tuple]` that indicates convolution strides.
padding: An `int` for the padding to apply to layers in this stack.
activation: A `str` for the activation to use for this stack.
route: An `int` for the level to route from to get the next input.
dilation_rate: An `int` for the scale used in dialated Darknet.
output_name: A `str` for the name to use for this output.
is_output: A `bool` for whether this layer is an output in the default
model.
"""
self.layer = layer
self.stack = stack
self.repetitions = reps
......@@ -78,6 +81,7 @@ class BlockConfig(object):
self.padding = padding
self.activation = activation
self.route = route
self.dilation_rate = dilation_rate
self.output_name = output_name
self.is_output = is_output
......@@ -89,41 +93,41 @@ def build_block_specs(config):
return specs
class LayerFactory(object):
"""Class for quick look up of default layers.
class LayerBuilder:
"""Layer builder class.
Used by darknet to connect, introduce or exit a level. Used in place of an if
condition or switch to make adding new layers easier and to reduce redundant
code.
Class for quick look up of default layers used by darknet to
connect, introduce or exit a level. Used in place of an if condition
or switch to make adding new layers easier and to reduce redundant code.
"""
def __init__(self):
self._layer_dict = {
"ConvBN": (nn_blocks.ConvBN, self.conv_bn_config_todict),
"MaxPool": (tf.keras.layers.MaxPool2D, self.maxpool_config_todict)
'ConvBN': (nn_blocks.ConvBN, self.conv_bn_config_todict),
'MaxPool': (tf.keras.layers.MaxPool2D, self.maxpool_config_todict)
}
def conv_bn_config_todict(self, config, kwargs):
dictvals = {
"filters": config.filters,
"kernel_size": config.kernel_size,
"strides": config.strides,
"padding": config.padding
'filters': config.filters,
'kernel_size': config.kernel_size,
'strides': config.strides,
'padding': config.padding
}
dictvals.update(kwargs)
return dictvals
def darktiny_config_todict(self, config, kwargs):
dictvals = {"filters": config.filters, "strides": config.strides}
dictvals = {'filters': config.filters, 'strides': config.strides}
dictvals.update(kwargs)
return dictvals
def maxpool_config_todict(self, config, kwargs):
return {
"pool_size": config.pool_size,
"strides": config.strides,
"padding": config.padding,
"name": kwargs["name"]
'pool_size': config.pool_size,
'strides': config.strides,
'padding': config.padding,
'name': kwargs['name']
}
def __call__(self, config, kwargs):
......@@ -134,90 +138,259 @@ class LayerFactory(object):
# model configs
LISTNAMES = [
"default_layer_name", "level_type", "number_of_layers_in_level",
"bottleneck", "filters", "kernal_size", "pool_size", "strides", "padding",
"default_activation", "route", "level/name", "is_output"
'default_layer_name', 'level_type', 'number_of_layers_in_level',
'bottleneck', 'filters', 'kernal_size', 'pool_size', 'strides', 'padding',
'default_activation', 'route', 'dilation', 'level/name', 'is_output'
]
# pylint: disable=line-too-long
CSPDARKNET53 = {
"list_names": LISTNAMES,
"splits": {"backbone_split": 106,
"neck_split": 138},
"backbone": [
["ConvBN", None, 1, False, 32, None, 3, 1, "same", "mish", -1, 0, False],
["DarkRes", "csp", 1, True, 64, None, None, None, None, "mish", -1, 1, False],
["DarkRes", "csp", 2, False, 128, None, None, None, None, "mish", -1, 2, False],
["DarkRes", "csp", 8, False, 256, None, None, None, None, "mish", -1, 3, True],
["DarkRes", "csp", 8, False, 512, None, None, None, None, "mish", -1, 4, True],
["DarkRes", "csp", 4, False, 1024, None, None, None, None, "mish", -1, 5, True],
'list_names':
LISTNAMES,
'splits': {
'backbone_split': 106,
'neck_split': 132
},
'backbone': [
[
'ConvBN', None, 1, False, 32, None, 3, 1, 'same', 'mish', -1, 1, 0,
False
],
[
'DarkRes', 'csp', 1, True, 64, None, None, None, None, 'mish', -1,
1, 1, False
],
[
'DarkRes', 'csp', 2, False, 128, None, None, None, None, 'mish', -1,
1, 2, False
],
[
'DarkRes', 'csp', 8, False, 256, None, None, None, None, 'mish', -1,
1, 3, True
],
[
'DarkRes', 'csp', 8, False, 512, None, None, None, None, 'mish', -1,
2, 4, True
],
[
'DarkRes', 'csp', 4, False, 1024, None, None, None, None, 'mish',
-1, 4, 5, True
],
]
}
CSPADARKNET53 = {
'list_names':
LISTNAMES,
'splits': {
'backbone_split': 100,
'neck_split': 135
},
'backbone': [
[
'ConvBN', None, 1, False, 32, None, 3, 1, 'same', 'mish', -1, 1, 0,
False
],
[
'DarkRes', 'residual', 1, True, 64, None, None, None, None, 'mish',
-1, 1, 1, False
],
[
'DarkRes', 'csp', 2, False, 128, None, None, None, None, 'mish', -1,
1, 2, False
],
[
'DarkRes', 'csp', 8, False, 256, None, None, None, None, 'mish', -1,
1, 3, True
],
[
'DarkRes', 'csp', 8, False, 512, None, None, None, None, 'mish', -1,
2, 4, True
],
[
'DarkRes', 'csp', 4, False, 1024, None, None, None, None, 'mish',
-1, 4, 5, True
],
]
}
LARGECSP53 = {
'list_names':
LISTNAMES,
'splits': {
'backbone_split': 100,
'neck_split': 135
},
'backbone': [
[
'ConvBN', None, 1, False, 32, None, 3, 1, 'same', 'mish', -1, 1, 0,
False
],
[
'DarkRes', 'csp', 1, True, 64, None, None, None, None, 'mish', -1,
1, 1, False
],
[
'DarkRes', 'csp', 3, False, 128, None, None, None, None, 'mish', -1,
1, 2, False
],
[
'DarkRes', 'csp', 15, False, 256, None, None, None, None, 'mish',
-1, 1, 3, True
],
[
'DarkRes', 'csp', 15, False, 512, None, None, None, None, 'mish',
-1, 2, 4, True
],
[
'DarkRes', 'csp', 7, False, 1024, None, None, None, None, 'mish',
-1, 4, 5, True
],
[
'DarkRes', 'csp', 7, False, 1024, None, None, None, None, 'mish',
-1, 8, 6, True
],
[
'DarkRes', 'csp', 7, False, 1024, None, None, None, None, 'mish',
-1, 16, 7, True
],
]
}
DARKNET53 = {
"list_names": LISTNAMES,
"splits": {"backbone_split": 76},
"backbone": [
["ConvBN", None, 1, False, 32, None, 3, 1, "same", "leaky", -1, 0, False],
["DarkRes", "residual", 1, True, 64, None, None, None, None, "leaky", -1, 1, False],
["DarkRes", "residual", 2, False, 128, None, None, None, None, "leaky", -1, 2, False],
["DarkRes", "residual", 8, False, 256, None, None, None, None, "leaky", -1, 3, True],
["DarkRes", "residual", 8, False, 512, None, None, None, None, "leaky", -1, 4, True],
["DarkRes", "residual", 4, False, 1024, None, None, None, None, "leaky", -1, 5, True],
'list_names':
LISTNAMES,
'splits': {
'backbone_split': 76
},
'backbone': [
[
'ConvBN', None, 1, False, 32, None, 3, 1, 'same', 'leaky', -1, 1, 0,
False
],
[
'DarkRes', 'residual', 1, True, 64, None, None, None, None, 'leaky',
-1, 1, 1, False
],
[
'DarkRes', 'residual', 2, False, 128, None, None, None, None,
'leaky', -1, 1, 2, False
],
[
'DarkRes', 'residual', 8, False, 256, None, None, None, None,
'leaky', -1, 1, 3, True
],
[
'DarkRes', 'residual', 8, False, 512, None, None, None, None,
'leaky', -1, 2, 4, True
],
[
'DarkRes', 'residual', 4, False, 1024, None, None, None, None,
'leaky', -1, 4, 5, True
],
]
}
CSPDARKNETTINY = {
"list_names": LISTNAMES,
"splits": {"backbone_split": 28},
"backbone": [
["ConvBN", None, 1, False, 32, None, 3, 2, "same", "leaky", -1, 0, False],
["ConvBN", None, 1, False, 64, None, 3, 2, "same", "leaky", -1, 1, False],
["CSPTiny", "csp_tiny", 1, False, 64, None, 3, 2, "same", "leaky", -1, 2, False],
["CSPTiny", "csp_tiny", 1, False, 128, None, 3, 2, "same", "leaky", -1, 3, False],
["CSPTiny", "csp_tiny", 1, False, 256, None, 3, 2, "same", "leaky", -1, 4, True],
["ConvBN", None, 1, False, 512, None, 3, 1, "same", "leaky", -1, 5, True],
'list_names':
LISTNAMES,
'splits': {
'backbone_split': 28
},
'backbone': [
[
'ConvBN', None, 1, False, 32, None, 3, 2, 'same', 'leaky', -1, 1, 0,
False
],
[
'ConvBN', None, 1, False, 64, None, 3, 2, 'same', 'leaky', -1, 1, 1,
False
],
[
'CSPTiny', 'csp_tiny', 1, False, 64, None, 3, 2, 'same', 'leaky',
-1, 1, 2, False
],
[
'CSPTiny', 'csp_tiny', 1, False, 128, None, 3, 2, 'same', 'leaky',
-1, 1, 3, False
],
[
'CSPTiny', 'csp_tiny', 1, False, 256, None, 3, 2, 'same', 'leaky',
-1, 1, 4, True
],
[
'ConvBN', None, 1, False, 512, None, 3, 1, 'same', 'leaky', -1, 1,
5, True
],
]
}
DARKNETTINY = {
"list_names": LISTNAMES,
"splits": {"backbone_split": 14},
"backbone": [
["ConvBN", None, 1, False, 16, None, 3, 1, "same", "leaky", -1, 0, False],
["DarkTiny", "tiny", 1, True, 32, None, 3, 2, "same", "leaky", -1, 1, False],
["DarkTiny", "tiny", 1, True, 64, None, 3, 2, "same", "leaky", -1, 2, False],
["DarkTiny", "tiny", 1, False, 128, None, 3, 2, "same", "leaky", -1, 3, False],
["DarkTiny", "tiny", 1, False, 256, None, 3, 2, "same", "leaky", -1, 4, True],
["DarkTiny", "tiny", 1, False, 512, None, 3, 2, "same", "leaky", -1, 5, False],
["DarkTiny", "tiny", 1, False, 1024, None, 3, 1, "same", "leaky", -1, 5, True],
'list_names':
LISTNAMES,
'splits': {
'backbone_split': 14
},
'backbone': [
[
'ConvBN', None, 1, False, 16, None, 3, 1, 'same', 'leaky', -1, 1, 0,
False
],
[
'DarkTiny', 'tiny', 1, True, 32, None, 3, 2, 'same', 'leaky', -1, 1,
1, False
],
[
'DarkTiny', 'tiny', 1, True, 64, None, 3, 2, 'same', 'leaky', -1, 1,
2, False
],
[
'DarkTiny', 'tiny', 1, False, 128, None, 3, 2, 'same', 'leaky', -1,
1, 3, False
],
[
'DarkTiny', 'tiny', 1, False, 256, None, 3, 2, 'same', 'leaky', -1,
1, 4, True
],
[
'DarkTiny', 'tiny', 1, False, 512, None, 3, 2, 'same', 'leaky', -1,
1, 5, False
],
[
'DarkTiny', 'tiny', 1, False, 1024, None, 3, 1, 'same', 'leaky', -1,
1, 5, True
],
]
}
# pylint: enable=line-too-long
BACKBONES = {
"darknettiny": DARKNETTINY,
"darknet53": DARKNET53,
"cspdarknet53": CSPDARKNET53,
"cspdarknettiny": CSPDARKNETTINY
'darknettiny': DARKNETTINY,
'darknet53': DARKNET53,
'cspdarknet53': CSPDARKNET53,
'altered_cspdarknet53': CSPADARKNET53,
'cspdarknettiny': CSPDARKNETTINY,
'csp-large': LARGECSP53,
}
@tf.keras.utils.register_keras_serializable(package="yolo")
@tf.keras.utils.register_keras_serializable(package='yolo')
class Darknet(tf.keras.Model):
"""Darknet backbone."""
"""The Darknet backbone architecture."""
def __init__(
self,
model_id="darknet53",
model_id='darknet53',
input_specs=tf.keras.layers.InputSpec(shape=[None, None, None, 3]),
min_level=None,
max_level=5,
width_scale=1.0,
depth_scale=1.0,
csp_level_mod=(),
activation=None,
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_initializer="glorot_uniform",
dilate=False,
kernel_initializer='glorot_uniform',
kernel_regularizer=None,
bias_regularizer=None,
**kwargs):
......@@ -227,12 +400,13 @@ class Darknet(tf.keras.Model):
self._model_name = model_id
self._splits = splits
self._input_shape = input_specs
self._registry = LayerFactory()
self._registry = LayerBuilder()
# default layer look up
self._min_size = min_level
self._max_size = max_level
self._output_specs = None
self._csp_level_mod = set(csp_level_mod)
self._kernel_initializer = kernel_initializer
self._bias_regularizer = bias_regularizer
......@@ -241,16 +415,20 @@ class Darknet(tf.keras.Model):
self._use_sync_bn = use_sync_bn
self._activation = activation
self._kernel_regularizer = kernel_regularizer
self._dilate = dilate
self._width_scale = width_scale
self._depth_scale = depth_scale
self._default_dict = {
"kernel_initializer": self._kernel_initializer,
"kernel_regularizer": self._kernel_regularizer,
"bias_regularizer": self._bias_regularizer,
"norm_momentum": self._norm_momentum,
"norm_epsilon": self._norm_epislon,
"use_sync_bn": self._use_sync_bn,
"activation": self._activation,
"name": None
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
'bias_regularizer': self._bias_regularizer,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epislon,
'use_sync_bn': self._use_sync_bn,
'activation': self._activation,
'dilation_rate': 1,
'name': None
}
inputs = tf.keras.layers.Input(shape=self._input_shape.shape[1:])
......@@ -273,33 +451,39 @@ class Darknet(tf.keras.Model):
endpoints = collections.OrderedDict()
stack_outputs = [inputs]
for i, config in enumerate(net):
if config.output_name > self._max_size:
break
if config.output_name in self._csp_level_mod:
config.stack = 'residual'
config.filters = int(config.filters * self._width_scale)
config.repetitions = int(config.repetitions * self._depth_scale)
if config.stack is None:
x = self._build_block(stack_outputs[config.route],
config,
name=f"{config.layer}_{i}")
x = self._build_block(
stack_outputs[config.route], config, name=f'{config.layer}_{i}')
stack_outputs.append(x)
elif config.stack == "residual":
x = self._residual_stack(stack_outputs[config.route],
config,
name=f"{config.layer}_{i}")
elif config.stack == 'residual':
x = self._residual_stack(
stack_outputs[config.route], config, name=f'{config.layer}_{i}')
stack_outputs.append(x)
elif config.stack == "csp":
x = self._csp_stack(stack_outputs[config.route],
config,
name=f"{config.layer}_{i}")
elif config.stack == 'csp':
x = self._csp_stack(
stack_outputs[config.route], config, name=f'{config.layer}_{i}')
stack_outputs.append(x)
elif config.stack == "csp_tiny":
x_pass, x = self._csp_tiny_stack(stack_outputs[config.route],
config, name=f"{config.layer}_{i}")
elif config.stack == 'csp_tiny':
x_pass, x = self._csp_tiny_stack(
stack_outputs[config.route], config, name=f'{config.layer}_{i}')
stack_outputs.append(x_pass)
elif config.stack == "tiny":
x = self._tiny_stack(stack_outputs[config.route],
config,
name=f"{config.layer}_{i}")
elif config.stack == 'tiny':
x = self._tiny_stack(
stack_outputs[config.route], config, name=f'{config.layer}_{i}')
stack_outputs.append(x)
if (config.is_output and self._min_size is None):
endpoints[str(config.output_name)] = x
elif self._min_size is not None and config.output_name >= self._min_size and config.output_name <= self._max_size:
elif (self._min_size is not None and
config.output_name >= self._min_size and
config.output_name <= self._max_size):
endpoints[str(config.output_name)] = x
self._output_specs = {l: endpoints[l].get_shape() for l in endpoints.keys()}
......@@ -308,8 +492,7 @@ class Darknet(tf.keras.Model):
def _get_activation(self, activation):
if self._activation is None:
return activation
else:
return self._activation
return self._activation
def _csp_stack(self, inputs, config, name):
if config.bottleneck:
......@@ -320,86 +503,135 @@ class Darknet(tf.keras.Model):
csp_filter_scale = 2
residual_filter_scale = 1
scale_filters = 2
self._default_dict["activation"] = self._get_activation(config.activation)
self._default_dict["name"] = f"{name}_csp_down"
x, x_route = nn_blocks.CSPRoute(filters=config.filters,
filter_scale=csp_filter_scale,
downsample=True,
**self._default_dict)(inputs)
for i in range(config.repetitions):
self._default_dict["name"] = f"{name}_{i}"
x = nn_blocks.DarkResidual(filters=config.filters // scale_filters,
filter_scale=residual_filter_scale,
**self._default_dict)(x)
self._default_dict["name"] = f"{name}_csp_connect"
output = nn_blocks.CSPConnect(filters=config.filters,
filter_scale=csp_filter_scale,
**self._default_dict)([x, x_route])
self._default_dict["activation"] = self._activation
self._default_dict["name"] = None
self._default_dict['activation'] = self._get_activation(config.activation)
self._default_dict['name'] = f'{name}_csp_down'
if self._dilate:
self._default_dict['dilation_rate'] = config.dilation_rate
else:
self._default_dict['dilation_rate'] = 1
# swap/add dilation
x, x_route = nn_blocks.CSPRoute(
filters=config.filters,
filter_scale=csp_filter_scale,
downsample=True,
**self._default_dict)(
inputs)
dilated_reps = config.repetitions - self._default_dict['dilation_rate'] // 2
for i in range(dilated_reps):
self._default_dict['name'] = f'{name}_{i}'
x = nn_blocks.DarkResidual(
filters=config.filters // scale_filters,
filter_scale=residual_filter_scale,
**self._default_dict)(
x)
for i in range(dilated_reps, config.repetitions):
self._default_dict[
'dilation_rate'] = self._default_dict['dilation_rate'] // 2
self._default_dict[
'name'] = f"{name}_{i}_degridded_{self._default_dict['dilation_rate']}"
x = nn_blocks.DarkResidual(
filters=config.filters // scale_filters,
filter_scale=residual_filter_scale,
**self._default_dict)(
x)
self._default_dict['name'] = f'{name}_csp_connect'
output = nn_blocks.CSPConnect(
filters=config.filters,
filter_scale=csp_filter_scale,
**self._default_dict)([x, x_route])
self._default_dict['activation'] = self._activation
self._default_dict['name'] = None
return output
def _csp_tiny_stack(self, inputs, config, name):
self._default_dict["activation"] = self._get_activation(config.activation)
self._default_dict["name"] = f"{name}_csp_tiny"
x, x_route = nn_blocks.CSPTiny(filters=config.filters,
**self._default_dict)(inputs)
self._default_dict["activation"] = self._activation
self._default_dict["name"] = None
self._default_dict['activation'] = self._get_activation(config.activation)
self._default_dict['name'] = f'{name}_csp_tiny'
x, x_route = nn_blocks.CSPTiny(
filters=config.filters, **self._default_dict)(
inputs)
self._default_dict['activation'] = self._activation
self._default_dict['name'] = None
return x, x_route
def _tiny_stack(self, inputs, config, name):
x = tf.keras.layers.MaxPool2D(pool_size=2,
strides=config.strides,
padding="same",
data_format=None,
name=f"{name}_tiny/pool")(inputs)
self._default_dict["activation"] = self._get_activation(config.activation)
self._default_dict["name"] = f"{name}_tiny/conv"
x = tf.keras.layers.MaxPool2D(
pool_size=2,
strides=config.strides,
padding='same',
data_format=None,
name=f'{name}_tiny/pool')(
inputs)
self._default_dict['activation'] = self._get_activation(config.activation)
self._default_dict['name'] = f'{name}_tiny/conv'
x = nn_blocks.ConvBN(
filters=config.filters,
kernel_size=(3, 3),
strides=(1, 1),
padding="same",
padding='same',
**self._default_dict)(
x)
self._default_dict["activation"] = self._activation
self._default_dict["name"] = None
self._default_dict['activation'] = self._activation
self._default_dict['name'] = None
return x
def _residual_stack(self, inputs, config, name):
self._default_dict["activation"] = self._get_activation(config.activation)
self._default_dict["name"] = f"{name}_residual_down"
x = nn_blocks.DarkResidual(filters=config.filters,
downsample=True,
**self._default_dict)(inputs)
for i in range(config.repetitions - 1):
self._default_dict["name"] = f"{name}_{i}"
x = nn_blocks.DarkResidual(filters=config.filters,
**self._default_dict)(x)
self._default_dict["activation"] = self._activation
self._default_dict["name"] = None
self._default_dict['activation'] = self._get_activation(config.activation)
self._default_dict['name'] = f'{name}_residual_down'
if self._dilate:
self._default_dict['dilation_rate'] = config.dilation_rate
if config.repetitions < 8:
config.repetitions += 2
else:
self._default_dict['dilation_rate'] = 1
x = nn_blocks.DarkResidual(
filters=config.filters, downsample=True, **self._default_dict)(
inputs)
dilated_reps = config.repetitions - (
self._default_dict['dilation_rate'] // 2) - 1
for i in range(dilated_reps):
self._default_dict['name'] = f'{name}_{i}'
x = nn_blocks.DarkResidual(
filters=config.filters, **self._default_dict)(
x)
for i in range(dilated_reps, config.repetitions - 1):
self._default_dict[
'dilation_rate'] = self._default_dict['dilation_rate'] // 2
self._default_dict[
'name'] = f"{name}_{i}_degridded_{self._default_dict['dilation_rate']}"
x = nn_blocks.DarkResidual(
filters=config.filters, **self._default_dict)(
x)
self._default_dict['activation'] = self._activation
self._default_dict['name'] = None
self._default_dict['dilation_rate'] = 1
return x
def _build_block(self, inputs, config, name):
x = inputs
i = 0
self._default_dict["activation"] = self._get_activation(config.activation)
self._default_dict['activation'] = self._get_activation(config.activation)
while i < config.repetitions:
self._default_dict["name"] = f"{name}_{i}"
self._default_dict['name'] = f'{name}_{i}'
layer = self._registry(config, self._default_dict)
x = layer(x)
i += 1
self._default_dict["activation"] = self._activation
self._default_dict["name"] = None
self._default_dict['activation'] = self._activation
self._default_dict['name'] = None
return x
@staticmethod
def get_model_config(name):
name = name.lower()
backbone = BACKBONES[name]["backbone"]
splits = BACKBONES[name]["splits"]
backbone = BACKBONES[name]['backbone']
splits = BACKBONES[name]['splits']
return build_block_specs(backbone), splits
@property
......@@ -412,35 +644,41 @@ class Darknet(tf.keras.Model):
def get_config(self):
layer_config = {
"model_id": self._model_name,
"min_level": self._min_size,
"max_level": self._max_size,
"kernel_initializer": self._kernel_initializer,
"kernel_regularizer": self._kernel_regularizer,
"bias_regularizer": self._bias_regularizer,
"norm_momentum": self._norm_momentum,
"norm_epsilon": self._norm_epislon,
"use_sync_bn": self._use_sync_bn,
"activation": self._activation
'model_id': self._model_name,
'min_level': self._min_size,
'max_level': self._max_size,
'kernel_initializer': self._kernel_initializer,
'kernel_regularizer': self._kernel_regularizer,
'bias_regularizer': self._bias_regularizer,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epislon,
'use_sync_bn': self._use_sync_bn,
'activation': self._activation,
}
return layer_config
@factory.register_backbone_builder("darknet")
@factory.register_backbone_builder('darknet')
def build_darknet(
input_specs: tf.keras.layers.InputSpec,
backbone_config: hyperparams.Config,
norm_activation_config: hyperparams.Config,
l2_regularizer: tf.keras.regularizers.Regularizer = None) -> tf.keras.Model:
"""Builds darknet backbone."""
"""Builds darknet."""
backbone_cfg = backbone_config.get()
model = Darknet(
model_id=backbone_cfg.model_id,
input_shape=input_specs,
min_level=backbone_cfg.min_level,
max_level=backbone_cfg.max_level,
input_specs=input_specs,
dilate=backbone_cfg.dilate,
width_scale=backbone_cfg.width_scale,
depth_scale=backbone_cfg.depth_scale,
activation=norm_activation_config.activation,
use_sync_bn=norm_activation_config.use_sync_bn,
norm_momentum=norm_activation_config.norm_momentum,
norm_epsilon=norm_activation_config.norm_epsilon,
kernel_regularizer=l2_regularizer)
model.summary()
return model
......@@ -13,7 +13,7 @@
# limitations under the License.
# Lint as: python3
"""Tests for resnet."""
"""Tests for yolo."""
from absl.testing import parameterized
import numpy as np
......@@ -24,35 +24,48 @@ from tensorflow.python.distribute import strategy_combinations
from official.vision.beta.projects.yolo.modeling.backbones import darknet
class DarkNetTest(parameterized.TestCase, tf.test.TestCase):
class DarknetTest(parameterized.TestCase, tf.test.TestCase):
@parameterized.parameters(
(224, "darknet53", 2, 1),
(224, "darknettiny", 1, 2),
(224, "cspdarknettiny", 1, 1),
(224, "cspdarknet53", 2, 1),
(224, 'darknet53', 2, 1, True),
(224, 'darknettiny', 1, 2, False),
(224, 'cspdarknettiny', 1, 1, False),
(224, 'cspdarknet53', 2, 1, True),
)
def test_network_creation(self, input_size, model_id,
endpoint_filter_scale, scale_final):
def test_network_creation(self, input_size, model_id, endpoint_filter_scale,
scale_final, dilate):
"""Test creation of ResNet family models."""
tf.keras.backend.set_image_data_format("channels_last")
tf.keras.backend.set_image_data_format('channels_last')
network = darknet.Darknet(model_id=model_id, min_level=3, max_level=5)
network = darknet.Darknet(
model_id=model_id, min_level=3, max_level=5, dilate=dilate)
self.assertEqual(network.model_id, model_id)
inputs = tf.keras.Input(shape=(input_size, input_size, 3), batch_size=1)
endpoints = network(inputs)
self.assertAllEqual(
[1, input_size / 2**3, input_size / 2**3, 128 * endpoint_filter_scale],
endpoints["3"].shape.as_list())
self.assertAllEqual(
[1, input_size / 2**4, input_size / 2**4, 256 * endpoint_filter_scale],
endpoints["4"].shape.as_list())
self.assertAllEqual([
1, input_size / 2**5, input_size / 2**5,
512 * endpoint_filter_scale * scale_final
], endpoints["5"].shape.as_list())
if dilate:
self.assertAllEqual([
1, input_size / 2**3, input_size / 2**3, 128 * endpoint_filter_scale
], endpoints['3'].shape.as_list())
self.assertAllEqual([
1, input_size / 2**3, input_size / 2**3, 256 * endpoint_filter_scale
], endpoints['4'].shape.as_list())
self.assertAllEqual([
1, input_size / 2**3, input_size / 2**3,
512 * endpoint_filter_scale * scale_final
], endpoints['5'].shape.as_list())
else:
self.assertAllEqual([
1, input_size / 2**3, input_size / 2**3, 128 * endpoint_filter_scale
], endpoints['3'].shape.as_list())
self.assertAllEqual([
1, input_size / 2**4, input_size / 2**4, 256 * endpoint_filter_scale
], endpoints['4'].shape.as_list())
self.assertAllEqual([
1, input_size / 2**5, input_size / 2**5,
512 * endpoint_filter_scale * scale_final
], endpoints['5'].shape.as_list())
@combinations.generate(
combinations.combine(
......@@ -66,20 +79,20 @@ class DarkNetTest(parameterized.TestCase, tf.test.TestCase):
"""Test for sync bn on TPU and GPU devices."""
inputs = np.random.rand(1, 224, 224, 3)
tf.keras.backend.set_image_data_format("channels_last")
tf.keras.backend.set_image_data_format('channels_last')
with strategy.scope():
network = darknet.Darknet(model_id="darknet53", min_size=3, max_size=5)
network = darknet.Darknet(model_id='darknet53', min_size=3, max_size=5)
_ = network(inputs)
@parameterized.parameters(1, 3, 4)
def test_input_specs(self, input_dim):
"""Test different input feature dimensions."""
tf.keras.backend.set_image_data_format("channels_last")
tf.keras.backend.set_image_data_format('channels_last')
input_specs = tf.keras.layers.InputSpec(shape=[None, None, None, input_dim])
network = darknet.Darknet(
model_id="darknet53", min_level=3, max_level=5, input_specs=input_specs)
model_id='darknet53', min_level=3, max_level=5, input_specs=input_specs)
inputs = tf.keras.Input(shape=(224, 224, input_dim), batch_size=1)
_ = network(inputs)
......@@ -87,14 +100,14 @@ class DarkNetTest(parameterized.TestCase, tf.test.TestCase):
def test_serialize_deserialize(self):
# Create a network object that sets all of its config options.
kwargs = dict(
model_id="darknet53",
model_id='darknet53',
min_level=3,
max_level=5,
use_sync_bn=False,
activation="relu",
activation='relu',
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_initializer="VarianceScaling",
kernel_initializer='VarianceScaling',
kernel_regularizer=None,
bias_regularizer=None,
)
......@@ -113,5 +126,5 @@ class DarkNetTest(parameterized.TestCase, tf.test.TestCase):
self.assertAllEqual(network.get_config(), new_network.get_config())
if __name__ == "__main__":
if __name__ == '__main__':
tf.test.main()
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
"""Feature Pyramid Network and Path Aggregation variants used in YOLO."""
import tensorflow as tf
from official.vision.beta.projects.yolo.modeling.layers import nn_blocks
@tf.keras.utils.register_keras_serializable(package='yolo')
class _IdentityRoute(tf.keras.layers.Layer):
def call(self, inputs):
return None, inputs
@tf.keras.utils.register_keras_serializable(package='yolo')
class YoloFPN(tf.keras.layers.Layer):
"""YOLO Feature pyramid network."""
def __init__(self,
fpn_depth=4,
use_spatial_attention=False,
csp_stack=False,
activation='leaky',
fpn_filter_scale=1,
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_initializer='glorot_uniform',
kernel_regularizer=None,
bias_regularizer=None,
**kwargs):
"""Yolo FPN initialization function (Yolo V4).
Args:
fpn_depth: `int`, number of layers to use in each FPN path
if you choose to use an FPN.
use_spatial_attention: `bool`, use the spatial attention module.
csp_stack: `bool`, CSPize the FPN.
activation: `str`, the activation function to use typically leaky or mish.
fpn_filter_scale: `int`, scaling factor for the FPN filters.
use_sync_bn: if True, use synchronized batch normalization.
norm_momentum: `float`, normalization momentum for the moving average.
norm_epsilon: `float`, small float added to variance to avoid dividing by
zero.
kernel_initializer: kernel_initializer for convolutional layers.
kernel_regularizer: tf.keras.regularizers.Regularizer object for Conv2D.
bias_regularizer: tf.keras.regularizers.Regularizer object for Conv2D.
**kwargs: keyword arguments to be passed.
"""
super().__init__(**kwargs)
self._fpn_depth = fpn_depth
self._activation = activation
self._use_sync_bn = use_sync_bn
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
self._kernel_initializer = kernel_initializer
self._kernel_regularizer = kernel_regularizer
self._bias_regularizer = bias_regularizer
self._use_spatial_attention = use_spatial_attention
self._filter_scale = fpn_filter_scale
self._csp_stack = csp_stack
self._base_config = dict(
activation=self._activation,
use_sync_bn=self._use_sync_bn,
kernel_regularizer=self._kernel_regularizer,
kernel_initializer=self._kernel_initializer,
bias_regularizer=self._bias_regularizer,
norm_epsilon=self._norm_epsilon,
norm_momentum=self._norm_momentum)
def get_raw_depths(self, minimum_depth, inputs):
"""Calculates the unscaled depths of the FPN branches.
Args:
minimum_depth (int): depth of the smallest branch of the FPN.
inputs (dict): dictionary of the shape of input args as a dictionary of
lists.
Returns:
The unscaled depths of the FPN branches.
"""
depths = []
for i in range(self._min_level, self._max_level + 1):
depths.append(inputs[str(i)][-1] / self._filter_scale)
return list(reversed(depths))
def build(self, inputs):
"""Use config dictionary to generate all important attributes for head.
Args:
inputs: dictionary of the shape of input args as a dictionary of lists.
"""
keys = [int(key) for key in inputs.keys()]
self._min_level = min(keys)
self._max_level = max(keys)
self._min_depth = inputs[str(self._min_level)][-1]
self._depths = self.get_raw_depths(self._min_depth, inputs)
# directly connect to an input path and process it
self.preprocessors = dict()
# resample an input and merge it with the output of another path
# inorder to aggregate backbone outputs
self.resamples = dict()
# set of convoltion layers and upsample layers that are used to
# prepare the FPN processors for output
for level, depth in zip(
reversed(range(self._min_level, self._max_level + 1)), self._depths):
if level == self._min_level:
self.resamples[str(level)] = nn_blocks.PathAggregationBlock(
filters=depth // 2,
inverted=True,
upsample=True,
drop_final=self._csp_stack == 0,
upsample_size=2,
**self._base_config)
self.preprocessors[str(level)] = _IdentityRoute()
elif level != self._max_level:
self.resamples[str(level)] = nn_blocks.PathAggregationBlock(
filters=depth // 2,
inverted=True,
upsample=True,
drop_final=False,
upsample_size=2,
**self._base_config)
self.preprocessors[str(level)] = nn_blocks.DarkRouteProcess(
filters=depth,
repetitions=self._fpn_depth - int(level == self._min_level),
block_invert=True,
insert_spp=False,
csp_stack=self._csp_stack,
**self._base_config)
else:
self.preprocessors[str(level)] = nn_blocks.DarkRouteProcess(
filters=depth,
repetitions=self._fpn_depth + 1 * int(self._csp_stack == 0),
insert_spp=True,
block_invert=False,
csp_stack=self._csp_stack,
**self._base_config)
def call(self, inputs):
outputs = dict()
layer_in = inputs[str(self._max_level)]
for level in reversed(range(self._min_level, self._max_level + 1)):
_, x = self.preprocessors[str(level)](layer_in)
outputs[str(level)] = x
if level > self._min_level:
x_next = inputs[str(level - 1)]
_, layer_in = self.resamples[str(level - 1)]([x_next, x])
return outputs
@tf.keras.utils.register_keras_serializable(package='yolo')
class YoloPAN(tf.keras.layers.Layer):
"""YOLO Path Aggregation Network."""
def __init__(self,
path_process_len=6,
max_level_process_len=None,
embed_spp=False,
use_spatial_attention=False,
csp_stack=False,
activation='leaky',
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_initializer='glorot_uniform',
kernel_regularizer=None,
bias_regularizer=None,
fpn_input=True,
fpn_filter_scale=1.0,
**kwargs):
"""Yolo Path Aggregation Network initialization function (Yolo V3 and V4).
Args:
path_process_len: `int`, number of layers ot use in each Decoder path.
max_level_process_len: `int`, number of layers ot use in the largest
processing path, or the backbones largest output if it is different.
embed_spp: `bool`, use the SPP found in the YoloV3 and V4 model.
use_spatial_attention: `bool`, use the spatial attention module.
csp_stack: `bool`, CSPize the FPN.
activation: `str`, the activation function to use typically leaky or mish.
use_sync_bn: if True, use synchronized batch normalization.
norm_momentum: `float`, normalization omentum for the moving average.
norm_epsilon: `float`, small float added to variance to avoid dividing
by zero.
kernel_initializer: kernel_initializer for convolutional layers.
kernel_regularizer: tf.keras.regularizers.Regularizer object for Conv2D.
bias_regularizer: tf.keras.regularizers.Regularizer object for Conv2d.
fpn_input: `bool`, for whether the input into this fucntion is an FPN or
a backbone.
fpn_filter_scale: `int`, scaling factor for the FPN filters.
**kwargs: keyword arguments to be passed.
"""
super().__init__(**kwargs)
self._path_process_len = path_process_len
self._embed_spp = embed_spp
self._use_spatial_attention = use_spatial_attention
self._activation = activation
self._use_sync_bn = use_sync_bn
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
self._kernel_initializer = kernel_initializer
self._kernel_regularizer = kernel_regularizer
self._bias_regularizer = bias_regularizer
self._fpn_input = fpn_input
self._max_level_process_len = max_level_process_len
self._csp_stack = csp_stack
self._fpn_filter_scale = fpn_filter_scale
if max_level_process_len is None:
self._max_level_process_len = path_process_len
self._base_config = dict(
activation=self._activation,
use_sync_bn=self._use_sync_bn,
kernel_regularizer=self._kernel_regularizer,
kernel_initializer=self._kernel_initializer,
bias_regularizer=self._bias_regularizer,
norm_epsilon=self._norm_epsilon,
norm_momentum=self._norm_momentum)
def build(self, inputs):
"""Use config dictionary to generate all important attributes for head.
Args:
inputs: dictionary of the shape of input args as a dictionary of lists.
"""
# define the key order
keys = [int(key) for key in inputs.keys()]
self._min_level = min(keys)
self._max_level = max(keys)
self._min_depth = inputs[str(self._min_level)][-1]
self._depths = self.get_raw_depths(self._min_depth, inputs)
# directly connect to an input path and process it
self.preprocessors = dict()
# resample an input and merge it with the output of another path
# inorder to aggregate backbone outputs
self.resamples = dict()
# FPN will reverse the key process order for the backbone, so we need
# adjust the order that objects are created and processed to adjust for
# this. not using an FPN will directly connect the decoder to the backbone
# therefore the object creation order needs to be done from the largest
# to smallest level.
if self._fpn_input:
# process order {... 3, 4, 5}
self._iterator = range(self._min_level, self._max_level + 1)
self._check = lambda x: x < self._max_level
self._key_shift = lambda x: x + 1
self._input = self._min_level
downsample = True
upsample = False
else:
# process order {5, 4, 3, ...}
self._iterator = list(
reversed(range(self._min_level, self._max_level + 1)))
self._check = lambda x: x > self._min_level
self._key_shift = lambda x: x - 1
self._input = self._max_level
downsample = False
upsample = True
if self._csp_stack == 0:
proc_filters = lambda x: x
resample_filters = lambda x: x // 2
else:
proc_filters = lambda x: x * 2
resample_filters = lambda x: x
for level, depth in zip(self._iterator, self._depths):
if level == self._input:
self.preprocessors[str(level)] = nn_blocks.DarkRouteProcess(
filters=proc_filters(depth),
repetitions=self._max_level_process_len,
insert_spp=self._embed_spp,
block_invert=False,
insert_sam=self._use_spatial_attention,
csp_stack=self._csp_stack,
**self._base_config)
else:
self.resamples[str(level)] = nn_blocks.PathAggregationBlock(
filters=resample_filters(depth),
upsample=upsample,
downsample=downsample,
inverted=False,
drop_final=self._csp_stack == 0,
**self._base_config)
self.preprocessors[str(level)] = nn_blocks.DarkRouteProcess(
filters=proc_filters(depth),
repetitions=self._path_process_len,
insert_spp=False,
insert_sam=self._use_spatial_attention,
csp_stack=self._csp_stack,
**self._base_config)
def get_raw_depths(self, minimum_depth, inputs):
"""Calculates the unscaled depths of the FPN branches.
Args:
minimum_depth: `int` depth of the smallest branch of the FPN.
inputs: `dict[str, tf.InputSpec]` of the shape of input args as a
dictionary of lists.
Returns:
The unscaled depths of the FPN branches.
"""
depths = []
if len(inputs.keys()) > 3 or self._fpn_filter_scale > 1:
for i in range(self._min_level, self._max_level + 1):
depths.append(inputs[str(i)][-1] * 2)
else:
for _ in range(self._min_level, self._max_level + 1):
depths.append(minimum_depth)
minimum_depth *= 2
if self._fpn_input:
return depths
return list(reversed(depths))
def call(self, inputs):
outputs = dict()
layer_in = inputs[str(self._input)]
for level in self._iterator:
x_route, x = self.preprocessors[str(level)](layer_in)
outputs[str(level)] = x
if self._check(level):
x_next = inputs[str(self._key_shift(level))]
_, layer_in = self.resamples[str(
self._key_shift(level))]([x_route, x_next])
return outputs
@tf.keras.utils.register_keras_serializable(package='yolo')
class YoloDecoder(tf.keras.Model):
"""Darknet Backbone Decoder."""
def __init__(self,
input_specs,
use_fpn=False,
use_spatial_attention=False,
csp_stack=False,
fpn_depth=4,
fpn_filter_scale=1,
path_process_len=6,
max_level_process_len=None,
embed_spp=False,
activation='leaky',
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_initializer='glorot_uniform',
kernel_regularizer=None,
bias_regularizer=None,
**kwargs):
"""Yolo Decoder initialization function.
A unified model that ties all decoder components into a conditionally build
YOLO decoder.
Args:
input_specs: `dict[str, tf.InputSpec]`: input specs of each of the inputs
to the heads.
use_fpn: `bool`, use the FPN found in the YoloV4 model.
use_spatial_attention: `bool`, use the spatial attention module.
csp_stack: `bool`, CSPize the FPN.
fpn_depth: `int`, number of layers ot use in each FPN path
if you choose to use an FPN.
fpn_filter_scale: `int`, scaling factor for the FPN filters.
path_process_len: `int`, number of layers ot use in each Decoder path.
max_level_process_len: `int`, number of layers ot use in the largest
processing path, or the backbones largest output if it is different.
embed_spp: `bool`, use the SPP found in the YoloV3 and V4 model.
activation: `str`, the activation function to use typically leaky or mish.
use_sync_bn: if True, use synchronized batch normalization.
norm_momentum: `float`, normalization omentum for the moving average.
norm_epsilon: `float`, small float added to variance to avoid dividing by
zero.
kernel_initializer: kernel_initializer for convolutional layers.
kernel_regularizer: tf.keras.regularizers.Regularizer object for Conv2D.
bias_regularizer: tf.keras.regularizers.Regularizer object for Conv2D.
**kwargs: keyword arguments to be passed.
"""
self._input_specs = input_specs
self._use_fpn = use_fpn
self._fpn_depth = fpn_depth
self._path_process_len = path_process_len
self._max_level_process_len = max_level_process_len
self._embed_spp = embed_spp
self._activation = activation
self._use_sync_bn = use_sync_bn
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
self._kernel_initializer = kernel_initializer
self._kernel_regularizer = kernel_regularizer
self._bias_regularizer = bias_regularizer
self._base_config = dict(
use_spatial_attention=use_spatial_attention,
csp_stack=csp_stack,
activation=self._activation,
use_sync_bn=self._use_sync_bn,
fpn_filter_scale=fpn_filter_scale,
norm_momentum=self._norm_momentum,
norm_epsilon=self._norm_epsilon,
kernel_initializer=self._kernel_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer)
self._decoder_config = dict(
path_process_len=self._path_process_len,
max_level_process_len=self._max_level_process_len,
embed_spp=self._embed_spp,
fpn_input=self._use_fpn,
**self._base_config)
inputs = {
key: tf.keras.layers.Input(shape=value[1:])
for key, value in input_specs.items()
}
if self._use_fpn:
inter_outs = YoloFPN(
fpn_depth=self._fpn_depth, **self._base_config)(
inputs)
outputs = YoloPAN(**self._decoder_config)(inter_outs)
else:
inter_outs = None
outputs = YoloPAN(**self._decoder_config)(inputs)
self._output_specs = {key: value.shape for key, value in outputs.items()}
super().__init__(inputs=inputs, outputs=outputs, name='YoloDecoder')
@property
def use_fpn(self):
return self._use_fpn
@property
def output_specs(self):
return self._output_specs
def get_config(self):
config = dict(
input_specs=self._input_specs,
use_fpn=self._use_fpn,
fpn_depth=self._fpn_depth,
**self._decoder_config)
return config
@classmethod
def from_config(cls, config, custom_objects=None):
return cls(**config)
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
"""Tests for YOLO."""
# Import libraries
from absl.testing import parameterized
import tensorflow as tf
from tensorflow.python.distribute import combinations
from tensorflow.python.distribute import strategy_combinations
from official.vision.beta.projects.yolo.modeling.decoders import yolo_decoder as decoders
class YoloDecoderTest(parameterized.TestCase, tf.test.TestCase):
def _build_yolo_decoder(self, input_specs, name='1'):
# Builds 4 different arbitrary decoders.
if name == '1':
model = decoders.YoloDecoder(
input_specs=input_specs,
embed_spp=False,
use_fpn=False,
max_level_process_len=2,
path_process_len=1,
activation='mish')
elif name == '6spp':
model = decoders.YoloDecoder(
input_specs=input_specs,
embed_spp=True,
use_fpn=False,
max_level_process_len=None,
path_process_len=6,
activation='mish')
elif name == '6sppfpn':
model = decoders.YoloDecoder(
input_specs=input_specs,
embed_spp=True,
use_fpn=True,
max_level_process_len=None,
path_process_len=6,
activation='mish')
elif name == '6':
model = decoders.YoloDecoder(
input_specs=input_specs,
embed_spp=False,
use_fpn=False,
max_level_process_len=None,
path_process_len=6,
activation='mish')
else:
raise NotImplementedError(f'YOLO decoder test {type} not implemented.')
return model
@parameterized.parameters('1', '6spp', '6sppfpn', '6')
def test_network_creation(self, version):
"""Test creation of ResNet family models."""
tf.keras.backend.set_image_data_format('channels_last')
input_shape = {
'3': [1, 52, 52, 256],
'4': [1, 26, 26, 512],
'5': [1, 13, 13, 1024]
}
decoder = self._build_yolo_decoder(input_shape, version)
inputs = {}
for key in input_shape:
inputs[key] = tf.ones(input_shape[key], dtype=tf.float32)
endpoints = decoder.call(inputs)
for key in endpoints.keys():
self.assertAllEqual(endpoints[key].shape.as_list(), input_shape[key])
@combinations.generate(
combinations.combine(
strategy=[
strategy_combinations.cloud_tpu_strategy,
strategy_combinations.one_device_strategy_gpu,
],
use_sync_bn=[False, True],
))
def test_sync_bn_multiple_devices(self, strategy, use_sync_bn):
"""Test for sync bn on TPU and GPU devices."""
tf.keras.backend.set_image_data_format('channels_last')
with strategy.scope():
input_shape = {
'3': [1, 52, 52, 256],
'4': [1, 26, 26, 512],
'5': [1, 13, 13, 1024]
}
decoder = self._build_yolo_decoder(input_shape, '6')
inputs = {}
for key in input_shape:
inputs[key] = tf.ones(input_shape[key], dtype=tf.float32)
_ = decoder.call(inputs)
@parameterized.parameters(1, 3, 4)
def test_input_specs(self, input_dim):
"""Test different input feature dimensions."""
tf.keras.backend.set_image_data_format('channels_last')
input_shape = {
'3': [1, 52, 52, 256],
'4': [1, 26, 26, 512],
'5': [1, 13, 13, 1024]
}
decoder = self._build_yolo_decoder(input_shape, '6')
inputs = {}
for key in input_shape:
inputs[key] = tf.ones(input_shape[key], dtype=tf.float32)
_ = decoder(inputs)
def test_serialize_deserialize(self):
"""Create a network object that sets all of its config options."""
tf.keras.backend.set_image_data_format('channels_last')
input_shape = {
'3': [1, 52, 52, 256],
'4': [1, 26, 26, 512],
'5': [1, 13, 13, 1024]
}
decoder = self._build_yolo_decoder(input_shape, '6')
inputs = {}
for key in input_shape:
inputs[key] = tf.ones(input_shape[key], dtype=tf.float32)
_ = decoder(inputs)
config = decoder.get_config()
decoder_from_config = decoders.YoloDecoder.from_config(config)
self.assertAllEqual(decoder.get_config(), decoder_from_config.get_config())
if __name__ == '__main__':
tf.test.main()
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
"""Yolo heads."""
import tensorflow as tf
from official.vision.beta.projects.yolo.modeling.layers import nn_blocks
class YoloHead(tf.keras.layers.Layer):
"""YOLO Prediction Head."""
def __init__(self,
min_level,
max_level,
classes=80,
boxes_per_level=3,
output_extras=0,
norm_momentum=0.99,
norm_epsilon=0.001,
kernel_initializer='glorot_uniform',
kernel_regularizer=None,
bias_regularizer=None,
activation=None,
**kwargs):
"""Yolo Prediction Head initialization function.
Args:
min_level: `int`, the minimum backbone output level.
max_level: `int`, the maximum backbone output level.
classes: `int`, number of classes per category.
boxes_per_level: `int`, number of boxes to predict per level.
output_extras: `int`, number of additional output channels that the head.
should predict for non-object detection and non-image classification
tasks.
norm_momentum: `float`, normalization momentum for the moving average.
norm_epsilon: `float`, small float added to variance to avoid dividing by
zero.
kernel_initializer: kernel_initializer for convolutional layers.
kernel_regularizer: tf.keras.regularizers.Regularizer object for Conv2D.
bias_regularizer: tf.keras.regularizers.Regularizer object for Conv2d.
activation: `str`, the activation function to use typically leaky or mish.
**kwargs: keyword arguments to be passed.
"""
super().__init__(**kwargs)
self._min_level = min_level
self._max_level = max_level
self._key_list = [
str(key) for key in range(self._min_level, self._max_level + 1)
]
self._classes = classes
self._boxes_per_level = boxes_per_level
self._output_extras = output_extras
self._output_conv = (classes + output_extras + 5) * boxes_per_level
self._base_config = dict(
activation=activation,
norm_momentum=norm_momentum,
norm_epsilon=norm_epsilon,
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
bias_regularizer=bias_regularizer)
self._conv_config = dict(
filters=self._output_conv,
kernel_size=(1, 1),
strides=(1, 1),
padding='same',
use_bn=False,
**self._base_config)
def build(self, input_shape):
self._head = dict()
for key in self._key_list:
self._head[key] = nn_blocks.ConvBN(**self._conv_config)
def call(self, inputs):
outputs = dict()
for key in self._key_list:
outputs[key] = self._head[key](inputs[key])
return outputs
@property
def output_depth(self):
return (self._classes + self._output_extras + 5) * self._boxes_per_level
@property
def num_boxes(self):
if self._min_level is None or self._max_level is None:
raise Exception(
'Model has to be built before number of boxes can be determined.')
return (self._max_level - self._min_level + 1) * self._boxes_per_level
def get_config(self):
config = dict(
min_level=self._min_level,
max_level=self._max_level,
classes=self._classes,
boxes_per_level=self._boxes_per_level,
output_extras=self._output_extras,
**self._base_config)
return config
@classmethod
def from_config(cls, config, custom_objects=None):
return cls(**config)
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
"""Tests for yolo heads."""
# Import libraries
from absl.testing import parameterized
import tensorflow as tf
from official.vision.beta.projects.yolo.modeling.heads import yolo_head as heads
class YoloDecoderTest(parameterized.TestCase, tf.test.TestCase):
def test_network_creation(self):
"""Test creation of YOLO family models."""
tf.keras.backend.set_image_data_format('channels_last')
input_shape = {
'3': [1, 52, 52, 256],
'4': [1, 26, 26, 512],
'5': [1, 13, 13, 1024]
}
classes = 100
bps = 3
head = heads.YoloHead(3, 5, classes=classes, boxes_per_level=bps)
inputs = {}
for key in input_shape:
inputs[key] = tf.ones(input_shape[key], dtype=tf.float32)
endpoints = head(inputs)
# print(endpoints)
for key in endpoints.keys():
expected_input_shape = input_shape[key]
expected_input_shape[-1] = (classes + 5) * bps
self.assertAllEqual(endpoints[key].shape.as_list(), expected_input_shape)
def test_serialize_deserialize(self):
# Create a network object that sets all of its config options.
tf.keras.backend.set_image_data_format('channels_last')
input_shape = {
'3': [1, 52, 52, 256],
'4': [1, 26, 26, 512],
'5': [1, 13, 13, 1024]
}
classes = 100
bps = 3
head = heads.YoloHead(3, 5, classes=classes, boxes_per_level=bps)
inputs = {}
for key in input_shape:
inputs[key] = tf.ones(input_shape[key], dtype=tf.float32)
_ = head(inputs)
configs = head.get_config()
head_from_config = heads.YoloHead.from_config(configs)
self.assertAllEqual(head.get_config(), head_from_config.get_config())
if __name__ == '__main__':
tf.test.main()
......@@ -13,81 +13,85 @@
# limitations under the License.
# Lint as: python3
"""Contains common building blocks for yolo neural networks."""
from typing import Callable, List
import tensorflow as tf
from official.modeling import tf_utils
from official.vision.beta.ops import spatial_transform_ops
@tf.keras.utils.register_keras_serializable(package="yolo")
@tf.keras.utils.register_keras_serializable(package='yolo')
class Identity(tf.keras.layers.Layer):
def call(self, inputs):
return inputs
@tf.keras.utils.register_keras_serializable(package="yolo")
@tf.keras.utils.register_keras_serializable(package='yolo')
class ConvBN(tf.keras.layers.Layer):
"""Modified Convolution layer to match that of the DarkNet Library.
"""ConvBN block.
Modified Convolution layer to match that of the Darknet Library.
The Layer is a standards combination of Conv BatchNorm Activation,
however, the use of bias in the conv is determined by the use of batch norm.
however, the use of bias in the conv is determined by the use of batch
normalization.
Cross Stage Partial networks (CSPNets) were proposed in:
[1] Chien-Yao Wang, Hong-Yuan Mark Liao, I-Hau Yeh, Yueh-Hua Wu, Ping-Yang
Chen, Jun-Wei Hsieh.
CSPNet: A New Backbone that can Enhance Learning Capability of CNN.
arXiv:1911.11929
[1] Chien-Yao Wang, Hong-Yuan Mark Liao, I-Hau Yeh, Yueh-Hua Wu,
Ping-Yang Chen, Jun-Wei Hsieh
CSPNet: A New Backbone that can Enhance Learning Capability of CNN.
arXiv:1911.11929
"""
def __init__(self,
filters=1,
kernel_size=(1, 1),
strides=(1, 1),
padding="same",
padding='same',
dilation_rate=(1, 1),
kernel_initializer="glorot_uniform",
bias_initializer="zeros",
kernel_regularizer=None,
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
bias_regularizer=None,
kernel_regularizer=None,
use_bn=True,
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
activation="leaky",
activation='leaky',
leaky_alpha=0.1,
**kwargs):
"""Initializes ConvBN layer.
"""ConvBN initializer.
Args:
filters: integer for output depth, or the number of features to learn
filters: integer for output depth, or the number of features to learn.
kernel_size: integer or tuple for the shape of the weight matrix or kernel
to learn.
strides: integer of tuple how much to move the kernel after each kernel
use padding: string 'valid' or 'same', if same, then pad the image, else
do not.
padding: `str`, padding method for conv layers.
use.
padding: string 'valid' or 'same', if same, then pad the image, else do
not.
dilation_rate: tuple to indicate how much to modulate kernel weights and
how many pixels in a feature map to skip.
how many pixels in a feature map to skip.
kernel_initializer: string to indicate which function to use to initialize
weights.
bias_initializer: string to indicate which function to use to initialize
bias.
kernel_regularizer: string to indicate which function to use to
regularizer weights.
bias_regularizer: string to indicate which function to use to regularizer
bias.
kernel_regularizer: string to indicate which function to use to
regularizer weights.
use_bn: boolean for whether to use batch normalization.
use_sync_bn: boolean for whether sync batch normalization.
norm_momentum: float for moment to use for batch normalization
norm_epsilon: float for batch normalization epsilon
use_sync_bn: boolean for whether sync batch normalization statistics
of all batch norm layers to the models global statistics
(across all input batches).
norm_momentum: float for moment to use for batch normalization.
norm_epsilon: float for batch normalization epsilon.
activation: string or None for activation function to use in layer,
if None activation is replaced by linear.
if None activation is replaced by linear.
leaky_alpha: float to use as alpha if activation function is leaky.
**kwargs: Keyword Arguments
**kwargs: Keyword Arguments.
"""
# convolution params
self._filters = filters
self._kernel_size = kernel_size
......@@ -97,15 +101,16 @@ class ConvBN(tf.keras.layers.Layer):
self._kernel_initializer = kernel_initializer
self._bias_initializer = bias_initializer
self._kernel_regularizer = kernel_regularizer
self._bias_regularizer = bias_regularizer
# batch normalization params
self._use_bn = use_bn
self._use_sync_bn = use_sync_bn
self._norm_moment = norm_momentum
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
if tf.keras.backend.image_data_format() == "channels_last":
if tf.keras.backend.image_data_format() == 'channels_last':
# format: (batch_size, height, width, channels)
self._bn_axis = -1
else:
......@@ -116,7 +121,7 @@ class ConvBN(tf.keras.layers.Layer):
self._activation = activation
self._leaky_alpha = leaky_alpha
super(ConvBN, self).__init__(**kwargs)
super().__init__(**kwargs)
def build(self, input_shape):
use_bias = not self._use_bn
......@@ -136,101 +141,103 @@ class ConvBN(tf.keras.layers.Layer):
if self._use_bn:
if self._use_sync_bn:
self.bn = tf.keras.layers.experimental.SyncBatchNormalization(
momentum=self._norm_moment,
momentum=self._norm_momentum,
epsilon=self._norm_epsilon,
axis=self._bn_axis)
else:
self.bn = tf.keras.layers.BatchNormalization(
momentum=self._norm_moment,
momentum=self._norm_momentum,
epsilon=self._norm_epsilon,
axis=self._bn_axis)
else:
self.bn = Identity()
if self._activation == "leaky":
if self._activation == 'leaky':
self._activation_fn = tf.keras.layers.LeakyReLU(alpha=self._leaky_alpha)
elif self._activation == "mish":
elif self._activation == 'mish':
self._activation_fn = lambda x: x * tf.math.tanh(tf.math.softplus(x))
else:
self._activation_fn = tf_utils.get_activation(self._activation)
def call(self, x):
x = self.conv(x)
x = self.bn(x)
if self._use_bn:
x = self.bn(x)
x = self._activation_fn(x)
return x
def get_config(self):
# used to store/share parameters to reconstruct the model
layer_config = {
"filters": self._filters,
"kernel_size": self._kernel_size,
"strides": self._strides,
"padding": self._padding,
"dilation_rate": self._dilation_rate,
"kernel_initializer": self._kernel_initializer,
"bias_initializer": self._bias_initializer,
"bias_regularizer": self._bias_regularizer,
"kernel_regularizer": self._kernel_regularizer,
"use_bn": self._use_bn,
"use_sync_bn": self._use_sync_bn,
"norm_moment": self._norm_moment,
"norm_epsilon": self._norm_epsilon,
"activation": self._activation,
"leaky_alpha": self._leaky_alpha
'filters': self._filters,
'kernel_size': self._kernel_size,
'strides': self._strides,
'padding': self._padding,
'dilation_rate': self._dilation_rate,
'kernel_initializer': self._kernel_initializer,
'bias_initializer': self._bias_initializer,
'bias_regularizer': self._bias_regularizer,
'kernel_regularizer': self._kernel_regularizer,
'use_bn': self._use_bn,
'use_sync_bn': self._use_sync_bn,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epsilon,
'activation': self._activation,
'leaky_alpha': self._leaky_alpha
}
layer_config.update(super(ConvBN, self).get_config())
layer_config.update(super().get_config())
return layer_config
def __repr__(self):
return repr(self.get_config())
@tf.keras.utils.register_keras_serializable(package="yolo")
@tf.keras.utils.register_keras_serializable(package='yolo')
class DarkResidual(tf.keras.layers.Layer):
"""DarkNet block with Residual connection for Yolo v3 Backbone.
"""
"""Darknet block with Residual connection for Yolo v3 Backbone."""
def __init__(self,
filters=1,
filter_scale=2,
kernel_initializer="glorot_uniform",
bias_initializer="zeros",
dilation_rate=1,
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
kernel_regularizer=None,
bias_regularizer=None,
use_bn=True,
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
activation="leaky",
activation='leaky',
leaky_alpha=0.1,
sc_activation="linear",
sc_activation='linear',
downsample=False,
**kwargs):
"""Initializes DarkResidual.
"""Dark Residual initializer.
Args:
filters: integer for output depth, or the number of features to learn.
filter_scale: `int`, scale factor for number of filters.
filter_scale: `int` for filter scale.
dilation_rate: tuple to indicate how much to modulate kernel weights and
how many pixels in a feature map to skip.
kernel_initializer: string to indicate which function to use to initialize
weights
weights.
bias_initializer: string to indicate which function to use to initialize
bias
bias.
kernel_regularizer: string to indicate which function to use to
regularizer weights
regularizer weights.
bias_regularizer: string to indicate which function to use to regularizer
bias
use_bn: boolean for whether to use batch normalization
use_sync_bn: boolean for whether sync batch normalization.
norm_momentum: float for moment to use for batch normalization
norm_epsilon: float for batch normalization epsilon
activation: string for activation function to use in conv layers.
leaky_alpha: float to use as alpha if activation function is leaky
sc_activation: string for activation function to use in layer
bias.
use_bn: boolean for whether to use batch normalization.
use_sync_bn: boolean for whether sync batch normalization statistics.
of all batch norm layers to the models global statistics
(across all input batches).
norm_momentum: float for moment to use for batch normalization.
norm_epsilon: float for batch normalization epsilon.
activation: string or None for activation function to use in layer,
if None activation is replaced by linear.
leaky_alpha: float to use as alpha if activation function is leaky.
sc_activation: string for activation function to use in layer.
downsample: boolean for if image input is larger than layer output, set
downsample to True so the dimensions are forced to match
**kwargs: Keyword Arguments
downsample to True so the dimensions are forced to match.
**kwargs: Keyword Arguments.
"""
# downsample
self._downsample = downsample
......@@ -245,8 +252,10 @@ class DarkResidual(tf.keras.layers.Layer):
self._kernel_regularizer = kernel_regularizer
# normal params
self._norm_moment = norm_momentum
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
self._dilation_rate = dilation_rate if isinstance(dilation_rate,
int) else dilation_rate[0]
# activation params
self._conv_activation = activation
......@@ -256,138 +265,152 @@ class DarkResidual(tf.keras.layers.Layer):
super().__init__(**kwargs)
def build(self, input_shape):
self._dark_conv_args = {
"kernel_initializer": self._kernel_initializer,
"bias_initializer": self._bias_initializer,
"bias_regularizer": self._bias_regularizer,
"use_bn": self._use_bn,
"use_sync_bn": self._use_sync_bn,
"norm_momentum": self._norm_moment,
"norm_epsilon": self._norm_epsilon,
"activation": self._conv_activation,
"kernel_regularizer": self._kernel_regularizer,
"leaky_alpha": self._leaky_alpha
dark_conv_args = {
'kernel_initializer': self._kernel_initializer,
'bias_initializer': self._bias_initializer,
'bias_regularizer': self._bias_regularizer,
'use_bn': self._use_bn,
'use_sync_bn': self._use_sync_bn,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epsilon,
'activation': self._conv_activation,
'kernel_regularizer': self._kernel_regularizer,
'leaky_alpha': self._leaky_alpha
}
if self._downsample:
if self._dilation_rate > 1:
dilation_rate = 1
if self._dilation_rate // 2 > 0:
dilation_rate = self._dilation_rate // 2
down_stride = 1
else:
dilation_rate = 1
down_stride = 2
self._dconv = ConvBN(
filters=self._filters,
kernel_size=(3, 3),
strides=(2, 2),
padding="same",
**self._dark_conv_args)
else:
self._dconv = Identity()
strides=down_stride,
dilation_rate=dilation_rate,
padding='same',
**dark_conv_args)
self._conv1 = ConvBN(
filters=self._filters // self._filter_scale,
kernel_size=(1, 1),
strides=(1, 1),
padding="same",
**self._dark_conv_args)
padding='same',
**dark_conv_args)
self._conv2 = ConvBN(
filters=self._filters,
kernel_size=(3, 3),
strides=(1, 1),
padding="same",
**self._dark_conv_args)
dilation_rate=self._dilation_rate,
padding='same',
**dark_conv_args)
self._shortcut = tf.keras.layers.Add()
if self._sc_activation == "leaky":
self._activation_fn = tf.keras.layers.LeakyReLU(
alpha=self._leaky_alpha)
elif self._sc_activation == "mish":
if self._sc_activation == 'leaky':
self._activation_fn = tf.keras.layers.LeakyReLU(alpha=self._leaky_alpha)
elif self._sc_activation == 'mish':
self._activation_fn = lambda x: x * tf.math.tanh(tf.math.softplus(x))
else:
self._activation_fn = tf_utils.get_activation(self._sc_activation)
self._activation_fn = tf_utils.get_activation(
self._sc_activation
)
super().build(input_shape)
def call(self, inputs):
shortcut = self._dconv(inputs)
x = self._conv1(shortcut)
def call(self, inputs, training=None):
if self._downsample:
inputs = self._dconv(inputs)
x = self._conv1(inputs)
x = self._conv2(x)
x = self._shortcut([x, shortcut])
x = self._shortcut([x, inputs])
return self._activation_fn(x)
def get_config(self):
# used to store/share parameters to reconstruct the model
layer_config = {
"filters": self._filters,
"kernel_initializer": self._kernel_initializer,
"bias_initializer": self._bias_initializer,
"kernel_regularizer": self._kernel_regularizer,
"use_bn": self._use_bn,
"use_sync_bn": self._use_sync_bn,
"norm_moment": self._norm_moment,
"norm_epsilon": self._norm_epsilon,
"activation": self._conv_activation,
"leaky_alpha": self._leaky_alpha,
"sc_activation": self._sc_activation,
"downsample": self._downsample
'filters': self._filters,
'kernel_initializer': self._kernel_initializer,
'bias_initializer': self._bias_initializer,
'kernel_regularizer': self._kernel_regularizer,
'dilation_rate': self._dilation_rate,
'use_bn': self._use_bn,
'use_sync_bn': self._use_sync_bn,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epsilon,
'activation': self._conv_activation,
'leaky_alpha': self._leaky_alpha,
'sc_activation': self._sc_activation,
'downsample': self._downsample,
}
layer_config.update(super().get_config())
return layer_config
@tf.keras.utils.register_keras_serializable(package="yolo")
@tf.keras.utils.register_keras_serializable(package='yolo')
class CSPTiny(tf.keras.layers.Layer):
"""A Small size convolution block proposed in the CSPNet.
The layer uses shortcuts, routing(concatnation), and feature grouping
in order to improve gradient variablity and allow for high efficency, low
power residual learning for small networtf.keras.
"""CSP Tiny layer.
A Small size convolution block proposed in the CSPNet. The layer uses
shortcuts, routing(concatnation), and feature grouping in order to improve
gradient variablity and allow for high efficency, low power residual learning
for small networtf.keras.
Cross Stage Partial networks (CSPNets) were proposed in:
[1] Chien-Yao Wang, Hong-Yuan Mark Liao, I-Hau Yeh, Yueh-Hua Wu, Ping-Yang
Chen, Jun-Wei Hsieh
[1] Chien-Yao Wang, Hong-Yuan Mark Liao, I-Hau Yeh, Yueh-Hua Wu,
Ping-Yang Chen, Jun-Wei Hsieh
CSPNet: A New Backbone that can Enhance Learning Capability of CNN.
arXiv:1911.11929
arXiv:1911.11929
"""
def __init__(self,
filters=1,
kernel_initializer="glorot_uniform",
bias_initializer="zeros",
kernel_regularizer=None,
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
bias_regularizer=None,
kernel_regularizer=None,
use_bn=True,
dilation_rate=1,
use_sync_bn=False,
group_id=1,
groups=2,
norm_momentum=0.99,
norm_epsilon=0.001,
activation="leaky",
activation='leaky',
downsample=True,
leaky_alpha=0.1,
**kwargs):
"""Initializes CSPTiny.
"""Initializer for CSPTiny block.
Args:
filters: integer for output depth, or the number of features to learn
filters: integer for output depth, or the number of features to learn.
kernel_initializer: string to indicate which function to use to initialize
weights
weights.
bias_initializer: string to indicate which function to use to initialize
bias
kernel_regularizer: string to indicate which function to use to
regularizer weights
bias.
bias_regularizer: string to indicate which function to use to regularizer
bias
use_bn: boolean for whether to use batch normalization
use_sync_bn: boolean for whether sync batch normalization statistics of
all batch norm layers to the models global statistics (across all input
batches)
group_id: integer for which group of features to pass through the csp tiny
stack.
bias.
kernel_regularizer: string to indicate which function to use to
regularizer weights.
use_bn: boolean for whether to use batch normalization.
dilation_rate: `int`, dilation rate for conv layers.
use_sync_bn: boolean for whether sync batch normalization statistics
of all batch norm layers to the models global statistics
(across all input batches).
group_id: integer for which group of features to pass through the csp
tiny stack.
groups: integer for how many splits there should be in the convolution
feature stack output
norm_momentum: float for moment to use for batch normalization
norm_epsilon: float for batch normalization epsilon
feature stack output.
norm_momentum: float for moment to use for batch normalization.
norm_epsilon: float for batch normalization epsilon.
activation: string or None for activation function to use in layer,
if None activation is replaced by linear
if None activation is replaced by linear.
downsample: boolean for if image input is larger than layer output, set
downsample to True so the dimensions are forced to match
leaky_alpha: float to use as alpha if activation function is leaky
**kwargs: Keyword Arguments
downsample to True so the dimensions are forced to match.
leaky_alpha: float to use as alpha if activation function is leaky.
**kwargs: Keyword Arguments.
"""
# ConvBN params
......@@ -396,6 +419,7 @@ class CSPTiny(tf.keras.layers.Layer):
self._bias_initializer = bias_initializer
self._bias_regularizer = bias_regularizer
self._use_bn = use_bn
self._dilation_rate = dilation_rate
self._use_sync_bn = use_sync_bn
self._kernel_regularizer = kernel_regularizer
self._groups = groups
......@@ -403,7 +427,7 @@ class CSPTiny(tf.keras.layers.Layer):
self._downsample = downsample
# normal params
self._norm_moment = norm_momentum
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
# activation params
......@@ -413,37 +437,37 @@ class CSPTiny(tf.keras.layers.Layer):
super().__init__(**kwargs)
def build(self, input_shape):
self._dark_conv_args = {
"kernel_initializer": self._kernel_initializer,
"bias_initializer": self._bias_initializer,
"bias_regularizer": self._bias_regularizer,
"use_bn": self._use_bn,
"use_sync_bn": self._use_sync_bn,
"norm_momentum": self._norm_moment,
"norm_epsilon": self._norm_epsilon,
"activation": self._conv_activation,
"kernel_regularizer": self._kernel_regularizer,
"leaky_alpha": self._leaky_alpha
dark_conv_args = {
'kernel_initializer': self._kernel_initializer,
'bias_initializer': self._bias_initializer,
'bias_regularizer': self._bias_regularizer,
'use_bn': self._use_bn,
'use_sync_bn': self._use_sync_bn,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epsilon,
'activation': self._conv_activation,
'kernel_regularizer': self._kernel_regularizer,
'leaky_alpha': self._leaky_alpha
}
self._convlayer1 = ConvBN(
filters=self._filters,
kernel_size=(3, 3),
strides=(1, 1),
padding="same",
**self._dark_conv_args)
padding='same',
**dark_conv_args)
self._convlayer2 = ConvBN(
filters=self._filters // 2,
kernel_size=(3, 3),
strides=(1, 1),
padding="same",
padding='same',
kernel_initializer=self._kernel_initializer,
bias_initializer=self._bias_initializer,
bias_regularizer=self._bias_regularizer,
kernel_regularizer=self._kernel_regularizer,
use_bn=self._use_bn,
use_sync_bn=self._use_sync_bn,
norm_momentum=self._norm_moment,
norm_momentum=self._norm_momentum,
norm_epsilon=self._norm_epsilon,
activation=self._conv_activation,
leaky_alpha=self._leaky_alpha)
......@@ -452,22 +476,23 @@ class CSPTiny(tf.keras.layers.Layer):
filters=self._filters // 2,
kernel_size=(3, 3),
strides=(1, 1),
padding="same",
**self._dark_conv_args)
padding='same',
**dark_conv_args)
self._convlayer4 = ConvBN(
filters=self._filters,
kernel_size=(1, 1),
strides=(1, 1),
padding="same",
**self._dark_conv_args)
padding='same',
**dark_conv_args)
self._maxpool = tf.keras.layers.MaxPool2D(
pool_size=2, strides=2, padding="same", data_format=None)
if self._downsample:
self._maxpool = tf.keras.layers.MaxPool2D(
pool_size=2, strides=2, padding='same', data_format=None)
super().build(input_shape)
def call(self, inputs):
def call(self, inputs, training=None):
x1 = self._convlayer1(inputs)
x1_group = tf.split(x1, self._groups, axis=-1)[self._group_id]
x2 = self._convlayer2(x1_group) # grouping
......@@ -479,276 +504,303 @@ class CSPTiny(tf.keras.layers.Layer):
x = self._maxpool(x)
return x, x5
def get_config(self):
# used to store/share parameters to reconsturct the model
layer_config = {
"filters": self._filters,
"strides": self._strides,
"kernel_initializer": self._kernel_initializer,
"bias_initializer": self._bias_initializer,
"kernel_regularizer": self._kernel_regularizer,
"use_bn": self._use_bn,
"use_sync_bn": self._use_sync_bn,
"norm_moment": self._norm_moment,
"norm_epsilon": self._norm_epsilon,
"activation": self._conv_activation,
"leaky_alpha": self._leaky_alpha,
"sc_activation": self._sc_activation,
}
layer_config.update(super().get_config())
return layer_config
@tf.keras.utils.register_keras_serializable(package="yolo")
@tf.keras.utils.register_keras_serializable(package='yolo')
class CSPRoute(tf.keras.layers.Layer):
"""Down sampling layer to take the place of down sampleing.
It is applied in Residual networks. This is the first of 2 layers needed to
convert any Residual Network model to a CSPNet. At the start of a new level
change, this CSPRoute layer creates a learned identity that will act as a
cross stage connection, that is used to inform the inputs to the next stage.
It is called cross stage partial because the number of filters required in
every intermitent Residual layer is reduced by half. The sister layer will
take the partial generated by this layer and concatnate it with the output of
the final residual layer in the stack to create a fully feature level output.
This concatnation merges the partial blocks of 2 levels as input to the next
allowing the gradients of each level to be more unique, and reducing the
number of parameters required by each level by 50% while keeping accuracy
consistent.
"""CSPRoute block.
Down sampling layer to take the place of down sampleing done in Residual
networks. This is the first of 2 layers needed to convert any Residual Network
model to a CSPNet. At the start of a new level change, this CSPRoute layer
creates a learned identity that will act as a cross stage connection,
that is used to inform the inputs to the next stage. It is called cross stage
partial because the number of filters required in every intermitent Residual
layer is reduced by half. The sister layer will take the partial generated by
this layer and concatnate it with the output of the final residual layer in
the stack to create a fully feature level output. This concatnation merges the
partial blocks of 2 levels as input to the next allowing the gradients of each
level to be more unique, and reducing the number of parameters required by
each level by 50% while keeping accuracy consistent.
Cross Stage Partial networks (CSPNets) were proposed in:
[1] Chien-Yao Wang, Hong-Yuan Mark Liao, I-Hau Yeh, Yueh-Hua Wu, Ping-Yang
Chen, Jun-Wei Hsieh.
[1] Chien-Yao Wang, Hong-Yuan Mark Liao, I-Hau Yeh, Yueh-Hua Wu,
Ping-Yang Chen, Jun-Wei Hsieh
CSPNet: A New Backbone that can Enhance Learning Capability of CNN.
arXiv:1911.11929
arXiv:1911.11929
"""
def __init__(self,
filters,
filter_scale=2,
activation="mish",
downsample=True,
kernel_initializer="glorot_uniform",
bias_initializer="zeros",
kernel_regularizer=None,
activation='mish',
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
bias_regularizer=None,
kernel_regularizer=None,
dilation_rate=1,
use_bn=True,
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
downsample=True,
leaky_alpha=0.1,
**kwargs):
"""Initializes CSPRoute.
"""CSPRoute layer initializer.
Args:
filters: integer for output depth, or the number of features to learn
filter_scale: integer dicating (filters//2) or the number of filters in
the partial feature stack.
activation: string for activation function to use in layer
downsample: down_sample the input.
kernel_initializer: string to indicate which function to use to initialize
weights.
activation: string for activation function to use in layer.
kernel_initializer: string to indicate which function to use to
initialize weights.
bias_initializer: string to indicate which function to use to initialize
bias.
kernel_regularizer: string to indicate which function to use to
regularizer weights.
bias_regularizer: string to indicate which function to use to regularizer
bias.
kernel_regularizer: string to indicate which function to use to
regularizer weights.
dilation_rate: dilation rate for conv layers.
use_bn: boolean for whether to use batch normalization.
use_sync_bn: boolean for whether sync batch normalization.
norm_momentum: float for moment to use for batch normalization
norm_epsilon: float for batch normalization epsilon
**kwargs: Keyword Arguments
use_sync_bn: boolean for whether sync batch normalization statistics
of all batch norm layers to the models global statistics
(across all input batches).
norm_momentum: float for moment to use for batch normalization.
norm_epsilon: float for batch normalization epsilon.
downsample: down_sample the input.
leaky_alpha: `float`, for leaky alpha value.
**kwargs: Keyword Arguments.
"""
super().__init__(**kwargs)
# Layer params.
# layer params
self._filters = filters
self._filter_scale = filter_scale
self._activation = activation
# Convoultion params.
# convoultion params
self._kernel_initializer = kernel_initializer
self._bias_initializer = bias_initializer
self._kernel_regularizer = kernel_regularizer
self._bias_regularizer = bias_regularizer
self._dilation_rate = dilation_rate
self._use_bn = use_bn
self._use_sync_bn = use_sync_bn
self._norm_moment = norm_momentum
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
self._downsample = downsample
self._leaky_alpha = leaky_alpha
def build(self, input_shape):
self._dark_conv_args = {
"kernel_initializer": self._kernel_initializer,
"bias_initializer": self._bias_initializer,
"bias_regularizer": self._bias_regularizer,
"use_bn": self._use_bn,
"use_sync_bn": self._use_sync_bn,
"norm_momentum": self._norm_moment,
"norm_epsilon": self._norm_epsilon,
"activation": self._activation,
"kernel_regularizer": self._kernel_regularizer,
dark_conv_args = {
'kernel_initializer': self._kernel_initializer,
'bias_initializer': self._bias_initializer,
'bias_regularizer': self._bias_regularizer,
'use_bn': self._use_bn,
'use_sync_bn': self._use_sync_bn,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epsilon,
'activation': self._activation,
'kernel_regularizer': self._kernel_regularizer,
'leaky_alpha': self._leaky_alpha,
}
if self._downsample:
self._conv1 = ConvBN(filters=self._filters,
kernel_size=(3, 3),
strides=(2, 2),
**self._dark_conv_args)
else:
self._conv1 = ConvBN(filters=self._filters,
kernel_size=(3, 3),
strides=(1, 1),
**self._dark_conv_args)
self._conv2 = ConvBN(filters=self._filters // self._filter_scale,
kernel_size=(1, 1),
strides=(1, 1),
**self._dark_conv_args)
self._conv3 = ConvBN(filters=self._filters // self._filter_scale,
kernel_size=(1, 1),
strides=(1, 1),
**self._dark_conv_args)
if self._dilation_rate > 1:
dilation_rate = 1
if self._dilation_rate // 2 > 0:
dilation_rate = self._dilation_rate // 2
down_stride = 1
else:
dilation_rate = 1
down_stride = 2
def call(self, inputs):
x = self._conv1(inputs)
y = self._conv2(x)
x = self._conv3(x)
self._conv1 = ConvBN(
filters=self._filters,
kernel_size=(3, 3),
strides=down_stride,
dilation_rate=dilation_rate,
**dark_conv_args)
self._conv2 = ConvBN(
filters=self._filters // self._filter_scale,
kernel_size=(1, 1),
strides=(1, 1),
**dark_conv_args)
self._conv3 = ConvBN(
filters=self._filters // self._filter_scale,
kernel_size=(1, 1),
strides=(1, 1),
**dark_conv_args)
def call(self, inputs, training=None):
if self._downsample:
inputs = self._conv1(inputs)
y = self._conv2(inputs)
x = self._conv3(inputs)
return (x, y)
@tf.keras.utils.register_keras_serializable(package="yolo")
@tf.keras.utils.register_keras_serializable(package='yolo')
class CSPConnect(tf.keras.layers.Layer):
"""Sister Layer to the CSPRoute layer.
Merges the partial feature stacks generated by the CSPDownsampling layer,
and the finaly output of the residual stack. Suggested in the CSPNet paper.
"""CSPConnect block.
Sister Layer to the CSPRoute layer. Merges the partial feature stacks
generated by the CSPDownsampling layer, and the finaly output of the
residual stack. Suggested in the CSPNet paper.
Cross Stage Partial networks (CSPNets) were proposed in:
[1] Chien-Yao Wang, Hong-Yuan Mark Liao, I-Hau Yeh, Yueh-Hua Wu, Ping-Yang
Chen, Jun-Wei Hsieh.
[1] Chien-Yao Wang, Hong-Yuan Mark Liao, I-Hau Yeh, Yueh-Hua Wu,
Ping-Yang Chen, Jun-Wei Hsieh
CSPNet: A New Backbone that can Enhance Learning Capability of CNN.
arXiv:1911.11929
arXiv:1911.11929
"""
def __init__(self,
filters,
filter_scale=2,
activation="mish",
kernel_initializer="glorot_uniform",
bias_initializer="zeros",
kernel_regularizer=None,
drop_final=False,
drop_first=False,
activation='mish',
kernel_size=(1, 1),
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
bias_regularizer=None,
kernel_regularizer=None,
dilation_rate=1,
use_bn=True,
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
leaky_alpha=0.1,
**kwargs):
"""Initializes CSPConnect.
"""Initializer for CSPConnect block.
Args:
filters: integer for output depth, or the number of features to learn.
filters: integer for output depth, or the number of features to learn
filter_scale: integer dicating (filters//2) or the number of filters in
the partial feature stack.
drop_final: `bool`, whether to drop final conv layer.
drop_first: `bool`, whether to drop first conv layer.
activation: string for activation function to use in layer.
kernel_size: `Tuple`, kernel size for conv layers.
kernel_initializer: string to indicate which function to use to initialize
weights.
bias_initializer: string to indicate which function to use to initialize
bias.
kernel_regularizer: string to indicate which function to use to
regularizer weights.
bias_regularizer: string to indicate which function to use to regularizer
bias.
kernel_regularizer: string to indicate which function to use to
regularizer weights.
dilation_rate: `int`, dilation rate for conv layers.
use_bn: boolean for whether to use batch normalization.
use_sync_bn: boolean for whether sync batch normalization.
norm_momentum: float for moment to use for batch normalization
norm_epsilon: float for batch normalization epsilon
**kwargs: Keyword Arguments
use_sync_bn: boolean for whether sync batch normalization statistics
of all batch norm layers to the models global
statistics (across all input batches).
norm_momentum: float for moment to use for batch normalization.
norm_epsilon: float for batch normalization epsilon.
leaky_alpha: `float`, for leaky alpha value.
**kwargs: Keyword Arguments.
"""
super().__init__(**kwargs)
# layer params.
# layer params
self._filters = filters
self._filter_scale = filter_scale
self._activation = activation
# Convoultion params.
# convoultion params
self._kernel_size = kernel_size
self._kernel_initializer = kernel_initializer
self._bias_initializer = bias_initializer
self._kernel_regularizer = kernel_regularizer
self._bias_regularizer = bias_regularizer
self._use_bn = use_bn
self._use_sync_bn = use_sync_bn
self._norm_moment = norm_momentum
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
self._drop_final = drop_final
self._drop_first = drop_first
self._leaky_alpha = leaky_alpha
def build(self, input_shape):
self._dark_conv_args = {
"kernel_initializer": self._kernel_initializer,
"bias_initializer": self._bias_initializer,
"bias_regularizer": self._bias_regularizer,
"use_bn": self._use_bn,
"use_sync_bn": self._use_sync_bn,
"norm_momentum": self._norm_moment,
"norm_epsilon": self._norm_epsilon,
"activation": self._activation,
"kernel_regularizer": self._kernel_regularizer,
dark_conv_args = {
'kernel_initializer': self._kernel_initializer,
'bias_initializer': self._bias_initializer,
'bias_regularizer': self._bias_regularizer,
'use_bn': self._use_bn,
'use_sync_bn': self._use_sync_bn,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epsilon,
'activation': self._activation,
'kernel_regularizer': self._kernel_regularizer,
'leaky_alpha': self._leaky_alpha,
}
self._conv1 = ConvBN(filters=self._filters // self._filter_scale,
kernel_size=(1, 1),
strides=(1, 1),
**self._dark_conv_args)
if not self._drop_first:
self._conv1 = ConvBN(
filters=self._filters // self._filter_scale,
kernel_size=self._kernel_size,
strides=(1, 1),
**dark_conv_args)
self._concat = tf.keras.layers.Concatenate(axis=-1)
self._conv2 = ConvBN(filters=self._filters,
kernel_size=(1, 1),
strides=(1, 1),
**self._dark_conv_args)
def call(self, inputs):
if not self._drop_final:
self._conv2 = ConvBN(
filters=self._filters,
kernel_size=(1, 1),
strides=(1, 1),
**dark_conv_args)
def call(self, inputs, training=None):
x_prev, x_csp = inputs
x = self._conv1(x_prev)
x = self._concat([x, x_csp])
x = self._conv2(x)
if not self._drop_first:
x_prev = self._conv1(x_prev)
x = self._concat([x_prev, x_csp])
# skipped if drop final is true
if not self._drop_final:
x = self._conv2(x)
return x
class CSPStack(tf.keras.layers.Layer):
"""CSP full stack.
Combines the route and the connect in case you dont want to just quickly wrap
an existing callable or list of layers to make it a cross stage partial.
Added for ease of use. you should be able to wrap any layer stack with a CSP
independent of wether it belongs to the Darknet family. if filter_scale = 2,
then the blocks in the stack passed into the the CSP stack should also have
filters = filters/filter_scale.
"""CSP Stack layer.
CSP full stack, combines the route and the connect in case you dont want to
jsut quickly wrap an existing callable or list of layers to
make it a cross stage partial. Added for ease of use. you should be able
to wrap any layer stack with a CSP independent of wether it belongs
to the Darknet family. if filter_scale = 2, then the blocks in the stack
passed into the the CSP stack should also have filters = filters/filter_scale
Cross Stage Partial networks (CSPNets) were proposed in:
[1] Chien-Yao Wang, Hong-Yuan Mark Liao, I-Hau Yeh, Yueh-Hua Wu, Ping-Yang
Chen, Jun-Wei Hsieh
[1] Chien-Yao Wang, Hong-Yuan Mark Liao, I-Hau Yeh, Yueh-Hua Wu,
Ping-Yang Chen, Jun-Wei Hsieh
CSPNet: A New Backbone that can Enhance Learning Capability of CNN.
arXiv:1911.11929
arXiv:1911.11929
"""
def __init__(self,
filters,
model_to_wrap=None,
filter_scale=2,
activation="mish",
kernel_initializer="glorot_uniform",
bias_initializer="zeros",
kernel_regularizer=None,
activation='mish',
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
bias_regularizer=None,
kernel_regularizer=None,
downsample=True,
use_bn=True,
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
**kwargs):
"""Initializes CSPStack.
"""CSPStack layer initializer.
Args:
filters: integer for output depth, or the number of features to learn.
model_to_wrap: callable Model or a list of callable objects that will
process the output of CSPRoute, and be input into CSPConnect. List will
be called sequentially.
process the output of CSPRoute, and be input into CSPConnect.
list will be called sequentially.
filter_scale: integer dicating (filters//2) or the number of filters in
the partial feature stack.
activation: string for activation function to use in layer.
......@@ -756,66 +808,829 @@ class CSPStack(tf.keras.layers.Layer):
weights.
bias_initializer: string to indicate which function to use to initialize
bias.
kernel_regularizer: string to indicate which function to use to
regularizer weights.
bias_regularizer: string to indicate which function to use to regularizer
bias.
kernel_regularizer: string to indicate which function to use to
regularizer weights.
downsample: down_sample the input.
use_bn: boolean for whether to use batch normalization
use_sync_bn: boolean for whether sync batch normalization.
norm_momentum: float for moment to use for batch normalization
norm_epsilon: float for batch normalization epsilon
**kwargs: Keyword Arguments
use_bn: boolean for whether to use batch normalization.
use_sync_bn: boolean for whether sync batch normalization statistics
of all batch norm layers to the models global statistics
(across all input batches).
norm_momentum: float for moment to use for batch normalization.
norm_epsilon: float for batch normalization epsilon.
**kwargs: Keyword Arguments.
Raises:
TypeError: model_to_wrap is not a layer or a list of layers
"""
super().__init__(**kwargs)
# Layer params.
# layer params
self._filters = filters
self._filter_scale = filter_scale
self._activation = activation
self._downsample = downsample
# Convoultion params.
# convoultion params
self._kernel_initializer = kernel_initializer
self._bias_initializer = bias_initializer
self._kernel_regularizer = kernel_regularizer
self._bias_regularizer = bias_regularizer
self._use_bn = use_bn
self._use_sync_bn = use_sync_bn
self._norm_moment = norm_momentum
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
if model_to_wrap is not None:
if isinstance(model_to_wrap, Callable):
self._model_to_wrap = [model_to_wrap]
elif isinstance(model_to_wrap, List):
self._model_to_wrap = model_to_wrap
else:
raise ValueError("The input to the CSPStack must be a list of layers"
"that we can iterate through, or \n a callable")
else:
if model_to_wrap is None:
self._model_to_wrap = []
elif isinstance(model_to_wrap, Callable):
self._model_to_wrap = [model_to_wrap]
elif isinstance(model_to_wrap, List):
self._model_to_wrap = model_to_wrap
else:
raise TypeError(
'the input to the CSPStack must be a list of layers that we can' +
'iterate through, or \n a callable')
def build(self, input_shape):
self._dark_conv_args = {
"filters": self._filters,
"filter_scale": self._filter_scale,
"activation": self._activation,
"kernel_initializer": self._kernel_initializer,
"bias_initializer": self._bias_initializer,
"bias_regularizer": self._bias_regularizer,
"use_bn": self._use_bn,
"use_sync_bn": self._use_sync_bn,
"norm_momentum": self._norm_moment,
"norm_epsilon": self._norm_epsilon,
"kernel_regularizer": self._kernel_regularizer,
dark_conv_args = {
'filters': self._filters,
'filter_scale': self._filter_scale,
'activation': self._activation,
'kernel_initializer': self._kernel_initializer,
'bias_initializer': self._bias_initializer,
'bias_regularizer': self._bias_regularizer,
'use_bn': self._use_bn,
'use_sync_bn': self._use_sync_bn,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epsilon,
'kernel_regularizer': self._kernel_regularizer,
}
self._route = CSPRoute(downsample=self._downsample, **self._dark_conv_args)
self._connect = CSPConnect(**self._dark_conv_args)
return
self._route = CSPRoute(downsample=self._downsample, **dark_conv_args)
self._connect = CSPConnect(**dark_conv_args)
def call(self, inputs):
def call(self, inputs, training=None):
x, x_route = self._route(inputs)
for layer in self._model_to_wrap:
x = layer(x)
x = self._connect([x, x_route])
return x
@tf.keras.utils.register_keras_serializable(package='yolo')
class PathAggregationBlock(tf.keras.layers.Layer):
"""Path Aggregation block."""
def __init__(self,
filters=1,
drop_final=True,
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
bias_regularizer=None,
kernel_regularizer=None,
use_bn=True,
use_sync_bn=False,
inverted=False,
norm_momentum=0.99,
norm_epsilon=0.001,
activation='leaky',
leaky_alpha=0.1,
downsample=False,
upsample=False,
upsample_size=2,
**kwargs):
"""Initializer for path aggregation block.
Args:
filters: integer for output depth, or the number of features to learn.
drop_final: do not create the last convolution block.
kernel_initializer: string to indicate which function to use to initialize
weights.
bias_initializer: string to indicate which function to use to initialize
bias.
bias_regularizer: string to indicate which function to use to regularizer
bias.
kernel_regularizer: string to indicate which function to use to
regularizer weights.
use_bn: boolean for whether to use batch normalization.
use_sync_bn: boolean for whether sync batch normalization statistics
of all batch norm layers to the models global statistics
(across all input batches).
inverted: boolean for inverting the order of the convolutions.
norm_momentum: float for moment to use for batch normalization.
norm_epsilon: float for batch normalization epsilon.
activation: string or None for activation function to use in layer,
if None activation is replaced by linear.
leaky_alpha: float to use as alpha if activation function is leaky.
downsample: `bool` for whehter to downwample and merge.
upsample: `bool` for whehter to upsample and merge.
upsample_size: `int` how much to upsample in order to match shapes.
**kwargs: Keyword Arguments.
"""
# Darkconv params
self._filters = filters
self._kernel_initializer = kernel_initializer
self._bias_initializer = bias_initializer
self._bias_regularizer = bias_regularizer
self._kernel_regularizer = kernel_regularizer
self._use_bn = use_bn
self._use_sync_bn = use_sync_bn
# Normal params
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
# Activation params
self._conv_activation = activation
self._leaky_alpha = leaky_alpha
self._downsample = downsample
self._upsample = upsample
self._upsample_size = upsample_size
self._drop_final = drop_final
# Block params
self._inverted = inverted
super().__init__(**kwargs)
def _build_regular(self, input_shape, kwargs):
if self._downsample:
self._conv = ConvBN(
filters=self._filters,
kernel_size=(3, 3),
strides=(2, 2),
padding='same',
**kwargs)
else:
self._conv = ConvBN(
filters=self._filters,
kernel_size=(1, 1),
strides=(1, 1),
padding='same',
**kwargs)
if not self._drop_final:
self._conv_concat = ConvBN(
filters=self._filters,
kernel_size=(1, 1),
strides=(1, 1),
padding='same',
**kwargs)
def _build_reversed(self, input_shape, kwargs):
if self._downsample:
self._conv_prev = ConvBN(
filters=self._filters,
kernel_size=(3, 3),
strides=(2, 2),
padding='same',
**kwargs)
else:
self._conv_prev = ConvBN(
filters=self._filters,
kernel_size=(1, 1),
strides=(1, 1),
padding='same',
**kwargs)
self._conv_route = ConvBN(
filters=self._filters,
kernel_size=(1, 1),
strides=(1, 1),
padding='same',
**kwargs)
if not self._drop_final:
self._conv_sync = ConvBN(
filters=self._filters,
kernel_size=(1, 1),
strides=(1, 1),
padding='same',
**kwargs)
def build(self, input_shape):
dark_conv_args = {
'kernel_initializer': self._kernel_initializer,
'bias_initializer': self._bias_initializer,
'bias_regularizer': self._bias_regularizer,
'use_bn': self._use_bn,
'use_sync_bn': self._use_sync_bn,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epsilon,
'activation': self._conv_activation,
'kernel_regularizer': self._kernel_regularizer,
'leaky_alpha': self._leaky_alpha,
}
if self._inverted:
self._build_reversed(input_shape, dark_conv_args)
else:
self._build_regular(input_shape, dark_conv_args)
self._concat = tf.keras.layers.Concatenate()
super().build(input_shape)
def _call_regular(self, inputs, training=None):
input_to_convolve, input_to_concat = inputs
x_prev = self._conv(input_to_convolve)
if self._upsample:
x_prev = spatial_transform_ops.nearest_upsampling(x_prev,
self._upsample_size)
x = self._concat([x_prev, input_to_concat])
# used in csp conversion
if not self._drop_final:
x = self._conv_concat(x)
return x_prev, x
def _call_reversed(self, inputs, training=None):
x_route, x_prev = inputs
x_prev = self._conv_prev(x_prev)
if self._upsample:
x_prev = spatial_transform_ops.nearest_upsampling(x_prev,
self._upsample_size)
x_route = self._conv_route(x_route)
x = self._concat([x_route, x_prev])
if not self._drop_final:
x = self._conv_sync(x)
return x_prev, x
def call(self, inputs, training=None):
# done this way to prevent confusion in the auto graph
if self._inverted:
return self._call_reversed(inputs, training=training)
else:
return self._call_regular(inputs, training=training)
@tf.keras.utils.register_keras_serializable(package='yolo')
class SPP(tf.keras.layers.Layer):
"""Spatial Pyramid Pooling.
A non-agregated SPP layer that uses Pooling.
"""
def __init__(self, sizes, **kwargs):
self._sizes = list(reversed(sizes))
if not sizes:
raise ValueError('More than one maxpool should be specified in SSP block')
super().__init__(**kwargs)
def build(self, input_shape):
maxpools = []
for size in self._sizes:
maxpools.append(
tf.keras.layers.MaxPool2D(
pool_size=(size, size),
strides=(1, 1),
padding='same',
data_format=None))
self._maxpools = maxpools
super().build(input_shape)
def call(self, inputs, training=None):
outputs = []
for maxpool in self._maxpools:
outputs.append(maxpool(inputs))
outputs.append(inputs)
concat_output = tf.keras.layers.concatenate(outputs)
return concat_output
def get_config(self):
layer_config = {'sizes': self._sizes}
layer_config.update(super().get_config())
return layer_config
class SAM(tf.keras.layers.Layer):
"""Spatial Attention Model.
[1] Sanghyun Woo, Jongchan Park, Joon-Young Lee, In So Kweon
CBAM: Convolutional Block Attention Module. arXiv:1807.06521
implementation of the Spatial Attention Model (SAM)
"""
def __init__(self,
use_pooling=False,
filter_match=False,
filters=1,
kernel_size=(1, 1),
strides=(1, 1),
padding='same',
dilation_rate=(1, 1),
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
bias_regularizer=None,
kernel_regularizer=None,
use_bn=True,
use_sync_bn=True,
norm_momentum=0.99,
norm_epsilon=0.001,
activation='sigmoid',
output_activation=None,
leaky_alpha=0.1,
**kwargs):
# use_pooling
self._use_pooling = use_pooling
self._filters = filters
self._output_activation = output_activation
self._leaky_alpha = leaky_alpha
self.dark_conv_args = {
'kernel_size': kernel_size,
'strides': strides,
'padding': padding,
'dilation_rate': dilation_rate,
'kernel_initializer': kernel_initializer,
'bias_initializer': bias_initializer,
'bias_regularizer': bias_regularizer,
'use_bn': use_bn,
'use_sync_bn': use_sync_bn,
'norm_momentum': norm_momentum,
'norm_epsilon': norm_epsilon,
'activation': activation,
'kernel_regularizer': kernel_regularizer,
'leaky_alpha': leaky_alpha
}
super().__init__(**kwargs)
def build(self, input_shape):
if self._filters == -1:
self._filters = input_shape[-1]
self._conv = ConvBN(filters=self._filters, **self.dark_conv_args)
if self._output_activation == 'leaky':
self._activation_fn = tf.keras.layers.LeakyReLU(alpha=self._leaky_alpha)
elif self._output_activation == 'mish':
self._activation_fn = lambda x: x * tf.math.tanh(tf.math.softplus(x))
else:
self._activation_fn = tf_utils.get_activation(self._output_activation)
def call(self, inputs, training=None):
if self._use_pooling:
depth_max = tf.reduce_max(inputs, axis=-1, keepdims=True)
depth_avg = tf.reduce_mean(inputs, axis=-1, keepdims=True)
input_maps = tf.concat([depth_avg, depth_max], axis=-1)
else:
input_maps = inputs
attention_mask = self._conv(input_maps)
return self._activation_fn(inputs * attention_mask)
class CAM(tf.keras.layers.Layer):
"""Channel Attention Model.
[1] Sanghyun Woo, Jongchan Park, Joon-Young Lee, In So Kweon
CBAM: Convolutional Block Attention Module. arXiv:1807.06521
Implementation of the Channel Attention Model (CAM)
"""
def __init__(self,
reduction_ratio=1.0,
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
bias_regularizer=None,
kernel_regularizer=None,
use_bn=False,
use_sync_bn=False,
use_bias=False,
norm_momentum=0.99,
norm_epsilon=0.001,
mlp_activation='linear',
activation='sigmoid',
leaky_alpha=0.1,
**kwargs):
self._reduction_ratio = reduction_ratio
# use_pooling
if use_sync_bn:
self._bn = tf.keras.layers.experimental.SyncBatchNormalization
else:
self._bn = tf.keras.layers.BatchNormalization
if not use_bn:
self._bn = Identity
self._bn_args = {}
else:
self._bn_args = {
'momentum': norm_momentum,
'epsilon': norm_epsilon,
}
self._mlp_args = {
'use_bias': use_bias,
'kernel_initializer': kernel_initializer,
'bias_initializer': bias_initializer,
'bias_regularizer': bias_regularizer,
'activation': mlp_activation,
'kernel_regularizer': kernel_regularizer,
}
self._leaky_alpha = leaky_alpha
self._activation = activation
super().__init__(**kwargs)
def build(self, input_shape):
self._filters = input_shape[-1]
self._mlp = tf.keras.Sequential([
tf.keras.layers.Dense(self._filters, **self._mlp_args),
self._bn(**self._bn_args),
tf.keras.layers.Dense(
int(self._filters * self._reduction_ratio), **self._mlp_args),
self._bn(**self._bn_args),
tf.keras.layers.Dense(self._filters, **self._mlp_args),
self._bn(**self._bn_args),
])
if self._activation == 'leaky':
self._activation_fn = tf.keras.layers.LeakyReLU(alpha=self._leaky_alpha)
elif self._activation == 'mish':
self._activation_fn = lambda x: x * tf.math.tanh(tf.math.softplus(x))
else:
self._activation_fn = tf_utils.get_activation(self._activation)
def call(self, inputs, training=None):
depth_max = self._mlp(tf.reduce_max(inputs, axis=(1, 2)))
depth_avg = self._mlp(tf.reduce_mean(inputs, axis=(1, 2)))
channel_mask = self._activation_fn(depth_avg + depth_max)
channel_mask = tf.expand_dims(channel_mask, axis=1)
attention_mask = tf.expand_dims(channel_mask, axis=1)
return inputs * attention_mask
class CBAM(tf.keras.layers.Layer):
"""Convolutional Block Attention Module.
[1] Sanghyun Woo, Jongchan Park, Joon-Young Lee, In So Kweon
CBAM: Convolutional Block Attention Module. arXiv:1807.06521
implementation of the Convolution Block Attention Module (CBAM)
"""
def __init__(self,
use_pooling=False,
filters=1,
reduction_ratio=1.0,
kernel_size=(1, 1),
strides=(1, 1),
padding='same',
dilation_rate=(1, 1),
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
bias_regularizer=None,
kernel_regularizer=None,
use_bn=True,
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
mlp_activation=None,
activation='sigmoid',
leaky_alpha=0.1,
**kwargs):
# use_pooling
self._sam_args = {
'use_pooling': use_pooling,
'filters': filters,
'kernel_size': kernel_size,
'strides': strides,
'padding': padding,
'dilation_rate': dilation_rate,
}
self._cam_args = {
'reduction_ratio': reduction_ratio,
'mlp_activation': mlp_activation
}
self._common_args = {
'kernel_initializer': kernel_initializer,
'bias_initializer': bias_initializer,
'bias_regularizer': bias_regularizer,
'use_bn': use_bn,
'use_sync_bn': use_sync_bn,
'norm_momentum': norm_momentum,
'norm_epsilon': norm_epsilon,
'activation': activation,
'kernel_regularizer': kernel_regularizer,
'leaky_alpha': leaky_alpha
}
self._cam_args.update(self._common_args)
self._sam_args.update(self._common_args)
super().__init__(**kwargs)
def build(self, input_shape):
self._cam = CAM(**self._cam_args)
self._sam = SAM(**self._sam_args)
def call(self, inputs, training=None):
return self._sam(self._cam(inputs))
@tf.keras.utils.register_keras_serializable(package='yolo')
class DarkRouteProcess(tf.keras.layers.Layer):
"""Dark Route Process block.
Process darknet outputs and connect back bone to head more generalizably
Abstracts repetition of DarkConv objects that is common in YOLO.
It is used like the following:
x = ConvBN(1024, (3, 3), (1, 1))(x)
proc = DarkRouteProcess(filters = 1024,
repetitions = 3,
insert_spp = False)(x)
"""
def __init__(
self,
filters=2,
repetitions=2,
insert_spp=False,
insert_sam=False,
insert_cbam=False,
csp_stack=0,
csp_scale=2,
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
bias_regularizer=None,
kernel_regularizer=None,
use_sync_bn=False,
norm_momentum=0.99,
norm_epsilon=0.001,
block_invert=False,
activation='leaky',
leaky_alpha=0.1,
spp_keys=None,
**kwargs):
"""DarkRouteProcess initializer.
Args:
filters: the number of filters to be used in all subsequent layers
filters should be the depth of the tensor input into this layer,
as no downsampling can be done within this layer object.
repetitions: number of times to repeat the processign nodes.
for tiny: 1 repition, no spp allowed.
for spp: insert_spp = True, and allow for 6 repetitions.
for regular: insert_spp = False, and allow for 6 repetitions.
insert_spp: bool if true add the spatial pyramid pooling layer.
insert_sam: bool if true add spatial attention module to path.
insert_cbam: bool if true add convolutional block attention
module to path.
csp_stack: int for the number of sequential layers from 0
to <value> you would like to convert into a Cross Stage
Partial(csp) type.
csp_scale: int for how much to down scale the number of filters
only for the csp layers in the csp section of the processing
path. A value 2 indicates that each layer that is int eh CSP
stack will have filters = filters/2.
kernel_initializer: method to use to initialize kernel weights.
bias_initializer: method to use to initialize the bias of the conv
layers.
bias_regularizer: string to indicate which function to use to regularizer
bias.
kernel_regularizer: string to indicate which function to use to
regularizer weights.
use_sync_bn: bool if true use the sync batch normalization.
norm_momentum: batch norm parameter see Tensorflow documentation.
norm_epsilon: batch norm parameter see Tensorflow documentation.
block_invert: bool use for switching between the even and odd
repretions of layers. usually the repetition is based on a
3x3 conv with filters, followed by a 1x1 with filters/2 with
an even number of repetitions to ensure each 3x3 gets a 1x1
sqeeze. block invert swaps the 3x3/1 1x1/2 to a 1x1/2 3x3/1
ordering typically used when the model requires an odd number
of repetiitions. All other peramters maintain their affects
activation: activation function to use in processing.
leaky_alpha: if leaky acitivation function, the alpha to use in
processing the relu input.
spp_keys: List[int] of the sampling levels to be applied by
the Spatial Pyramid Pooling Layer. By default it is
[5, 9, 13] inidicating a 5x5 pooling followed by 9x9
followed by 13x13 then followed by the standard concatnation
and convolution.
**kwargs: Keyword Arguments.
"""
super().__init__(**kwargs)
# darkconv params
self._filters = filters
self._use_sync_bn = use_sync_bn
self._kernel_initializer = kernel_initializer
self._bias_initializer = bias_initializer
self._bias_regularizer = bias_regularizer
self._kernel_regularizer = kernel_regularizer
# normal params
self._norm_momentum = norm_momentum
self._norm_epsilon = norm_epsilon
# activation params
self._activation = activation
self._leaky_alpha = leaky_alpha
repetitions += (2 * int(insert_spp))
if repetitions == 1:
block_invert = True
self._repetitions = repetitions
self.layer_list, self.outputs = self._get_base_layers()
if csp_stack > 0:
self._csp_scale = csp_scale
csp_stack += (2 * int(insert_spp))
self._csp_filters = lambda x: x // csp_scale
self._convert_csp(self.layer_list, self.outputs, csp_stack)
block_invert = False
self._csp_stack = csp_stack
if block_invert:
self._conv1_filters = lambda x: x
self._conv2_filters = lambda x: x // 2
self._conv1_kernel = (3, 3)
self._conv2_kernel = (1, 1)
else:
self._conv1_filters = lambda x: x // 2
self._conv2_filters = lambda x: x
self._conv1_kernel = (1, 1)
self._conv2_kernel = (3, 3)
# insert SPP will always add to the total nuber of layer, never replace
if insert_spp:
self._spp_keys = spp_keys if spp_keys is not None else [5, 9, 13]
self.layer_list = self._insert_spp(self.layer_list)
if repetitions > 1:
self.outputs[-2] = True
if insert_sam:
self.layer_list = self._insert_sam(self.layer_list, self.outputs)
self._repetitions += 1
self.outputs[-1] = True
def _get_base_layers(self):
layer_list = []
outputs = []
for i in range(self._repetitions):
layers = ['conv1'] * ((i + 1) % 2) + ['conv2'] * (i % 2)
layer_list.extend(layers)
outputs = [False] + outputs
return layer_list, outputs
def _insert_spp(self, layer_list):
if len(layer_list) <= 3:
layer_list[1] = 'spp'
else:
layer_list[3] = 'spp'
return layer_list
def _convert_csp(self, layer_list, outputs, csp_stack_size):
layer_list[0] = 'csp_route'
layer_list.insert(csp_stack_size - 1, 'csp_connect')
outputs.insert(csp_stack_size - 1, False)
return layer_list, outputs
def _insert_sam(self, layer_list, outputs):
if len(layer_list) >= 2 and layer_list[-2] != 'spp':
layer_list.insert(-2, 'sam')
outputs.insert(-1, True)
else:
layer_list.insert(-1, 'sam')
outputs.insert(-1, False)
return layer_list
def _conv1(self, filters, kwargs, csp=False):
if csp:
filters_ = self._csp_filters
else:
filters_ = self._conv1_filters
x1 = ConvBN(
filters=filters_(filters),
kernel_size=self._conv1_kernel,
strides=(1, 1),
padding='same',
use_bn=True,
**kwargs)
return x1
def _conv2(self, filters, kwargs, csp=False):
if csp:
filters_ = self._csp_filters
else:
filters_ = self._conv2_filters
x1 = ConvBN(
filters=filters_(filters),
kernel_size=self._conv2_kernel,
strides=(1, 1),
padding='same',
use_bn=True,
**kwargs)
return x1
def _csp_route(self, filters, kwargs):
x1 = CSPRoute(
filters=filters,
filter_scale=self._csp_scale,
downsample=False,
**kwargs)
return x1
def _csp_connect(self, filters, kwargs):
x1 = CSPConnect(filters=filters, drop_final=True, drop_first=True, **kwargs)
return x1
def _spp(self, filters, kwargs):
x1 = SPP(self._spp_keys)
return x1
def _sam(self, filters, kwargs):
x1 = SAM(filters=-1, use_pooling=False, use_bn=True, **kwargs)
return x1
def build(self, input_shape):
dark_conv_args = {
'activation': self._activation,
'kernel_initializer': self._kernel_initializer,
'bias_initializer': self._bias_initializer,
'bias_regularizer': self._bias_regularizer,
'use_sync_bn': self._use_sync_bn,
'norm_momentum': self._norm_momentum,
'norm_epsilon': self._norm_epsilon,
'kernel_regularizer': self._kernel_regularizer,
'leaky_alpha': self._leaky_alpha,
}
csp = False
self.layers = []
for layer in self.layer_list:
if layer == 'csp_route':
self.layers.append(self._csp_route(self._filters, dark_conv_args))
csp = True
elif layer == 'csp_connect':
self.layers.append(self._csp_connect(self._filters, dark_conv_args))
csp = False
elif layer == 'conv1':
self.layers.append(self._conv1(self._filters, dark_conv_args, csp=csp))
elif layer == 'conv2':
self.layers.append(self._conv2(self._filters, dark_conv_args, csp=csp))
elif layer == 'spp':
self.layers.append(self._spp(self._filters, dark_conv_args))
elif layer == 'sam':
self.layers.append(self._sam(-1, dark_conv_args))
self._lim = len(self.layers)
super().build(input_shape)
def _call_regular(self, inputs, training=None):
# check efficiency
x = inputs
x_prev = x
output_prev = True
for (layer, output) in zip(self.layers, self.outputs):
if output_prev:
x_prev = x
x = layer(x)
output_prev = output
return x_prev, x
def _call_csp(self, inputs, training=None):
# check efficiency
x = inputs
x_prev = x
output_prev = True
x_route = None
for i, (layer, output) in enumerate(zip(self.layers, self.outputs)):
if output_prev:
x_prev = x
if i == 0:
x, x_route = layer(x)
elif i == self._csp_stack - 1:
x = layer([x, x_route])
else:
x = layer(x)
output_prev = output
return x_prev, x
def call(self, inputs, training=None):
if self._csp_stack > 0:
return self._call_csp(inputs, training=training)
else:
return self._call_regular(inputs)
......@@ -13,7 +13,6 @@
# limitations under the License.
# Lint as: python3
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
......@@ -23,8 +22,8 @@ from official.vision.beta.projects.yolo.modeling.layers import nn_blocks
class CSPConnectTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(("same", 224, 224, 64, 1),
("downsample", 224, 224, 64, 2))
@parameterized.named_parameters(('same', 224, 224, 64, 1),
('downsample', 224, 224, 64, 2))
def test_pass_through(self, width, height, filters, mod):
x = tf.keras.Input(shape=(width, height, filters))
test_layer = nn_blocks.CSPRoute(filters=filters, filter_scale=mod)
......@@ -38,8 +37,8 @@ class CSPConnectTest(tf.test.TestCase, parameterized.TestCase):
[None, np.ceil(width // 2),
np.ceil(height // 2), (filters)])
@parameterized.named_parameters(("same", 224, 224, 64, 1),
("downsample", 224, 224, 128, 2))
@parameterized.named_parameters(('same', 224, 224, 64, 1),
('downsample', 224, 224, 128, 2))
def test_gradient_pass_though(self, filters, width, height, mod):
loss = tf.keras.losses.MeanSquaredError()
optimizer = tf.keras.optimizers.SGD()
......@@ -49,10 +48,11 @@ class CSPConnectTest(tf.test.TestCase, parameterized.TestCase):
init = tf.random_normal_initializer()
x = tf.Variable(
initial_value=init(shape=(1, width, height, filters), dtype=tf.float32))
y = tf.Variable(initial_value=init(shape=(1, int(np.ceil(width // 2)),
int(np.ceil(height // 2)),
filters),
dtype=tf.float32))
y = tf.Variable(
initial_value=init(
shape=(1, int(np.ceil(width // 2)), int(np.ceil(height // 2)),
filters),
dtype=tf.float32))
with tf.GradientTape() as tape:
x_hat, x_prev = test_layer(x)
......@@ -66,8 +66,8 @@ class CSPConnectTest(tf.test.TestCase, parameterized.TestCase):
class CSPRouteTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(("same", 224, 224, 64, 1),
("downsample", 224, 224, 64, 2))
@parameterized.named_parameters(('same', 224, 224, 64, 1),
('downsample', 224, 224, 64, 2))
def test_pass_through(self, width, height, filters, mod):
x = tf.keras.Input(shape=(width, height, filters))
test_layer = nn_blocks.CSPRoute(filters=filters, filter_scale=mod)
......@@ -79,8 +79,8 @@ class CSPRouteTest(tf.test.TestCase, parameterized.TestCase):
[None, np.ceil(width // 2),
np.ceil(height // 2), (filters / mod)])
@parameterized.named_parameters(("same", 224, 224, 64, 1),
("downsample", 224, 224, 128, 2))
@parameterized.named_parameters(('same', 224, 224, 64, 1),
('downsample', 224, 224, 128, 2))
def test_gradient_pass_though(self, filters, width, height, mod):
loss = tf.keras.losses.MeanSquaredError()
optimizer = tf.keras.optimizers.SGD()
......@@ -90,10 +90,11 @@ class CSPRouteTest(tf.test.TestCase, parameterized.TestCase):
init = tf.random_normal_initializer()
x = tf.Variable(
initial_value=init(shape=(1, width, height, filters), dtype=tf.float32))
y = tf.Variable(initial_value=init(shape=(1, int(np.ceil(width // 2)),
int(np.ceil(height // 2)),
filters),
dtype=tf.float32))
y = tf.Variable(
initial_value=init(
shape=(1, int(np.ceil(width // 2)), int(np.ceil(height // 2)),
filters),
dtype=tf.float32))
with tf.GradientTape() as tape:
x_hat, x_prev = test_layer(x)
......@@ -107,11 +108,11 @@ class CSPRouteTest(tf.test.TestCase, parameterized.TestCase):
class CSPStackTest(tf.test.TestCase, parameterized.TestCase):
def build_layer(
self, layer_type, filters, filter_scale, count, stack_type, downsample):
def build_layer(self, layer_type, filters, filter_scale, count, stack_type,
downsample):
if stack_type is not None:
layers = []
if layer_type == "residual":
if layer_type == 'residual':
for _ in range(count):
layers.append(
nn_blocks.DarkResidual(
......@@ -120,7 +121,7 @@ class CSPStackTest(tf.test.TestCase, parameterized.TestCase):
for _ in range(count):
layers.append(nn_blocks.ConvBN(filters=filters))
if stack_type == "model":
if stack_type == 'model':
layers = tf.keras.Sequential(layers=layers)
else:
layers = None
......@@ -133,10 +134,10 @@ class CSPStackTest(tf.test.TestCase, parameterized.TestCase):
return stack
@parameterized.named_parameters(
("no_stack", 224, 224, 64, 2, "residual", None, 0, True),
("residual_stack", 224, 224, 64, 2, "residual", "list", 2, True),
("conv_stack", 224, 224, 64, 2, "conv", "list", 3, False),
("callable_no_scale", 224, 224, 64, 1, "residual", "model", 5, False))
('no_stack', 224, 224, 64, 2, 'residual', None, 0, True),
('residual_stack', 224, 224, 64, 2, 'residual', 'list', 2, True),
('conv_stack', 224, 224, 64, 2, 'conv', 'list', 3, False),
('callable_no_scale', 224, 224, 64, 1, 'residual', 'model', 5, False))
def test_pass_through(self, width, height, filters, mod, layer_type,
stack_type, count, downsample):
x = tf.keras.Input(shape=(width, height, filters))
......@@ -152,10 +153,10 @@ class CSPStackTest(tf.test.TestCase, parameterized.TestCase):
self.assertAllEqual(outx.shape.as_list(), [None, width, height, filters])
@parameterized.named_parameters(
("no_stack", 224, 224, 64, 2, "residual", None, 0, True),
("residual_stack", 224, 224, 64, 2, "residual", "list", 2, True),
("conv_stack", 224, 224, 64, 2, "conv", "list", 3, False),
("callable_no_scale", 224, 224, 64, 1, "residual", "model", 5, False))
('no_stack', 224, 224, 64, 2, 'residual', None, 0, True),
('residual_stack', 224, 224, 64, 2, 'residual', 'list', 2, True),
('conv_stack', 224, 224, 64, 2, 'conv', 'list', 3, False),
('callable_no_scale', 224, 224, 64, 1, 'residual', 'model', 5, False))
def test_gradient_pass_though(self, width, height, filters, mod, layer_type,
stack_type, count, downsample):
loss = tf.keras.losses.MeanSquaredError()
......@@ -188,10 +189,10 @@ class CSPStackTest(tf.test.TestCase, parameterized.TestCase):
class ConvBNTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(
("valid", (3, 3), "valid", (1, 1)), ("same", (3, 3), "same", (1, 1)),
("downsample", (3, 3), "same", (2, 2)), ("test", (1, 1), "valid", (1, 1)))
('valid', (3, 3), 'valid', (1, 1)), ('same', (3, 3), 'same', (1, 1)),
('downsample', (3, 3), 'same', (2, 2)), ('test', (1, 1), 'valid', (1, 1)))
def test_pass_through(self, kernel_size, padding, strides):
if padding == "same":
if padding == 'same':
pad_const = 1
else:
pad_const = 0
......@@ -212,16 +213,16 @@ class ConvBNTest(tf.test.TestCase, parameterized.TestCase):
print(test)
self.assertAllEqual(outx.shape.as_list(), test)
@parameterized.named_parameters(("filters", 3))
@parameterized.named_parameters(('filters', 3))
def test_gradient_pass_though(self, filters):
loss = tf.keras.losses.MeanSquaredError()
optimizer = tf.keras.optimizers.SGD()
with tf.device("/CPU:0"):
test_layer = nn_blocks.ConvBN(filters, kernel_size=(3, 3), padding="same")
with tf.device('/CPU:0'):
test_layer = nn_blocks.ConvBN(filters, kernel_size=(3, 3), padding='same')
init = tf.random_normal_initializer()
x = tf.Variable(initial_value=init(shape=(1, 224, 224,
3), dtype=tf.float32))
x = tf.Variable(
initial_value=init(shape=(1, 224, 224, 3), dtype=tf.float32))
y = tf.Variable(
initial_value=init(shape=(1, 224, 224, filters), dtype=tf.float32))
......@@ -235,9 +236,9 @@ class ConvBNTest(tf.test.TestCase, parameterized.TestCase):
class DarkResidualTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(("same", 224, 224, 64, False),
("downsample", 223, 223, 32, True),
("oddball", 223, 223, 32, False))
@parameterized.named_parameters(('same', 224, 224, 64, False),
('downsample', 223, 223, 32, True),
('oddball', 223, 223, 32, False))
def test_pass_through(self, width, height, filters, downsample):
mod = 1
if downsample:
......@@ -252,9 +253,9 @@ class DarkResidualTest(tf.test.TestCase, parameterized.TestCase):
[None, np.ceil(width / mod),
np.ceil(height / mod), filters])
@parameterized.named_parameters(("same", 64, 224, 224, False),
("downsample", 32, 223, 223, True),
("oddball", 32, 223, 223, False))
@parameterized.named_parameters(('same', 64, 224, 224, False),
('downsample', 32, 223, 223, True),
('oddball', 32, 223, 223, False))
def test_gradient_pass_though(self, filters, width, height, downsample):
loss = tf.keras.losses.MeanSquaredError()
optimizer = tf.keras.optimizers.SGD()
......@@ -268,10 +269,11 @@ class DarkResidualTest(tf.test.TestCase, parameterized.TestCase):
init = tf.random_normal_initializer()
x = tf.Variable(
initial_value=init(shape=(1, width, height, filters), dtype=tf.float32))
y = tf.Variable(initial_value=init(shape=(1, int(np.ceil(width / mod)),
int(np.ceil(height / mod)),
filters),
dtype=tf.float32))
y = tf.Variable(
initial_value=init(
shape=(1, int(np.ceil(width / mod)), int(np.ceil(height / mod)),
filters),
dtype=tf.float32))
with tf.GradientTape() as tape:
x_hat = test_layer(x)
......@@ -281,5 +283,104 @@ class DarkResidualTest(tf.test.TestCase, parameterized.TestCase):
self.assertNotIn(None, grad)
if __name__ == "__main__":
class DarkSppTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(('RouteProcessSpp', 224, 224, 3, [5, 9, 13]),
('test1', 300, 300, 10, [2, 3, 4, 5]),
('test2', 256, 256, 5, [10]))
def test_pass_through(self, width, height, channels, sizes):
x = tf.keras.Input(shape=(width, height, channels))
test_layer = nn_blocks.SPP(sizes=sizes)
outx = test_layer(x)
self.assertAllEqual(outx.shape.as_list(),
[None, width, height, channels * (len(sizes) + 1)])
return
@parameterized.named_parameters(('RouteProcessSpp', 224, 224, 3, [5, 9, 13]),
('test1', 300, 300, 10, [2, 3, 4, 5]),
('test2', 256, 256, 5, [10]))
def test_gradient_pass_though(self, width, height, channels, sizes):
loss = tf.keras.losses.MeanSquaredError()
optimizer = tf.keras.optimizers.SGD()
test_layer = nn_blocks.SPP(sizes=sizes)
init = tf.random_normal_initializer()
x = tf.Variable(
initial_value=init(
shape=(1, width, height, channels), dtype=tf.float32))
y = tf.Variable(
initial_value=init(
shape=(1, width, height, channels * (len(sizes) + 1)),
dtype=tf.float32))
with tf.GradientTape() as tape:
x_hat = test_layer(x)
grad_loss = loss(x_hat, y)
grad = tape.gradient(grad_loss, test_layer.trainable_variables)
optimizer.apply_gradients(zip(grad, test_layer.trainable_variables))
self.assertNotIn(None, grad)
return
class DarkRouteProcessTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(
('test1', 224, 224, 64, 7, False), ('test2', 223, 223, 32, 3, False),
('tiny', 223, 223, 16, 1, False), ('spp', 224, 224, 64, 7, False))
def test_pass_through(self, width, height, filters, repetitions, spp):
x = tf.keras.Input(shape=(width, height, filters))
test_layer = nn_blocks.DarkRouteProcess(
filters=filters, repetitions=repetitions, insert_spp=spp)
outx = test_layer(x)
self.assertLen(outx, 2, msg='len(outx) != 2')
if repetitions == 1:
filter_y1 = filters
else:
filter_y1 = filters // 2
self.assertAllEqual(
outx[1].shape.as_list(), [None, width, height, filter_y1])
self.assertAllEqual(
filters % 2,
0,
msg='Output of a DarkRouteProcess layer has an odd number of filters')
self.assertAllEqual(outx[0].shape.as_list(), [None, width, height, filters])
@parameterized.named_parameters(
('test1', 224, 224, 64, 7, False), ('test2', 223, 223, 32, 3, False),
('tiny', 223, 223, 16, 1, False), ('spp', 224, 224, 64, 7, False))
def test_gradient_pass_though(self, width, height, filters, repetitions, spp):
loss = tf.keras.losses.MeanSquaredError()
optimizer = tf.keras.optimizers.SGD()
test_layer = nn_blocks.DarkRouteProcess(
filters=filters, repetitions=repetitions, insert_spp=spp)
if repetitions == 1:
filter_y1 = filters
else:
filter_y1 = filters // 2
init = tf.random_normal_initializer()
x = tf.Variable(
initial_value=init(shape=(1, width, height, filters), dtype=tf.float32))
y_0 = tf.Variable(
initial_value=init(shape=(1, width, height, filters), dtype=tf.float32))
y_1 = tf.Variable(
initial_value=init(
shape=(1, width, height, filter_y1), dtype=tf.float32))
with tf.GradientTape() as tape:
x_hat_0, x_hat_1 = test_layer(x)
grad_loss_0 = loss(x_hat_0, y_0)
grad_loss_1 = loss(x_hat_1, y_1)
grad = tape.gradient([grad_loss_0, grad_loss_1],
test_layer.trainable_variables)
optimizer.apply_gradients(zip(grad, test_layer.trainable_variables))
self.assertNotIn(None, grad)
return
if __name__ == '__main__':
tf.test.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment