Commit 47545935 authored by vishnubanna's avatar vishnubanna
Browse files

load tests into one file and inherit from the imagenet task and the...

load tests into one file and inherit from the imagenet task and the functioning tfds decoder for imagenet
parent 48b412c3
import tensorflow as tf
import tensorflow.keras as ks
@tf.keras.utils.register_keras_serializable(package='Text')
def mish(x):
"""Mish: A Self Regularized Non-Monotonic Activation Function
This activation is far smoother than ReLU.
Original paper: https://arxiv.org/abs/1908.08681
Args:
x: float Tensor to perform activation.
Returns:
`x` with the MISH activation applied.
"""
return x * tf.math.tanh(ks.activations.softplus(x))
\ No newline at end of file
import tensorflow as tf
import tensorflow.keras as ks
class mish(ks.layers.Layer):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def call(self, x):
return x * tf.math.tanh(ks.activations.softplus(x))
from .nn_blocks import Identity, CSPTiny, CSPDownSample, CSPConnect, DarkTiny, DarkResidual, DarkConv
\ No newline at end of file
import tensorflow as tf
import tensorflow.keras as ks
import numpy as np
from absl.testing import parameterized
from official.vision.beta.projects.yolo.modeling import layers as nn_blocks
class CSPConnect(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(("same", 224, 224, 64, 1),
("downsample", 224, 224, 64, 2))
def test_pass_through(self, width, height, filters, mod):
x = ks.Input(shape=(width, height, filters))
test_layer = nn_blocks.CSPDownSample(filters=filters, filter_reduce=mod)
test_layer2 = nn_blocks.CSPConnect(filters=filters, filter_reduce=mod)
outx, px = test_layer(x)
outx = test_layer2([outx, px])
print(outx)
print(outx.shape.as_list())
self.assertAllEqual(
outx.shape.as_list(),
[None, np.ceil(width // 2),
np.ceil(height // 2), (filters)])
@parameterized.named_parameters(("same", 224, 224, 64, 1),
("downsample", 224, 224, 128, 2))
def test_gradient_pass_though(self, filters, width, height, mod):
loss = ks.losses.MeanSquaredError()
optimizer = ks.optimizers.SGD()
test_layer = nn_blocks.CSPDownSample(filters, filter_reduce=mod)
path_layer = nn_blocks.CSPConnect(filters, filter_reduce=mod)
init = tf.random_normal_initializer()
x = tf.Variable(
initial_value=init(shape=(1, width, height, filters), dtype=tf.float32))
y = tf.Variable(initial_value=init(shape=(1, int(np.ceil(width // 2)),
int(np.ceil(height // 2)),
filters),
dtype=tf.float32))
with tf.GradientTape() as tape:
x_hat, x_prev = test_layer(x)
x_hat = path_layer([x_hat, x_prev])
grad_loss = loss(x_hat, y)
grad = tape.gradient(grad_loss, test_layer.trainable_variables)
optimizer.apply_gradients(zip(grad, test_layer.trainable_variables))
self.assertNotIn(None, grad)
if __name__ == "__main__":
tf.test.main()
import tensorflow as tf
import tensorflow.keras as ks
import numpy as np
from absl.testing import parameterized
from official.vision.beta.projects.yolo.modeling import layers as nn_blocks
class CSPDownSample(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(("same", 224, 224, 64, 1),
("downsample", 224, 224, 64, 2))
def test_pass_through(self, width, height, filters, mod):
x = ks.Input(shape=(width, height, filters))
test_layer = nn_blocks.CSPDownSample(filters=filters, filter_reduce=mod)
outx, px = test_layer(x)
print(outx)
print(outx.shape.as_list())
self.assertAllEqual(
outx.shape.as_list(),
[None, np.ceil(width // 2),
np.ceil(height // 2), (filters / mod)])
@parameterized.named_parameters(("same", 224, 224, 64, 1),
("downsample", 224, 224, 128, 2))
def test_gradient_pass_though(self, filters, width, height, mod):
loss = ks.losses.MeanSquaredError()
optimizer = ks.optimizers.SGD()
test_layer = nn_blocks.CSPDownSample(filters, filter_reduce=mod)
path_layer = nn_blocks.CSPConnect(filters, filter_reduce=mod)
init = tf.random_normal_initializer()
x = tf.Variable(
initial_value=init(shape=(1, width, height, filters), dtype=tf.float32))
y = tf.Variable(initial_value=init(shape=(1, int(np.ceil(width // 2)),
int(np.ceil(height // 2)),
filters),
dtype=tf.float32))
with tf.GradientTape() as tape:
x_hat, x_prev = test_layer(x)
x_hat = path_layer([x_hat, x_prev])
grad_loss = loss(x_hat, y)
grad = tape.gradient(grad_loss, test_layer.trainable_variables)
optimizer.apply_gradients(zip(grad, test_layer.trainable_variables))
self.assertNotIn(None, grad)
if __name__ == "__main__":
tf.test.main()
import tensorflow as tf
import tensorflow.keras as ks
import tensorflow_datasets as tfds
from absl.testing import parameterized
from official.vision.beta.projects.yolo.modeling import layers as nn_blocks
class DarkConvTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(
("valid", (3, 3), "valid", (1, 1)), ("same", (3, 3), "same", (1, 1)),
("downsample", (3, 3), "same", (2, 2)), ("test", (1, 1), "valid", (1, 1)))
def test_pass_through(self, kernel_size, padding, strides):
if padding == "same":
pad_const = 1
else:
pad_const = 0
x = ks.Input(shape=(224, 224, 3))
test_layer = nn_blocks.DarkConv(filters=64,
kernel_size=kernel_size,
padding=padding,
strides=strides,
trainable=False)
outx = test_layer(x)
print(outx.shape.as_list())
test = [
None,
int((224 - kernel_size[0] + (2 * pad_const)) / strides[0] + 1),
int((224 - kernel_size[1] + (2 * pad_const)) / strides[1] + 1), 64
]
print(test)
self.assertAllEqual(outx.shape.as_list(), test)
@parameterized.named_parameters(("filters", 3))
def test_gradient_pass_though(self, filters):
loss = ks.losses.MeanSquaredError()
optimizer = ks.optimizers.SGD()
with tf.device("/CPU:0"):
test_layer = nn_blocks.DarkConv(filters, kernel_size=(3, 3), padding="same")
init = tf.random_normal_initializer()
x = tf.Variable(initial_value=init(shape=(1, 224, 224,
3), dtype=tf.float32))
y = tf.Variable(
initial_value=init(shape=(1, 224, 224, filters), dtype=tf.float32))
with tf.GradientTape() as tape:
x_hat = test_layer(x)
grad_loss = loss(x_hat, y)
grad = tape.gradient(grad_loss, test_layer.trainable_variables)
optimizer.apply_gradients(zip(grad, test_layer.trainable_variables))
self.assertNotIn(None, grad)
if __name__ == "__main__":
tf.test.main()
import tensorflow as tf
import tensorflow.keras as ks
import numpy as np
from absl.testing import parameterized
from official.vision.beta.projects.yolo.modeling import layers as nn_blocks
class DarkResidualTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(("same", 224, 224, 64, False),
("downsample", 223, 223, 32, True),
("oddball", 223, 223, 32, False))
def test_pass_through(self, width, height, filters, downsample):
mod = 1
if downsample:
mod = 2
x = ks.Input(shape=(width, height, filters))
test_layer = nn_blocks.DarkResidual(filters=filters, downsample=downsample)
outx = test_layer(x)
print(outx)
print(outx.shape.as_list())
self.assertAllEqual(
outx.shape.as_list(),
[None, np.ceil(width / mod),
np.ceil(height / mod), filters])
@parameterized.named_parameters(("same", 64, 224, 224, False),
("downsample", 32, 223, 223, True),
("oddball", 32, 223, 223, False))
def test_gradient_pass_though(self, filters, width, height, downsample):
loss = ks.losses.MeanSquaredError()
optimizer = ks.optimizers.SGD()
test_layer = nn_blocks.DarkResidual(filters, downsample=downsample)
if downsample:
mod = 2
else:
mod = 1
init = tf.random_normal_initializer()
x = tf.Variable(
initial_value=init(shape=(1, width, height, filters), dtype=tf.float32))
y = tf.Variable(initial_value=init(shape=(1, int(np.ceil(width / mod)),
int(np.ceil(height / mod)),
filters),
dtype=tf.float32))
with tf.GradientTape() as tape:
x_hat = test_layer(x)
grad_loss = loss(x_hat, y)
grad = tape.gradient(grad_loss, test_layer.trainable_variables)
optimizer.apply_gradients(zip(grad, test_layer.trainable_variables))
self.assertNotIn(None, grad)
if __name__ == "__main__":
tf.test.main()
import tensorflow as tf
import tensorflow.keras as ks
import numpy as np
from absl.testing import parameterized
from official.vision.beta.projects.yolo.modeling import layers as nn_blocks
class DarkTinyTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(("middle", 224, 224, 64, 2),
("last", 224, 224, 1024, 1))
def test_pass_through(self, width, height, filters, strides):
x = ks.Input(shape=(width, height, filters))
test_layer = nn_blocks.DarkTiny(filters=filters, strides=strides)
outx = test_layer(x)
self.assertEqual(width % strides, 0, msg="width % strides != 0")
self.assertEqual(height % strides, 0, msg="height % strides != 0")
self.assertAllEqual(outx.shape.as_list(),
[None, width // strides, height // strides, filters])
@parameterized.named_parameters(("middle", 224, 224, 64, 2),
("last", 224, 224, 1024, 1))
def test_gradient_pass_though(self, width, height, filters, strides):
loss = ks.losses.MeanSquaredError()
optimizer = ks.optimizers.SGD()
test_layer = nn_blocks.DarkTiny(filters=filters, strides=strides)
init = tf.random_normal_initializer()
x = tf.Variable(
initial_value=init(shape=(1, width, height, filters), dtype=tf.float32))
y = tf.Variable(initial_value=init(shape=(1, width // strides,
height // strides, filters),
dtype=tf.float32))
with tf.GradientTape() as tape:
x_hat = test_layer(x)
grad_loss = loss(x_hat, y)
grad = tape.gradient(grad_loss, test_layer.trainable_variables)
optimizer.apply_gradients(zip(grad, test_layer.trainable_variables))
self.assertNotIn(None, grad)
if __name__ == "__main__":
tf.test.main()
......@@ -3,7 +3,7 @@ from functools import partial
import tensorflow as tf
import tensorflow.keras as ks
import tensorflow.keras.backend as K
from official.vision.beta.projects.yolo.modeling.functions.mish_activation import mish
from official.vision.beta.projects.yolo.modeling.activations.mish import mish
......@@ -18,7 +18,40 @@ class Identity(ks.layers.Layer):
@ks.utils.register_keras_serializable(package='yolo')
class DarkConv(ks.layers.Layer):
'''
Modified Convolution layer to match that of the DarkNet Library. The Layer is a standards combination of Conv BatchNorm Activation,
however, the use of bias in the conv is determined by the use of batch normalization. The Layer also allows for feature grouping
suggested in the CSPNet paper
Cross Stage Partial networks (CSPNets) were proposed in:
[1] Chien-Yao Wang, Hong-Yuan Mark Liao, I-Hau Yeh, Yueh-Hua Wu, Ping-Yang Chen, Jun-Wei Hsieh
CSPNet: A New Backbone that can Enhance Learning Capability of CNN. arXiv:1911.11929
Args:
filters: integer for output depth, or the number of features to learn
kernel_size: integer or tuple for the shape of the weight matrix or kernel to learn
strides: integer of tuple how much to move the kernel after each kernel use
padding: string 'valid' or 'same', if same, then pad the image, else do not
dialtion_rate: tuple to indicate how much to modulate kernel weights and
the how many pixels ina featur map to skip
use_bias: boolean to indicate wither to use bias in convolution layer
kernel_initializer: string to indicate which function to use to initialize weigths
bias_initializer: string to indicate which function to use to initialize bias
kernel_regularizer: string to indicate which function to use to regularizer weights
bias_regularizer: string to indicate which function to use to regularizer bias
group_id: integer for which group of features to pass through the conv.
groups: integer for how many splits there should be in the convolution feature stack input
grouping_only: skip the convolution and only return the group of featuresindicated by grouping_only
use_bn: boolean for wether to use batchnormalization
use_sync_bn: boolean for wether sync batch normalization statistics
of all batch norm layers to the models global statistics (across all input batches)
norm_moment: float for moment to use for batchnorm
norm_epsilon: float for batchnorm epsilon
activation: string or None for activation function to use in layer,
if None activation is replaced by linear
leaky_alpha: float to use as alpha if activation function is leaky
**kwargs: Keyword Arguments
'''
def __init__(
self,
filters=1,
......@@ -27,6 +60,9 @@ class DarkConv(ks.layers.Layer):
padding='same',
dilation_rate=(1, 1),
use_bias=True,
groups = 1,
group_id = 0,
grouping_only = False,
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
bias_regularizer=None,
......@@ -38,30 +74,7 @@ class DarkConv(ks.layers.Layer):
activation='leaky',
leaky_alpha=0.1,
**kwargs):
'''
Modified Convolution layer to match that of the DarkNet Library
Args:
filters: integer for output depth, or the number of features to learn
kernel_size: integer or tuple for the shape of the weight matrix or kernel to learn
strides: integer of tuple how much to move the kernel after each kernel use
padding: string 'valid' or 'same', if same, then pad the image, else do not
dialtion_rate: tuple to indicate how much to modulate kernel weights and
the how many pixels ina featur map to skip
use_bias: boolean to indicate wither to use bias in convolution layer
kernel_initializer: string to indicate which function to use to initialize weigths
bias_initializer: string to indicate which function to use to initialize bias
l2_regularization: float to use as a constant for weight regularization
use_bn: boolean for wether to use batchnormalization
use_sync_bn: boolean for wether sync batch normalization statistics
of all batch norm layers to the models global statistics (across all input batches)
norm_moment: float for moment to use for batchnorm
norm_epsilon: float for batchnorm epsilon
activation: string or None for activation function to use in layer,
if None activation is replaced by linear
leaky_alpha: float to use as alpha if activation function is leaky
**kwargs: Keyword Arguments
'''
# convolution params
self._filters = filters
......@@ -70,6 +83,9 @@ class DarkConv(ks.layers.Layer):
self._padding = padding
self._dilation_rate = dilation_rate
self._use_bias = use_bias
self._groups = groups
self._group_id = group_id
self._grouping_only = grouping_only
self._kernel_initializer = kernel_initializer
self._bias_initializer = bias_initializer
self._kernel_regularizer = kernel_regularizer
......@@ -100,53 +116,58 @@ class DarkConv(ks.layers.Layer):
super(DarkConv, self).__init__(**kwargs)
def build(self, input_shape):
kernel_size = self._kernel_size if type(
self._kernel_size) == int else self._kernel_size[0]
if self._padding == "same" and kernel_size != 1:
self._zeropad = ks.layers.ZeroPadding2D(
((1, 1), (1, 1))) # symmetric padding
else:
self._zeropad = Identity()
self.conv = ks.layers.Conv2D(
filters=self._filters,
kernel_size=self._kernel_size,
strides=self._strides,
padding="valid",
dilation_rate=self._dilation_rate,
use_bias=self._use_bias,
kernel_initializer=self._kernel_initializer,
bias_initializer=self._bias_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer)
#self.conv =tf.nn.convolution(filters=self._filters, strides=self._strides, padding=self._padding
if self._use_bn:
if self._use_sync_bn:
self.bn = tf.keras.layers.experimental.SyncBatchNormalization(
momentum=self._norm_moment,
epsilon=self._norm_epsilon,
axis=self._bn_axis)
if not self._grouping_only:
kernel_size = self._kernel_size if type(
self._kernel_size) == int else self._kernel_size[0]
if self._padding == "same" and kernel_size != 1:
self._zeropad = ks.layers.ZeroPadding2D(
((1, 1), (1, 1))) # symmetric padding
else:
self.bn = ks.layers.BatchNormalization(momentum=self._norm_moment,
epsilon=self._norm_epsilon,
axis=self._bn_axis)
else:
self.bn = Identity()
if self._activation == 'leaky':
alpha = {"alpha": self._leaky_alpha}
self._activation_fn = partial(tf.nn.leaky_relu, **alpha)
elif self._activation == 'mish':
self._activation_fn = mish()
else:
self._activation_fn = ks.layers.Activation(activation=self._activation)
self._zeropad = Identity()
self.conv = ks.layers.Conv2D(
filters=self._filters,
kernel_size=self._kernel_size,
strides=self._strides,
padding="valid",
dilation_rate=self._dilation_rate,
use_bias=self._use_bias,
kernel_initializer=self._kernel_initializer,
bias_initializer=self._bias_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer)
#self.conv =tf.nn.convolution(filters=self._filters, strides=self._strides, padding=self._padding
if self._use_bn:
if self._use_sync_bn:
self.bn = tf.keras.layers.experimental.SyncBatchNormalization(
momentum=self._norm_moment,
epsilon=self._norm_epsilon,
axis=self._bn_axis)
else:
self.bn = ks.layers.BatchNormalization(momentum=self._norm_moment,
epsilon=self._norm_epsilon,
axis=self._bn_axis)
else:
self.bn = Identity()
def call(self, inputs):
x = self._zeropad(inputs)
x = self.conv(x)
x = self.bn(x)
x = self._activation_fn(x)
if self._activation == 'leaky':
alpha = {"alpha": self._leaky_alpha}
self._activation_fn = partial(tf.nn.leaky_relu, **alpha)
elif self._activation == 'mish':
self._activation_fn = mish
else:
self._activation_fn = ks.layers.Activation(activation=self._activation)
def call(self, x):
if self._groups != 1:
x = tf.split(x, self._groups, axis=-1)
x = x[self._group_id] # grouping
if not self._grouping_only:
x = self._zeropad(x)
x = self.conv(x)
x = self.bn(x)
x = self._activation_fn(x)
return x
def get_config(self):
......@@ -158,6 +179,9 @@ class DarkConv(ks.layers.Layer):
"padding": self._padding,
"dilation_rate": self._dilation_rate,
"use_bias": self._use_bias,
"groups": self._groups,
"group_id": self._group_id,
"grouping_only": self._grouping_only,
"kernel_initializer": self._kernel_initializer,
"bias_initializer": self._bias_initializer,
"bias_regularizer": self._bias_regularizer,
......@@ -178,7 +202,27 @@ class DarkConv(ks.layers.Layer):
@ks.utils.register_keras_serializable(package='yolo')
class DarkTiny(ks.layers.Layer):
"""
Automatic Maxpool Downsampling Convolution layer, created to make routing easier.
Args:
filters: integer for output depth, or the number of features to learn
use_bias: boolean to indicate wither to use bias in convolution layer
kernel_initializer: string to indicate which function to use to initialize weigths
bias_initializer: string to indicate which function to use to initialize bias
kernel_regularizer: string to indicate which function to use to regularizer weights
bias_regularizer: string to indicate which function to use to regularizer bias
use_bn: boolean for wether to use batchnormalization
use_sync_bn: boolean for wether sync batch normalization statistics
of all batch norm layers to the models global statistics (across all input batches)
group_id: integer for which group of features to pass through the csp tiny stack.
groups: integer for how many splits there should be in the convolution feature stack output
norm_moment: float for moment to use for batchnorm
norm_epsilon: float for batchnorm epsilon
activation: string or None for activation function to use in layer,
if None activation is replaced by linear
**kwargs: Keyword Arguments
"""
def __init__(
self,
filters=1,
......@@ -193,8 +237,6 @@ class DarkTiny(ks.layers.Layer):
norm_momentum=0.99,
norm_epsilon=0.001,
activation='leaky',
leaky_alpha=0.1,
sc_activation='linear',
**kwargs):
# darkconv params
......@@ -214,8 +256,6 @@ class DarkTiny(ks.layers.Layer):
# activation params
self._conv_activation = activation
self._leaky_alpha = leaky_alpha
self._sc_activation = sc_activation
super().__init__(**kwargs)
......@@ -238,8 +278,7 @@ class DarkTiny(ks.layers.Layer):
use_sync_bn=self._use_sync_bn,
norm_momentum=self._norm_moment,
norm_epsilon=self._norm_epsilon,
activation=self._conv_activation,
leaky_alpha=self._leaky_alpha)
activation=self._conv_activation)
super().build(input_shape)
......@@ -270,7 +309,30 @@ class DarkTiny(ks.layers.Layer):
@ks.utils.register_keras_serializable(package='yolo')
class DarkResidual(ks.layers.Layer):
'''
DarkNet block with Residual connection for Yolo v3 Backbone
Args:
filters: integer for output depth, or the number of features to learn
use_bias: boolean to indicate wither to use bias in convolution layer
kernel_initializer: string to indicate which function to use to initialize weigths
bias_initializer: string to indicate which function to use to initialize bias
kernel_regularizer: string to indicate which function to use to regularizer weights
bias_regularizer: string to indicate which function to use to regularizer bias
use_bn: boolean for wether to use batchnormalization
use_sync_bn: boolean for wether sync batch normalization statistics
of all batch norm layers to the models global statistics (across all input batches)
norm_moment: float for moment to use for batchnorm
norm_epsilon: float for batchnorm epsilon
conv_activation: string or None for activation function to use in layer,
if None activation is replaced by linear
leaky_alpha: float to use as alpha if activation function is leaky
sc_activation: string for activation function to use in layer
downsample: boolean for if image input is larger than layer output, set downsample to True
so the dimentions are forced to match
**kwargs: Keyword Arguments
'''
def __init__(self,
filters=1,
filter_scale=2,
......@@ -288,28 +350,7 @@ class DarkResidual(ks.layers.Layer):
sc_activation='linear',
downsample=False,
**kwargs):
'''
DarkNet block with Residual connection for Yolo v3 Backbone
Args:
filters: integer for output depth, or the number of features to learn
use_bias: boolean to indicate wither to use bias in convolution layer
kernel_initializer: string to indicate which function to use to initialize weigths
bias_initializer: string to indicate which function to use to initialize bias
use_bn: boolean for wether to use batchnormalization
use_sync_bn: boolean for wether sync batch normalization statistics
of all batch norm layers to the models global statistics (across all input batches)
norm_moment: float for moment to use for batchnorm
norm_epsilon: float for batchnorm epsilon
conv_activation: string or None for activation function to use in layer,
if None activation is replaced by linear
leaky_alpha: float to use as alpha if activation function is leaky
sc_activation: string for activation function to use in layer
downsample: boolean for if image input is larger than layer output, set downsample to True
so the dimentions are forced to match
**kwargs: Keyword Arguments
'''
# downsample
self._downsample = downsample
......@@ -421,7 +462,36 @@ class DarkResidual(ks.layers.Layer):
@ks.utils.register_keras_serializable(package='yolo')
class CSPTiny(ks.layers.Layer):
"""
A Small size convolution block proposed in the CSPNet. The layer uses shortcuts, routing(concatnation), and feature grouping
in order to improve gradient variablity and allow for high efficency, low power residual learning for small networks.
Cross Stage Partial networks (CSPNets) were proposed in:
[1] Chien-Yao Wang, Hong-Yuan Mark Liao, I-Hau Yeh, Yueh-Hua Wu, Ping-Yang Chen, Jun-Wei Hsieh
CSPNet: A New Backbone that can Enhance Learning Capability of CNN. arXiv:1911.11929
Args:
filters: integer for output depth, or the number of features to learn
use_bias: boolean to indicate wither to use bias in convolution layer
kernel_initializer: string to indicate which function to use to initialize weigths
bias_initializer: string to indicate which function to use to initialize bias
use_bn: boolean for wether to use batchnormalization
kernel_regularizer: string to indicate which function to use to regularizer weights
bias_regularizer: string to indicate which function to use to regularizer bias
use_sync_bn: boolean for wether sync batch normalization statistics
of all batch norm layers to the models global statistics (across all input batches)
group_id: integer for which group of features to pass through the csp tiny stack.
groups: integer for how many splits there should be in the convolution feature stack output
norm_moment: float for moment to use for batchnorm
norm_epsilon: float for batchnorm epsilon
conv_activation: string or None for activation function to use in layer,
if None activation is replaced by linear
leaky_alpha: float to use as alpha if activation function is leaky
sc_activation: string for activation function to use in layer
downsample: boolean for if image input is larger than layer output, set downsample to True
so the dimentions are forced to match
**kwargs: Keyword Arguments
"""
def __init__(
self,
filters=1,
......@@ -486,6 +556,8 @@ class CSPTiny(ks.layers.Layer):
strides=(1, 1),
padding='same',
use_bias=self._use_bias,
groups = self._groups,
group_id = self._group_id,
kernel_initializer=self._kernel_initializer,
bias_initializer=self._bias_initializer,
bias_regularizer=self._bias_regularizer,
......@@ -538,15 +610,14 @@ class CSPTiny(ks.layers.Layer):
def call(self, inputs):
x1 = self._convlayer1(inputs)
x2 = tf.split(x1, self._groups, axis=-1)
x3 = self._convlayer2(x2[self._group_id])
x4 = self._convlayer3(x3)
x5 = tf.concat([x4, x3], axis=-1)
x6 = self._convlayer4(x5)
x = tf.concat([x1, x6], axis=-1)
x2 = self._convlayer2(x1) # grouping
x3 = self._convlayer3(x2)
x4 = tf.concat([x3, x2], axis=-1) # csp partial using grouping
x5 = self._convlayer4(x4)
x = tf.concat([x1, x5], axis=-1) # csp connect
if self._downsample:
x = self._maxpool(x)
return x, x6
return x, x5
def get_config(self):
# used to store/share parameters to reconsturct the model
......@@ -571,7 +642,36 @@ class CSPTiny(ks.layers.Layer):
@ks.utils.register_keras_serializable(package='yolo')
class CSPDownSample(ks.layers.Layer):
"""
Down sampling layer to take the place of down sampleing done in Residual networks. This is
the first of 2 layers needed to convert any Residual Network model to a CSPNet. At the start of a new
level change, this CSPDownsample layer creates a learned identity that will act as a cross stage connection,
that is used to inform the inputs to the next stage. It is called cross stage partial because the number of filters
required in every intermitent Residual layer is reduced by half. The sister layer will take the partial generated by
this layer and concatnate it with the output of the final residual layer in the stack to create a fully feature level
output. This concatnation merges the partial blocks of 2 levels as input to the next allowing the gradients of each
level to be more unique, and reducing the number of parameters required by each level by 50% while keeping accuracy
consistent.
Cross Stage Partial networks (CSPNets) were proposed in:
[1] Chien-Yao Wang, Hong-Yuan Mark Liao, I-Hau Yeh, Yueh-Hua Wu, Ping-Yang Chen, Jun-Wei Hsieh
CSPNet: A New Backbone that can Enhance Learning Capability of CNN. arXiv:1911.11929
Args:
filters: integer for output depth, or the number of features to learn
filter_reduce: integer dicating (filters//2) or the number of filters in the partial feature stack
activation: string for activation function to use in layer
kernel_initializer: string to indicate which function to use to initialize weights
bias_initializer: string to indicate which function to use to initialize bias
kernel_regularizer: string to indicate which function to use to regularizer weights
bias_regularizer: string to indicate which function to use to regularizer bias
use_bn: boolean for wether to use batchnormalization
use_sync_bn: boolean for wether sync batch normalization statistics
of all batch norm layers to the models global statistics (across all input batches)
norm_moment: float for moment to use for batchnorm
norm_epsilon: float for batchnorm epsilon
**kwargs: Keyword Arguments
"""
def __init__(
self,
filters,
......@@ -651,7 +751,29 @@ class CSPDownSample(ks.layers.Layer):
@ks.utils.register_keras_serializable(package='yolo')
class CSPConnect(ks.layers.Layer):
"""
Sister Layer to the CSPDownsample layer. Merges the partial feature stacks generated by the CSPDownsampling layer,
and the finaly output of the residual stack. Suggested in the CSPNet paper.
Cross Stage Partial networks (CSPNets) were proposed in:
[1] Chien-Yao Wang, Hong-Yuan Mark Liao, I-Hau Yeh, Yueh-Hua Wu, Ping-Yang Chen, Jun-Wei Hsieh
CSPNet: A New Backbone that can Enhance Learning Capability of CNN. arXiv:1911.11929
Args:
filters: integer for output depth, or the number of features to learn
filter_reduce: integer dicating (filters//2) or the number of filters in the partial feature stack
activation: string for activation function to use in layer
kernel_initializer: string to indicate which function to use to initialize weights
bias_initializer: string to indicate which function to use to initialize bias
kernel_regularizer: string to indicate which function to use to regularizer weights
bias_regularizer: string to indicate which function to use to regularizer bias
use_bn: boolean for wether to use batchnormalization
use_sync_bn: boolean for wether sync batch normalization statistics
of all batch norm layers to the models global statistics (across all input batches)
norm_moment: float for moment to use for batchnorm
norm_epsilon: float for batchnorm epsilon
**kwargs: Keyword Arguments
"""
def __init__(
self,
filters,
......
import tensorflow as tf
import tensorflow.keras as ks
import numpy as np
from absl.testing import parameterized
from official.vision.beta.projects.yolo.modeling.layers import nn_blocks
class CSPConnect(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(("same", 224, 224, 64, 1),
("downsample", 224, 224, 64, 2))
def test_pass_through(self, width, height, filters, mod):
x = ks.Input(shape=(width, height, filters))
test_layer = nn_blocks.CSPDownSample(filters=filters, filter_reduce=mod)
test_layer2 = nn_blocks.CSPConnect(filters=filters, filter_reduce=mod)
outx, px = test_layer(x)
outx = test_layer2([outx, px])
print(outx)
print(outx.shape.as_list())
self.assertAllEqual(
outx.shape.as_list(),
[None, np.ceil(width // 2),
np.ceil(height // 2), (filters)])
@parameterized.named_parameters(("same", 224, 224, 64, 1),
("downsample", 224, 224, 128, 2))
def test_gradient_pass_though(self, filters, width, height, mod):
loss = ks.losses.MeanSquaredError()
optimizer = ks.optimizers.SGD()
test_layer = nn_blocks.CSPDownSample(filters, filter_reduce=mod)
path_layer = nn_blocks.CSPConnect(filters, filter_reduce=mod)
init = tf.random_normal_initializer()
x = tf.Variable(
initial_value=init(shape=(1, width, height, filters), dtype=tf.float32))
y = tf.Variable(initial_value=init(shape=(1, int(np.ceil(width // 2)),
int(np.ceil(height // 2)),
filters),
dtype=tf.float32))
with tf.GradientTape() as tape:
x_hat, x_prev = test_layer(x)
x_hat = path_layer([x_hat, x_prev])
grad_loss = loss(x_hat, y)
grad = tape.gradient(grad_loss, test_layer.trainable_variables)
optimizer.apply_gradients(zip(grad, test_layer.trainable_variables))
self.assertNotIn(None, grad)
class CSPDownSample(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(("same", 224, 224, 64, 1),
("downsample", 224, 224, 64, 2))
def test_pass_through(self, width, height, filters, mod):
x = ks.Input(shape=(width, height, filters))
test_layer = nn_blocks.CSPDownSample(filters=filters, filter_reduce=mod)
outx, px = test_layer(x)
print(outx)
print(outx.shape.as_list())
self.assertAllEqual(
outx.shape.as_list(),
[None, np.ceil(width // 2),
np.ceil(height // 2), (filters / mod)])
@parameterized.named_parameters(("same", 224, 224, 64, 1),
("downsample", 224, 224, 128, 2))
def test_gradient_pass_though(self, filters, width, height, mod):
loss = ks.losses.MeanSquaredError()
optimizer = ks.optimizers.SGD()
test_layer = nn_blocks.CSPDownSample(filters, filter_reduce=mod)
path_layer = nn_blocks.CSPConnect(filters, filter_reduce=mod)
init = tf.random_normal_initializer()
x = tf.Variable(
initial_value=init(shape=(1, width, height, filters), dtype=tf.float32))
y = tf.Variable(initial_value=init(shape=(1, int(np.ceil(width // 2)),
int(np.ceil(height // 2)),
filters),
dtype=tf.float32))
with tf.GradientTape() as tape:
x_hat, x_prev = test_layer(x)
x_hat = path_layer([x_hat, x_prev])
grad_loss = loss(x_hat, y)
grad = tape.gradient(grad_loss, test_layer.trainable_variables)
optimizer.apply_gradients(zip(grad, test_layer.trainable_variables))
self.assertNotIn(None, grad)
class DarkConvTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(
("valid", (3, 3), "valid", (1, 1)), ("same", (3, 3), "same", (1, 1)),
("downsample", (3, 3), "same", (2, 2)), ("test", (1, 1), "valid", (1, 1)))
def test_pass_through(self, kernel_size, padding, strides):
if padding == "same":
pad_const = 1
else:
pad_const = 0
x = ks.Input(shape=(224, 224, 3))
test_layer = nn_blocks.DarkConv(filters=64,
kernel_size=kernel_size,
padding=padding,
strides=strides,
trainable=False)
outx = test_layer(x)
print(outx.shape.as_list())
test = [
None,
int((224 - kernel_size[0] + (2 * pad_const)) / strides[0] + 1),
int((224 - kernel_size[1] + (2 * pad_const)) / strides[1] + 1), 64
]
print(test)
self.assertAllEqual(outx.shape.as_list(), test)
@parameterized.named_parameters(("filters", 3))
def test_gradient_pass_though(self, filters):
loss = ks.losses.MeanSquaredError()
optimizer = ks.optimizers.SGD()
with tf.device("/CPU:0"):
test_layer = nn_blocks.DarkConv(filters, kernel_size=(3, 3), padding="same")
init = tf.random_normal_initializer()
x = tf.Variable(initial_value=init(shape=(1, 224, 224,
3), dtype=tf.float32))
y = tf.Variable(
initial_value=init(shape=(1, 224, 224, filters), dtype=tf.float32))
with tf.GradientTape() as tape:
x_hat = test_layer(x)
grad_loss = loss(x_hat, y)
grad = tape.gradient(grad_loss, test_layer.trainable_variables)
optimizer.apply_gradients(zip(grad, test_layer.trainable_variables))
self.assertNotIn(None, grad)
class DarkResidualTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(("same", 224, 224, 64, False),
("downsample", 223, 223, 32, True),
("oddball", 223, 223, 32, False))
def test_pass_through(self, width, height, filters, downsample):
mod = 1
if downsample:
mod = 2
x = ks.Input(shape=(width, height, filters))
test_layer = nn_blocks.DarkResidual(filters=filters, downsample=downsample)
outx = test_layer(x)
print(outx)
print(outx.shape.as_list())
self.assertAllEqual(
outx.shape.as_list(),
[None, np.ceil(width / mod),
np.ceil(height / mod), filters])
@parameterized.named_parameters(("same", 64, 224, 224, False),
("downsample", 32, 223, 223, True),
("oddball", 32, 223, 223, False))
def test_gradient_pass_though(self, filters, width, height, downsample):
loss = ks.losses.MeanSquaredError()
optimizer = ks.optimizers.SGD()
test_layer = nn_blocks.DarkResidual(filters, downsample=downsample)
if downsample:
mod = 2
else:
mod = 1
init = tf.random_normal_initializer()
x = tf.Variable(
initial_value=init(shape=(1, width, height, filters), dtype=tf.float32))
y = tf.Variable(initial_value=init(shape=(1, int(np.ceil(width / mod)),
int(np.ceil(height / mod)),
filters),
dtype=tf.float32))
with tf.GradientTape() as tape:
x_hat = test_layer(x)
grad_loss = loss(x_hat, y)
grad = tape.gradient(grad_loss, test_layer.trainable_variables)
optimizer.apply_gradients(zip(grad, test_layer.trainable_variables))
self.assertNotIn(None, grad)
class DarkTinyTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(("middle", 224, 224, 64, 2),
("last", 224, 224, 1024, 1))
def test_pass_through(self, width, height, filters, strides):
x = ks.Input(shape=(width, height, filters))
test_layer = nn_blocks.DarkTiny(filters=filters, strides=strides)
outx = test_layer(x)
self.assertEqual(width % strides, 0, msg="width % strides != 0")
self.assertEqual(height % strides, 0, msg="height % strides != 0")
self.assertAllEqual(outx.shape.as_list(),
[None, width // strides, height // strides, filters])
@parameterized.named_parameters(("middle", 224, 224, 64, 2),
("last", 224, 224, 1024, 1))
def test_gradient_pass_though(self, width, height, filters, strides):
loss = ks.losses.MeanSquaredError()
optimizer = ks.optimizers.SGD()
test_layer = nn_blocks.DarkTiny(filters=filters, strides=strides)
init = tf.random_normal_initializer()
x = tf.Variable(
initial_value=init(shape=(1, width, height, filters), dtype=tf.float32))
y = tf.Variable(initial_value=init(shape=(1, width // strides,
height // strides, filters),
dtype=tf.float32))
with tf.GradientTape() as tape:
x_hat = test_layer(x)
grad_loss = loss(x_hat, y)
grad = tape.gradient(grad_loss, test_layer.trainable_variables)
optimizer.apply_gradients(zip(grad, test_layer.trainable_variables))
self.assertNotIn(None, grad)
if __name__ == "__main__":
tf.test.main()
......@@ -23,30 +23,12 @@ from official.vision.beta.projects.yolo.configs import darknet_classification as
from official.vision.beta.projects.yolo.dataloaders import classification_input as cli
from official.vision.beta.dataloaders import classification_input
from official.vision.beta.modeling import factory
from official.vision.beta.tasks import image_classification
@task_factory.register_task_cls(exp_cfg.ImageClassificationTask)
class ImageClassificationTask(base_task.Task):
class ImageClassificationTask(image_classification.ImageClassificationTask):
"""A task for image classification."""
def build_model(self):
"""Builds classification model."""
input_specs = tf.keras.layers.InputSpec(
shape=[None] + self.task_config.model.input_size)
l2_weight_decay = self.task_config.losses.l2_weight_decay
# Divide weight decay by 2.0 to match the implementation of tf.nn.l2_loss.
# (https://www.tensorflow.org/api_docs/python/tf/keras/regularizers/l2)
# (https://www.tensorflow.org/api_docs/python/tf/nn/l2_loss)
l2_regularizer = (tf.keras.regularizers.l2(
l2_weight_decay / 2.0) if l2_weight_decay else None)
model = factory.build_classification_model(
input_specs=input_specs,
model_config=self.task_config.model,
l2_regularizer=l2_regularizer)
return model
def build_inputs(self, params, input_context=None):
"""Builds classification input."""
......@@ -70,142 +52,6 @@ class ImageClassificationTask(base_task.Task):
parser_fn=parser.parse_fn(params.is_training))
dataset = reader.read(input_context=input_context)
return dataset
def build_losses(self, labels, model_outputs, aux_losses=None):
"""Sparse categorical cross entropy loss.
Args:
labels: labels.
model_outputs: Output logits of the classifier.
aux_losses: auxiliarly loss tensors, i.e. `losses` in keras.Model.
Returns:
The total loss tensor.
"""
losses_config = self.task_config.losses
if losses_config.one_hot:
total_loss = tf.keras.losses.categorical_crossentropy(
labels,
model_outputs,
from_logits=True,
label_smoothing=losses_config.label_smoothing)
else:
total_loss = tf.keras.losses.sparse_categorical_crossentropy(
labels, model_outputs, from_logits=True)
total_loss = tf_utils.safe_mean(total_loss)
if aux_losses:
total_loss += tf.add_n(aux_losses)
return total_loss
def build_metrics(self, training=True):
"""Gets streaming metrics for training/validation."""
if self.task_config.losses.one_hot:
metrics = [
tf.keras.metrics.CategoricalAccuracy(name='accuracy'),
tf.keras.metrics.TopKCategoricalAccuracy(k=5, name='top_5_accuracy')]
else:
metrics = [
tf.keras.metrics.SparseCategoricalAccuracy(name='accuracy'),
tf.keras.metrics.SparseTopKCategoricalAccuracy(
k=5, name='top_5_accuracy')]
return metrics
def train_step(self, inputs, model, optimizer, metrics=None):
"""Does forward and backward.
Args:
inputs: a dictionary of input tensors.
model: the model, forward pass definition.
optimizer: the optimizer for this training step.
metrics: a nested structure of metrics objects.
Returns:
A dictionary of logs.
"""
features, labels = inputs
if self.task_config.losses.one_hot:
labels = tf.one_hot(labels, self.task_config.model.num_classes)
num_replicas = tf.distribute.get_strategy().num_replicas_in_sync
with tf.GradientTape() as tape:
outputs = model(features, training=True)
# Casting output layer as float32 is necessary when mixed_precision is
# mixed_float16 or mixed_bfloat16 to ensure output is casted as float32.
outputs = tf.nest.map_structure(
lambda x: tf.cast(x, tf.float32), outputs)
# Computes per-replica loss.
loss = self.build_losses(
model_outputs=outputs, labels=labels, aux_losses=model.losses)
#Scales loss as the default gradients allreduce performs sum inside the
# optimizer.
scaled_loss = loss / num_replicas
# For mixed_precision policy, when LossScaleOptimizer is used, loss is
# scaled for numerical stability.
if isinstance(
optimizer, tf.keras.mixed_precision.experimental.LossScaleOptimizer):
scaled_loss = optimizer.get_scaled_loss(scaled_loss)
tf.print("batch loss: ", loss, end = "\r")
tvars = model.trainable_variables
grads = tape.gradient(scaled_loss, tvars)
# Scales back gradient before apply_gradients when LossScaleOptimizer is
# used.
if isinstance(
optimizer, tf.keras.mixed_precision.experimental.LossScaleOptimizer):
grads = optimizer.get_unscaled_gradients(grads)
# Apply gradient clipping.
if self.task_config.gradient_clip_norm > 0:
grads, _ = tf.clip_by_global_norm(
grads, self.task_config.gradient_clip_norm)
optimizer.apply_gradients(list(zip(grads, tvars)))
logs = {self.loss: loss}
if metrics:
self.process_metrics(metrics, labels, outputs)
logs.update({m.name: m.result() for m in metrics})
elif model.compiled_metrics:
self.process_compiled_metrics(model.compiled_metrics, labels, outputs)
logs.update({m.name: m.result() for m in model.metrics})
return logs
def validation_step(self, inputs, model, metrics=None):
"""Validatation step.
Args:
inputs: a dictionary of input tensors.
model: the keras.Model.
metrics: a nested structure of metrics objects.
Returns:
A dictionary of logs.
"""
features, labels = inputs
if self.task_config.losses.one_hot:
labels = tf.one_hot(labels, self.task_config.model.num_classes)
outputs = self.inference_step(features, model)
outputs = tf.nest.map_structure(lambda x: tf.cast(x, tf.float32), outputs)
loss = self.build_losses(model_outputs=outputs, labels=labels,
aux_losses=model.losses)
logs = {self.loss: loss}
if metrics:
self.process_metrics(metrics, labels, outputs)
logs.update({m.name: m.result() for m in metrics})
elif model.compiled_metrics:
self.process_compiled_metrics(model.compiled_metrics, labels, outputs)
logs.update({m.name: m.result() for m in model.metrics})
return logs
def inference_step(self, inputs, model):
"""Performs the forward step."""
return model(inputs, training=False)
......@@ -33,15 +33,10 @@ from official.modeling import performance
FLAGS = flags.FLAGS
'''
python3 -m official.vision.beta.projects.yolo.train --mode=train_and_eval --experiment=darknet_classification --model_dir=training_dir --config_file=official/vision/beta/projects/yolo/configs/experiments/darknet53.yaml
python3 -m official.vision.beta.projects.yolo.train --mode=train_and_eval --experiment=darknet_classification --model_dir=training_dir --config_file=official/vision/beta/projects/yolo/configs/experiments/darknet53_tfds.yaml
'''
def import_overrides():
print(sys.modules["official.vision.beta.configs.backbones"])
return
def main(_):
import_overrides()
gin.parse_config_files_and_bindings(FLAGS.gin_file, FLAGS.gin_params)
print(FLAGS.experiment)
params = train_utils.parse_configuration(FLAGS)
......
......@@ -6,10 +6,10 @@ runtime:
distribution_strategy: mirrored
enable_xla: false
gpu_thread_mode: null
loss_scale: dynamic
mixed_precision_dtype: float16
loss_scale: null
mixed_precision_dtype: float32
num_cores_per_replica: 1
num_gpus: 2
num_gpus: 0
num_packs: 1
per_gpu_thread_count: 0
run_eagerly: false
......@@ -46,19 +46,19 @@ task:
drop_remainder: true
dtype: float16
enable_tf_data_service: false
global_batch_size: 16
input_path: ''
global_batch_size: 128
input_path: imagenet-2012-tfrecord/train*
is_training: true
sharding: true
shuffle_buffer_size: 100
shuffle_buffer_size: 10000
tf_data_service_address: null
tf_data_service_job_name: null
tfds_as_supervised: false
tfds_data_dir: ~/tensorflow_datasets/
tfds_download: true
tfds_name: imagenet2012
tfds_data_dir: ''
tfds_download: false
tfds_name: ''
tfds_skip_decoding_feature: ''
tfds_split: train
tfds_split: ''
validation_data:
block_length: 1
cache: false
......@@ -67,19 +67,19 @@ task:
drop_remainder: false
dtype: float16
enable_tf_data_service: false
global_batch_size: 16
input_path: ''
global_batch_size: 128
input_path: imagenet-2012-tfrecord/valid*
is_training: true
sharding: true
shuffle_buffer_size: 100
shuffle_buffer_size: 10000
tf_data_service_address: null
tf_data_service_job_name: null
tfds_as_supervised: false
tfds_data_dir: ~/tensorflow_datasets/
tfds_download: true
tfds_name: imagenet2012
tfds_data_dir: ''
tfds_download: false
tfds_name: ''
tfds_skip_decoding_feature: ''
tfds_split: validation
tfds_split: ''
trainer:
allow_tpu_summary: false
best_checkpoint_eval_metric: ''
......@@ -94,9 +94,9 @@ trainer:
learning_rate:
polynomial:
cycle: false
decay_steps: 6392000
end_learning_rate: 1.25e-05
initial_learning_rate: 0.0125
decay_steps: 799000
end_learning_rate: 0.0001
initial_learning_rate: 0.1
name: PolynomialDecay
power: 4.0
type: polynomial
......@@ -113,12 +113,12 @@ trainer:
linear:
name: linear
warmup_learning_rate: 0
warmup_steps: 8000
warmup_steps: 1000
type: linear
steps_per_loop: 10000
summary_interval: 10000
train_steps: 6400000
train_steps: 800000
train_tf_function: true
train_tf_while_loop: true
validation_interval: 10000
validation_steps: 3200
validation_steps: 400
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment