Commit 8de66223 authored by maming's avatar maming
Browse files

Initial commit

parents
Pipeline #3358 canceled with stages
from keras.layers import Layer, InputSpec
from keras import initializers, regularizers, constraints
import keras.backend as K
from keras_contrib.utils.test_utils import to_tuple
class PELU(Layer):
"""Parametric Exponential Linear Unit.
It follows:
`f(x) = alphas * (exp(x / betas) - 1) for x < 0`,
`f(x) = (alphas / betas) * x for x >= 0`,
where `alphas` & `betas` are learned arrays with the same shape as x.
# Input shape
Arbitrary. Use the keyword argument `input_shape`
(tuple of integers, does not include the samples axis)
when using this layer as the first layer in a model.
# Output shape
Same shape as the input.
# Arguments
alphas_initializer: initialization function for the alpha variable weights.
betas_initializer: initialization function for the beta variable weights.
weights: initial weights, as a list of a single Numpy array.
shared_axes: the axes along which to share learnable
parameters for the activation function.
For example, if the incoming feature maps
are from a 2D convolution
with output shape `(batch, height, width, channels)`,
and you wish to share parameters across space
so that each filter only has one set of parameters,
set `shared_axes=[1, 2]`.
# References
- [Parametric exponential linear unit for deep convolutional neural networks](
https://arxiv.org/abs/1605.09332v3)
"""
def __init__(self, alpha_initializer='ones',
alpha_regularizer=None,
alpha_constraint=None,
beta_initializer='ones',
beta_regularizer=None,
beta_constraint=None,
shared_axes=None,
**kwargs):
super(PELU, self).__init__(**kwargs)
self.supports_masking = True
self.alpha_initializer = initializers.get(alpha_initializer)
self.alpha_regularizer = regularizers.get(alpha_regularizer)
self.alpha_constraint = constraints.get(alpha_constraint)
self.beta_initializer = initializers.get(beta_initializer)
self.beta_regularizer = regularizers.get(beta_regularizer)
self.beta_constraint = constraints.get(beta_constraint)
if shared_axes is None:
self.shared_axes = None
elif not isinstance(shared_axes, (list, tuple)):
self.shared_axes = [shared_axes]
else:
self.shared_axes = list(shared_axes)
def build(self, input_shape):
input_shape = to_tuple(input_shape)
param_shape = list(input_shape[1:])
self.param_broadcast = [False] * len(param_shape)
if self.shared_axes is not None:
for i in self.shared_axes:
param_shape[i - 1] = 1
self.param_broadcast[i - 1] = True
param_shape = tuple(param_shape)
# Initialised as ones to emulate the default ELU
self.alpha = self.add_weight(shape=param_shape,
name='alpha',
initializer=self.alpha_initializer,
regularizer=self.alpha_regularizer,
constraint=self.alpha_constraint)
self.beta = self.add_weight(shape=param_shape,
name='beta',
initializer=self.beta_initializer,
regularizer=self.beta_regularizer,
constraint=self.beta_constraint)
# Set input spec
axes = {}
if self.shared_axes:
for i in range(1, len(input_shape)):
if i not in self.shared_axes:
axes[i] = input_shape[i]
self.input_spec = InputSpec(ndim=len(input_shape), axes=axes)
self.built = True
def call(self, x, mask=None):
if K.backend() == 'theano':
pos = K.relu(x) * (K.pattern_broadcast(self.alpha, self.param_broadcast) /
K.pattern_broadcast(self.beta, self.param_broadcast))
neg = (K.pattern_broadcast(self.alpha, self.param_broadcast) *
(K.exp((-K.relu(-x))
/ K.pattern_broadcast(self.beta, self.param_broadcast)) - 1))
else:
pos = K.relu(x) * self.alpha / self.beta
neg = self.alpha * (K.exp((-K.relu(-x)) / self.beta) - 1)
return neg + pos
def get_config(self):
config = {
'alpha_initializer': initializers.serialize(self.alpha_initializer),
'alpha_regularizer': regularizers.serialize(self.alpha_regularizer),
'alpha_constraint': constraints.serialize(self.alpha_constraint),
'beta_initializer': initializers.serialize(self.beta_initializer),
'beta_regularizer': regularizers.serialize(self.beta_regularizer),
'beta_constraint': constraints.serialize(self.beta_constraint),
'shared_axes': self.shared_axes
}
base_config = super(PELU, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def compute_output_shape(self, input_shape):
return input_shape
import keras.backend as K
from keras.layers import Layer
class SineReLU(Layer):
"""Sine Rectified Linear Unit to generate oscilations.
It allows an oscilation in the gradients when the weights are negative.
The oscilation can be controlled with a parameter, which makes it be close
or equal to zero. The functional is diferentiable at any point due to
its derivative.
For instance, at 0, the derivative of 'sin(0) - cos(0)'
is 'cos(0) + sin(0)' which is 1.
# Input shape
Arbitrary. Use the keyword argument `input_shape`
(tuple of integers, does not include the samples axis)
when using this layer as the first layer in a model.
# Output shape
Same shape as the input.
# Arguments
epsilon: float. Hyper-parameter used to control the amplitude of the
sinusoidal wave when weights are negative.
The default value, 0.0025, since it works better for CNN layers and
those are the most used layers nowadays.
When using Dense Networks, try something around 0.006.
# References:
- [SineReLU: An Alternative to the ReLU Activation Function](
https://medium.com/@wilder.rodrigues/sinerelu-an-alternative-to-the-relu-activation-function-e46a6199997d).
This function was
first introduced at the Codemotion Amsterdam 2018 and then at
the DevDays, in Vilnius, Lithuania.
It has been extensively tested with Deep Nets, CNNs,
LSTMs, Residual Nets and GANs, based
on the MNIST, Kaggle Toxicity and IMDB datasets.
# Performance:
- Fashion MNIST
* Mean of 6 runs per Activation Function
* Fully Connection Network
- SineReLU: loss mean -> 0.3522; accuracy mean -> 89.18;
mean of std loss -> 0.08375204467435822
- LeakyReLU: loss mean-> 0.3553; accuracy mean -> 88.98;
mean of std loss -> 0.0831161868455245
- ReLU: loss mean -> 0.3519; accuracy mean -> 88.84;
mean of std loss -> 0.08358816501301362
* Convolutional Neural Network
- SineReLU: loss mean -> 0.2180; accuracy mean -> 92.49;
mean of std loss -> 0.0781155784858847
- LeakyReLU: loss mean -> 0.2205; accuracy mean -> 92.37;
mean of std loss -> 0.09273670474788205
- ReLU: loss mean -> 0.2144; accuracy mean -> 92.45;
mean of std loss -> 0.09396114585977
- MNIST
* Mean of 6 runs per Activation Function
* Fully Connection Network
- SineReLU: loss mean -> 0.0623; accuracy mean -> 98.53;
mean of std loss -> 0.06012015231824904
- LeakyReLU: loss mean-> 0.0623; accuracy mean -> 98.50;
mean of std loss -> 0.06052147632835356
- ReLU: loss mean -> 0.0605; accuracy mean -> 98.49;
mean of std loss -> 0.059599885665016096
* Convolutional Neural Network
- SineReLU: loss mean -> 0.0198; accuracy mean -> 99.51;
mean of std loss -> 0.0425338329550847
- LeakyReLU: loss mean -> 0.0216; accuracy mean -> 99.40;
mean of std loss -> 0.04834468835196667
- ReLU: loss mean -> 0.0185; accuracy mean -> 99.49;
mean of std loss -> 0.05503719489690131
# Jupyter Notebooks
- https://github.com/ekholabs/DLinK/blob/master/notebooks/keras
# Examples
The Advanced Activation function SineReLU have to be imported from the
keras_contrib.layers package.
To see full source-code of this architecture and other examples,
please follow this link: https://github.com/ekholabs/DLinK
```python
model = Sequential()
model.add(Dense(128, input_shape = (784,)))
model.add(SineReLU())
model.add(Dropout(0.2))
model.add(Dense(256))
model.add(SineReLU())
model.add(Dropout(0.3))
model.add(Dense(1024))
model.add(SineReLU())
model.add(Dropout(0.5))
model.add(Dense(10, activation = 'softmax'))
```
"""
def __init__(self, epsilon=0.0025, **kwargs):
super(SineReLU, self).__init__(**kwargs)
self.supports_masking = True
self.epsilon = K.cast_to_floatx(epsilon)
def call(self, Z):
m = self.epsilon * (K.sin(Z) - K.cos(Z))
A = K.maximum(m, Z)
return A
def get_config(self):
config = {'epsilon': float(self.epsilon)}
base_config = super(SineReLU, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def compute_output_shape(self, input_shape):
return input_shape
from keras.layers import Layer, InputSpec
from keras import initializers
import keras.backend as K
from keras_contrib.utils.test_utils import to_tuple
class SReLU(Layer):
"""S-shaped Rectified Linear Unit.
It follows:
`f(x) = t^r + a^r(x - t^r) for x >= t^r`,
`f(x) = x for t^r > x > t^l`,
`f(x) = t^l + a^l(x - t^l) for x <= t^l`.
# Input shape
Arbitrary. Use the keyword argument `input_shape`
(tuple of integers, does not include the samples axis)
when using this layer as the first layer in a model.
# Output shape
Same shape as the input.
# Arguments
t_left_initializer: initializer function for the left part intercept
a_left_initializer: initializer function for the left part slope
t_right_initializer: initializer function for the right part intercept
a_right_initializer: initializer function for the right part slope
shared_axes: the axes along which to share learnable
parameters for the activation function.
For example, if the incoming feature maps
are from a 2D convolution
with output shape `(batch, height, width, channels)`,
and you wish to share parameters across space
so that each filter only has one set of parameters,
set `shared_axes=[1, 2]`.
# References
- [Deep Learning with S-shaped Rectified Linear Activation Units](
http://arxiv.org/abs/1512.07030)
"""
def __init__(self, t_left_initializer='zeros',
a_left_initializer=initializers.RandomUniform(minval=0, maxval=1),
t_right_initializer=initializers.RandomUniform(minval=0, maxval=5),
a_right_initializer='ones',
shared_axes=None,
**kwargs):
super(SReLU, self).__init__(**kwargs)
self.supports_masking = True
self.t_left_initializer = initializers.get(t_left_initializer)
self.a_left_initializer = initializers.get(a_left_initializer)
self.t_right_initializer = initializers.get(t_right_initializer)
self.a_right_initializer = initializers.get(a_right_initializer)
if shared_axes is None:
self.shared_axes = None
elif not isinstance(shared_axes, (list, tuple)):
self.shared_axes = [shared_axes]
else:
self.shared_axes = list(shared_axes)
def build(self, input_shape):
input_shape = to_tuple(input_shape)
param_shape = list(input_shape[1:])
self.param_broadcast = [False] * len(param_shape)
if self.shared_axes is not None:
for i in self.shared_axes:
param_shape[i - 1] = 1
self.param_broadcast[i - 1] = True
param_shape = tuple(param_shape)
self.t_left = self.add_weight(shape=param_shape,
name='t_left',
initializer=self.t_left_initializer)
self.a_left = self.add_weight(shape=param_shape,
name='a_left',
initializer=self.a_left_initializer)
self.t_right = self.add_weight(shape=param_shape,
name='t_right',
initializer=self.t_right_initializer)
self.a_right = self.add_weight(shape=param_shape,
name='a_right',
initializer=self.a_right_initializer)
# Set input spec
axes = {}
if self.shared_axes:
for i in range(1, len(input_shape)):
if i not in self.shared_axes:
axes[i] = input_shape[i]
self.input_spec = InputSpec(ndim=len(input_shape), axes=axes)
self.built = True
def call(self, x, mask=None):
# ensure the the right part is always to the right of the left
t_right_actual = self.t_left + K.abs(self.t_right)
if K.backend() == 'theano':
t_left = K.pattern_broadcast(self.t_left, self.param_broadcast)
a_left = K.pattern_broadcast(self.a_left, self.param_broadcast)
a_right = K.pattern_broadcast(self.a_right, self.param_broadcast)
t_right_actual = K.pattern_broadcast(t_right_actual,
self.param_broadcast)
else:
t_left = self.t_left
a_left = self.a_left
a_right = self.a_right
y_left_and_center = t_left + K.relu(x - t_left,
a_left,
t_right_actual - t_left)
y_right = K.relu(x - t_right_actual) * a_right
return y_left_and_center + y_right
def get_config(self):
config = {
't_left_initializer': self.t_left_initializer,
'a_left_initializer': self.a_left_initializer,
't_right_initializer': self.t_right_initializer,
'a_right_initializer': self.a_right_initializer,
'shared_axes': self.shared_axes
}
base_config = super(SReLU, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def compute_output_shape(self, input_shape):
return input_shape
from keras import backend as K
from keras.layers import Layer
class Swish(Layer):
""" Swish (Ramachandranet al., 2017)
# Input shape
Arbitrary. Use the keyword argument `input_shape`
(tuple of integers, does not include the samples axis)
when using this layer as the first layer in a model.
# Output shape
Same shape as the input.
# Arguments
beta: float >= 0. Scaling factor
if set to 1 and trainable set to False (default),
Swish equals the SiLU activation (Elfwing et al., 2017)
trainable: whether to learn the scaling factor during training or not
# References
- [Searching for Activation Functions](https://arxiv.org/abs/1710.05941)
- [Sigmoid-weighted linear units for neural network function
approximation in reinforcement learning](https://arxiv.org/abs/1702.03118)
"""
def __init__(self, beta=1.0, trainable=False, **kwargs):
super(Swish, self).__init__(**kwargs)
self.supports_masking = True
self.beta = beta
self.trainable = trainable
def build(self, input_shape):
self.scaling_factor = K.variable(self.beta,
dtype=K.floatx(),
name='scaling_factor')
if self.trainable:
self._trainable_weights.append(self.scaling_factor)
super(Swish, self).build(input_shape)
def call(self, inputs, mask=None):
return inputs * K.sigmoid(self.scaling_factor * inputs)
def get_config(self):
config = {'beta': self.get_weights()[0] if self.trainable else self.beta,
'trainable': self.trainable}
base_config = super(Swish, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def compute_output_shape(self, input_shape):
return input_shape
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from keras import backend as K
from keras import activations
from keras import regularizers
from keras import initializers
from keras import constraints
from keras.layers import Layer
from keras_contrib.utils.test_utils import to_tuple
class Capsule(Layer):
"""Capsule Layer implementation in Keras
This implementation is based on Dynamic Routing of Capsules,
Geoffrey Hinton et. al.
The Capsule Layer is a Neural Network Layer which helps
modeling relationships in image and sequential data better
than just CNNs or RNNs. It achieves this by understanding
the spatial relationships between objects (in images)
or words (in text) by encoding additional information
about the image or text, such as angle of rotation,
thickness and brightness, relative proportions etc.
This layer can be used instead of pooling layers to
lower dimensions and still capture important information
about the relationships and structures within the data.
A normal pooling layer would lose a lot of
this information.
This layer can be used on the output of any layer
which has a 3-D output (including batch_size). For example,
in image classification, it can be used on the output of a
Conv2D layer for Computer Vision applications. Also,
it can be used on the output of a GRU or LSTM Layer
(Bidirectional or Unidirectional) for NLP applications.
The default activation function is 'linear'. But, this layer
is generally used with the 'squash' activation function
(recommended). To use the squash activation function, do :
from keras_contrib.activations import squash
capsule = Capsule(num_capsule=10,
dim_capsule=10,
routings=3,
share_weights=True,
activation=squash)
# Example usage :
1). COMPUTER VISION
input_image = Input(shape=(None, None, 3))
conv_2d = Conv2D(64,
(3, 3),
activation='relu')(input_image)
capsule = Capsule(num_capsule=10,
dim_capsule=16,
routings=3,
activation='relu',
share_weights=True)(conv_2d)
2). NLP
maxlen = 72
max_features = 120000
input_text = Input(shape=(maxlen,))
embedding = Embedding(max_features,
embed_size,
weights=[embedding_matrix],
trainable=False)(input_text)
bi_gru = Bidirectional(GRU(64,
return_seqeunces=True))(embedding)
capsule = Capsule(num_capsule=5,
dim_capsule=5,
routings=4,
activation='sigmoid',
share_weights=True)(bi_gru)
# Arguments
num_capsule : Number of Capsules (int)
dim_capsules : Dimensions of the vector output of each Capsule (int)
routings : Number of dynamic routings in the Capsule Layer (int)
share_weights : Whether to share weights between Capsules or not
(boolean)
activation : Activation function for the Capsules
regularizer : Regularizer for the weights of the Capsules
initializer : Initializer for the weights of the Caspules
constraint : Constraint for the weights of the Capsules
# Input shape
3D tensor with shape:
(batch_size, input_num_capsule, input_dim_capsule)
[any 3-D Tensor with the first dimension as batch_size]
# Output shape
3D tensor with shape:
(batch_size, num_capsule, dim_capsule)
# References
- [Dynamic-Routing-Between-Capsules]
(https://arxiv.org/pdf/1710.09829.pdf)
- [Keras-Examples-CIFAR10-CNN-Capsule]"""
def __init__(self,
num_capsule,
dim_capsule,
routings=3,
share_weights=True,
initializer='glorot_uniform',
activation=None,
regularizer=None,
constraint=None,
**kwargs):
super(Capsule, self).__init__(**kwargs)
self.num_capsule = num_capsule
self.dim_capsule = dim_capsule
self.routings = routings
self.share_weights = share_weights
self.activation = activations.get(activation)
self.regularizer = regularizers.get(regularizer)
self.initializer = initializers.get(initializer)
self.constraint = constraints.get(constraint)
def build(self, input_shape):
input_shape = to_tuple(input_shape)
input_dim_capsule = input_shape[-1]
if self.share_weights:
self.W = self.add_weight(name='capsule_kernel',
shape=(1,
input_dim_capsule,
self.num_capsule *
self.dim_capsule),
initializer=self.initializer,
regularizer=self.regularizer,
constraint=self.constraint,
trainable=True)
else:
input_num_capsule = input_shape[-2]
self.W = self.add_weight(name='capsule_kernel',
shape=(input_num_capsule,
input_dim_capsule,
self.num_capsule *
self.dim_capsule),
initializer=self.initializer,
regularizer=self.regularizer,
constraint=self.constraint,
trainable=True)
self.build = True
def call(self, inputs):
if self.share_weights:
u_hat_vectors = K.conv1d(inputs, self.W)
else:
u_hat_vectors = K.local_conv1d(inputs, self.W, [1], [1])
# u_hat_vectors : The spatially transformed input vectors (with local_conv_1d)
batch_size = K.shape(inputs)[0]
input_num_capsule = K.shape(inputs)[1]
u_hat_vectors = K.reshape(u_hat_vectors, (batch_size,
input_num_capsule,
self.num_capsule,
self.dim_capsule))
u_hat_vectors = K.permute_dimensions(u_hat_vectors, (0, 2, 1, 3))
routing_weights = K.zeros_like(u_hat_vectors[:, :, :, 0])
for i in range(self.routings):
capsule_weights = K.softmax(routing_weights, 1)
outputs = K.batch_dot(capsule_weights, u_hat_vectors, [2, 2])
if K.ndim(outputs) == 4:
outputs = K.sum(outputs, axis=1)
if i < self.routings - 1:
outputs = K.l2_normalize(outputs, -1)
routing_weights = K.batch_dot(outputs, u_hat_vectors, [2, 3])
if K.ndim(routing_weights) == 4:
routing_weights = K.sum(routing_weights, axis=1)
return self.activation(outputs)
def compute_output_shape(self, input_shape):
return (None, self.num_capsule, self.dim_capsule)
def get_config(self):
config = {'num_capsule': self.num_capsule,
'dim_capsule': self.dim_capsule,
'routings': self.routings,
'share_weights': self.share_weights,
'activation': activations.serialize(self.activation),
'regularizer': regularizers.serialize(self.regularizer),
'initializer': initializers.serialize(self.initializer),
'constraint': constraints.serialize(self.constraint)}
base_config = super(Capsule, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from functools import partial
from keras import backend as K
from keras_contrib import backend as KC
from keras import activations
from keras import initializers
from keras import regularizers
from keras import constraints
from keras.layers import Layer
from keras.layers import InputSpec
from keras_contrib.utils.conv_utils import conv_output_length
from keras_contrib.utils.conv_utils import normalize_data_format
from keras_contrib.utils.test_utils import to_tuple
import numpy as np
class CosineConvolution2D(Layer):
"""Cosine Normalized Convolution operator for filtering
windows of two-dimensional inputs.
# Examples
```python
# apply a 3x3 convolution with 64 output filters on a 256x256 image:
model = Sequential()
model.add(CosineConvolution2D(64, 3, 3,
padding='same',
input_shape=(3, 256, 256)))
# now model.output_shape == (None, 64, 256, 256)
# add a 3x3 convolution on top, with 32 output filters:
model.add(CosineConvolution2D(32, 3, 3, padding='same'))
# now model.output_shape == (None, 32, 256, 256)
```
# Arguments
filters: Number of convolution filters to use.
kernel_size: kernel_size: An integer or tuple/list of
2 integers, specifying the
dimensions of the convolution window.
init: name of initialization function for the weights of the layer
(see [initializers](https://keras.io/initializers)), or alternatively,
Theano function to use for weights initialization.
This parameter is only relevant if you don't pass
a `weights` argument.
activation: name of activation function to use
(see [activations](https://keras.io/activations)),
or alternatively, elementwise Theano function.
If you don't specify anything, no activation is applied
(ie. "linear" activation: a(x) = x).
weights: list of numpy arrays to set as initial weights.
padding: 'valid', 'same' or 'full'
('full' requires the Theano backend).
strides: tuple of length 2. Factor by which to strides output.
Also called strides elsewhere.
kernel_regularizer: instance of [WeightRegularizer](
https://keras.io/regularizers)
(eg. L1 or L2 regularization), applied to the main weights matrix.
bias_regularizer: instance of [WeightRegularizer](
https://keras.io/regularizers), applied to the use_bias.
activity_regularizer: instance of [ActivityRegularizer](
https://keras.io/regularizers), applied to the network output.
kernel_constraint: instance of the [constraints](
https://keras.io/constraints) module
(eg. maxnorm, nonneg), applied to the main weights matrix.
bias_constraint: instance of the [constraints](
https://keras.io/constraints) module, applied to the use_bias.
data_format: 'channels_first' or 'channels_last'.
In 'channels_first' mode, the channels dimension
(the depth) is at index 1, in 'channels_last' mode is it at index 3.
It defaults to the `image_data_format` value found in your
Keras config file at `~/.keras/keras.json`.
If you never set it, then it will be `'channels_last'`.
use_bias: whether to include a use_bias
(i.e. make the layer affine rather than linear).
# Input shape
4D tensor with shape:
`(samples, channels, rows, cols)` if data_format='channels_first'
or 4D tensor with shape:
`(samples, rows, cols, channels)` if data_format='channels_last'.
# Output shape
4D tensor with shape:
`(samples, filters, nekernel_rows, nekernel_cols)`
if data_format='channels_first'
or 4D tensor with shape:
`(samples, nekernel_rows, nekernel_cols, filters)`
if data_format='channels_last'.
`rows` and `cols` values might have changed due to padding.
# References
- [Cosine Normalization: Using Cosine Similarity Instead
of Dot Product in Neural Networks](https://arxiv.org/pdf/1702.05870.pdf)
"""
def __init__(self, filters, kernel_size,
kernel_initializer='glorot_uniform', activation=None, weights=None,
padding='valid', strides=(1, 1), data_format=None,
kernel_regularizer=None, bias_regularizer=None,
activity_regularizer=None,
kernel_constraint=None, bias_constraint=None,
use_bias=True, **kwargs):
if data_format is None:
data_format = K.image_data_format()
if padding not in {'valid', 'same', 'full'}:
raise ValueError('Invalid border mode for CosineConvolution2D:', padding)
self.filters = filters
self.kernel_size = kernel_size
self.nb_row, self.nb_col = self.kernel_size
self.kernel_initializer = initializers.get(kernel_initializer)
self.activation = activations.get(activation)
self.padding = padding
self.strides = tuple(strides)
self.data_format = normalize_data_format(data_format)
self.kernel_regularizer = regularizers.get(kernel_regularizer)
self.bias_regularizer = regularizers.get(bias_regularizer)
self.activity_regularizer = regularizers.get(activity_regularizer)
self.kernel_constraint = constraints.get(kernel_constraint)
self.bias_constraint = constraints.get(bias_constraint)
self.use_bias = use_bias
self.input_spec = [InputSpec(ndim=4)]
self.initial_weights = weights
super(CosineConvolution2D, self).__init__(**kwargs)
def build(self, input_shape):
input_shape = to_tuple(input_shape)
if self.data_format == 'channels_first':
stack_size = input_shape[1]
self.kernel_shape = (self.filters, stack_size, self.nb_row, self.nb_col)
self.kernel_norm_shape = (1, stack_size, self.nb_row, self.nb_col)
elif self.data_format == 'channels_last':
stack_size = input_shape[3]
self.kernel_shape = (self.nb_row, self.nb_col, stack_size, self.filters)
self.kernel_norm_shape = (self.nb_row, self.nb_col, stack_size, 1)
else:
raise ValueError('Invalid data_format:', self.data_format)
self.W = self.add_weight(shape=self.kernel_shape,
initializer=partial(self.kernel_initializer),
name='{}_W'.format(self.name),
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint)
kernel_norm_name = '{}_kernel_norm'.format(self.name)
self.kernel_norm = K.variable(np.ones(self.kernel_norm_shape),
name=kernel_norm_name)
if self.use_bias:
self.b = self.add_weight(shape=(self.filters,),
initializer='zero',
name='{}_b'.format(self.name),
regularizer=self.bias_regularizer,
constraint=self.bias_constraint)
else:
self.b = None
if self.initial_weights is not None:
self.set_weights(self.initial_weights)
del self.initial_weights
self.built = True
def compute_output_shape(self, input_shape):
if self.data_format == 'channels_first':
rows = input_shape[2]
cols = input_shape[3]
elif self.data_format == 'channels_last':
rows = input_shape[1]
cols = input_shape[2]
else:
raise ValueError('Invalid data_format:', self.data_format)
rows = conv_output_length(rows, self.nb_row,
self.padding, self.strides[0])
cols = conv_output_length(cols, self.nb_col,
self.padding, self.strides[1])
if self.data_format == 'channels_first':
return input_shape[0], self.filters, rows, cols
elif self.data_format == 'channels_last':
return input_shape[0], rows, cols, self.filters
def call(self, x, mask=None):
b, xb = 0., 0.
if self.data_format == 'channels_first':
kernel_sum_axes = [1, 2, 3]
if self.use_bias:
b = K.reshape(self.b, (self.filters, 1, 1, 1))
xb = 1.
elif self.data_format == 'channels_last':
kernel_sum_axes = [0, 1, 2]
if self.use_bias:
b = K.reshape(self.b, (1, 1, 1, self.filters))
xb = 1.
tmp = K.sum(K.square(self.W), axis=kernel_sum_axes, keepdims=True)
Wnorm = K.sqrt(tmp + K.square(b) + K.epsilon())
tmp = KC.conv2d(K.square(x), self.kernel_norm, strides=self.strides,
padding=self.padding,
data_format=self.data_format,
filter_shape=self.kernel_norm_shape)
xnorm = K.sqrt(tmp + xb + K.epsilon())
W = self.W / Wnorm
output = KC.conv2d(x, W, strides=self.strides,
padding=self.padding,
data_format=self.data_format,
filter_shape=self.kernel_shape)
if K.backend() == 'theano':
xnorm = K.pattern_broadcast(xnorm, [False, True, False, False])
output /= xnorm
if self.use_bias:
b /= Wnorm
if self.data_format == 'channels_first':
b = K.reshape(b, (1, self.filters, 1, 1))
elif self.data_format == 'channels_last':
b = K.reshape(b, (1, 1, 1, self.filters))
else:
raise ValueError('Invalid data_format:', self.data_format)
b /= xnorm
output += b
output = self.activation(output)
return output
def get_config(self):
config = {
'filters': self.filters,
'kernel_size': self.kernel_size,
'kernel_initializer': initializers.serialize(self.kernel_initializer),
'activation': activations.serialize(self.activation),
'padding': self.padding,
'strides': self.strides,
'data_format': self.data_format,
'kernel_regularizer': regularizers.serialize(self.kernel_regularizer),
'bias_regularizer': regularizers.serialize(self.bias_regularizer),
'activity_regularizer':
regularizers.serialize(self.activity_regularizer),
'kernel_constraint': constraints.serialize(self.kernel_constraint),
'bias_constraint': constraints.serialize(self.bias_constraint),
'use_bias': self.use_bias}
base_config = super(CosineConvolution2D, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
CosineConv2D = CosineConvolution2D
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from keras.layers import Layer
from keras_contrib import backend as KC
from keras_contrib.utils.conv_utils import normalize_data_format
class SubPixelUpscaling(Layer):
""" Sub-pixel convolutional upscaling layer.
This layer requires a Convolution2D prior to it,
having output filters computed according to
the formula :
filters = k * (scale_factor * scale_factor)
where k = a user defined number of filters (generally larger than 32)
scale_factor = the upscaling factor (generally 2)
This layer performs the depth to space operation on
the convolution filters, and returns a
tensor with the size as defined below.
# Example :
```python
# A standard subpixel upscaling block
x = Convolution2D(256, 3, 3, padding='same', activation='relu')(...)
u = SubPixelUpscaling(scale_factor=2)(x)
# Optional
x = Convolution2D(256, 3, 3, padding='same', activation='relu')(u)
```
In practice, it is useful to have a second convolution layer after the
SubPixelUpscaling layer to speed up the learning process.
However, if you are stacking multiple
SubPixelUpscaling blocks, it may increase
the number of parameters greatly, so the
Convolution layer after SubPixelUpscaling
layer can be removed.
# Arguments
scale_factor: Upscaling factor.
data_format: Can be None, 'channels_first' or 'channels_last'.
# Input shape
4D tensor with shape:
`(samples, k * (scale_factor * scale_factor) channels, rows, cols)`
if data_format='channels_first'
or 4D tensor with shape:
`(samples, rows, cols, k * (scale_factor * scale_factor) channels)`
if data_format='channels_last'.
# Output shape
4D tensor with shape:
`(samples, k channels, rows * scale_factor, cols * scale_factor))`
if data_format='channels_first'
or 4D tensor with shape:
`(samples, rows * scale_factor, cols * scale_factor, k channels)`
if data_format='channels_last'.
# References
- [Real-Time Single Image and Video Super-Resolution Using an
Efficient Sub-Pixel Convolutional Neural Network](
https://arxiv.org/abs/1609.05158)
"""
def __init__(self, scale_factor=2, data_format=None, **kwargs):
super(SubPixelUpscaling, self).__init__(**kwargs)
self.scale_factor = scale_factor
self.data_format = normalize_data_format(data_format)
def build(self, input_shape):
pass
def call(self, x, mask=None):
y = KC.depth_to_space(x, self.scale_factor, self.data_format)
return y
def compute_output_shape(self, input_shape):
if self.data_format == 'channels_first':
b, k, r, c = input_shape
new_k = k // (self.scale_factor ** 2)
new_r = r * self.scale_factor
new_c = c * self.scale_factor
return b, new_k, new_r, new_c
else:
b, r, c, k = input_shape
new_r = r * self.scale_factor
new_c = c * self.scale_factor
new_k = k // (self.scale_factor ** 2)
return b, new_r, new_c, new_k
def get_config(self):
config = {'scale_factor': self.scale_factor,
'data_format': self.data_format}
base_config = super(SubPixelUpscaling, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from keras import backend as K
from keras import activations
from keras import initializers
from keras import regularizers
from keras import constraints
from keras.layers import InputSpec
from keras.layers import Layer
from keras_contrib.utils.test_utils import to_tuple
class CosineDense(Layer):
"""A cosine normalized densely-connected NN layer
# Example
```python
# as first layer in a sequential model:
model = Sequential()
model.add(CosineDense(32, input_dim=16))
# now the model will take as input arrays of shape (*, 16)
# and output arrays of shape (*, 32)
# this is equivalent to the above:
model = Sequential()
model.add(CosineDense(32, input_shape=(16,)))
# after the first layer, you don't need to specify
# the size of the input anymore:
model.add(CosineDense(32))
# Note that a regular Dense layer may work better as the final layer
```
# Arguments
units: Positive integer, dimensionality of the output space.
init: name of initialization function for the weights of the layer
(see [initializers](https://keras.io/initializers)),
or alternatively, Theano function to use for weights
initialization. This parameter is only relevant
if you don't pass a `weights` argument.
activation: name of activation function to use
(see [activations](https://keras.io/activations)),
or alternatively, elementwise Python function.
If you don't specify anything, no activation is applied
(ie. "linear" activation: a(x) = x).
weights: list of Numpy arrays to set as initial weights.
The list should have 2 elements, of shape `(input_dim, units)`
and (units,) for weights and biases respectively.
kernel_regularizer: instance of [WeightRegularizer](
https://keras.io/regularizers)
(eg. L1 or L2 regularization), applied to the main weights matrix.
bias_regularizer: instance of [WeightRegularizer](
https://keras.io/regularizers), applied to the bias.
activity_regularizer: instance of [ActivityRegularizer](
https://keras.io/regularizers), applied to the network output.
kernel_constraint: instance of the [constraints](
https://keras.io/constraints/) module
(eg. maxnorm, nonneg), applied to the main weights matrix.
bias_constraint: instance of the [constraints](
https://keras.io/constraints/) module, applied to the bias.
use_bias: whether to include a bias
(i.e. make the layer affine rather than linear).
input_dim: dimensionality of the input (integer). This argument
(or alternatively, the keyword argument `input_shape`)
is required when using this layer as the first layer in a model.
# Input shape
nD tensor with shape: `(nb_samples, ..., input_dim)`.
The most common situation would be
a 2D input with shape `(nb_samples, input_dim)`.
# Output shape
nD tensor with shape: `(nb_samples, ..., units)`.
For instance, for a 2D input with shape `(nb_samples, input_dim)`,
the output would have shape `(nb_samples, units)`.
# References
- [Cosine Normalization: Using Cosine Similarity Instead
of Dot Product in Neural Networks](https://arxiv.org/pdf/1702.05870.pdf)
"""
def __init__(self, units, kernel_initializer='glorot_uniform',
activation=None, weights=None,
kernel_regularizer=None, bias_regularizer=None,
activity_regularizer=None,
kernel_constraint=None, bias_constraint=None,
use_bias=True, **kwargs):
if 'input_shape' not in kwargs and 'input_dim' in kwargs:
kwargs['input_shape'] = (kwargs.pop('input_dim'),)
self.kernel_initializer = initializers.get(kernel_initializer)
self.activation = activations.get(activation)
self.units = units
self.kernel_regularizer = regularizers.get(kernel_regularizer)
self.bias_regularizer = regularizers.get(bias_regularizer)
self.activity_regularizer = regularizers.get(activity_regularizer)
self.kernel_constraint = constraints.get(kernel_constraint)
self.bias_constraint = constraints.get(bias_constraint)
self.use_bias = use_bias
self.initial_weights = weights
super(CosineDense, self).__init__(**kwargs)
def build(self, input_shape):
input_shape = to_tuple(input_shape)
ndim = len(input_shape)
assert ndim >= 2
input_dim = input_shape[-1]
self.input_dim = input_dim
self.input_spec = [InputSpec(dtype=K.floatx(),
ndim=ndim)]
self.kernel = self.add_weight(shape=(input_dim, self.units),
initializer=self.kernel_initializer,
name='{}_W'.format(self.name),
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint)
if self.use_bias:
self.bias = self.add_weight(shape=(self.units,),
initializer='zero',
name='{}_b'.format(self.name),
regularizer=self.bias_regularizer,
constraint=self.bias_constraint)
else:
self.bias = None
if self.initial_weights is not None:
self.set_weights(self.initial_weights)
del self.initial_weights
self.built = True
def call(self, x, mask=None):
if self.use_bias:
b, xb = self.bias, 1.
else:
b, xb = 0., 0.
xnorm = K.sqrt(K.sum(K.square(x), axis=-1, keepdims=True)
+ xb
+ K.epsilon())
Wnorm = K.sqrt(K.sum(K.square(self.kernel), axis=0)
+ K.square(b)
+ K.epsilon())
xWnorm = (xnorm * Wnorm)
output = K.dot(x, self.kernel) / xWnorm
if self.use_bias:
output += (self.bias / xWnorm)
return self.activation(output)
def compute_output_shape(self, input_shape):
assert input_shape
assert len(input_shape) >= 2
assert input_shape[-1]
output_shape = list(input_shape)
output_shape[-1] = self.units
return tuple(output_shape)
def get_config(self):
config = {
'units': self.units,
'kernel_initializer': initializers.serialize(self.kernel_initializer),
'activation': activations.serialize(self.activation),
'kernel_regularizer': regularizers.serialize(self.kernel_regularizer),
'bias_regularizer': regularizers.serialize(self.bias_regularizer),
'activity_regularizer':
regularizers.serialize(self.activity_regularizer),
'kernel_constraint': constraints.serialize(self.kernel_constraint),
'bias_constraint': constraints.serialize(self.bias_constraint),
'use_bias': self.use_bias
}
base_config = super(CosineDense, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
from __future__ import absolute_import
from __future__ import division
import warnings
from keras import backend as K
from keras import activations
from keras import initializers
from keras import regularizers
from keras import constraints
from keras.layers import Layer
from keras.layers import InputSpec
from keras_contrib.losses import crf_loss
from keras_contrib.metrics import crf_marginal_accuracy
from keras_contrib.metrics import crf_viterbi_accuracy
from keras_contrib.utils.test_utils import to_tuple
class CRF(Layer):
"""An implementation of linear chain conditional random field (CRF).
An linear chain CRF is defined to maximize the following likelihood function:
$$ L(W, U, b; y_1, ..., y_n) := \frac{1}{Z}
\sum_{y_1, ..., y_n} \exp(-a_1' y_1 - a_n' y_n
- \sum_{k=1^n}((f(x_k' W + b) y_k) + y_1' U y_2)), $$
where:
$Z$: normalization constant
$x_k, y_k$: inputs and outputs
This implementation has two modes for optimization:
1. (`join mode`) optimized by maximizing join likelihood,
which is optimal in theory of statistics.
Note that in this case, CRF must be the output/last layer.
2. (`marginal mode`) return marginal probabilities on each time
step and optimized via composition
likelihood (product of marginal likelihood), i.e.,
using `categorical_crossentropy` loss.
Note that in this case, CRF can be either the last layer or an
intermediate layer (though not explored).
For prediction (test phrase), one can choose either Viterbi
best path (class indices) or marginal
probabilities if probabilities are needed.
However, if one chooses *join mode* for training,
Viterbi output is typically better than marginal output,
but the marginal output will still perform
reasonably close, while if *marginal mode* is used for training,
marginal output usually performs
much better. The default behavior and `metrics.crf_accuracy`
is set according to this observation.
In addition, this implementation supports masking and accepts either
onehot or sparse target.
If you open a issue or a pull request about CRF, please
add 'cc @lzfelix' to notify Luiz Felix.
# Examples
```python
from keras_contrib.layers import CRF
from keras_contrib.losses import crf_loss
from keras_contrib.metrics import crf_viterbi_accuracy
model = Sequential()
model.add(Embedding(3001, 300, mask_zero=True)(X)
# use learn_mode = 'join', test_mode = 'viterbi',
# sparse_target = True (label indice output)
crf = CRF(10, sparse_target=True)
model.add(crf)
# crf_accuracy is default to Viterbi acc if using join-mode (default).
# One can add crf.marginal_acc if interested, but may slow down learning
model.compile('adam', loss=crf_loss, metrics=[crf_viterbi_accuracy])
# y must be label indices (with shape 1 at dim 3) here,
# since `sparse_target=True`
model.fit(x, y)
# prediction give onehot representation of Viterbi best path
y_hat = model.predict(x_test)
```
The following snippet shows how to load a persisted
model that uses the CRF layer:
```python
from keras.models import load_model
from keras_contrib.losses import import crf_loss
from keras_contrib.metrics import crf_viterbi_accuracy
custom_objects={'CRF': CRF,
'crf_loss': crf_loss,
'crf_viterbi_accuracy': crf_viterbi_accuracy}
loaded_model = load_model('<path_to_model>',
custom_objects=custom_objects)
```
# Arguments
units: Positive integer, dimensionality of the output space.
learn_mode: Either 'join' or 'marginal'.
The former train the model by maximizing join likelihood while the latter
maximize the product of marginal likelihood over all time steps.
One should use `losses.crf_nll` for 'join' mode
and `losses.categorical_crossentropy` or
`losses.sparse_categorical_crossentropy` for
`marginal` mode. For convenience, simply
use `losses.crf_loss`, which will decide the proper loss as described.
test_mode: Either 'viterbi' or 'marginal'.
The former is recommended and as default when `learn_mode = 'join'` and
gives one-hot representation of the best path at test (prediction) time,
while the latter is recommended and chosen as default
when `learn_mode = 'marginal'`,
which produces marginal probabilities for each time step.
For evaluating metrics, one should
use `metrics.crf_viterbi_accuracy` for 'viterbi' mode and
'metrics.crf_marginal_accuracy' for 'marginal' mode, or
simply use `metrics.crf_accuracy` for
both which automatically decides it as described.
One can also use both for evaluation at training.
sparse_target: Boolean (default False) indicating
if provided labels are one-hot or
indices (with shape 1 at dim 3).
use_boundary: Boolean (default True) indicating if trainable
start-end chain energies
should be added to model.
use_bias: Boolean, whether the layer uses a bias vector.
kernel_initializer: Initializer for the `kernel` weights matrix,
used for the linear transformation of the inputs.
(see [initializers](../initializers.md)).
chain_initializer: Initializer for the `chain_kernel` weights matrix,
used for the CRF chain energy.
(see [initializers](../initializers.md)).
boundary_initializer: Initializer for the `left_boundary`,
'right_boundary' weights vectors,
used for the start/left and end/right boundary energy.
(see [initializers](../initializers.md)).
bias_initializer: Initializer for the bias vector
(see [initializers](../initializers.md)).
activation: Activation function to use
(see [activations](../activations.md)).
If you pass None, no activation is applied
(ie. "linear" activation: `a(x) = x`).
kernel_regularizer: Regularizer function applied to
the `kernel` weights matrix
(see [regularizer](../regularizers.md)).
chain_regularizer: Regularizer function applied to
the `chain_kernel` weights matrix
(see [regularizer](../regularizers.md)).
boundary_regularizer: Regularizer function applied to
the 'left_boundary', 'right_boundary' weight vectors
(see [regularizer](../regularizers.md)).
bias_regularizer: Regularizer function applied to the bias vector
(see [regularizer](../regularizers.md)).
kernel_constraint: Constraint function applied to
the `kernel` weights matrix
(see [constraints](../constraints.md)).
chain_constraint: Constraint function applied to
the `chain_kernel` weights matrix
(see [constraints](../constraints.md)).
boundary_constraint: Constraint function applied to
the `left_boundary`, `right_boundary` weights vectors
(see [constraints](../constraints.md)).
bias_constraint: Constraint function applied to the bias vector
(see [constraints](../constraints.md)).
input_dim: dimensionality of the input (integer).
This argument (or alternatively, the keyword argument `input_shape`)
is required when using this layer as the first layer in a model.
unroll: Boolean (default False). If True, the network will be
unrolled, else a symbolic loop will be used.
Unrolling can speed-up a RNN, although it tends
to be more memory-intensive.
Unrolling is only suitable for short sequences.
# Input shape
3D tensor with shape `(nb_samples, timesteps, input_dim)`.
# Output shape
3D tensor with shape `(nb_samples, timesteps, units)`.
# Masking
This layer supports masking for input data with a variable number
of timesteps. To introduce masks to your data,
use an [Embedding](embeddings.md) layer with the `mask_zero` parameter
set to `True`.
"""
def __init__(self, units,
learn_mode='join',
test_mode=None,
sparse_target=False,
use_boundary=True,
use_bias=True,
activation='linear',
kernel_initializer='glorot_uniform',
chain_initializer='orthogonal',
bias_initializer='zeros',
boundary_initializer='zeros',
kernel_regularizer=None,
chain_regularizer=None,
boundary_regularizer=None,
bias_regularizer=None,
kernel_constraint=None,
chain_constraint=None,
boundary_constraint=None,
bias_constraint=None,
input_dim=None,
unroll=False,
**kwargs):
super(CRF, self).__init__(**kwargs)
self.supports_masking = True
self.units = units
self.learn_mode = learn_mode
assert self.learn_mode in ['join', 'marginal']
self.test_mode = test_mode
if self.test_mode is None:
self.test_mode = 'viterbi' if self.learn_mode == 'join' else 'marginal'
else:
assert self.test_mode in ['viterbi', 'marginal']
self.sparse_target = sparse_target
self.use_boundary = use_boundary
self.use_bias = use_bias
self.activation = activations.get(activation)
self.kernel_initializer = initializers.get(kernel_initializer)
self.chain_initializer = initializers.get(chain_initializer)
self.boundary_initializer = initializers.get(boundary_initializer)
self.bias_initializer = initializers.get(bias_initializer)
self.kernel_regularizer = regularizers.get(kernel_regularizer)
self.chain_regularizer = regularizers.get(chain_regularizer)
self.boundary_regularizer = regularizers.get(boundary_regularizer)
self.bias_regularizer = regularizers.get(bias_regularizer)
self.kernel_constraint = constraints.get(kernel_constraint)
self.chain_constraint = constraints.get(chain_constraint)
self.boundary_constraint = constraints.get(boundary_constraint)
self.bias_constraint = constraints.get(bias_constraint)
self.unroll = unroll
def build(self, input_shape):
input_shape = to_tuple(input_shape)
self.input_spec = [InputSpec(shape=input_shape)]
self.input_dim = input_shape[-1]
self.kernel = self.add_weight(shape=(self.input_dim, self.units),
name='kernel',
initializer=self.kernel_initializer,
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint)
self.chain_kernel = self.add_weight(shape=(self.units, self.units),
name='chain_kernel',
initializer=self.chain_initializer,
regularizer=self.chain_regularizer,
constraint=self.chain_constraint)
if self.use_bias:
self.bias = self.add_weight(shape=(self.units,),
name='bias',
initializer=self.bias_initializer,
regularizer=self.bias_regularizer,
constraint=self.bias_constraint)
else:
self.bias = 0
if self.use_boundary:
self.left_boundary = self.add_weight(shape=(self.units,),
name='left_boundary',
initializer=self.boundary_initializer,
regularizer=self.boundary_regularizer,
constraint=self.boundary_constraint)
self.right_boundary = self.add_weight(shape=(self.units,),
name='right_boundary',
initializer=self.boundary_initializer,
regularizer=self.boundary_regularizer,
constraint=self.boundary_constraint)
self.built = True
def call(self, X, mask=None):
if mask is not None:
assert K.ndim(mask) == 2, 'Input mask to CRF must have dim 2 if not None'
if self.test_mode == 'viterbi':
test_output = self.viterbi_decoding(X, mask)
else:
test_output = self.get_marginal_prob(X, mask)
self.uses_learning_phase = True
if self.learn_mode == 'join':
train_output = K.zeros_like(K.dot(X, self.kernel))
out = K.in_train_phase(train_output, test_output)
else:
if self.test_mode == 'viterbi':
train_output = self.get_marginal_prob(X, mask)
out = K.in_train_phase(train_output, test_output)
else:
out = test_output
return out
def compute_output_shape(self, input_shape):
return input_shape[:2] + (self.units,)
def compute_mask(self, input, mask=None):
if mask is not None and self.learn_mode == 'join':
return K.any(mask, axis=1)
return mask
def get_config(self):
config = {
'units': self.units,
'learn_mode': self.learn_mode,
'test_mode': self.test_mode,
'use_boundary': self.use_boundary,
'use_bias': self.use_bias,
'sparse_target': self.sparse_target,
'kernel_initializer': initializers.serialize(self.kernel_initializer),
'chain_initializer': initializers.serialize(self.chain_initializer),
'boundary_initializer': initializers.serialize(
self.boundary_initializer),
'bias_initializer': initializers.serialize(self.bias_initializer),
'activation': activations.serialize(self.activation),
'kernel_regularizer': regularizers.serialize(self.kernel_regularizer),
'chain_regularizer': regularizers.serialize(self.chain_regularizer),
'boundary_regularizer': regularizers.serialize(
self.boundary_regularizer),
'bias_regularizer': regularizers.serialize(self.bias_regularizer),
'kernel_constraint': constraints.serialize(self.kernel_constraint),
'chain_constraint': constraints.serialize(self.chain_constraint),
'boundary_constraint': constraints.serialize(self.boundary_constraint),
'bias_constraint': constraints.serialize(self.bias_constraint),
'input_dim': self.input_dim,
'unroll': self.unroll}
base_config = super(CRF, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
@property
def loss_function(self):
warnings.warn('CRF.loss_function is deprecated '
'and it might be removed in the future. Please '
'use losses.crf_loss instead.')
return crf_loss
@property
def accuracy(self):
warnings.warn('CRF.accuracy is deprecated and it '
'might be removed in the future. Please '
'use metrics.crf_accuracy')
if self.test_mode == 'viterbi':
return crf_viterbi_accuracy
else:
return crf_marginal_accuracy
@property
def viterbi_acc(self):
warnings.warn('CRF.viterbi_acc is deprecated and it might '
'be removed in the future. Please '
'use metrics.viterbi_acc instead.')
return crf_viterbi_accuracy
@property
def marginal_acc(self):
warnings.warn('CRF.moarginal_acc is deprecated and it '
'might be removed in the future. Please '
'use metrics.marginal_acc instead.')
return crf_marginal_accuracy
@staticmethod
def softmaxNd(x, axis=-1):
m = K.max(x, axis=axis, keepdims=True)
exp_x = K.exp(x - m)
prob_x = exp_x / K.sum(exp_x, axis=axis, keepdims=True)
return prob_x
@staticmethod
def shift_left(x, offset=1):
assert offset > 0
return K.concatenate([x[:, offset:], K.zeros_like(x[:, :offset])], axis=1)
@staticmethod
def shift_right(x, offset=1):
assert offset > 0
return K.concatenate([K.zeros_like(x[:, :offset]), x[:, :-offset]], axis=1)
def add_boundary_energy(self, energy, mask, start, end):
start = K.expand_dims(K.expand_dims(start, 0), 0)
end = K.expand_dims(K.expand_dims(end, 0), 0)
if mask is None:
energy = K.concatenate([energy[:, :1, :] + start, energy[:, 1:, :]],
axis=1)
energy = K.concatenate([energy[:, :-1, :], energy[:, -1:, :] + end],
axis=1)
else:
mask = K.expand_dims(K.cast(mask, K.floatx()))
start_mask = K.cast(K.greater(mask, self.shift_right(mask)), K.floatx())
end_mask = K.cast(K.greater(self.shift_left(mask), mask), K.floatx())
energy = energy + start_mask * start
energy = energy + end_mask * end
return energy
def get_log_normalization_constant(self, input_energy, mask, **kwargs):
"""Compute logarithm of the normalization constant Z, where
Z = sum exp(-E) -> logZ = log sum exp(-E) =: -nlogZ
"""
# should have logZ[:, i] == logZ[:, j] for any i, j
logZ = self.recursion(input_energy, mask, return_sequences=False, **kwargs)
return logZ[:, 0]
def get_energy(self, y_true, input_energy, mask):
"""Energy = a1' y1 + u1' y1 + y1' U y2 + u2' y2 + y2' U y3 + u3' y3 + an' y3
"""
input_energy = K.sum(input_energy * y_true, 2) # (B, T)
# (B, T-1)
chain_energy = K.sum(K.dot(y_true[:, :-1, :],
self.chain_kernel) * y_true[:, 1:, :], 2)
if mask is not None:
mask = K.cast(mask, K.floatx())
# (B, T-1), mask[:,:-1]*mask[:,1:] makes it work with any padding
chain_mask = mask[:, :-1] * mask[:, 1:]
input_energy = input_energy * mask
chain_energy = chain_energy * chain_mask
total_energy = K.sum(input_energy, -1) + K.sum(chain_energy, -1) # (B, )
return total_energy
def get_negative_log_likelihood(self, y_true, X, mask):
"""Compute the loss, i.e., negative log likelihood (normalize by number of time steps)
likelihood = 1/Z * exp(-E) -> neg_log_like = - log(1/Z * exp(-E)) = logZ + E
"""
input_energy = self.activation(K.dot(X, self.kernel) + self.bias)
if self.use_boundary:
input_energy = self.add_boundary_energy(input_energy, mask,
self.left_boundary,
self.right_boundary)
energy = self.get_energy(y_true, input_energy, mask)
logZ = self.get_log_normalization_constant(input_energy, mask,
input_length=K.int_shape(X)[1])
nloglik = logZ + energy
if mask is not None:
nloglik = nloglik / K.sum(K.cast(mask, K.floatx()), 1)
else:
nloglik = nloglik / K.cast(K.shape(X)[1], K.floatx())
return nloglik
def step(self, input_energy_t, states, return_logZ=True):
# not in the following `prev_target_val` has shape = (B, F)
# where B = batch_size, F = output feature dim
# Note: `i` is of float32, due to the behavior of `K.rnn`
prev_target_val, i, chain_energy = states[:3]
t = K.cast(i[0, 0], dtype='int32')
if len(states) > 3:
if K.backend() == 'theano':
m = states[3][:, t:(t + 2)]
else:
m = K.slice(states[3], [0, t], [-1, 2])
input_energy_t = input_energy_t * K.expand_dims(m[:, 0])
# (1, F, F)*(B, 1, 1) -> (B, F, F)
chain_energy = chain_energy * K.expand_dims(
K.expand_dims(m[:, 0] * m[:, 1]))
if return_logZ:
# shapes: (1, B, F) + (B, F, 1) -> (B, F, F)
energy = chain_energy + K.expand_dims(input_energy_t - prev_target_val, 2)
new_target_val = K.logsumexp(-energy, 1) # shapes: (B, F)
return new_target_val, [new_target_val, i + 1]
else:
energy = chain_energy + K.expand_dims(input_energy_t + prev_target_val, 2)
min_energy = K.min(energy, 1)
# cast for tf-version `K.rnn
argmin_table = K.cast(K.argmin(energy, 1), K.floatx())
return argmin_table, [min_energy, i + 1]
def recursion(self, input_energy, mask=None, go_backwards=False,
return_sequences=True, return_logZ=True, input_length=None):
"""Forward (alpha) or backward (beta) recursion
If `return_logZ = True`, compute the logZ, the normalization constant:
\[ Z = \sum_{y1, y2, y3} exp(-E) # energy
= \sum_{y1, y2, y3} exp(-(u1' y1 + y1' W y2 + u2' y2 + y2' W y3 + u3' y3))
= sum_{y2, y3} (exp(-(u2' y2 + y2' W y3 + u3' y3))
sum_{y1} exp(-(u1' y1' + y1' W y2))) \]
Denote:
\[ S(y2) := sum_{y1} exp(-(u1' y1 + y1' W y2)), \]
\[ Z = sum_{y2, y3} exp(log S(y2) - (u2' y2 + y2' W y3 + u3' y3)) \]
\[ logS(y2) = log S(y2) = log_sum_exp(-(u1' y1' + y1' W y2)) \]
Note that:
yi's are one-hot vectors
u1, u3: boundary energies have been merged
If `return_logZ = False`, compute the Viterbi's best path lookup table.
"""
chain_energy = self.chain_kernel
# shape=(1, F, F): F=num of output features. 1st F is for t-1, 2nd F for t
chain_energy = K.expand_dims(chain_energy, 0)
# shape=(B, F), dtype=float32
prev_target_val = K.zeros_like(input_energy[:, 0, :])
if go_backwards:
input_energy = K.reverse(input_energy, 1)
if mask is not None:
mask = K.reverse(mask, 1)
initial_states = [prev_target_val, K.zeros_like(prev_target_val[:, :1])]
constants = [chain_energy]
if mask is not None:
mask2 = K.cast(K.concatenate([mask, K.zeros_like(mask[:, :1])], axis=1),
K.floatx())
constants.append(mask2)
def _step(input_energy_i, states):
return self.step(input_energy_i, states, return_logZ)
target_val_last, target_val_seq, _ = K.rnn(_step, input_energy,
initial_states,
constants=constants,
input_length=input_length,
unroll=self.unroll)
if return_sequences:
if go_backwards:
target_val_seq = K.reverse(target_val_seq, 1)
return target_val_seq
else:
return target_val_last
def forward_recursion(self, input_energy, **kwargs):
return self.recursion(input_energy, **kwargs)
def backward_recursion(self, input_energy, **kwargs):
return self.recursion(input_energy, go_backwards=True, **kwargs)
def get_marginal_prob(self, X, mask=None):
input_energy = self.activation(K.dot(X, self.kernel) + self.bias)
if self.use_boundary:
input_energy = self.add_boundary_energy(input_energy, mask,
self.left_boundary,
self.right_boundary)
input_length = K.int_shape(X)[1]
alpha = self.forward_recursion(input_energy, mask=mask,
input_length=input_length)
beta = self.backward_recursion(input_energy, mask=mask,
input_length=input_length)
if mask is not None:
input_energy = input_energy * K.expand_dims(K.cast(mask, K.floatx()))
margin = -(self.shift_right(alpha) + input_energy + self.shift_left(beta))
return self.softmaxNd(margin)
def viterbi_decoding(self, X, mask=None):
input_energy = self.activation(K.dot(X, self.kernel) + self.bias)
if self.use_boundary:
input_energy = self.add_boundary_energy(
input_energy, mask, self.left_boundary, self.right_boundary)
argmin_tables = self.recursion(input_energy, mask, return_logZ=False)
argmin_tables = K.cast(argmin_tables, 'int32')
# backward to find best path, `initial_best_idx` can be any,
# as all elements in the last argmin_table are the same
argmin_tables = K.reverse(argmin_tables, 1)
# matrix instead of vector is required by tf `K.rnn`
initial_best_idx = [K.expand_dims(argmin_tables[:, 0, 0])]
if K.backend() == 'theano':
from theano import tensor as T
initial_best_idx = [T.unbroadcast(initial_best_idx[0], 1)]
def gather_each_row(params, indices):
n = K.shape(indices)[0]
if K.backend() == 'theano':
from theano import tensor as T
return params[T.arange(n), indices]
elif K.backend() == 'tensorflow':
import tensorflow as tf
indices = K.transpose(K.stack([tf.range(n), indices]))
return tf.gather_nd(params, indices)
else:
raise NotImplementedError
def find_path(argmin_table, best_idx):
next_best_idx = gather_each_row(argmin_table, best_idx[0][:, 0])
next_best_idx = K.expand_dims(next_best_idx)
if K.backend() == 'theano':
from theano import tensor as T
next_best_idx = T.unbroadcast(next_best_idx, 1)
return next_best_idx, [next_best_idx]
_, best_paths, _ = K.rnn(find_path, argmin_tables, initial_best_idx,
input_length=K.int_shape(X)[1], unroll=self.unroll)
best_paths = K.reverse(best_paths, 1)
best_paths = K.squeeze(best_paths, 2)
return K.one_hot(best_paths, self.units)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment