"""Utilities related to Keras unit tests.""" import sys import numpy as np from numpy.testing import assert_allclose import inspect import keras from keras.layers import Input from keras.models import Model from keras import backend as K def get_test_data(num_train=1000, num_test=500, input_shape=(10,), output_shape=(2,), classification=True, num_classes=2): """Generates test data to train a model on. classification=True overrides output_shape (i.e. output_shape is set to (1,)) and the output consists in integers in [0, num_class-1]. Otherwise: float output with shape output_shape. """ samples = num_train + num_test if classification: y = np.random.randint(0, num_classes, size=(samples,)) X = np.zeros((samples,) + input_shape) for i in range(samples): X[i] = np.random.normal(loc=y[i], scale=0.7, size=input_shape) else: y_loc = np.random.random((samples,)) X = np.zeros((samples,) + input_shape) y = np.zeros((samples,) + output_shape) for i in range(samples): X[i] = np.random.normal(loc=y_loc[i], scale=0.7, size=input_shape) y[i] = np.random.normal(loc=y_loc[i], scale=0.7, size=output_shape) return (X[:num_train], y[:num_train]), (X[num_train:], y[num_train:]) def layer_test(layer_cls, kwargs={}, input_shape=None, input_dtype=None, input_data=None, expected_output=None, expected_output_dtype=None, fixed_batch_size=False): """Test routine for a layer with a single input tensor and single output tensor. Copy of the function in keras-team/keras because it's not in the public API. If we use the one from keras-team/keras it won't work with tf.keras. """ # generate input data if input_data is None: assert input_shape if not input_dtype: input_dtype = K.floatx() input_data_shape = list(input_shape) for i, e in enumerate(input_data_shape): if e is None: input_data_shape[i] = np.random.randint(1, 4) input_data = (10 * np.random.random(input_data_shape)) input_data = input_data.astype(input_dtype) else: if input_shape is None: input_shape = input_data.shape if input_dtype is None: input_dtype = input_data.dtype if expected_output_dtype is None: expected_output_dtype = input_dtype # instantiation layer = layer_cls(**kwargs) # test get_weights , set_weights at layer level weights = layer.get_weights() layer.set_weights(weights) expected_output_shape = layer.compute_output_shape(input_shape) # test in functional API if fixed_batch_size: x = Input(batch_shape=input_shape, dtype=input_dtype) else: x = Input(shape=input_shape[1:], dtype=input_dtype) y = layer(x) assert K.dtype(y) == expected_output_dtype # check with the functional API model = Model(x, y) actual_output = model.predict(input_data) actual_output_shape = actual_output.shape for expected_dim, actual_dim in zip(expected_output_shape, actual_output_shape): if expected_dim is not None: assert expected_dim == actual_dim if expected_output is not None: assert_allclose(actual_output, expected_output, rtol=1e-3) # test serialization, weight setting at model level model_config = model.get_config() custom_objects = {layer.__class__.__name__: layer.__class__} recovered_model = model.__class__.from_config(model_config, custom_objects) if model.weights: weights = model.get_weights() recovered_model.set_weights(weights) _output = recovered_model.predict(input_data) assert_allclose(_output, actual_output, rtol=1e-3) # test training mode (e.g. useful when the layer has a # different behavior at training and testing time). if has_arg(layer.call, 'training'): model.compile('rmsprop', 'mse') model.train_on_batch(input_data, actual_output) # test instantiation from layer config layer_config = layer.get_config() layer_config['batch_input_shape'] = input_shape layer = layer.__class__.from_config(layer_config) # for further checks in the caller function return actual_output def has_arg(fn, name, accept_all=False): """Checks if a callable accepts a given keyword argument. For Python 2, checks if there is an argument with the given name. For Python 3, checks if there is an argument with the given name, and also whether this argument can be called with a keyword (i.e. if it is not a positional-only argument). This function is a copy of the one in keras-team/keras because it's not in the public API. # Arguments fn: Callable to inspect. name: Check if `fn` can be called with `name` as a keyword argument. accept_all: What to return if there is no parameter called `name` but the function accepts a `**kwargs` argument. # Returns bool, whether `fn` accepts a `name` keyword argument. """ if sys.version_info < (3,): arg_spec = inspect.getargspec(fn) if accept_all and arg_spec.keywords is not None: return True return name in arg_spec.args elif sys.version_info < (3, 3): arg_spec = inspect.getfullargspec(fn) if accept_all and arg_spec.varkw is not None: return True return (name in arg_spec.args or name in arg_spec.kwonlyargs) else: signature = inspect.signature(fn) parameter = signature.parameters.get(name) if parameter is None: if accept_all: for param in signature.parameters.values(): if param.kind == inspect.Parameter.VAR_KEYWORD: return True return False return (parameter.kind in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY)) def to_list(x, allow_tuple=False): if isinstance(x, list): return x if allow_tuple and isinstance(x, tuple): return list(x) return [x] def unpack_singleton(x): if len(x) == 1: return x[0] return x if keras.__name__ == 'keras': is_tf_keras = False elif keras.__name__ == 'tensorflow.keras': is_tf_keras = True else: raise KeyError('Cannot detect if using keras or tf.keras.') def to_tuple(shape): """This functions is here to fix an inconsistency between keras and tf.keras. In tf.keras, the input_shape argument is an tuple with `Dimensions` objects. In keras, the input_shape is a simple tuple of ints or `None`. We'll work with tuples of ints or `None` to be consistent with keras-team/keras. So we must apply this function to all input_shapes of the build methods in custom layers. """ if is_tf_keras: import tensorflow as tf return tuple(tf.TensorShape(shape).as_list()) else: return shape