Commit 120b1fb6 authored by Damien Vincent's avatar Damien Vincent
Browse files

Image compression: initial version of the entropy coder.

parent 7a2bcdc5
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Activation and weight binarizer implementations."""
import math
import numpy as np
import tensorflow as tf
def ConvertSignCodeToZeroOneCode(x):
"""Conversion from codes {-1, +1} to codes {0, 1}."""
return 0.5 * (x + 1.0)
def ConvertZeroOneCodeToSignCode(x):
"""Convert from codes {0, 1} to codes {-1, +1}."""
return 2.0 * x - 1.0
def CheckZeroOneCode(x):
return tf.reduce_all(tf.equal(x * (x - 1.0), 0))
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Set of blocks related to entropy coding."""
import math
import tensorflow as tf
import block_base
# pylint does not recognize block_base.BlockBase.__call__().
# pylint: disable=not-callable
class CodeLength(block_base.BlockBase):
"""Theoretical bound for a code length given a probability distribution.
"""
def __init__(self, name=None):
super(CodeLength, self).__init__(name)
def _Apply(self, c, p):
"""Theoretical bound of the coded length given a probability distribution.
Args:
c: The binary codes. Belong to {0, 1}.
p: The probability of: P(code==+1)
Returns:
The average code length.
Note: the average code length can be greater than 1 bit (e.g. when
encoding the least likely symbol).
"""
entropy = ((1.0 - c) * tf.log(1.0 - p) + c * tf.log(p)) / (-math.log(2))
entropy = tf.reduce_mean(entropy)
return entropy
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for basic tensorflow blocks_entropy_coding."""
from __future__ import division
from __future__ import unicode_literals
import math
import numpy as np
import tensorflow as tf
import blocks_entropy_coding
class BlocksEntropyCodingTest(tf.test.TestCase):
def testCodeLength(self):
shape = [2, 4]
proba_feed = [[0.65, 0.25, 0.70, 0.10],
[0.28, 0.20, 0.44, 0.54]]
symbol_feed = [[1.0, 0.0, 1.0, 0.0],
[0.0, 0.0, 0.0, 1.0]]
mean_code_length = - (
(math.log(0.65) + math.log(0.75) + math.log(0.70) + math.log(0.90) +
math.log(0.72) + math.log(0.80) + math.log(0.56) + math.log(0.54)) /
math.log(2.0)) / (shape[0] * shape[1])
symbol = tf.placeholder(dtype=tf.float32, shape=shape)
proba = tf.placeholder(dtype=tf.float32, shape=shape)
code_length_calculator = blocks_entropy_coding.CodeLength()
code_length = code_length_calculator(symbol, proba)
with self.test_session():
tf.global_variables_initializer().run()
code_length_eval = code_length.eval(
feed_dict={symbol: symbol_feed, proba: proba_feed})
self.assertAllClose(mean_code_length, code_length_eval)
if __name__ == '__main__':
tf.test.main()
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Blocks of LSTM and its variants."""
import numpy as np
import tensorflow as tf
import block_base
import block_util
import blocks_std
# pylint does not recognize block_base.BlockBase.__call__().
# pylint: disable=not-callable
def LSTMBiasInit(shape, dtype):
"""Returns ones for forget-gate, and zeros for the others."""
shape = np.array(shape)
# Check internal consistencies.
assert shape.shape == (1,), shape
assert shape[0] % 4 == 0, shape
n = shape[0] // 4
ones = tf.fill([n], tf.constant(1, dtype=dtype))
zeros = tf.fill([3 * n], tf.constant(0, dtype=dtype))
return tf.concat([ones, zeros], 0)
class LSTMBase(block_base.BlockBase):
"""Base class for LSTM implementations.
These LSTM implementations use the pattern found in [1]. No peephole
connection, i.e., cell content is not used in recurrence computation.
Hidden units are also output units.
[1] Zaremba, Sutskever, Vinyals. Recurrent Neural Network Regularization,
2015. arxiv:1409.2329.
"""
def __init__(self, output_shape, name):
"""Initializes LSTMBase class object.
Args:
output_shape: List representing the LSTM output shape. This argument
does not include batch dimension. For example, if the LSTM output has
shape [batch, depth], then pass [depth].
name: Name of this block.
"""
super(LSTMBase, self).__init__(name)
with self._BlockScope():
self._output_shape = [None] + list(output_shape)
self._hidden = None
self._cell = None
@property
def hidden(self):
"""Returns the hidden units of this LSTM."""
return self._hidden
@hidden.setter
def hidden(self, value):
"""Assigns to the hidden units of this LSTM.
Args:
value: The new value for the hidden units. If None, the hidden units are
considered to be filled with zeros.
"""
if value is not None:
value.get_shape().assert_is_compatible_with(self._output_shape)
self._hidden = value
@property
def cell(self):
"""Returns the cell units of this LSTM."""
return self._cell
@cell.setter
def cell(self, value):
"""Assigns to the cell units of this LSTM.
Args:
value: The new value for the cell units. If None, the cell units are
considered to be filled with zeros.
"""
if value is not None:
value.get_shape().assert_is_compatible_with(self._output_shape)
self._cell = value
# Consider moving bias terms to the base, and require this method to be
# linear.
def _TransformInputs(self, _):
"""Transforms the input units to (4 * depth) units.
The forget-gate, input-gate, output-gate, and cell update is computed as
f, i, j, o = T(h) + R(x)
where h is hidden units, x is input units, and T, R are transforms of
h, x, respectively.
This method implements R. Note that T is strictly linear, so if LSTM is
going to use bias, this method must include the bias to the transformation.
Subclasses must implement this method. See _Apply() for more details.
"""
raise NotImplementedError()
def _TransformHidden(self, _):
"""Transforms the hidden units to (4 * depth) units.
The forget-gate, input-gate, output-gate, and cell update is computed as
f, i, j, o = T(h) + R(x)
where h is hidden units, x is input units, and T, R are transforms of
h, x, respectively.
This method implements T in the equation. The method must implement a
strictly linear transformation. For example, it may use MatMul or Conv2D,
but must not add bias. This is because when hidden units are zeros, then
the LSTM implementation will skip calling this method, instead of passing
zeros to this function.
Subclasses must implement this method. See _Apply() for more details.
"""
raise NotImplementedError()
def _Apply(self, *args):
xtransform = self._TransformInputs(*args)
depth_axis = len(self._output_shape) - 1
if self.hidden is not None:
htransform = self._TransformHidden(self.hidden)
f, i, j, o = tf.split(
value=htransform + xtransform, num_or_size_splits=4, axis=depth_axis)
else:
f, i, j, o = tf.split(
value=xtransform, num_or_size_splits=4, axis=depth_axis)
if self.cell is not None:
self.cell = tf.sigmoid(f) * self.cell + tf.sigmoid(i) * tf.tanh(j)
else:
self.cell = tf.sigmoid(i) * tf.tanh(j)
self.hidden = tf.sigmoid(o) * tf.tanh(self.cell)
return self.hidden
class LSTM(LSTMBase):
"""Efficient LSTM implementation used in [1].
[1] Zaremba, Sutskever, Vinyals. Recurrent Neural Network Regularization,
2015. arxiv:1409.2329.
"""
def __init__(self,
depth,
bias=LSTMBiasInit,
initializer=block_util.RsqrtInitializer(),
name=None):
super(LSTM, self).__init__([depth], name)
with self._BlockScope():
self._depth = depth
self._nn = blocks_std.NN(
4 * depth, bias=bias, act=None, initializer=initializer)
self._hidden_linear = blocks_std.Linear(
4 * depth, initializer=initializer)
def _TransformInputs(self, *args):
return self._nn(*args)
def _TransformHidden(self, h):
return self._hidden_linear(h)
class Conv2DLSTM(LSTMBase):
"""Convolutional LSTM implementation with optimizations inspired by [1].
Note that when using the batch normalization feature, the bias initializer
will not be used, since BN effectively cancels its effect out.
[1] Zaremba, Sutskever, Vinyals. Recurrent Neural Network Regularization,
2015. arxiv:1409.2329.
"""
def __init__(self,
depth,
filter_size,
hidden_filter_size,
strides,
padding,
bias=LSTMBiasInit,
initializer=block_util.RsqrtInitializer(dims=(0, 1, 2)),
use_moving_average=False,
name=None):
super(Conv2DLSTM, self).__init__([None, None, depth], name)
self._iter = 0
with self._BlockScope():
self._input_conv = blocks_std.Conv2D(
4 * depth,
filter_size,
strides,
padding,
bias=None,
act=None,
initializer=initializer,
name='input_conv2d')
self._hidden_conv = blocks_std.Conv2D(
4 * depth,
hidden_filter_size,
[1, 1],
'SAME',
bias=None,
act=None,
initializer=initializer,
name='hidden_conv2d')
if bias is not None:
self._bias = blocks_std.BiasAdd(bias, name='biases')
else:
self._bias = blocks_std.PassThrough()
def _TransformInputs(self, x):
return self._bias(self._input_conv(x))
def _TransformHidden(self, h):
return self._hidden_conv(h)
def _Apply(self, *args):
xtransform = self._TransformInputs(*args)
depth_axis = len(self._output_shape) - 1
if self.hidden is not None:
htransform = self._TransformHidden(self.hidden)
f, i, j, o = tf.split(
value=htransform + xtransform, num_or_size_splits=4, axis=depth_axis)
else:
f, i, j, o = tf.split(
value=xtransform, num_or_size_splits=4, axis=depth_axis)
if self.cell is not None:
self.cell = tf.sigmoid(f) * self.cell + tf.sigmoid(i) * tf.tanh(j)
else:
self.cell = tf.sigmoid(i) * tf.tanh(j)
self.hidden = tf.sigmoid(o) * tf.tanh(self.cell)
self._iter += 1
return self.hidden
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for LSTM tensorflow blocks."""
from __future__ import division
import numpy as np
import tensorflow as tf
import block_base
import blocks_std
import blocks_lstm
class BlocksLSTMTest(tf.test.TestCase):
def CheckUnary(self, y, op_type):
self.assertEqual(op_type, y.op.type)
self.assertEqual(1, len(y.op.inputs))
return y.op.inputs[0]
def CheckBinary(self, y, op_type):
self.assertEqual(op_type, y.op.type)
self.assertEqual(2, len(y.op.inputs))
return y.op.inputs
def testLSTM(self):
lstm = blocks_lstm.LSTM(10)
lstm.hidden = tf.zeros(shape=[10, 10], dtype=tf.float32)
lstm.cell = tf.zeros(shape=[10, 10], dtype=tf.float32)
x = tf.placeholder(dtype=tf.float32, shape=[10, 11])
y = lstm(x)
o, tanhc = self.CheckBinary(y, 'Mul')
self.assertEqual(self.CheckUnary(o, 'Sigmoid').name, 'LSTM/split:3')
self.assertIs(lstm.cell, self.CheckUnary(tanhc, 'Tanh'))
fc, ij = self.CheckBinary(lstm.cell, 'Add')
f, _ = self.CheckBinary(fc, 'Mul')
self.assertEqual(self.CheckUnary(f, 'Sigmoid').name, 'LSTM/split:0')
i, j = self.CheckBinary(ij, 'Mul')
self.assertEqual(self.CheckUnary(i, 'Sigmoid').name, 'LSTM/split:1')
j = self.CheckUnary(j, 'Tanh')
self.assertEqual(j.name, 'LSTM/split:2')
def testLSTMBiasInit(self):
lstm = blocks_lstm.LSTM(9)
x = tf.placeholder(dtype=tf.float32, shape=[15, 7])
lstm(x)
b = lstm._nn._bias
with self.test_session():
tf.global_variables_initializer().run()
bias_var = b._bias.eval()
comp = ([1.0] * 9) + ([0.0] * 27)
self.assertAllEqual(bias_var, comp)
def testConv2DLSTM(self):
lstm = blocks_lstm.Conv2DLSTM(depth=10,
filter_size=[1, 1],
hidden_filter_size=[1, 1],
strides=[1, 1],
padding='SAME')
lstm.hidden = tf.zeros(shape=[10, 11, 11, 10], dtype=tf.float32)
lstm.cell = tf.zeros(shape=[10, 11, 11, 10], dtype=tf.float32)
x = tf.placeholder(dtype=tf.float32, shape=[10, 11, 11, 1])
y = lstm(x)
o, tanhc = self.CheckBinary(y, 'Mul')
self.assertEqual(self.CheckUnary(o, 'Sigmoid').name, 'Conv2DLSTM/split:3')
self.assertIs(lstm.cell, self.CheckUnary(tanhc, 'Tanh'))
fc, ij = self.CheckBinary(lstm.cell, 'Add')
f, _ = self.CheckBinary(fc, 'Mul')
self.assertEqual(self.CheckUnary(f, 'Sigmoid').name, 'Conv2DLSTM/split:0')
i, j = self.CheckBinary(ij, 'Mul')
self.assertEqual(self.CheckUnary(i, 'Sigmoid').name, 'Conv2DLSTM/split:1')
j = self.CheckUnary(j, 'Tanh')
self.assertEqual(j.name, 'Conv2DLSTM/split:2')
def testConv2DLSTMBiasInit(self):
lstm = blocks_lstm.Conv2DLSTM(9, 1, 1, [1, 1], 'SAME')
x = tf.placeholder(dtype=tf.float32, shape=[1, 7, 7, 7])
lstm(x)
b = lstm._bias
with self.test_session():
tf.global_variables_initializer().run()
bias_var = b._bias.eval()
comp = ([1.0] * 9) + ([0.0] * 27)
self.assertAllEqual(bias_var, comp)
if __name__ == '__main__':
tf.test.main()
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Define some typical masked 2D convolutions."""
import numpy as np
import tensorflow as tf
import block_util
import blocks_std
# pylint does not recognize block_base.BlockBase.__call__().
# pylint: disable=not-callable
class RasterScanConv2D(blocks_std.Conv2DBase):
"""Conv2D with no dependency on future pixels (in raster scan order).
For example, assuming a 5 x 5 kernel, the kernel is applied a spatial mask:
T T T T T
T T T T T
T T x F F
F F F F F
F F F F F
where 'T' are pixels which are available when computing the convolution
for pixel 'x'. All the pixels marked with 'F' are not available.
'x' itself is not available if strict_order is True, otherwise, it is
available.
"""
def __init__(self, depth, filter_size, strides, padding,
strict_order=True,
bias=None, act=None, initializer=None, name=None):
super(RasterScanConv2D, self).__init__(
depth, filter_size, strides, padding, bias, act, name=name)
if (filter_size[0] % 2) != 1 or (filter_size[1] % 2) != 1:
raise ValueError('Kernel size should be odd.')
with self._BlockScope():
if initializer is None:
initializer = block_util.RsqrtInitializer(dims=(0, 1, 2))
self._initializer = initializer
self._strict_order = strict_order
def _CreateKernel(self, shape, dtype):
init = self._initializer(shape, dtype)
kernel = self.NewVar(init)
mask = np.ones(shape[:2], dtype=dtype.as_numpy_dtype)
center = shape[:2] // 2
mask[center[0] + 1:, :] = 0
if not self._strict_order:
mask[center[0], center[1] + 1:] = 0
else:
mask[center[0], center[1]:] = 0
mask = mask.reshape(mask.shape + (1, 1))
return tf.convert_to_tensor(mask, dtype) * kernel
class DepthOrderConv2D(blocks_std.Conv2DBase):
"""Conv2D with no dependency on higher depth dimensions.
More precisely, the output depth #n has only dependencies on input depths #k
for k < n (if strict_order is True) or for k <= n (if strict_order is False).
"""
def __init__(self, depth, filter_size, strides, padding,
strict_order=True,
bias=None, act=None, initializer=None, name=None):
super(DepthOrderConv2D, self).__init__(
depth, filter_size, strides, padding, bias, act, name=name)
with self._BlockScope():
if initializer is None:
initializer = block_util.RsqrtInitializer(dims=(0, 1, 2))
self._initializer = initializer
self._strict_order = strict_order
def _CreateKernel(self, shape, dtype):
init = self._initializer(shape, dtype)
kernel = self.NewVar(init)
mask = np.ones(shape[2:], dtype=dtype.as_numpy_dtype)
depth_output = shape[3]
for d in xrange(depth_output):
if self._strict_order:
mask[d:, d] = 0
else:
mask[d + 1:, d] = 0
mask = mask.reshape((1, 1) + mask.shape)
return tf.convert_to_tensor(mask, dtype) * kernel
class GroupRasterScanConv2D(blocks_std.Conv2DBase):
"""Conv2D with no dependency on future pixels (in raster scan order).
This version only introduces dependencies on previous pixels in raster scan
order. It can also introduce some dependencies on previous depth positions
of the current pixel (current pixel = center pixel of the kernel) in the
following way:
the depth dimension of the input is split into Ki groups of size
|input_group_size|, the output dimension is split into Ko groups of size
|output_group_size| (usually Ki == Ko). Each output group ko of the current
pixel position can only depend on previous input groups ki
(i.e. ki < ko if strict_order is True or ki <= ko if strict_order is False).
Notes:
- Block RasterScanConv2D is a special case of GroupRasterScanConv2D
where Ki == Ko == 1 (i.e. input_group_size == input_depth and
output_group_size == output_depth).
- For 1x1 convolution, block DepthOrderConv2D is a special case of
GroupRasterScanConv2D where input_group_size == 1 and
output_group_size == 1.
"""
def __init__(self, depth, filter_size, strides, padding,
strict_order=True,
input_group_size=1,
output_group_size=1,
bias=None, act=None, initializer=None, name=None):
super(GroupRasterScanConv2D, self).__init__(
depth, filter_size, strides, padding, bias, act, name=name)
if (filter_size[0] % 2) != 1 or (filter_size[1] % 2) != 1:
raise ValueError('Kernel size should be odd.')
with self._BlockScope():
if initializer is None:
initializer = block_util.RsqrtInitializer(dims=(0, 1, 2))
self._initializer = initializer
self._input_group_size = input_group_size
self._output_group_size = output_group_size
self._strict_order = strict_order
if depth % self._output_group_size != 0:
raise ValueError(
'Invalid depth group size: {} for depth {}'.format(
self._output_group_size, depth))
self._output_group_count = depth // self._output_group_size
def _CreateKernel(self, shape, dtype):
init = self._initializer(shape, dtype)
kernel = self.NewVar(init)
depth_input = shape[2]
if depth_input % self._input_group_size != 0:
raise ValueError(
'Invalid depth group size: {} for depth {}'.format(
self._input_group_size, depth_input))
input_group_count = depth_input // self._input_group_size
output_group_count = self._output_group_count
# Set the mask to 0 for future pixels in raster scan order.
center = shape[:2] // 2
mask = np.ones([shape[0], shape[1],
input_group_count, self._input_group_size,
output_group_count, self._output_group_size],
dtype=dtype.as_numpy_dtype)
mask[center[0] + 1:, :, :, :, :, :] = 0
mask[center[0], center[1] + 1:, :, :, :, :] = 0
# Adjust the mask for the current position (the center position).
depth_output = shape[3]
for d in xrange(output_group_count):
mask[center[0], center[1], d + 1:, :, d:d + 1, :] = 0
if self._strict_order:
mask[center[0], center[1], d, :, d:d + 1, :] = 0
mask = mask.reshape([shape[0], shape[1], depth_input, depth_output])
return tf.convert_to_tensor(mask, dtype) * kernel
class InFillingConv2D(blocks_std.Conv2DBase):
"""Conv2D with kernel having no dependency on the current pixel.
For example, assuming a 5 x 5 kernel, the kernel is applied a spatial mask:
T T T T T
T T T T T
T T x T T
T T T T T
T T T T T
where 'T' marks a pixel which is available when computing the convolution
for pixel 'x'. 'x' itself is not available.
"""
def __init__(self, depth, filter_size, strides, padding,
bias=None, act=None, initializer=None, name=None):
super(InFillingConv2D, self).__init__(
depth, filter_size, strides, padding, bias, act, name=name)
if (filter_size[0] % 2) != 1 or (filter_size[1] % 2) != 1:
raise ValueError('Kernel size should be odd.')
if filter_size[0] == 1 and filter_size[1] == 1:
raise ValueError('Kernel size should be larger than 1x1.')
with self._BlockScope():
if initializer is None:
initializer = block_util.RsqrtInitializer(dims=(0, 1, 2))
self._initializer = initializer
def _CreateKernel(self, shape, dtype):
init = self._initializer(shape, dtype)
kernel = self.NewVar(init)
mask = np.ones(shape[:2], dtype=dtype.as_numpy_dtype)
center = shape[:2] // 2
mask[center[0], center[1]] = 0
mask = mask.reshape(mask.shape + (1, 1))
return tf.convert_to_tensor(mask, dtype) * kernel
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Masked conv2d LSTM."""
import block_base
import block_util
import blocks_masked_conv2d
import blocks_lstm
import blocks_std
# pylint: disable=not-callable
class RasterScanConv2DLSTM(blocks_lstm.LSTMBase):
"""Convolutional LSTM implementation with optimizations inspired by [1].
Note that when using the batch normalization feature, the bias initializer
will not be used, since BN effectively cancels its effect out.
[1] Zaremba, Sutskever, Vinyals. Recurrent Neural Network Regularization,
2015. arxiv:1409.2329.
"""
def __init__(self,
depth,
filter_size,
hidden_filter_size,
strides,
padding,
bias=blocks_lstm.LSTMBiasInit,
initializer=block_util.RsqrtInitializer(dims=(0, 1, 2)),
name=None):
super(RasterScanConv2DLSTM, self).__init__([None, None, depth], name)
with self._BlockScope():
self._input_conv = blocks_masked_conv2d.RasterScanConv2D(
4 * depth,
filter_size,
strides,
padding,
strict_order=False,
bias=None,
act=None,
initializer=initializer,
name='input_conv2d')
self._hidden_conv = blocks_std.Conv2D(
4 * depth,
hidden_filter_size,
[1, 1],
'SAME',
bias=None,
act=None,
initializer=initializer,
name='hidden_conv2d')
if bias is not None:
self._bias = blocks_std.BiasAdd(bias, name='biases')
else:
self._bias = blocks_std.PassThrough()
def _TransformInputs(self, x):
return self._bias(self._input_conv(x))
def _TransformHidden(self, h):
return self._hidden_conv(h)
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests of the 2D masked convolution blocks."""
from __future__ import division
from __future__ import unicode_literals
import numpy as np
import tensorflow as tf
import blocks_masked_conv2d
class MaskedConv2DTest(tf.test.TestCase):
def testRasterScanKernel(self):
kernel_size = 5
input_depth = 1
output_depth = 1
kernel_shape = [kernel_size, kernel_size, input_depth, output_depth]
# pylint: disable=bad-whitespace
kernel_feed = [[ 1.0, 2.0, 3.0, 4.0, 5.0],
[ 6.0, 7.0, 8.0, 9.0, 10.0],
[11.0, 12.0, 13.0, 14.0, 15.0],
[16.0, 17.0, 18.0, 19.0, 20.0],
[21.0, 22.0, 23.0, 24.0, 25.0]]
kernel_feed = np.reshape(kernel_feed, kernel_shape)
kernel_expected = [[ 1.0, 2.0, 3.0, 4.0, 5.0],
[ 6.0, 7.0, 8.0, 9.0, 10.0],
[11.0, 12.0, 0.0, 0.0, 0.0],
[ 0.0, 0.0, 0.0, 0.0, 0.0],
[ 0.0, 0.0, 0.0, 0.0, 0.0]]
kernel_expected = np.reshape(kernel_expected, kernel_shape)
# pylint: enable=bad-whitespace
init_kernel = lambda s, t: tf.constant(kernel_feed, dtype=t, shape=s)
masked_conv2d = blocks_masked_conv2d.RasterScanConv2D(
output_depth, [kernel_size] * 2, [1] * 2, 'SAME',
initializer=init_kernel)
x = tf.placeholder(dtype=tf.float32, shape=[10] * 3 + [input_depth])
_ = masked_conv2d(x)
with self.test_session():
tf.global_variables_initializer().run()
kernel_value = masked_conv2d._kernel.eval()
self.assertAllEqual(kernel_expected, kernel_value)
def testDepthOrderKernel(self):
kernel_size = 1
input_depth = 7
output_depth = input_depth
kernel_shape = [kernel_size, kernel_size, input_depth, output_depth]
kernel_feed = np.ones(kernel_shape)
x_shape = [5] * 3 + [input_depth]
x_feed = np.ones(x_shape)
y_expected = np.zeros(x_shape[0:3] + [output_depth])
y_expected[:, :, :] = np.arange(output_depth)
init_kernel = lambda s, t: tf.constant(kernel_feed, dtype=t, shape=s)
masked_conv2d = blocks_masked_conv2d.DepthOrderConv2D(
output_depth, [kernel_size] * 2, [1] * 2, 'SAME',
strict_order=True,
initializer=init_kernel)
x = tf.placeholder(dtype=tf.float32, shape=x_shape)
y = masked_conv2d(x)
with self.test_session():
tf.global_variables_initializer().run()
y_value = y.eval(feed_dict={x: x_feed})
self.assertAllEqual(y_expected, y_value)
def testGroupRasterScanKernel(self):
kernel_size = 3
input_depth = 4
input_group_size = 2
output_depth = 2
output_group_size = 1
kernel_shape = [kernel_size, kernel_size, input_depth, output_depth]
kernel_feed = np.ones(shape=kernel_shape)
height = 5
width = 5
x_shape = [1, height, width, input_depth]
x_feed = np.ones(shape=x_shape)
# pylint: disable=bad-whitespace
y_expected = [
[[ 0, 2], [ 4, 6], [ 4, 6], [ 4, 6], [ 4, 6]],
[[ 8, 10], [16, 18], [16, 18], [16, 18], [12, 14]],
[[ 8, 10], [16, 18], [16, 18], [16, 18], [12, 14]],
[[ 8, 10], [16, 18], [16, 18], [16, 18], [12, 14]],
[[ 8, 10], [16, 18], [16, 18], [16, 18], [12, 14]],
]
y_expected = np.reshape(y_expected, [1, height, width, output_depth])
# pylint: enable=bad-whitespace
init_kernel = lambda s, t: tf.constant(kernel_feed, dtype=t, shape=s)
masked_conv2d = blocks_masked_conv2d.GroupRasterScanConv2D(
output_depth, [kernel_size] * 2, [1] * 2, 'SAME',
strict_order=True,
input_group_size=input_group_size,
output_group_size=output_group_size,
initializer=init_kernel)
x = tf.placeholder(dtype=tf.float32, shape=x_shape)
y = masked_conv2d(x)
with self.test_session():
tf.global_variables_initializer().run()
y_value = y.eval(feed_dict={x: x_feed})
self.assertAllEqual(y_expected, y_value)
def testInFillingKernel(self):
kernel_size = 5
input_depth = 1
output_depth = 1
kernel_shape = [kernel_size, kernel_size, input_depth, output_depth]
# pylint: disable=bad-whitespace
kernel_feed = [[ 1.0, 2.0, 3.0, 4.0, 5.0],
[ 6.0, 7.0, 8.0, 9.0, 10.0],
[11.0, 12.0, 13.0, 14.0, 15.0],
[16.0, 17.0, 18.0, 19.0, 20.0],
[21.0, 22.0, 23.0, 24.0, 25.0]]
kernel_feed = np.reshape(kernel_feed, kernel_shape)
kernel_expected = [[ 1.0, 2.0, 3.0, 4.0, 5.0],
[ 6.0, 7.0, 8.0, 9.0, 10.0],
[11.0, 12.0, 0.0, 14.0, 15.0],
[16.0, 17.0, 18.0, 19.0, 20.0],
[21.0, 22.0, 23.0, 24.0, 25.0]]
kernel_expected = np.reshape(kernel_expected, kernel_shape)
# pylint: enable=bad-whitespace
init_kernel = lambda s, t: tf.constant(kernel_feed, dtype=t, shape=s)
masked_conv2d = blocks_masked_conv2d.InFillingConv2D(
output_depth, [kernel_size] * 2, [1] * 2, 'SAME',
initializer=init_kernel)
x = tf.placeholder(dtype=tf.float32, shape=[10] * 3 + [input_depth])
_ = masked_conv2d(x)
with self.test_session():
tf.global_variables_initializer().run()
kernel_value = masked_conv2d._kernel.eval()
self.assertAllEqual(kernel_expected, kernel_value)
def testConv2DMaskedNumerics(self):
kernel_size = 5
input_shape = [1, 10, 10, 1]
filter_shape = [kernel_size, kernel_size, 1, 1]
strides = [1, 1, 1, 1]
output_shape = [1, 10, 10, 1]
conv = blocks_masked_conv2d.RasterScanConv2D(
depth=filter_shape[-1],
filter_size=filter_shape[0:2],
strides=strides[1:3],
padding='SAME',
initializer=tf.constant_initializer(value=1.0))
x = tf.placeholder(dtype=tf.float32, shape=input_shape)
y = conv(x)
x_feed = - np.ones(input_shape, dtype=float)
y_expected = np.ones(output_shape, dtype=float)
for i in xrange(input_shape[1]):
for j in xrange(input_shape[2]):
x_feed[0, i, j, 0] = 10 * (j + 1) + i
v = 0
ki_start = max(i - kernel_size // 2, 0)
kj_start = max(j - kernel_size // 2, 0)
kj_end = min(j + kernel_size // 2, input_shape[2] - 1)
for ki in range(ki_start, i + 1):
for kj in range(kj_start, kj_end + 1):
if ki > i:
continue
if ki == i and kj >= j:
continue
v += 10 * (kj + 1) + ki
y_expected[0, i, j, 0] = v
with self.test_session():
tf.global_variables_initializer().run()
y_value = y.eval(feed_dict={x: x_feed})
self.assertAllEqual(y_expected, y_value)
if __name__ == '__main__':
tf.test.main()
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Common blocks which work as operators on other blocks."""
import tensorflow as tf
import block_base
# pylint: disable=not-callable
class CompositionOperator(block_base.BlockBase):
"""Composition of several blocks."""
def __init__(self, block_list, name=None):
"""Initialization of the composition operator.
Args:
block_list: List of blocks.BlockBase that are chained to create
a new blocks.BlockBase.
name: Name of this block.
"""
super(CompositionOperator, self).__init__(name)
self._blocks = block_list
def _Apply(self, x):
"""Apply successively all the blocks on the given input tensor."""
h = x
for layer in self._blocks:
h = layer(h)
return h
class LineOperator(block_base.BlockBase):
"""Repeat the same block over all the lines of an input tensor."""
def __init__(self, block, name=None):
super(LineOperator, self).__init__(name)
self._block = block
def _Apply(self, x):
height = x.get_shape()[1].value
if height is None:
raise ValueError('Unknown tensor height')
all_line_x = tf.split(value=x, num_or_size_splits=height, axis=1)
y = []
for line_x in all_line_x:
y.append(self._block(line_x))
y = tf.concat(values=y, axis=1)
return y
class TowerOperator(block_base.BlockBase):
"""Parallel execution with concatenation of several blocks."""
def __init__(self, block_list, dim=3, name=None):
"""Initialization of the parallel exec + concat (Tower).
Args:
block_list: List of blocks.BlockBase that are chained to create
a new blocks.BlockBase.
dim: the dimension on which to concat.
name: Name of this block.
"""
super(TowerOperator, self).__init__(name)
self._blocks = block_list
self._concat_dim = dim
def _Apply(self, x):
"""Apply successively all the blocks on the given input tensor."""
outputs = [layer(x) for layer in self._blocks]
return tf.concat(outputs, self._concat_dim)
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests of the block operators."""
import numpy as np
import tensorflow as tf
import block_base
import blocks_operator
class AddOneBlock(block_base.BlockBase):
def __init__(self, name=None):
super(AddOneBlock, self).__init__(name)
def _Apply(self, x):
return x + 1.0
class SquareBlock(block_base.BlockBase):
def __init__(self, name=None):
super(SquareBlock, self).__init__(name)
def _Apply(self, x):
return x * x
class BlocksOperatorTest(tf.test.TestCase):
def testComposition(self):
x_value = np.array([[1.0, 2.0, 3.0],
[-1.0, -2.0, -3.0]])
y_expected_value = np.array([[4.0, 9.0, 16.0],
[0.0, 1.0, 4.0]])
x = tf.placeholder(dtype=tf.float32, shape=[2, 3])
complex_block = blocks_operator.CompositionOperator(
[AddOneBlock(),
SquareBlock()])
y = complex_block(x)
with self.test_session():
y_value = y.eval(feed_dict={x: x_value})
self.assertAllClose(y_expected_value, y_value)
if __name__ == '__main__':
tf.test.main()
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Basic blocks for building tensorflow models."""
import numpy as np
import tensorflow as tf
import block_base
import block_util
# pylint does not recognize block_base.BlockBase.__call__().
# pylint: disable=not-callable
def HandleConvPaddingModes(x, padding, kernel_shape, strides):
"""Returns an updated tensor and padding type for REFLECT and SYMMETRIC.
Args:
x: A 4D tensor with shape [batch_size, height, width, depth].
padding: Padding mode (SAME, VALID, REFLECT, or SYMMETRIC).
kernel_shape: Shape of convolution kernel that will be applied.
strides: Convolution stride that will be used.
Returns:
x and padding after adjustments for REFLECT and SYMMETRIC.
"""
# For 1x1 convolution, all padding modes are the same.
if np.all(kernel_shape[:2] == 1):
return x, 'VALID'
if padding == 'REFLECT' or padding == 'SYMMETRIC':
# We manually compute the number of paddings as if 'SAME'.
# From Tensorflow kernel, the formulas are as follows.
# output_shape = ceil(input_shape / strides)
# paddings = (output_shape - 1) * strides + filter_size - input_shape
# Let x, y, s be a shorthand notations for input_shape, output_shape, and
# strides, respectively. Let (x - 1) = sn + r where 0 <= r < s. Note that
# y - 1 = ceil(x / s) - 1 = floor((x - 1) / s) = n
# provided that x > 0. Therefore
# paddings = n * s + filter_size - (sn + r + 1)
# = filter_size - r - 1.
input_shape = x.get_shape() # shape at graph construction time
img_shape = tf.shape(x)[1:3] # image shape (no batch) at run time
remainder = tf.mod(img_shape - 1, strides[1:3])
pad_sizes = kernel_shape[:2] - remainder - 1
pad_rows = pad_sizes[0]
pad_cols = pad_sizes[1]
pad = tf.stack([[0, 0], tf.stack([pad_rows // 2, (pad_rows + 1) // 2]),
tf.stack([pad_cols // 2, (pad_cols + 1) // 2]), [0, 0]])
# Manually pad the input and switch the padding mode to 'VALID'.
x = tf.pad(x, pad, mode=padding)
x.set_shape([input_shape[0], x.get_shape()[1],
x.get_shape()[2], input_shape[3]])
padding = 'VALID'
return x, padding
class PassThrough(block_base.BlockBase):
"""A dummy transform block that does nothing."""
def __init__(self):
# Pass an empty string to disable name scoping.
super(PassThrough, self).__init__(name='')
def _Apply(self, inp):
return inp
@property
def initialized(self):
"""Always returns True."""
return True
class Bias(object):
"""An initialization helper class for BiasAdd block below."""
def __init__(self, value=0):
self.value = value
class BiasAdd(block_base.BlockBase):
"""A tf.nn.bias_add wrapper.
This wrapper may act as a PassThrough block depending on the initializer
provided, to make easier optional bias applications in NN blocks, etc.
See __init__() for the details.
"""
def __init__(self, initializer=Bias(0), name=None):
"""Initializes Bias block.
|initializer| parameter have two special cases.
1. If initializer is None, then this block works as a PassThrough.
2. If initializer is a Bias class object, then tf.constant_initializer is
used with the stored value.
Args:
initializer: An initializer for the bias variable.
name: Name of this block.
"""
super(BiasAdd, self).__init__(name)
with self._BlockScope():
if isinstance(initializer, Bias):
self._initializer = tf.constant_initializer(value=initializer.value)
else:
self._initializer = initializer
self._bias = None
def _Apply(self, x):
if not self._bias:
init = self._initializer([int(x.get_shape()[-1])], x.dtype)
self._bias = self.NewVar(init)
return tf.nn.bias_add(x, self._bias)
def CreateWeightLoss(self):
return []
class LinearBase(block_base.BlockBase):
"""A matmul wrapper.
Returns input * W, where matrix W can be customized through derivation.
"""
def __init__(self, depth, name=None):
super(LinearBase, self).__init__(name)
with self._BlockScope():
self._depth = depth
self._matrix = None
def _CreateKernel(self, shape, dtype):
raise NotImplementedError('This method must be sub-classed.')
def _Apply(self, x):
if not self._matrix:
shape = [int(x.get_shape()[-1]), self._depth]
self._matrix = self._CreateKernel(shape, x.dtype)
return tf.matmul(x, self._matrix)
class Linear(LinearBase):
"""A matmul wrapper.
Returns input * W, where matrix W is learned.
"""
def __init__(self,
depth,
initializer=block_util.RsqrtInitializer(),
name=None):
super(Linear, self).__init__(depth, name)
with self._BlockScope():
self._initializer = initializer
def _CreateKernel(self, shape, dtype):
init = self._initializer(shape, dtype)
return self.NewVar(init)
class NN(block_base.BlockBase):
"""A neural network layer wrapper.
Returns act(input * W + b), where matrix W, bias b are learned, and act is an
optional activation function (i.e., nonlinearity).
This transform block can handle multiple inputs. If x_1, x_2, ..., x_m are
the inputs, then returns act(x_1 * W_1 + ... + x_m * W_m + b).
Attributes:
nunits: The dimension of the output.
"""
def __init__(self,
depth,
bias=Bias(0),
act=None, # e.g., tf.nn.relu
initializer=block_util.RsqrtInitializer(),
linear_block_factory=(lambda d, i: Linear(d, initializer=i)),
name=None):
"""Initializes NN block.
Args:
depth: The depth of the output.
bias: An initializer for the bias, or a Bias class object. If None, there
will be no bias term for this NN block. See BiasAdd block.
act: Optional activation function. If None, no activation is applied.
initializer: The initialization method for the matrix weights.
linear_block_factory: A function used to create a linear block.
name: The name of this block.
"""
super(NN, self).__init__(name)
with self._BlockScope():
self._linear_block_factory = linear_block_factory
self._depth = depth
self._initializer = initializer
self._matrices = None
self._bias = BiasAdd(bias) if bias else PassThrough()
self._act = act if act else PassThrough()
# TODO(sjhwang): Stop using **kwargs, if we ever switch to python3.
def _Apply(self, *args):
if not self._matrices:
self._matrices = [
self._linear_block_factory(self._depth, self._initializer)
for _ in args]
if len(self._matrices) != len(args):
raise ValueError('{} expected {} inputs, but observed {} inputs'.format(
self.name, len(self._matrices), len(args)))
if len(args) > 1:
y = tf.add_n([m(x) for m, x in zip(self._matrices, args)])
else:
y = self._matrices[0](args[0])
return self._act(self._bias(y))
class Conv2DBase(block_base.BlockBase):
"""A tf.nn.conv2d operator."""
def __init__(self, depth, filter_size, strides, padding,
bias=None, act=None, atrous_rate=None, conv=tf.nn.conv2d,
name=None):
"""Initializes a Conv2DBase block.
Arguments:
depth: The output depth of the block (i.e. #filters); if negative, the
output depth will be set to be the same as the input depth.
filter_size: The size of the 2D filter. If it's specified as an integer,
it's going to create a square filter. Otherwise, this is a tuple
specifying the height x width of the filter.
strides: A tuple specifying the y and x stride.
padding: One of the valid padding modes allowed by tf.nn.conv2d, or
'REFLECT'/'SYMMETRIC' for mirror padding.
bias: An initializer for the bias, or a Bias class object. If None, there
will be no bias in this block. See BiasAdd block.
act: Optional activation function applied to the output.
atrous_rate: optional input rate for ATrous convolution. If not None, this
will be used and the strides will be ignored.
conv: The convolution function to use (e.g. tf.nn.conv2d).
name: The name for this conv2d op.
"""
super(Conv2DBase, self).__init__(name)
with self._BlockScope():
self._act = act if act else PassThrough()
self._bias = BiasAdd(bias) if bias else PassThrough()
self._kernel_shape = np.zeros((4,), dtype=np.int32)
self._kernel_shape[:2] = filter_size
self._kernel_shape[3] = depth
self._strides = np.ones((4,), dtype=np.int32)
self._strides[1:3] = strides
self._strides = list(self._strides)
self._padding = padding
self._kernel = None
self._conv = conv
self._atrous_rate = atrous_rate
def _CreateKernel(self, shape, dtype):
raise NotImplementedError('This method must be sub-classed')
def _Apply(self, x):
"""Apply the self._conv op.
Arguments:
x: input tensor. It needs to be a 4D tensor of the form
[batch, height, width, channels].
Returns:
The output of the convolution of x with the current convolutional
kernel.
Raises:
ValueError: if number of channels is not defined at graph construction.
"""
input_shape = x.get_shape().with_rank(4)
input_shape[3:].assert_is_fully_defined() # channels must be defined
if self._kernel is None:
assert self._kernel_shape[2] == 0, self._kernel_shape
self._kernel_shape[2] = input_shape[3].value
if self._kernel_shape[3] < 0:
# Make output depth be the same as input depth.
self._kernel_shape[3] = self._kernel_shape[2]
self._kernel = self._CreateKernel(self._kernel_shape, x.dtype)
x, padding = HandleConvPaddingModes(
x, self._padding, self._kernel_shape, self._strides)
if self._atrous_rate is None:
x = self._conv(x, self._kernel, strides=self._strides, padding=padding)
else:
x = self._conv(x, self._kernel, rate=self._atrous_rate, padding=padding)
if self._padding != 'VALID':
# Manually update shape. Known shape information can be lost by tf.pad().
height = (1 + (input_shape[1].value - 1) // self._strides[1]
if input_shape[1].value else None)
width = (1 + (input_shape[2].value - 1) // self._strides[2]
if input_shape[2].value else None)
shape = x.get_shape()
x.set_shape([shape[0], height, width, shape[3]])
return self._act(self._bias(x))
class Conv2D(Conv2DBase):
"""A tf.nn.conv2d operator."""
def __init__(self, depth, filter_size, strides, padding,
bias=None, act=None, initializer=None, name=None):
"""Initializes a Conv2D block.
Arguments:
depth: The output depth of the block (i.e., #filters)
filter_size: The size of the 2D filter. If it's specified as an integer,
it's going to create a square filter. Otherwise, this is a tuple
specifying the height x width of the filter.
strides: A tuple specifying the y and x stride.
padding: One of the valid padding modes allowed by tf.nn.conv2d, or
'REFLECT'/'SYMMETRIC' for mirror padding.
bias: An initializer for the bias, or a Bias class object. If None, there
will be no bias in this block. See BiasAdd block.
act: Optional activation function applied to the output.
initializer: Optional initializer for weights.
name: The name for this conv2d op.
"""
super(Conv2D, self).__init__(depth, filter_size, strides, padding, bias,
act, conv=tf.nn.conv2d, name=name)
with self._BlockScope():
if initializer is None:
initializer = block_util.RsqrtInitializer(dims=(0, 1, 2))
self._initializer = initializer
def _CreateKernel(self, shape, dtype):
return self.NewVar(self._initializer(shape, dtype))
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for basic tensorflow blocks_std."""
from __future__ import division
from __future__ import unicode_literals
import math
import os
import numpy as np
import tensorflow as tf
import blocks_std
def _NumpyConv2D(x, f, strides, padding, rate=1):
assert strides[0] == 1 and strides[3] == 1, strides
if rate > 1:
f_shape = f.shape
expand_f = np.zeros([f_shape[0], ((f_shape[1] - 1) * rate + 1),
f_shape[2], f_shape[3]])
expand_f[:, [y * rate for y in range(f_shape[1])], :, :] = f
f = np.zeros([((f_shape[0] - 1) * rate + 1), expand_f.shape[1],
f_shape[2], f_shape[3]])
f[[y * rate for y in range(f_shape[0])], :, :, :] = expand_f
if padding != 'VALID':
assert x.shape[1] > 0 and x.shape[2] > 0, x.shape
# Compute the number of padded rows and cols.
# See Conv2D block comments for a math explanation.
remainder = ((x.shape[1] - 1) % strides[1], (x.shape[2] - 1) % strides[2])
pad_rows = f.shape[0] - remainder[0] - 1
pad_cols = f.shape[1] - remainder[1] - 1
pad = ((0, 0),
(pad_rows // 2, (pad_rows + 1) // 2),
(pad_cols // 2, (pad_cols + 1) // 2),
(0, 0))
# Pad the input using numpy.pad().
mode = None
if padding == 'SAME':
mode = str('constant')
if padding == 'REFLECT':
mode = str('reflect')
if padding == 'SYMMETRIC':
mode = str('symmetric')
x = np.pad(x, pad, mode=mode)
# Since x is now properly padded, proceed as if padding mode is VALID.
x_window = np.empty(
(x.shape[0],
int(math.ceil((x.shape[1] - f.shape[0] + 1) / strides[1])),
int(math.ceil((x.shape[2] - f.shape[1] + 1) / strides[2])),
np.prod(f.shape[:3])))
# The output at pixel location (i, j) is the result of linear transformation
# applied to the window whose top-left corner is at
# (i * row_stride, j * col_stride).
for i in xrange(x_window.shape[1]):
k = i * strides[1]
for j in xrange(x_window.shape[2]):
l = j * strides[2]
x_window[:, i, j, :] = x[:,
k:(k + f.shape[0]),
l:(l + f.shape[1]),
:].reshape((x_window.shape[0], -1))
y = np.tensordot(x_window, f.reshape((-1, f.shape[3])), axes=1)
return y
class BlocksStdTest(tf.test.TestCase):
def CheckUnary(self, y, op_type):
self.assertEqual(op_type, y.op.type)
self.assertEqual(1, len(y.op.inputs))
return y.op.inputs[0]
def CheckBinary(self, y, op_type):
self.assertEqual(op_type, y.op.type)
self.assertEqual(2, len(y.op.inputs))
return y.op.inputs
def testPassThrough(self):
p = blocks_std.PassThrough()
x = tf.placeholder(dtype=tf.float32, shape=[1])
self.assertIs(p(x), x)
def CheckBiasAdd(self, y, b):
x, u = self.CheckBinary(y, 'BiasAdd')
self.assertIs(u, b._bias.value())
self.assertEqual(x.dtype, u.dtype.base_dtype)
return x
def testBiasAdd(self):
b = blocks_std.BiasAdd()
x = tf.placeholder(dtype=tf.float32, shape=[4, 8])
y = b(x)
self.assertEqual(b._bias.get_shape(), x.get_shape()[-1:])
self.assertIs(x, self.CheckBiasAdd(y, b))
def testBiasRankTest(self):
b = blocks_std.BiasAdd()
x = tf.placeholder(dtype=tf.float32, shape=[10])
with self.assertRaises(ValueError):
b(x)
def CheckLinear(self, y, m):
x, w = self.CheckBinary(y, 'MatMul')
self.assertIs(w, m._matrix.value())
self.assertEqual(x.dtype, w.dtype.base_dtype)
return x
def testLinear(self):
m = blocks_std.Linear(10)
x = tf.placeholder(dtype=tf.float32, shape=[8, 9])
y = m(x)
self.assertEqual(m._matrix.get_shape(), [9, 10])
self.assertIs(x, self.CheckLinear(y, m))
def testLinearShared(self):
# Create a linear map which is applied twice on different inputs
# (i.e. the weights of the map are shared).
# TODO(sjhwang): Make this test deterministic.
linear_map = blocks_std.Linear(6)
x1 = tf.random_normal(shape=[1, 5])
x2 = tf.random_normal(shape=[1, 5])
xs = x1 + x2
# Apply the transform with the same weights.
y1 = linear_map(x1)
y2 = linear_map(x2)
ys = linear_map(xs)
with self.test_session() as sess:
# Initialize all the variables of the graph.
tf.global_variables_initializer().run()
y1_res, y2_res, ys_res = sess.run([y1, y2, ys])
self.assertAllClose(y1_res + y2_res, ys_res)
def CheckNN(self, y, nn, act=None):
if act:
pre_act = self.CheckUnary(y, act)
else:
pre_act = y
if not isinstance(nn._bias, blocks_std.PassThrough):
pre_bias = self.CheckBiasAdd(pre_act, nn._bias)
else:
pre_bias = pre_act
if len(nn._matrices) > 1:
self.assertEqual('AddN', pre_bias.op.type)
pre_bias = pre_bias.op.inputs
else:
pre_bias = [pre_bias]
self.assertEqual(len(pre_bias), len(nn._matrices))
return [self.CheckLinear(u, m) for u, m in zip(pre_bias, nn._matrices)]
def testNNWithoutActWithoutBias(self):
nn = blocks_std.NN(10, act=None, bias=None)
x = tf.placeholder(dtype=tf.float32, shape=[5, 7])
y = nn(x)
self.assertIs(x, self.CheckNN(y, nn)[0])
def testNNWithoutBiasWithAct(self):
nn = blocks_std.NN(10, act=tf.nn.relu, bias=None)
x = tf.placeholder(dtype=tf.float32, shape=[5, 7])
y = nn(x)
self.assertIs(x, self.CheckNN(y, nn, 'Relu')[0])
def testNNWithBiasWithoutAct(self):
nn = blocks_std.NN(10, bias=blocks_std.Bias(0), act=None)
x = tf.placeholder(dtype=tf.float32, shape=[5, 7])
y = nn(x)
self.assertIs(x, self.CheckNN(y, nn)[0])
def testNNWithBiasWithAct(self):
nn = blocks_std.NN(10, bias=blocks_std.Bias(0), act=tf.square)
x = tf.placeholder(dtype=tf.float32, shape=[5, 7])
y = nn(x)
self.assertIs(x, self.CheckNN(y, nn, 'Square')[0])
def testNNMultipleInputs(self):
nn = blocks_std.NN(10, bias=blocks_std.Bias(0), act=tf.tanh)
x = [tf.placeholder(dtype=tf.float32, shape=[5, 7]),
tf.placeholder(dtype=tf.float32, shape=[5, 3]),
tf.placeholder(dtype=tf.float32, shape=[5, 5])]
y = nn(*x)
xs = self.CheckNN(y, nn, 'Tanh')
self.assertEqual(len(x), len(xs))
for u, v in zip(x, xs):
self.assertIs(u, v)
def testConv2DSAME(self):
np.random.seed(142536)
x_shape = [4, 16, 11, 5]
f_shape = [4, 3, 5, 6]
strides = [1, 2, 2, 1]
padding = 'SAME'
conv = blocks_std.Conv2D(depth=f_shape[-1],
filter_size=f_shape[0:2],
strides=strides[1:3],
padding=padding,
act=None,
bias=None)
x_value = np.random.normal(size=x_shape)
x = tf.convert_to_tensor(x_value, dtype=tf.float32)
y = conv(x)
with self.test_session():
tf.global_variables_initializer().run()
f_value = conv._kernel.eval()
y_value = y.eval()
y_expected = _NumpyConv2D(x_value, f_value,
strides=strides, padding=padding)
self.assertAllClose(y_expected, y_value)
def testConv2DValid(self):
np.random.seed(253647)
x_shape = [4, 11, 12, 5]
f_shape = [5, 2, 5, 5]
strides = [1, 2, 2, 1]
padding = 'VALID'
conv = blocks_std.Conv2D(depth=f_shape[-1],
filter_size=f_shape[0:2],
strides=strides[1:3],
padding=padding,
act=None,
bias=None)
x_value = np.random.normal(size=x_shape)
x = tf.convert_to_tensor(x_value, dtype=tf.float32)
y = conv(x)
with self.test_session():
tf.global_variables_initializer().run()
f_value = conv._kernel.eval()
y_value = y.eval()
y_expected = _NumpyConv2D(x_value, f_value,
strides=strides, padding=padding)
self.assertAllClose(y_expected, y_value)
def testConv2DSymmetric(self):
np.random.seed(364758)
x_shape = [4, 10, 12, 6]
f_shape = [3, 4, 6, 5]
strides = [1, 1, 1, 1]
padding = 'SYMMETRIC'
conv = blocks_std.Conv2D(depth=f_shape[-1],
filter_size=f_shape[0:2],
strides=strides[1:3],
padding=padding,
act=None,
bias=None)
x_value = np.random.normal(size=x_shape)
x = tf.convert_to_tensor(x_value, dtype=tf.float32)
y = conv(x)
with self.test_session():
tf.global_variables_initializer().run()
f_value = conv._kernel.eval()
y_value = y.eval()
y_expected = _NumpyConv2D(x_value, f_value,
strides=strides, padding=padding)
self.assertAllClose(y_expected, y_value)
def testConv2DReflect(self):
np.random.seed(768798)
x_shape = [4, 10, 12, 6]
f_shape = [3, 4, 6, 5]
strides = [1, 2, 2, 1]
padding = 'REFLECT'
conv = blocks_std.Conv2D(depth=f_shape[-1],
filter_size=f_shape[0:2],
strides=strides[1:3],
padding=padding,
act=None,
bias=None)
x_value = np.random.normal(size=x_shape)
x = tf.convert_to_tensor(x_value, dtype=tf.float32)
y = conv(x)
with self.test_session():
tf.global_variables_initializer().run()
f_value = conv._kernel.eval()
y_value = y.eval()
y_expected = _NumpyConv2D(x_value, f_value,
strides=strides, padding=padding)
self.assertAllClose(y_expected, y_value)
def testConv2DBias(self):
input_shape = [19, 14, 14, 64]
filter_shape = [3, 7, 64, 128]
strides = [1, 2, 2, 1]
output_shape = [19, 6, 4, 128]
conv = blocks_std.Conv2D(depth=filter_shape[-1],
filter_size=filter_shape[0:2],
strides=strides[1:3],
padding='VALID',
act=None,
bias=blocks_std.Bias(1))
x = tf.placeholder(dtype=tf.float32, shape=input_shape)
y = conv(x)
self.CheckBiasAdd(y, conv._bias)
self.assertEqual(output_shape, y.get_shape().as_list())
if __name__ == '__main__':
tf.test.main()
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Entropy coder model."""
class EntropyCoderModel(object):
"""Entropy coder model."""
def __init__(self):
# Loss used for training the model.
self.loss = None
# Tensorflow op to run to train the model.
self.train_op = None
# Tensor corresponding to the average code length of the input bit field
# tensor. The average code length is a number of output bits per input bit.
# To get an effective compression, this number should be between 0.0
# and 1.0 (1.0 corresponds to no compression).
self.average_code_length = None
def Initialize(self, global_step, optimizer, config_string):
raise NotImplementedError()
def BuildGraph(self, input_codes):
"""Build the Tensorflow graph corresponding to the entropy coder model.
Args:
input_codes: Tensor of size: batch_size x height x width x bit_depth
corresponding to the codes to compress.
The input codes are {-1, +1} codes.
"""
# TODO(damienv):
# - consider switching to {0, 1} codes.
# - consider passing an extra tensor which gives for each (b, y, x)
# what is the actual depth (which would allow to use more or less bits
# for each (y, x) location.
raise NotImplementedError()
def GetConfigStringForUnitTest(self):
"""Returns a default model configuration to be used for unit tests."""
return None
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Entropy coder model registrar."""
class ModelFactory(object):
"""Factory of encoder/decoder models."""
def __init__(self):
self._model_dictionary = dict()
def RegisterModel(self,
entropy_coder_model_name,
entropy_coder_model_factory):
self._model_dictionary[entropy_coder_model_name] = (
entropy_coder_model_factory)
def CreateModel(self, model_name):
current_model_factory = self._model_dictionary[model_name]
return current_model_factory()
def GetAvailableModels(self):
return self._model_dictionary.keys()
_model_registry = ModelFactory()
def GetModelRegistry():
return _model_registry
class RegisterEntropyCoderModel(object):
def __init__(self, model_name):
self._model_name = model_name
def __call__(self, f):
_model_registry.RegisterModel(self._model_name, f)
return f
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Code probability model used for entropy coding."""
import json
import tensorflow as tf
from entropy_coder.lib import blocks
from entropy_coder.model import entropy_coder_model
from entropy_coder.model import model_factory
# pylint: disable=not-callable
class BrnnPredictor(blocks.BlockBase):
"""BRNN prediction applied on one layer."""
def __init__(self, code_depth, name=None):
super(BrnnPredictor, self).__init__(name)
with self._BlockScope():
hidden_depth = 2 * code_depth
# What is coming from the previous layer/iteration
# is going through a regular Conv2D layer as opposed to the binary codes
# of the current layer/iteration which are going through a masked
# convolution.
self._adaptation0 = blocks.RasterScanConv2D(
hidden_depth, [7, 7], [1, 1], 'SAME',
strict_order=True,
bias=blocks.Bias(0), act=tf.tanh)
self._adaptation1 = blocks.Conv2D(
hidden_depth, [3, 3], [1, 1], 'SAME',
bias=blocks.Bias(0), act=tf.tanh)
self._predictor = blocks.CompositionOperator([
blocks.LineOperator(
blocks.RasterScanConv2DLSTM(
depth=hidden_depth,
filter_size=[1, 3],
hidden_filter_size=[1, 3],
strides=[1, 1],
padding='SAME')),
blocks.Conv2D(hidden_depth, [1, 1], [1, 1], 'SAME',
bias=blocks.Bias(0), act=tf.tanh),
blocks.Conv2D(code_depth, [1, 1], [1, 1], 'SAME',
bias=blocks.Bias(0), act=tf.tanh)
])
def _Apply(self, x, s):
# Code estimation using both:
# - the state from the previous iteration/layer,
# - the binary codes that are before in raster scan order.
h = tf.concat(values=[self._adaptation0(x), self._adaptation1(s)], axis=3)
estimated_codes = self._predictor(h)
return estimated_codes
class LayerPrediction(blocks.BlockBase):
"""Binary code prediction for one layer."""
def __init__(self, layer_count, code_depth, name=None):
super(LayerPrediction, self).__init__(name)
self._layer_count = layer_count
# No previous layer.
self._layer_state = None
self._current_layer = 0
with self._BlockScope():
# Layers used to do the conditional code prediction.
self._brnn_predictors = []
for _ in xrange(layer_count):
self._brnn_predictors.append(BrnnPredictor(code_depth))
# Layers used to generate the input of the LSTM operating on the
# iteration/depth domain.
hidden_depth = 2 * code_depth
self._state_blocks = []
for _ in xrange(layer_count):
self._state_blocks.append(blocks.CompositionOperator([
blocks.Conv2D(
hidden_depth, [3, 3], [1, 1], 'SAME',
bias=blocks.Bias(0), act=tf.tanh),
blocks.Conv2D(
code_depth, [3, 3], [1, 1], 'SAME',
bias=blocks.Bias(0), act=tf.tanh)
]))
# Memory of the RNN is equivalent to the size of 2 layers of binary
# codes.
hidden_depth = 2 * code_depth
self._layer_rnn = blocks.CompositionOperator([
blocks.Conv2DLSTM(
depth=hidden_depth,
filter_size=[1, 1],
hidden_filter_size=[1, 1],
strides=[1, 1],
padding='SAME'),
blocks.Conv2D(hidden_depth, [1, 1], [1, 1], 'SAME',
bias=blocks.Bias(0), act=tf.tanh),
blocks.Conv2D(code_depth, [1, 1], [1, 1], 'SAME',
bias=blocks.Bias(0), act=tf.tanh)
])
def _Apply(self, x):
assert self._current_layer < self._layer_count
# Layer state is set to 0 when there is no previous iteration.
if self._layer_state is None:
self._layer_state = tf.zeros_like(x, dtype=tf.float32)
# Code estimation using both:
# - the state from the previous iteration/layer,
# - the binary codes that are before in raster scan order.
estimated_codes = self._brnn_predictors[self._current_layer](
x, self._layer_state)
# Compute the updated layer state.
h = self._state_blocks[self._current_layer](x)
self._layer_state = self._layer_rnn(h)
self._current_layer += 1
return estimated_codes
class ProgressiveModel(entropy_coder_model.EntropyCoderModel):
"""Progressive BRNN entropy coder model."""
def __init__(self):
super(ProgressiveModel, self).__init__()
def Initialize(self, global_step, optimizer, config_string):
if config_string is None:
raise ValueError('The progressive model requires a configuration.')
config = json.loads(config_string)
if 'coded_layer_count' not in config:
config['coded_layer_count'] = 0
self._config = config
self._optimizer = optimizer
self._global_step = global_step
def BuildGraph(self, input_codes):
"""Build the graph corresponding to the progressive BRNN model."""
layer_depth = self._config['layer_depth']
layer_count = self._config['layer_count']
code_shape = input_codes.get_shape()
code_depth = code_shape[-1].value
if self._config['coded_layer_count'] > 0:
prefix_depth = self._config['coded_layer_count'] * layer_depth
if code_depth < prefix_depth:
raise ValueError('Invalid prefix depth: {} VS {}'.format(
prefix_depth, code_depth))
input_codes = input_codes[:, :, :, :prefix_depth]
code_shape = input_codes.get_shape()
code_depth = code_shape[-1].value
if code_depth % layer_depth != 0:
raise ValueError(
'Code depth must be a multiple of the layer depth: {} vs {}'.format(
code_depth, layer_depth))
code_layer_count = code_depth // layer_depth
if code_layer_count > layer_count:
raise ValueError('Input codes have too many layers: {}, max={}'.format(
code_layer_count, layer_count))
# Block used to estimate binary codes.
layer_prediction = LayerPrediction(layer_count, layer_depth)
# Block used to compute code lengths.
code_length_block = blocks.CodeLength()
# Loop over all the layers.
code_length = []
code_layers = tf.split(
value=input_codes, num_or_size_splits=code_layer_count, axis=3)
for k in xrange(code_layer_count):
x = code_layers[k]
predicted_x = layer_prediction(x)
# Saturate the prediction to avoid infinite code length.
epsilon = 0.001
predicted_x = tf.clip_by_value(
predicted_x, -1 + epsilon, +1 - epsilon)
code_length.append(code_length_block(
blocks.ConvertSignCodeToZeroOneCode(x),
blocks.ConvertSignCodeToZeroOneCode(predicted_x)))
tf.contrib.deprecated.scalar_summary('code_length_layer_{:02d}'.format(k),
code_length[-1])
code_length = tf.stack(code_length)
self.loss = tf.reduce_mean(code_length)
tf.contrib.deprecated.scalar_summary('loss', self.loss)
# Loop over all the remaining layers just to make sure they are
# instantiated. Otherwise, loading model params could fail.
dummy_x = tf.zeros_like(code_layers[0])
for _ in xrange(layer_count - code_layer_count):
dummy_predicted_x = layer_prediction(dummy_x)
# Average bitrate over total_line_count.
self.average_code_length = tf.reduce_mean(code_length)
if self._optimizer:
optim_op = self._optimizer.minimize(self.loss,
global_step=self._global_step)
block_updates = blocks.CreateBlockUpdates()
if block_updates:
with tf.get_default_graph().control_dependencies([optim_op]):
self.train_op = tf.group(*block_updates)
else:
self.train_op = optim_op
else:
self.train_op = None
def GetConfigStringForUnitTest(self):
s = '{\n'
s += '"layer_depth": 1,\n'
s += '"layer_count": 8\n'
s += '}\n'
return s
@model_factory.RegisterEntropyCoderModel('progressive')
def CreateProgressiveModel():
return ProgressiveModel()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment