Commit 68a18b70 authored by Toby Boyd's avatar Toby Boyd Committed by GitHub
Browse files

Merge pull request #1 from tensorflow/master

update to tensorflow/model master
parents bc70271a 2c4fea8d
# Copyright 2016 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Binary code sample generator."""
import numpy as np
_CRC_LINE = [
[0, 1, 0],
[1, 1, 0],
[1, 0, 0]
]
_CRC_DEPTH = [1, 1, 0, 1]
def ComputeLineCrc(code, width, y, x, d):
crc = 0
for dy in xrange(len(_CRC_LINE)):
i = y - 1 - dy
if i < 0:
continue
for dx in xrange(len(_CRC_LINE[dy])):
j = x - 2 + dx
if j < 0 or j >= width:
continue
crc += 1 if (code[i, j, d] != _CRC_LINE[dy][dx]) else 0
return crc
def ComputeDepthCrc(code, y, x, d):
crc = 0
for delta in xrange(len(_CRC_DEPTH)):
k = d - 1 - delta
if k < 0:
continue
crc += 1 if (code[y, x, k] != _CRC_DEPTH[delta]) else 0
return crc
def GenerateSingleCode(code_shape):
code = np.zeros(code_shape, dtype=np.int)
keep_value_proba = 0.8
height = code_shape[0]
width = code_shape[1]
depth = code_shape[2]
for d in xrange(depth):
for y in xrange(height):
for x in xrange(width):
v1 = ComputeLineCrc(code, width, y, x, d)
v2 = ComputeDepthCrc(code, y, x, d)
v = 1 if (v1 + v2 >= 6) else 0
if np.random.rand() < keep_value_proba:
code[y, x, d] = v
else:
code[y, x, d] = 1 - v
return code
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Base class for Tensorflow building blocks."""
import collections
import contextlib
import itertools
import tensorflow as tf
_block_stacks = collections.defaultdict(lambda: [])
class BlockBase(object):
"""Base class for transform wrappers of Tensorflow.
To implement a Tensorflow transform block, inherit this class.
1. To create a variable, use NewVar() method. Do not overload this method!
For example, use as follows.
a_variable = self.NewVar(initial_value)
2. All Tensorflow-related code must be done inside 'with self._BlockScope().'
Otherwise, name scoping and block hierarchy will not work. An exception
is _Apply() method, which is already called inside the context manager
by __call__() method.
3. Override and implement _Apply() method. This method is called by
__call__() method.
The users would use blocks like the following.
nn1 = NN(128, bias=Bias(0), act=tf.nn.relu)
y = nn1(x)
Some things to consider.
- Use lazy-initialization if possible. That is, initialize at first Apply()
rather than at __init__().
Note: if needed, the variables can be created on a specific parameter
server by creating blocks in a scope like:
with g.device(device):
linear = Linear(...)
"""
def __init__(self, name):
self._variables = []
self._subblocks = []
self._called = False
# Intentionally distinguishing empty string and None.
# If name is an empty string, then do not use name scope.
self.name = name if name is not None else self.__class__.__name__
self._graph = tf.get_default_graph()
if self.name:
# Capture the scope string at the init time.
with self._graph.name_scope(self.name) as scope:
self._scope_str = scope
else:
self._scope_str = ''
# Maintain hierarchy structure of blocks.
self._stack = _block_stacks[self._graph]
if self.__class__ is BlockBase:
# This code is only executed to create the root, which starts in the
# initialized state.
assert not self._stack
self._parent = None
self._called = True # The root is initialized.
return
# Create a fake root if a root is not already present.
if not self._stack:
self._stack.append(BlockBase('NoOpRoot'))
self._parent = self._stack[-1]
self._parent._subblocks.append(self) # pylint: disable=protected-access
def __repr__(self):
return '"{}" ({})'.format(self._scope_str, self.__class__.__name__)
@contextlib.contextmanager
def _OptionalNameScope(self, scope_str):
if scope_str:
with self._graph.name_scope(scope_str):
yield
else:
yield
@contextlib.contextmanager
def _BlockScope(self):
"""Context manager that handles graph, namescope, and nested blocks."""
self._stack.append(self)
try:
with self._graph.as_default():
with self._OptionalNameScope(self._scope_str):
yield self
finally: # Pop from the stack no matter exception is raised or not.
# The following line is executed when leaving 'with self._BlockScope()'
self._stack.pop()
def __call__(self, *args, **kwargs):
assert self._stack is _block_stacks[self._graph]
with self._BlockScope():
ret = self._Apply(*args, **kwargs)
self._called = True
return ret
def _Apply(self, *args, **kwargs):
"""Implementation of __call__()."""
raise NotImplementedError()
# Redirect all variable creation to this single function, so that we can
# switch to better variable creation scheme.
def NewVar(self, value, **kwargs):
"""Creates a new variable.
This function creates a variable, then returns a local copy created by
Identity operation. To get the Variable class object, use LookupRef()
method.
Note that each time Variable class object is used as an input to an
operation, Tensorflow will create a new Send/Recv pair. This hurts
performance.
If not for assign operations, use the local copy returned by this method.
Args:
value: Initialization value of the variable. The shape and the data type
of the variable is determined by this initial value.
**kwargs: Extra named arguments passed to Variable.__init__().
Returns:
A local copy of the new variable.
"""
v = tf.Variable(value, **kwargs)
self._variables.append(v)
return v
@property
def initialized(self):
"""Returns bool if the block is initialized.
By default, BlockBase assumes that a block is initialized when __call__()
is executed for the first time. If this is an incorrect assumption for some
subclasses, override this property in those subclasses.
Returns:
True if initialized, False otherwise.
"""
return self._called
def AssertInitialized(self):
"""Asserts initialized property."""
if not self.initialized:
raise RuntimeError('{} has not been initialized.'.format(self))
def VariableList(self):
"""Returns the list of all tensorflow variables used inside this block."""
variables = list(itertools.chain(
itertools.chain.from_iterable(
t.VariableList() for t in self._subblocks),
self._VariableList()))
return variables
def _VariableList(self):
"""Returns the list of all tensorflow variables owned by this block."""
self.AssertInitialized()
return self._variables
def CreateWeightLoss(self):
"""Returns L2 loss list of (almost) all variables used inside this block.
When this method needs to be overridden, there are two choices.
1. Override CreateWeightLoss() to change the weight loss of all variables
that belong to this block, both directly and indirectly.
2. Override _CreateWeightLoss() to change the weight loss of all
variables that directly belong to this block but not to the sub-blocks.
Returns:
A Tensor object or None.
"""
losses = list(itertools.chain(
itertools.chain.from_iterable(
t.CreateWeightLoss() for t in self._subblocks),
self._CreateWeightLoss()))
return losses
def _CreateWeightLoss(self):
"""Returns weight loss list of variables that belong to this block."""
self.AssertInitialized()
with self._BlockScope():
return [tf.nn.l2_loss(v) for v in self._variables]
def CreateUpdateOps(self):
"""Creates update operations for this block and its sub-blocks."""
ops = list(itertools.chain(
itertools.chain.from_iterable(
t.CreateUpdateOps() for t in self._subblocks),
self._CreateUpdateOps()))
return ops
def _CreateUpdateOps(self):
"""Creates update operations for this block."""
self.AssertInitialized()
return []
def MarkAsNonTrainable(self):
"""Mark all the variables of this block as non-trainable.
All the variables owned directly or indirectly (through subblocks) are
marked as non trainable.
This function along with CheckpointInitOp can be used to load a pretrained
model that consists in only one part of the whole graph.
"""
assert self._called
all_variables = self.VariableList()
collection = tf.get_collection_ref(tf.GraphKeys.TRAINABLE_VARIABLES)
for v in all_variables:
if v in collection:
collection.remove(v)
def CreateWeightLoss():
"""Returns all weight losses from the blocks in the graph."""
stack = _block_stacks[tf.get_default_graph()]
if not stack:
return []
return stack[0].CreateWeightLoss()
def CreateBlockUpdates():
"""Combines all updates from the blocks in the graph."""
stack = _block_stacks[tf.get_default_graph()]
if not stack:
return []
return stack[0].CreateUpdateOps()
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Utility functions for blocks."""
from __future__ import division
from __future__ import unicode_literals
import math
import numpy as np
import tensorflow as tf
class RsqrtInitializer(object):
"""Gaussian initializer with standard deviation 1/sqrt(n).
Note that tf.truncated_normal is used internally. Therefore any random sample
outside two-sigma will be discarded and re-sampled.
"""
def __init__(self, dims=(0,), **kwargs):
"""Creates an initializer.
Args:
dims: Dimension(s) index to compute standard deviation:
1.0 / sqrt(product(shape[dims]))
**kwargs: Extra keyword arguments to pass to tf.truncated_normal.
"""
if isinstance(dims, (int, long)):
self._dims = [dims]
else:
self._dims = dims
self._kwargs = kwargs
def __call__(self, shape, dtype):
stddev = 1.0 / np.sqrt(np.prod([shape[x] for x in self._dims]))
return tf.truncated_normal(
shape=shape, dtype=dtype, stddev=stddev, **self._kwargs)
class RectifierInitializer(object):
"""Gaussian initializer with standard deviation sqrt(2/fan_in).
Note that tf.random_normal is used internally to ensure the expected weight
distribution. This is intended to be used with ReLU activations, specially
in ResNets.
For details please refer to:
Delving Deep into Rectifiers: Surpassing Human-Level Performance on ImageNet
Classification
"""
def __init__(self, dims=(0,), scale=2.0, **kwargs):
"""Creates an initializer.
Args:
dims: Dimension(s) index to compute standard deviation:
sqrt(scale / product(shape[dims]))
scale: A constant scaling for the initialization used as
sqrt(scale / product(shape[dims])).
**kwargs: Extra keyword arguments to pass to tf.truncated_normal.
"""
if isinstance(dims, (int, long)):
self._dims = [dims]
else:
self._dims = dims
self._kwargs = kwargs
self._scale = scale
def __call__(self, shape, dtype):
stddev = np.sqrt(self._scale / np.prod([shape[x] for x in self._dims]))
return tf.random_normal(
shape=shape, dtype=dtype, stddev=stddev, **self._kwargs)
class GaussianInitializer(object):
"""Gaussian initializer with a given standard deviation.
Note that tf.truncated_normal is used internally. Therefore any random sample
outside two-sigma will be discarded and re-sampled.
"""
def __init__(self, stddev=1.0):
self._stddev = stddev
def __call__(self, shape, dtype):
return tf.truncated_normal(shape=shape, dtype=dtype, stddev=self._stddev)
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
from block_base import *
from block_util import *
from blocks_binarizer import *
from blocks_entropy_coding import *
from blocks_lstm import *
from blocks_masked_conv2d import *
from blocks_masked_conv2d_lstm import *
from blocks_operator import *
from blocks_std import *
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Activation and weight binarizer implementations."""
import math
import numpy as np
import tensorflow as tf
def ConvertSignCodeToZeroOneCode(x):
"""Conversion from codes {-1, +1} to codes {0, 1}."""
return 0.5 * (x + 1.0)
def ConvertZeroOneCodeToSignCode(x):
"""Convert from codes {0, 1} to codes {-1, +1}."""
return 2.0 * x - 1.0
def CheckZeroOneCode(x):
return tf.reduce_all(tf.equal(x * (x - 1.0), 0))
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Set of blocks related to entropy coding."""
import math
import tensorflow as tf
import block_base
# pylint does not recognize block_base.BlockBase.__call__().
# pylint: disable=not-callable
class CodeLength(block_base.BlockBase):
"""Theoretical bound for a code length given a probability distribution.
"""
def __init__(self, name=None):
super(CodeLength, self).__init__(name)
def _Apply(self, c, p):
"""Theoretical bound of the coded length given a probability distribution.
Args:
c: The binary codes. Belong to {0, 1}.
p: The probability of: P(code==+1)
Returns:
The average code length.
Note: the average code length can be greater than 1 bit (e.g. when
encoding the least likely symbol).
"""
entropy = ((1.0 - c) * tf.log(1.0 - p) + c * tf.log(p)) / (-math.log(2))
entropy = tf.reduce_mean(entropy)
return entropy
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for basic tensorflow blocks_entropy_coding."""
from __future__ import division
from __future__ import unicode_literals
import math
import numpy as np
import tensorflow as tf
import blocks_entropy_coding
class BlocksEntropyCodingTest(tf.test.TestCase):
def testCodeLength(self):
shape = [2, 4]
proba_feed = [[0.65, 0.25, 0.70, 0.10],
[0.28, 0.20, 0.44, 0.54]]
symbol_feed = [[1.0, 0.0, 1.0, 0.0],
[0.0, 0.0, 0.0, 1.0]]
mean_code_length = - (
(math.log(0.65) + math.log(0.75) + math.log(0.70) + math.log(0.90) +
math.log(0.72) + math.log(0.80) + math.log(0.56) + math.log(0.54)) /
math.log(2.0)) / (shape[0] * shape[1])
symbol = tf.placeholder(dtype=tf.float32, shape=shape)
proba = tf.placeholder(dtype=tf.float32, shape=shape)
code_length_calculator = blocks_entropy_coding.CodeLength()
code_length = code_length_calculator(symbol, proba)
with self.test_session():
tf.global_variables_initializer().run()
code_length_eval = code_length.eval(
feed_dict={symbol: symbol_feed, proba: proba_feed})
self.assertAllClose(mean_code_length, code_length_eval)
if __name__ == '__main__':
tf.test.main()
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Blocks of LSTM and its variants."""
import numpy as np
import tensorflow as tf
import block_base
import block_util
import blocks_std
# pylint does not recognize block_base.BlockBase.__call__().
# pylint: disable=not-callable
def LSTMBiasInit(shape, dtype):
"""Returns ones for forget-gate, and zeros for the others."""
shape = np.array(shape)
# Check internal consistencies.
assert shape.shape == (1,), shape
assert shape[0] % 4 == 0, shape
n = shape[0] // 4
ones = tf.fill([n], tf.constant(1, dtype=dtype))
zeros = tf.fill([3 * n], tf.constant(0, dtype=dtype))
return tf.concat([ones, zeros], 0)
class LSTMBase(block_base.BlockBase):
"""Base class for LSTM implementations.
These LSTM implementations use the pattern found in [1]. No peephole
connection, i.e., cell content is not used in recurrence computation.
Hidden units are also output units.
[1] Zaremba, Sutskever, Vinyals. Recurrent Neural Network Regularization,
2015. arxiv:1409.2329.
"""
def __init__(self, output_shape, name):
"""Initializes LSTMBase class object.
Args:
output_shape: List representing the LSTM output shape. This argument
does not include batch dimension. For example, if the LSTM output has
shape [batch, depth], then pass [depth].
name: Name of this block.
"""
super(LSTMBase, self).__init__(name)
with self._BlockScope():
self._output_shape = [None] + list(output_shape)
self._hidden = None
self._cell = None
@property
def hidden(self):
"""Returns the hidden units of this LSTM."""
return self._hidden
@hidden.setter
def hidden(self, value):
"""Assigns to the hidden units of this LSTM.
Args:
value: The new value for the hidden units. If None, the hidden units are
considered to be filled with zeros.
"""
if value is not None:
value.get_shape().assert_is_compatible_with(self._output_shape)
self._hidden = value
@property
def cell(self):
"""Returns the cell units of this LSTM."""
return self._cell
@cell.setter
def cell(self, value):
"""Assigns to the cell units of this LSTM.
Args:
value: The new value for the cell units. If None, the cell units are
considered to be filled with zeros.
"""
if value is not None:
value.get_shape().assert_is_compatible_with(self._output_shape)
self._cell = value
# Consider moving bias terms to the base, and require this method to be
# linear.
def _TransformInputs(self, _):
"""Transforms the input units to (4 * depth) units.
The forget-gate, input-gate, output-gate, and cell update is computed as
f, i, j, o = T(h) + R(x)
where h is hidden units, x is input units, and T, R are transforms of
h, x, respectively.
This method implements R. Note that T is strictly linear, so if LSTM is
going to use bias, this method must include the bias to the transformation.
Subclasses must implement this method. See _Apply() for more details.
"""
raise NotImplementedError()
def _TransformHidden(self, _):
"""Transforms the hidden units to (4 * depth) units.
The forget-gate, input-gate, output-gate, and cell update is computed as
f, i, j, o = T(h) + R(x)
where h is hidden units, x is input units, and T, R are transforms of
h, x, respectively.
This method implements T in the equation. The method must implement a
strictly linear transformation. For example, it may use MatMul or Conv2D,
but must not add bias. This is because when hidden units are zeros, then
the LSTM implementation will skip calling this method, instead of passing
zeros to this function.
Subclasses must implement this method. See _Apply() for more details.
"""
raise NotImplementedError()
def _Apply(self, *args):
xtransform = self._TransformInputs(*args)
depth_axis = len(self._output_shape) - 1
if self.hidden is not None:
htransform = self._TransformHidden(self.hidden)
f, i, j, o = tf.split(
value=htransform + xtransform, num_or_size_splits=4, axis=depth_axis)
else:
f, i, j, o = tf.split(
value=xtransform, num_or_size_splits=4, axis=depth_axis)
if self.cell is not None:
self.cell = tf.sigmoid(f) * self.cell + tf.sigmoid(i) * tf.tanh(j)
else:
self.cell = tf.sigmoid(i) * tf.tanh(j)
self.hidden = tf.sigmoid(o) * tf.tanh(self.cell)
return self.hidden
class LSTM(LSTMBase):
"""Efficient LSTM implementation used in [1].
[1] Zaremba, Sutskever, Vinyals. Recurrent Neural Network Regularization,
2015. arxiv:1409.2329.
"""
def __init__(self,
depth,
bias=LSTMBiasInit,
initializer=block_util.RsqrtInitializer(),
name=None):
super(LSTM, self).__init__([depth], name)
with self._BlockScope():
self._depth = depth
self._nn = blocks_std.NN(
4 * depth, bias=bias, act=None, initializer=initializer)
self._hidden_linear = blocks_std.Linear(
4 * depth, initializer=initializer)
def _TransformInputs(self, *args):
return self._nn(*args)
def _TransformHidden(self, h):
return self._hidden_linear(h)
class Conv2DLSTM(LSTMBase):
"""Convolutional LSTM implementation with optimizations inspired by [1].
Note that when using the batch normalization feature, the bias initializer
will not be used, since BN effectively cancels its effect out.
[1] Zaremba, Sutskever, Vinyals. Recurrent Neural Network Regularization,
2015. arxiv:1409.2329.
"""
def __init__(self,
depth,
filter_size,
hidden_filter_size,
strides,
padding,
bias=LSTMBiasInit,
initializer=block_util.RsqrtInitializer(dims=(0, 1, 2)),
use_moving_average=False,
name=None):
super(Conv2DLSTM, self).__init__([None, None, depth], name)
self._iter = 0
with self._BlockScope():
self._input_conv = blocks_std.Conv2D(
4 * depth,
filter_size,
strides,
padding,
bias=None,
act=None,
initializer=initializer,
name='input_conv2d')
self._hidden_conv = blocks_std.Conv2D(
4 * depth,
hidden_filter_size,
[1, 1],
'SAME',
bias=None,
act=None,
initializer=initializer,
name='hidden_conv2d')
if bias is not None:
self._bias = blocks_std.BiasAdd(bias, name='biases')
else:
self._bias = blocks_std.PassThrough()
def _TransformInputs(self, x):
return self._bias(self._input_conv(x))
def _TransformHidden(self, h):
return self._hidden_conv(h)
def _Apply(self, *args):
xtransform = self._TransformInputs(*args)
depth_axis = len(self._output_shape) - 1
if self.hidden is not None:
htransform = self._TransformHidden(self.hidden)
f, i, j, o = tf.split(
value=htransform + xtransform, num_or_size_splits=4, axis=depth_axis)
else:
f, i, j, o = tf.split(
value=xtransform, num_or_size_splits=4, axis=depth_axis)
if self.cell is not None:
self.cell = tf.sigmoid(f) * self.cell + tf.sigmoid(i) * tf.tanh(j)
else:
self.cell = tf.sigmoid(i) * tf.tanh(j)
self.hidden = tf.sigmoid(o) * tf.tanh(self.cell)
self._iter += 1
return self.hidden
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for LSTM tensorflow blocks."""
from __future__ import division
import numpy as np
import tensorflow as tf
import block_base
import blocks_std
import blocks_lstm
class BlocksLSTMTest(tf.test.TestCase):
def CheckUnary(self, y, op_type):
self.assertEqual(op_type, y.op.type)
self.assertEqual(1, len(y.op.inputs))
return y.op.inputs[0]
def CheckBinary(self, y, op_type):
self.assertEqual(op_type, y.op.type)
self.assertEqual(2, len(y.op.inputs))
return y.op.inputs
def testLSTM(self):
lstm = blocks_lstm.LSTM(10)
lstm.hidden = tf.zeros(shape=[10, 10], dtype=tf.float32)
lstm.cell = tf.zeros(shape=[10, 10], dtype=tf.float32)
x = tf.placeholder(dtype=tf.float32, shape=[10, 11])
y = lstm(x)
o, tanhc = self.CheckBinary(y, 'Mul')
self.assertEqual(self.CheckUnary(o, 'Sigmoid').name, 'LSTM/split:3')
self.assertIs(lstm.cell, self.CheckUnary(tanhc, 'Tanh'))
fc, ij = self.CheckBinary(lstm.cell, 'Add')
f, _ = self.CheckBinary(fc, 'Mul')
self.assertEqual(self.CheckUnary(f, 'Sigmoid').name, 'LSTM/split:0')
i, j = self.CheckBinary(ij, 'Mul')
self.assertEqual(self.CheckUnary(i, 'Sigmoid').name, 'LSTM/split:1')
j = self.CheckUnary(j, 'Tanh')
self.assertEqual(j.name, 'LSTM/split:2')
def testLSTMBiasInit(self):
lstm = blocks_lstm.LSTM(9)
x = tf.placeholder(dtype=tf.float32, shape=[15, 7])
lstm(x)
b = lstm._nn._bias
with self.test_session():
tf.global_variables_initializer().run()
bias_var = b._bias.eval()
comp = ([1.0] * 9) + ([0.0] * 27)
self.assertAllEqual(bias_var, comp)
def testConv2DLSTM(self):
lstm = blocks_lstm.Conv2DLSTM(depth=10,
filter_size=[1, 1],
hidden_filter_size=[1, 1],
strides=[1, 1],
padding='SAME')
lstm.hidden = tf.zeros(shape=[10, 11, 11, 10], dtype=tf.float32)
lstm.cell = tf.zeros(shape=[10, 11, 11, 10], dtype=tf.float32)
x = tf.placeholder(dtype=tf.float32, shape=[10, 11, 11, 1])
y = lstm(x)
o, tanhc = self.CheckBinary(y, 'Mul')
self.assertEqual(self.CheckUnary(o, 'Sigmoid').name, 'Conv2DLSTM/split:3')
self.assertIs(lstm.cell, self.CheckUnary(tanhc, 'Tanh'))
fc, ij = self.CheckBinary(lstm.cell, 'Add')
f, _ = self.CheckBinary(fc, 'Mul')
self.assertEqual(self.CheckUnary(f, 'Sigmoid').name, 'Conv2DLSTM/split:0')
i, j = self.CheckBinary(ij, 'Mul')
self.assertEqual(self.CheckUnary(i, 'Sigmoid').name, 'Conv2DLSTM/split:1')
j = self.CheckUnary(j, 'Tanh')
self.assertEqual(j.name, 'Conv2DLSTM/split:2')
def testConv2DLSTMBiasInit(self):
lstm = blocks_lstm.Conv2DLSTM(9, 1, 1, [1, 1], 'SAME')
x = tf.placeholder(dtype=tf.float32, shape=[1, 7, 7, 7])
lstm(x)
b = lstm._bias
with self.test_session():
tf.global_variables_initializer().run()
bias_var = b._bias.eval()
comp = ([1.0] * 9) + ([0.0] * 27)
self.assertAllEqual(bias_var, comp)
if __name__ == '__main__':
tf.test.main()
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Define some typical masked 2D convolutions."""
import numpy as np
import tensorflow as tf
import block_util
import blocks_std
# pylint does not recognize block_base.BlockBase.__call__().
# pylint: disable=not-callable
class RasterScanConv2D(blocks_std.Conv2DBase):
"""Conv2D with no dependency on future pixels (in raster scan order).
For example, assuming a 5 x 5 kernel, the kernel is applied a spatial mask:
T T T T T
T T T T T
T T x F F
F F F F F
F F F F F
where 'T' are pixels which are available when computing the convolution
for pixel 'x'. All the pixels marked with 'F' are not available.
'x' itself is not available if strict_order is True, otherwise, it is
available.
"""
def __init__(self, depth, filter_size, strides, padding,
strict_order=True,
bias=None, act=None, initializer=None, name=None):
super(RasterScanConv2D, self).__init__(
depth, filter_size, strides, padding, bias, act, name=name)
if (filter_size[0] % 2) != 1 or (filter_size[1] % 2) != 1:
raise ValueError('Kernel size should be odd.')
with self._BlockScope():
if initializer is None:
initializer = block_util.RsqrtInitializer(dims=(0, 1, 2))
self._initializer = initializer
self._strict_order = strict_order
def _CreateKernel(self, shape, dtype):
init = self._initializer(shape, dtype)
kernel = self.NewVar(init)
mask = np.ones(shape[:2], dtype=dtype.as_numpy_dtype)
center = shape[:2] // 2
mask[center[0] + 1:, :] = 0
if not self._strict_order:
mask[center[0], center[1] + 1:] = 0
else:
mask[center[0], center[1]:] = 0
mask = mask.reshape(mask.shape + (1, 1))
return tf.convert_to_tensor(mask, dtype) * kernel
class DepthOrderConv2D(blocks_std.Conv2DBase):
"""Conv2D with no dependency on higher depth dimensions.
More precisely, the output depth #n has only dependencies on input depths #k
for k < n (if strict_order is True) or for k <= n (if strict_order is False).
"""
def __init__(self, depth, filter_size, strides, padding,
strict_order=True,
bias=None, act=None, initializer=None, name=None):
super(DepthOrderConv2D, self).__init__(
depth, filter_size, strides, padding, bias, act, name=name)
with self._BlockScope():
if initializer is None:
initializer = block_util.RsqrtInitializer(dims=(0, 1, 2))
self._initializer = initializer
self._strict_order = strict_order
def _CreateKernel(self, shape, dtype):
init = self._initializer(shape, dtype)
kernel = self.NewVar(init)
mask = np.ones(shape[2:], dtype=dtype.as_numpy_dtype)
depth_output = shape[3]
for d in xrange(depth_output):
if self._strict_order:
mask[d:, d] = 0
else:
mask[d + 1:, d] = 0
mask = mask.reshape((1, 1) + mask.shape)
return tf.convert_to_tensor(mask, dtype) * kernel
class GroupRasterScanConv2D(blocks_std.Conv2DBase):
"""Conv2D with no dependency on future pixels (in raster scan order).
This version only introduces dependencies on previous pixels in raster scan
order. It can also introduce some dependencies on previous depth positions
of the current pixel (current pixel = center pixel of the kernel) in the
following way:
the depth dimension of the input is split into Ki groups of size
|input_group_size|, the output dimension is split into Ko groups of size
|output_group_size| (usually Ki == Ko). Each output group ko of the current
pixel position can only depend on previous input groups ki
(i.e. ki < ko if strict_order is True or ki <= ko if strict_order is False).
Notes:
- Block RasterScanConv2D is a special case of GroupRasterScanConv2D
where Ki == Ko == 1 (i.e. input_group_size == input_depth and
output_group_size == output_depth).
- For 1x1 convolution, block DepthOrderConv2D is a special case of
GroupRasterScanConv2D where input_group_size == 1 and
output_group_size == 1.
"""
def __init__(self, depth, filter_size, strides, padding,
strict_order=True,
input_group_size=1,
output_group_size=1,
bias=None, act=None, initializer=None, name=None):
super(GroupRasterScanConv2D, self).__init__(
depth, filter_size, strides, padding, bias, act, name=name)
if (filter_size[0] % 2) != 1 or (filter_size[1] % 2) != 1:
raise ValueError('Kernel size should be odd.')
with self._BlockScope():
if initializer is None:
initializer = block_util.RsqrtInitializer(dims=(0, 1, 2))
self._initializer = initializer
self._input_group_size = input_group_size
self._output_group_size = output_group_size
self._strict_order = strict_order
if depth % self._output_group_size != 0:
raise ValueError(
'Invalid depth group size: {} for depth {}'.format(
self._output_group_size, depth))
self._output_group_count = depth // self._output_group_size
def _CreateKernel(self, shape, dtype):
init = self._initializer(shape, dtype)
kernel = self.NewVar(init)
depth_input = shape[2]
if depth_input % self._input_group_size != 0:
raise ValueError(
'Invalid depth group size: {} for depth {}'.format(
self._input_group_size, depth_input))
input_group_count = depth_input // self._input_group_size
output_group_count = self._output_group_count
# Set the mask to 0 for future pixels in raster scan order.
center = shape[:2] // 2
mask = np.ones([shape[0], shape[1],
input_group_count, self._input_group_size,
output_group_count, self._output_group_size],
dtype=dtype.as_numpy_dtype)
mask[center[0] + 1:, :, :, :, :, :] = 0
mask[center[0], center[1] + 1:, :, :, :, :] = 0
# Adjust the mask for the current position (the center position).
depth_output = shape[3]
for d in xrange(output_group_count):
mask[center[0], center[1], d + 1:, :, d:d + 1, :] = 0
if self._strict_order:
mask[center[0], center[1], d, :, d:d + 1, :] = 0
mask = mask.reshape([shape[0], shape[1], depth_input, depth_output])
return tf.convert_to_tensor(mask, dtype) * kernel
class InFillingConv2D(blocks_std.Conv2DBase):
"""Conv2D with kernel having no dependency on the current pixel.
For example, assuming a 5 x 5 kernel, the kernel is applied a spatial mask:
T T T T T
T T T T T
T T x T T
T T T T T
T T T T T
where 'T' marks a pixel which is available when computing the convolution
for pixel 'x'. 'x' itself is not available.
"""
def __init__(self, depth, filter_size, strides, padding,
bias=None, act=None, initializer=None, name=None):
super(InFillingConv2D, self).__init__(
depth, filter_size, strides, padding, bias, act, name=name)
if (filter_size[0] % 2) != 1 or (filter_size[1] % 2) != 1:
raise ValueError('Kernel size should be odd.')
if filter_size[0] == 1 and filter_size[1] == 1:
raise ValueError('Kernel size should be larger than 1x1.')
with self._BlockScope():
if initializer is None:
initializer = block_util.RsqrtInitializer(dims=(0, 1, 2))
self._initializer = initializer
def _CreateKernel(self, shape, dtype):
init = self._initializer(shape, dtype)
kernel = self.NewVar(init)
mask = np.ones(shape[:2], dtype=dtype.as_numpy_dtype)
center = shape[:2] // 2
mask[center[0], center[1]] = 0
mask = mask.reshape(mask.shape + (1, 1))
return tf.convert_to_tensor(mask, dtype) * kernel
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Masked conv2d LSTM."""
import block_base
import block_util
import blocks_masked_conv2d
import blocks_lstm
import blocks_std
# pylint: disable=not-callable
class RasterScanConv2DLSTM(blocks_lstm.LSTMBase):
"""Convolutional LSTM implementation with optimizations inspired by [1].
Note that when using the batch normalization feature, the bias initializer
will not be used, since BN effectively cancels its effect out.
[1] Zaremba, Sutskever, Vinyals. Recurrent Neural Network Regularization,
2015. arxiv:1409.2329.
"""
def __init__(self,
depth,
filter_size,
hidden_filter_size,
strides,
padding,
bias=blocks_lstm.LSTMBiasInit,
initializer=block_util.RsqrtInitializer(dims=(0, 1, 2)),
name=None):
super(RasterScanConv2DLSTM, self).__init__([None, None, depth], name)
with self._BlockScope():
self._input_conv = blocks_masked_conv2d.RasterScanConv2D(
4 * depth,
filter_size,
strides,
padding,
strict_order=False,
bias=None,
act=None,
initializer=initializer,
name='input_conv2d')
self._hidden_conv = blocks_std.Conv2D(
4 * depth,
hidden_filter_size,
[1, 1],
'SAME',
bias=None,
act=None,
initializer=initializer,
name='hidden_conv2d')
if bias is not None:
self._bias = blocks_std.BiasAdd(bias, name='biases')
else:
self._bias = blocks_std.PassThrough()
def _TransformInputs(self, x):
return self._bias(self._input_conv(x))
def _TransformHidden(self, h):
return self._hidden_conv(h)
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests of the 2D masked convolution blocks."""
from __future__ import division
from __future__ import unicode_literals
import numpy as np
import tensorflow as tf
import blocks_masked_conv2d
class MaskedConv2DTest(tf.test.TestCase):
def testRasterScanKernel(self):
kernel_size = 5
input_depth = 1
output_depth = 1
kernel_shape = [kernel_size, kernel_size, input_depth, output_depth]
# pylint: disable=bad-whitespace
kernel_feed = [[ 1.0, 2.0, 3.0, 4.0, 5.0],
[ 6.0, 7.0, 8.0, 9.0, 10.0],
[11.0, 12.0, 13.0, 14.0, 15.0],
[16.0, 17.0, 18.0, 19.0, 20.0],
[21.0, 22.0, 23.0, 24.0, 25.0]]
kernel_feed = np.reshape(kernel_feed, kernel_shape)
kernel_expected = [[ 1.0, 2.0, 3.0, 4.0, 5.0],
[ 6.0, 7.0, 8.0, 9.0, 10.0],
[11.0, 12.0, 0.0, 0.0, 0.0],
[ 0.0, 0.0, 0.0, 0.0, 0.0],
[ 0.0, 0.0, 0.0, 0.0, 0.0]]
kernel_expected = np.reshape(kernel_expected, kernel_shape)
# pylint: enable=bad-whitespace
init_kernel = lambda s, t: tf.constant(kernel_feed, dtype=t, shape=s)
masked_conv2d = blocks_masked_conv2d.RasterScanConv2D(
output_depth, [kernel_size] * 2, [1] * 2, 'SAME',
initializer=init_kernel)
x = tf.placeholder(dtype=tf.float32, shape=[10] * 3 + [input_depth])
_ = masked_conv2d(x)
with self.test_session():
tf.global_variables_initializer().run()
kernel_value = masked_conv2d._kernel.eval()
self.assertAllEqual(kernel_expected, kernel_value)
def testDepthOrderKernel(self):
kernel_size = 1
input_depth = 7
output_depth = input_depth
kernel_shape = [kernel_size, kernel_size, input_depth, output_depth]
kernel_feed = np.ones(kernel_shape)
x_shape = [5] * 3 + [input_depth]
x_feed = np.ones(x_shape)
y_expected = np.zeros(x_shape[0:3] + [output_depth])
y_expected[:, :, :] = np.arange(output_depth)
init_kernel = lambda s, t: tf.constant(kernel_feed, dtype=t, shape=s)
masked_conv2d = blocks_masked_conv2d.DepthOrderConv2D(
output_depth, [kernel_size] * 2, [1] * 2, 'SAME',
strict_order=True,
initializer=init_kernel)
x = tf.placeholder(dtype=tf.float32, shape=x_shape)
y = masked_conv2d(x)
with self.test_session():
tf.global_variables_initializer().run()
y_value = y.eval(feed_dict={x: x_feed})
self.assertAllEqual(y_expected, y_value)
def testGroupRasterScanKernel(self):
kernel_size = 3
input_depth = 4
input_group_size = 2
output_depth = 2
output_group_size = 1
kernel_shape = [kernel_size, kernel_size, input_depth, output_depth]
kernel_feed = np.ones(shape=kernel_shape)
height = 5
width = 5
x_shape = [1, height, width, input_depth]
x_feed = np.ones(shape=x_shape)
# pylint: disable=bad-whitespace
y_expected = [
[[ 0, 2], [ 4, 6], [ 4, 6], [ 4, 6], [ 4, 6]],
[[ 8, 10], [16, 18], [16, 18], [16, 18], [12, 14]],
[[ 8, 10], [16, 18], [16, 18], [16, 18], [12, 14]],
[[ 8, 10], [16, 18], [16, 18], [16, 18], [12, 14]],
[[ 8, 10], [16, 18], [16, 18], [16, 18], [12, 14]],
]
y_expected = np.reshape(y_expected, [1, height, width, output_depth])
# pylint: enable=bad-whitespace
init_kernel = lambda s, t: tf.constant(kernel_feed, dtype=t, shape=s)
masked_conv2d = blocks_masked_conv2d.GroupRasterScanConv2D(
output_depth, [kernel_size] * 2, [1] * 2, 'SAME',
strict_order=True,
input_group_size=input_group_size,
output_group_size=output_group_size,
initializer=init_kernel)
x = tf.placeholder(dtype=tf.float32, shape=x_shape)
y = masked_conv2d(x)
with self.test_session():
tf.global_variables_initializer().run()
y_value = y.eval(feed_dict={x: x_feed})
self.assertAllEqual(y_expected, y_value)
def testInFillingKernel(self):
kernel_size = 5
input_depth = 1
output_depth = 1
kernel_shape = [kernel_size, kernel_size, input_depth, output_depth]
# pylint: disable=bad-whitespace
kernel_feed = [[ 1.0, 2.0, 3.0, 4.0, 5.0],
[ 6.0, 7.0, 8.0, 9.0, 10.0],
[11.0, 12.0, 13.0, 14.0, 15.0],
[16.0, 17.0, 18.0, 19.0, 20.0],
[21.0, 22.0, 23.0, 24.0, 25.0]]
kernel_feed = np.reshape(kernel_feed, kernel_shape)
kernel_expected = [[ 1.0, 2.0, 3.0, 4.0, 5.0],
[ 6.0, 7.0, 8.0, 9.0, 10.0],
[11.0, 12.0, 0.0, 14.0, 15.0],
[16.0, 17.0, 18.0, 19.0, 20.0],
[21.0, 22.0, 23.0, 24.0, 25.0]]
kernel_expected = np.reshape(kernel_expected, kernel_shape)
# pylint: enable=bad-whitespace
init_kernel = lambda s, t: tf.constant(kernel_feed, dtype=t, shape=s)
masked_conv2d = blocks_masked_conv2d.InFillingConv2D(
output_depth, [kernel_size] * 2, [1] * 2, 'SAME',
initializer=init_kernel)
x = tf.placeholder(dtype=tf.float32, shape=[10] * 3 + [input_depth])
_ = masked_conv2d(x)
with self.test_session():
tf.global_variables_initializer().run()
kernel_value = masked_conv2d._kernel.eval()
self.assertAllEqual(kernel_expected, kernel_value)
def testConv2DMaskedNumerics(self):
kernel_size = 5
input_shape = [1, 10, 10, 1]
filter_shape = [kernel_size, kernel_size, 1, 1]
strides = [1, 1, 1, 1]
output_shape = [1, 10, 10, 1]
conv = blocks_masked_conv2d.RasterScanConv2D(
depth=filter_shape[-1],
filter_size=filter_shape[0:2],
strides=strides[1:3],
padding='SAME',
initializer=tf.constant_initializer(value=1.0))
x = tf.placeholder(dtype=tf.float32, shape=input_shape)
y = conv(x)
x_feed = - np.ones(input_shape, dtype=float)
y_expected = np.ones(output_shape, dtype=float)
for i in xrange(input_shape[1]):
for j in xrange(input_shape[2]):
x_feed[0, i, j, 0] = 10 * (j + 1) + i
v = 0
ki_start = max(i - kernel_size // 2, 0)
kj_start = max(j - kernel_size // 2, 0)
kj_end = min(j + kernel_size // 2, input_shape[2] - 1)
for ki in range(ki_start, i + 1):
for kj in range(kj_start, kj_end + 1):
if ki > i:
continue
if ki == i and kj >= j:
continue
v += 10 * (kj + 1) + ki
y_expected[0, i, j, 0] = v
with self.test_session():
tf.global_variables_initializer().run()
y_value = y.eval(feed_dict={x: x_feed})
self.assertAllEqual(y_expected, y_value)
if __name__ == '__main__':
tf.test.main()
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Common blocks which work as operators on other blocks."""
import tensorflow as tf
import block_base
# pylint: disable=not-callable
class CompositionOperator(block_base.BlockBase):
"""Composition of several blocks."""
def __init__(self, block_list, name=None):
"""Initialization of the composition operator.
Args:
block_list: List of blocks.BlockBase that are chained to create
a new blocks.BlockBase.
name: Name of this block.
"""
super(CompositionOperator, self).__init__(name)
self._blocks = block_list
def _Apply(self, x):
"""Apply successively all the blocks on the given input tensor."""
h = x
for layer in self._blocks:
h = layer(h)
return h
class LineOperator(block_base.BlockBase):
"""Repeat the same block over all the lines of an input tensor."""
def __init__(self, block, name=None):
super(LineOperator, self).__init__(name)
self._block = block
def _Apply(self, x):
height = x.get_shape()[1].value
if height is None:
raise ValueError('Unknown tensor height')
all_line_x = tf.split(value=x, num_or_size_splits=height, axis=1)
y = []
for line_x in all_line_x:
y.append(self._block(line_x))
y = tf.concat(values=y, axis=1)
return y
class TowerOperator(block_base.BlockBase):
"""Parallel execution with concatenation of several blocks."""
def __init__(self, block_list, dim=3, name=None):
"""Initialization of the parallel exec + concat (Tower).
Args:
block_list: List of blocks.BlockBase that are chained to create
a new blocks.BlockBase.
dim: the dimension on which to concat.
name: Name of this block.
"""
super(TowerOperator, self).__init__(name)
self._blocks = block_list
self._concat_dim = dim
def _Apply(self, x):
"""Apply successively all the blocks on the given input tensor."""
outputs = [layer(x) for layer in self._blocks]
return tf.concat(outputs, self._concat_dim)
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests of the block operators."""
import numpy as np
import tensorflow as tf
import block_base
import blocks_operator
class AddOneBlock(block_base.BlockBase):
def __init__(self, name=None):
super(AddOneBlock, self).__init__(name)
def _Apply(self, x):
return x + 1.0
class SquareBlock(block_base.BlockBase):
def __init__(self, name=None):
super(SquareBlock, self).__init__(name)
def _Apply(self, x):
return x * x
class BlocksOperatorTest(tf.test.TestCase):
def testComposition(self):
x_value = np.array([[1.0, 2.0, 3.0],
[-1.0, -2.0, -3.0]])
y_expected_value = np.array([[4.0, 9.0, 16.0],
[0.0, 1.0, 4.0]])
x = tf.placeholder(dtype=tf.float32, shape=[2, 3])
complex_block = blocks_operator.CompositionOperator(
[AddOneBlock(),
SquareBlock()])
y = complex_block(x)
with self.test_session():
y_value = y.eval(feed_dict={x: x_value})
self.assertAllClose(y_expected_value, y_value)
if __name__ == '__main__':
tf.test.main()
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Basic blocks for building tensorflow models."""
import numpy as np
import tensorflow as tf
import block_base
import block_util
# pylint does not recognize block_base.BlockBase.__call__().
# pylint: disable=not-callable
def HandleConvPaddingModes(x, padding, kernel_shape, strides):
"""Returns an updated tensor and padding type for REFLECT and SYMMETRIC.
Args:
x: A 4D tensor with shape [batch_size, height, width, depth].
padding: Padding mode (SAME, VALID, REFLECT, or SYMMETRIC).
kernel_shape: Shape of convolution kernel that will be applied.
strides: Convolution stride that will be used.
Returns:
x and padding after adjustments for REFLECT and SYMMETRIC.
"""
# For 1x1 convolution, all padding modes are the same.
if np.all(kernel_shape[:2] == 1):
return x, 'VALID'
if padding == 'REFLECT' or padding == 'SYMMETRIC':
# We manually compute the number of paddings as if 'SAME'.
# From Tensorflow kernel, the formulas are as follows.
# output_shape = ceil(input_shape / strides)
# paddings = (output_shape - 1) * strides + filter_size - input_shape
# Let x, y, s be a shorthand notations for input_shape, output_shape, and
# strides, respectively. Let (x - 1) = sn + r where 0 <= r < s. Note that
# y - 1 = ceil(x / s) - 1 = floor((x - 1) / s) = n
# provided that x > 0. Therefore
# paddings = n * s + filter_size - (sn + r + 1)
# = filter_size - r - 1.
input_shape = x.get_shape() # shape at graph construction time
img_shape = tf.shape(x)[1:3] # image shape (no batch) at run time
remainder = tf.mod(img_shape - 1, strides[1:3])
pad_sizes = kernel_shape[:2] - remainder - 1
pad_rows = pad_sizes[0]
pad_cols = pad_sizes[1]
pad = tf.stack([[0, 0], tf.stack([pad_rows // 2, (pad_rows + 1) // 2]),
tf.stack([pad_cols // 2, (pad_cols + 1) // 2]), [0, 0]])
# Manually pad the input and switch the padding mode to 'VALID'.
x = tf.pad(x, pad, mode=padding)
x.set_shape([input_shape[0], x.get_shape()[1],
x.get_shape()[2], input_shape[3]])
padding = 'VALID'
return x, padding
class PassThrough(block_base.BlockBase):
"""A dummy transform block that does nothing."""
def __init__(self):
# Pass an empty string to disable name scoping.
super(PassThrough, self).__init__(name='')
def _Apply(self, inp):
return inp
@property
def initialized(self):
"""Always returns True."""
return True
class Bias(object):
"""An initialization helper class for BiasAdd block below."""
def __init__(self, value=0):
self.value = value
class BiasAdd(block_base.BlockBase):
"""A tf.nn.bias_add wrapper.
This wrapper may act as a PassThrough block depending on the initializer
provided, to make easier optional bias applications in NN blocks, etc.
See __init__() for the details.
"""
def __init__(self, initializer=Bias(0), name=None):
"""Initializes Bias block.
|initializer| parameter have two special cases.
1. If initializer is None, then this block works as a PassThrough.
2. If initializer is a Bias class object, then tf.constant_initializer is
used with the stored value.
Args:
initializer: An initializer for the bias variable.
name: Name of this block.
"""
super(BiasAdd, self).__init__(name)
with self._BlockScope():
if isinstance(initializer, Bias):
self._initializer = tf.constant_initializer(value=initializer.value)
else:
self._initializer = initializer
self._bias = None
def _Apply(self, x):
if not self._bias:
init = self._initializer([int(x.get_shape()[-1])], x.dtype)
self._bias = self.NewVar(init)
return tf.nn.bias_add(x, self._bias)
def CreateWeightLoss(self):
return []
class LinearBase(block_base.BlockBase):
"""A matmul wrapper.
Returns input * W, where matrix W can be customized through derivation.
"""
def __init__(self, depth, name=None):
super(LinearBase, self).__init__(name)
with self._BlockScope():
self._depth = depth
self._matrix = None
def _CreateKernel(self, shape, dtype):
raise NotImplementedError('This method must be sub-classed.')
def _Apply(self, x):
if not self._matrix:
shape = [int(x.get_shape()[-1]), self._depth]
self._matrix = self._CreateKernel(shape, x.dtype)
return tf.matmul(x, self._matrix)
class Linear(LinearBase):
"""A matmul wrapper.
Returns input * W, where matrix W is learned.
"""
def __init__(self,
depth,
initializer=block_util.RsqrtInitializer(),
name=None):
super(Linear, self).__init__(depth, name)
with self._BlockScope():
self._initializer = initializer
def _CreateKernel(self, shape, dtype):
init = self._initializer(shape, dtype)
return self.NewVar(init)
class NN(block_base.BlockBase):
"""A neural network layer wrapper.
Returns act(input * W + b), where matrix W, bias b are learned, and act is an
optional activation function (i.e., nonlinearity).
This transform block can handle multiple inputs. If x_1, x_2, ..., x_m are
the inputs, then returns act(x_1 * W_1 + ... + x_m * W_m + b).
Attributes:
nunits: The dimension of the output.
"""
def __init__(self,
depth,
bias=Bias(0),
act=None, # e.g., tf.nn.relu
initializer=block_util.RsqrtInitializer(),
linear_block_factory=(lambda d, i: Linear(d, initializer=i)),
name=None):
"""Initializes NN block.
Args:
depth: The depth of the output.
bias: An initializer for the bias, or a Bias class object. If None, there
will be no bias term for this NN block. See BiasAdd block.
act: Optional activation function. If None, no activation is applied.
initializer: The initialization method for the matrix weights.
linear_block_factory: A function used to create a linear block.
name: The name of this block.
"""
super(NN, self).__init__(name)
with self._BlockScope():
self._linear_block_factory = linear_block_factory
self._depth = depth
self._initializer = initializer
self._matrices = None
self._bias = BiasAdd(bias) if bias else PassThrough()
self._act = act if act else PassThrough()
def _Apply(self, *args):
if not self._matrices:
self._matrices = [
self._linear_block_factory(self._depth, self._initializer)
for _ in args]
if len(self._matrices) != len(args):
raise ValueError('{} expected {} inputs, but observed {} inputs'.format(
self.name, len(self._matrices), len(args)))
if len(args) > 1:
y = tf.add_n([m(x) for m, x in zip(self._matrices, args)])
else:
y = self._matrices[0](args[0])
return self._act(self._bias(y))
class Conv2DBase(block_base.BlockBase):
"""A tf.nn.conv2d operator."""
def __init__(self, depth, filter_size, strides, padding,
bias=None, act=None, atrous_rate=None, conv=tf.nn.conv2d,
name=None):
"""Initializes a Conv2DBase block.
Arguments:
depth: The output depth of the block (i.e. #filters); if negative, the
output depth will be set to be the same as the input depth.
filter_size: The size of the 2D filter. If it's specified as an integer,
it's going to create a square filter. Otherwise, this is a tuple
specifying the height x width of the filter.
strides: A tuple specifying the y and x stride.
padding: One of the valid padding modes allowed by tf.nn.conv2d, or
'REFLECT'/'SYMMETRIC' for mirror padding.
bias: An initializer for the bias, or a Bias class object. If None, there
will be no bias in this block. See BiasAdd block.
act: Optional activation function applied to the output.
atrous_rate: optional input rate for ATrous convolution. If not None, this
will be used and the strides will be ignored.
conv: The convolution function to use (e.g. tf.nn.conv2d).
name: The name for this conv2d op.
"""
super(Conv2DBase, self).__init__(name)
with self._BlockScope():
self._act = act if act else PassThrough()
self._bias = BiasAdd(bias) if bias else PassThrough()
self._kernel_shape = np.zeros((4,), dtype=np.int32)
self._kernel_shape[:2] = filter_size
self._kernel_shape[3] = depth
self._strides = np.ones((4,), dtype=np.int32)
self._strides[1:3] = strides
self._strides = list(self._strides)
self._padding = padding
self._kernel = None
self._conv = conv
self._atrous_rate = atrous_rate
def _CreateKernel(self, shape, dtype):
raise NotImplementedError('This method must be sub-classed')
def _Apply(self, x):
"""Apply the self._conv op.
Arguments:
x: input tensor. It needs to be a 4D tensor of the form
[batch, height, width, channels].
Returns:
The output of the convolution of x with the current convolutional
kernel.
Raises:
ValueError: if number of channels is not defined at graph construction.
"""
input_shape = x.get_shape().with_rank(4)
input_shape[3:].assert_is_fully_defined() # channels must be defined
if self._kernel is None:
assert self._kernel_shape[2] == 0, self._kernel_shape
self._kernel_shape[2] = input_shape[3].value
if self._kernel_shape[3] < 0:
# Make output depth be the same as input depth.
self._kernel_shape[3] = self._kernel_shape[2]
self._kernel = self._CreateKernel(self._kernel_shape, x.dtype)
x, padding = HandleConvPaddingModes(
x, self._padding, self._kernel_shape, self._strides)
if self._atrous_rate is None:
x = self._conv(x, self._kernel, strides=self._strides, padding=padding)
else:
x = self._conv(x, self._kernel, rate=self._atrous_rate, padding=padding)
if self._padding != 'VALID':
# Manually update shape. Known shape information can be lost by tf.pad().
height = (1 + (input_shape[1].value - 1) // self._strides[1]
if input_shape[1].value else None)
width = (1 + (input_shape[2].value - 1) // self._strides[2]
if input_shape[2].value else None)
shape = x.get_shape()
x.set_shape([shape[0], height, width, shape[3]])
return self._act(self._bias(x))
class Conv2D(Conv2DBase):
"""A tf.nn.conv2d operator."""
def __init__(self, depth, filter_size, strides, padding,
bias=None, act=None, initializer=None, name=None):
"""Initializes a Conv2D block.
Arguments:
depth: The output depth of the block (i.e., #filters)
filter_size: The size of the 2D filter. If it's specified as an integer,
it's going to create a square filter. Otherwise, this is a tuple
specifying the height x width of the filter.
strides: A tuple specifying the y and x stride.
padding: One of the valid padding modes allowed by tf.nn.conv2d, or
'REFLECT'/'SYMMETRIC' for mirror padding.
bias: An initializer for the bias, or a Bias class object. If None, there
will be no bias in this block. See BiasAdd block.
act: Optional activation function applied to the output.
initializer: Optional initializer for weights.
name: The name for this conv2d op.
"""
super(Conv2D, self).__init__(depth, filter_size, strides, padding, bias,
act, conv=tf.nn.conv2d, name=name)
with self._BlockScope():
if initializer is None:
initializer = block_util.RsqrtInitializer(dims=(0, 1, 2))
self._initializer = initializer
def _CreateKernel(self, shape, dtype):
return self.NewVar(self._initializer(shape, dtype))
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for basic tensorflow blocks_std."""
from __future__ import division
from __future__ import unicode_literals
import math
import os
import numpy as np
import tensorflow as tf
import blocks_std
def _NumpyConv2D(x, f, strides, padding, rate=1):
assert strides[0] == 1 and strides[3] == 1, strides
if rate > 1:
f_shape = f.shape
expand_f = np.zeros([f_shape[0], ((f_shape[1] - 1) * rate + 1),
f_shape[2], f_shape[3]])
expand_f[:, [y * rate for y in range(f_shape[1])], :, :] = f
f = np.zeros([((f_shape[0] - 1) * rate + 1), expand_f.shape[1],
f_shape[2], f_shape[3]])
f[[y * rate for y in range(f_shape[0])], :, :, :] = expand_f
if padding != 'VALID':
assert x.shape[1] > 0 and x.shape[2] > 0, x.shape
# Compute the number of padded rows and cols.
# See Conv2D block comments for a math explanation.
remainder = ((x.shape[1] - 1) % strides[1], (x.shape[2] - 1) % strides[2])
pad_rows = f.shape[0] - remainder[0] - 1
pad_cols = f.shape[1] - remainder[1] - 1
pad = ((0, 0),
(pad_rows // 2, (pad_rows + 1) // 2),
(pad_cols // 2, (pad_cols + 1) // 2),
(0, 0))
# Pad the input using numpy.pad().
mode = None
if padding == 'SAME':
mode = str('constant')
if padding == 'REFLECT':
mode = str('reflect')
if padding == 'SYMMETRIC':
mode = str('symmetric')
x = np.pad(x, pad, mode=mode)
# Since x is now properly padded, proceed as if padding mode is VALID.
x_window = np.empty(
(x.shape[0],
int(math.ceil((x.shape[1] - f.shape[0] + 1) / strides[1])),
int(math.ceil((x.shape[2] - f.shape[1] + 1) / strides[2])),
np.prod(f.shape[:3])))
# The output at pixel location (i, j) is the result of linear transformation
# applied to the window whose top-left corner is at
# (i * row_stride, j * col_stride).
for i in xrange(x_window.shape[1]):
k = i * strides[1]
for j in xrange(x_window.shape[2]):
l = j * strides[2]
x_window[:, i, j, :] = x[:,
k:(k + f.shape[0]),
l:(l + f.shape[1]),
:].reshape((x_window.shape[0], -1))
y = np.tensordot(x_window, f.reshape((-1, f.shape[3])), axes=1)
return y
class BlocksStdTest(tf.test.TestCase):
def CheckUnary(self, y, op_type):
self.assertEqual(op_type, y.op.type)
self.assertEqual(1, len(y.op.inputs))
return y.op.inputs[0]
def CheckBinary(self, y, op_type):
self.assertEqual(op_type, y.op.type)
self.assertEqual(2, len(y.op.inputs))
return y.op.inputs
def testPassThrough(self):
p = blocks_std.PassThrough()
x = tf.placeholder(dtype=tf.float32, shape=[1])
self.assertIs(p(x), x)
def CheckBiasAdd(self, y, b):
x, u = self.CheckBinary(y, 'BiasAdd')
self.assertIs(u, b._bias.value())
self.assertEqual(x.dtype, u.dtype.base_dtype)
return x
def testBiasAdd(self):
b = blocks_std.BiasAdd()
x = tf.placeholder(dtype=tf.float32, shape=[4, 8])
y = b(x)
self.assertEqual(b._bias.get_shape(), x.get_shape()[-1:])
self.assertIs(x, self.CheckBiasAdd(y, b))
def testBiasRankTest(self):
b = blocks_std.BiasAdd()
x = tf.placeholder(dtype=tf.float32, shape=[10])
with self.assertRaises(ValueError):
b(x)
def CheckLinear(self, y, m):
x, w = self.CheckBinary(y, 'MatMul')
self.assertIs(w, m._matrix.value())
self.assertEqual(x.dtype, w.dtype.base_dtype)
return x
def testLinear(self):
m = blocks_std.Linear(10)
x = tf.placeholder(dtype=tf.float32, shape=[8, 9])
y = m(x)
self.assertEqual(m._matrix.get_shape(), [9, 10])
self.assertIs(x, self.CheckLinear(y, m))
def testLinearShared(self):
# Create a linear map which is applied twice on different inputs
# (i.e. the weights of the map are shared).
linear_map = blocks_std.Linear(6)
x1 = tf.random_normal(shape=[1, 5])
x2 = tf.random_normal(shape=[1, 5])
xs = x1 + x2
# Apply the transform with the same weights.
y1 = linear_map(x1)
y2 = linear_map(x2)
ys = linear_map(xs)
with self.test_session() as sess:
# Initialize all the variables of the graph.
tf.global_variables_initializer().run()
y1_res, y2_res, ys_res = sess.run([y1, y2, ys])
self.assertAllClose(y1_res + y2_res, ys_res)
def CheckNN(self, y, nn, act=None):
if act:
pre_act = self.CheckUnary(y, act)
else:
pre_act = y
if not isinstance(nn._bias, blocks_std.PassThrough):
pre_bias = self.CheckBiasAdd(pre_act, nn._bias)
else:
pre_bias = pre_act
if len(nn._matrices) > 1:
self.assertEqual('AddN', pre_bias.op.type)
pre_bias = pre_bias.op.inputs
else:
pre_bias = [pre_bias]
self.assertEqual(len(pre_bias), len(nn._matrices))
return [self.CheckLinear(u, m) for u, m in zip(pre_bias, nn._matrices)]
def testNNWithoutActWithoutBias(self):
nn = blocks_std.NN(10, act=None, bias=None)
x = tf.placeholder(dtype=tf.float32, shape=[5, 7])
y = nn(x)
self.assertIs(x, self.CheckNN(y, nn)[0])
def testNNWithoutBiasWithAct(self):
nn = blocks_std.NN(10, act=tf.nn.relu, bias=None)
x = tf.placeholder(dtype=tf.float32, shape=[5, 7])
y = nn(x)
self.assertIs(x, self.CheckNN(y, nn, 'Relu')[0])
def testNNWithBiasWithoutAct(self):
nn = blocks_std.NN(10, bias=blocks_std.Bias(0), act=None)
x = tf.placeholder(dtype=tf.float32, shape=[5, 7])
y = nn(x)
self.assertIs(x, self.CheckNN(y, nn)[0])
def testNNWithBiasWithAct(self):
nn = blocks_std.NN(10, bias=blocks_std.Bias(0), act=tf.square)
x = tf.placeholder(dtype=tf.float32, shape=[5, 7])
y = nn(x)
self.assertIs(x, self.CheckNN(y, nn, 'Square')[0])
def testNNMultipleInputs(self):
nn = blocks_std.NN(10, bias=blocks_std.Bias(0), act=tf.tanh)
x = [tf.placeholder(dtype=tf.float32, shape=[5, 7]),
tf.placeholder(dtype=tf.float32, shape=[5, 3]),
tf.placeholder(dtype=tf.float32, shape=[5, 5])]
y = nn(*x)
xs = self.CheckNN(y, nn, 'Tanh')
self.assertEqual(len(x), len(xs))
for u, v in zip(x, xs):
self.assertIs(u, v)
def testConv2DSAME(self):
np.random.seed(142536)
x_shape = [4, 16, 11, 5]
f_shape = [4, 3, 5, 6]
strides = [1, 2, 2, 1]
padding = 'SAME'
conv = blocks_std.Conv2D(depth=f_shape[-1],
filter_size=f_shape[0:2],
strides=strides[1:3],
padding=padding,
act=None,
bias=None)
x_value = np.random.normal(size=x_shape)
x = tf.convert_to_tensor(x_value, dtype=tf.float32)
y = conv(x)
with self.test_session():
tf.global_variables_initializer().run()
f_value = conv._kernel.eval()
y_value = y.eval()
y_expected = _NumpyConv2D(x_value, f_value,
strides=strides, padding=padding)
self.assertAllClose(y_expected, y_value)
def testConv2DValid(self):
np.random.seed(253647)
x_shape = [4, 11, 12, 5]
f_shape = [5, 2, 5, 5]
strides = [1, 2, 2, 1]
padding = 'VALID'
conv = blocks_std.Conv2D(depth=f_shape[-1],
filter_size=f_shape[0:2],
strides=strides[1:3],
padding=padding,
act=None,
bias=None)
x_value = np.random.normal(size=x_shape)
x = tf.convert_to_tensor(x_value, dtype=tf.float32)
y = conv(x)
with self.test_session():
tf.global_variables_initializer().run()
f_value = conv._kernel.eval()
y_value = y.eval()
y_expected = _NumpyConv2D(x_value, f_value,
strides=strides, padding=padding)
self.assertAllClose(y_expected, y_value)
def testConv2DSymmetric(self):
np.random.seed(364758)
x_shape = [4, 10, 12, 6]
f_shape = [3, 4, 6, 5]
strides = [1, 1, 1, 1]
padding = 'SYMMETRIC'
conv = blocks_std.Conv2D(depth=f_shape[-1],
filter_size=f_shape[0:2],
strides=strides[1:3],
padding=padding,
act=None,
bias=None)
x_value = np.random.normal(size=x_shape)
x = tf.convert_to_tensor(x_value, dtype=tf.float32)
y = conv(x)
with self.test_session():
tf.global_variables_initializer().run()
f_value = conv._kernel.eval()
y_value = y.eval()
y_expected = _NumpyConv2D(x_value, f_value,
strides=strides, padding=padding)
self.assertAllClose(y_expected, y_value)
def testConv2DReflect(self):
np.random.seed(768798)
x_shape = [4, 10, 12, 6]
f_shape = [3, 4, 6, 5]
strides = [1, 2, 2, 1]
padding = 'REFLECT'
conv = blocks_std.Conv2D(depth=f_shape[-1],
filter_size=f_shape[0:2],
strides=strides[1:3],
padding=padding,
act=None,
bias=None)
x_value = np.random.normal(size=x_shape)
x = tf.convert_to_tensor(x_value, dtype=tf.float32)
y = conv(x)
with self.test_session():
tf.global_variables_initializer().run()
f_value = conv._kernel.eval()
y_value = y.eval()
y_expected = _NumpyConv2D(x_value, f_value,
strides=strides, padding=padding)
self.assertAllClose(y_expected, y_value)
def testConv2DBias(self):
input_shape = [19, 14, 14, 64]
filter_shape = [3, 7, 64, 128]
strides = [1, 2, 2, 1]
output_shape = [19, 6, 4, 128]
conv = blocks_std.Conv2D(depth=filter_shape[-1],
filter_size=filter_shape[0:2],
strides=strides[1:3],
padding='VALID',
act=None,
bias=blocks_std.Bias(1))
x = tf.placeholder(dtype=tf.float32, shape=input_shape)
y = conv(x)
self.CheckBiasAdd(y, conv._bias)
self.assertEqual(output_shape, y.get_shape().as_list())
if __name__ == '__main__':
tf.test.main()
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Entropy coder model."""
class EntropyCoderModel(object):
"""Entropy coder model."""
def __init__(self):
# Loss used for training the model.
self.loss = None
# Tensorflow op to run to train the model.
self.train_op = None
# Tensor corresponding to the average code length of the input bit field
# tensor. The average code length is a number of output bits per input bit.
# To get an effective compression, this number should be between 0.0
# and 1.0 (1.0 corresponds to no compression).
self.average_code_length = None
def Initialize(self, global_step, optimizer, config_string):
raise NotImplementedError()
def BuildGraph(self, input_codes):
"""Build the Tensorflow graph corresponding to the entropy coder model.
Args:
input_codes: Tensor of size: batch_size x height x width x bit_depth
corresponding to the codes to compress.
The input codes are {-1, +1} codes.
"""
# TODO:
# - consider switching to {0, 1} codes.
# - consider passing an extra tensor which gives for each (b, y, x)
# what is the actual depth (which would allow to use more or less bits
# for each (y, x) location.
raise NotImplementedError()
def GetConfigStringForUnitTest(self):
"""Returns a default model configuration to be used for unit tests."""
return None
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Entropy coder model registrar."""
class ModelFactory(object):
"""Factory of encoder/decoder models."""
def __init__(self):
self._model_dictionary = dict()
def RegisterModel(self,
entropy_coder_model_name,
entropy_coder_model_factory):
self._model_dictionary[entropy_coder_model_name] = (
entropy_coder_model_factory)
def CreateModel(self, model_name):
current_model_factory = self._model_dictionary[model_name]
return current_model_factory()
def GetAvailableModels(self):
return self._model_dictionary.keys()
_model_registry = ModelFactory()
def GetModelRegistry():
return _model_registry
class RegisterEntropyCoderModel(object):
def __init__(self, model_name):
self._model_name = model_name
def __call__(self, f):
_model_registry.RegisterModel(self._model_name, f)
return f
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment