# Copyright 2021 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Tests for tf_utils.""" from absl.testing import parameterized import numpy as np import tensorflow as tf from tensorflow.python.distribute import combinations from tensorflow.python.distribute import strategy_combinations from official.modeling import tf_utils def all_strategy_combinations(): return combinations.combine( strategy=[ strategy_combinations.cloud_tpu_strategy, strategy_combinations.mirrored_strategy_with_two_gpus, ], mode='eager', ) class TFUtilsTest(tf.test.TestCase, parameterized.TestCase): @combinations.generate(all_strategy_combinations()) def test_cross_replica_concat(self, strategy): num_cores = strategy.num_replicas_in_sync shape = (2, 3, 4) def concat(axis): @tf.function def function(): replica_value = tf.fill(shape, tf_utils.get_replica_id()) return tf_utils.cross_replica_concat(replica_value, axis=axis) return function def expected(axis): values = [np.full(shape, i) for i in range(num_cores)] return np.concatenate(values, axis=axis) per_replica_results = strategy.run(concat(axis=0)) replica_0_result = per_replica_results.values[0].numpy() for value in per_replica_results.values[1:]: self.assertAllClose(value.numpy(), replica_0_result) self.assertAllClose(replica_0_result, expected(axis=0)) replica_0_result = strategy.run(concat(axis=1)).values[0].numpy() self.assertAllClose(replica_0_result, expected(axis=1)) replica_0_result = strategy.run(concat(axis=2)).values[0].numpy() self.assertAllClose(replica_0_result, expected(axis=2)) @combinations.generate(all_strategy_combinations()) def test_cross_replica_concat_gradient(self, strategy): num_cores = strategy.num_replicas_in_sync shape = (10, 5) @tf.function def function(): replica_value = tf.random.normal(shape) with tf.GradientTape() as tape: tape.watch(replica_value) concat_value = tf_utils.cross_replica_concat(replica_value, axis=0) output = tf.reduce_sum(concat_value) return tape.gradient(output, replica_value) per_replica_gradients = strategy.run(function) for gradient in per_replica_gradients.values: self.assertAllClose(gradient, num_cores * tf.ones(shape)) if __name__ == '__main__': tf.test.main()