attention_test.py 6.54 KB
Newer Older
Hongkun Yu's avatar
Hongkun Yu committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for the attention layer."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import tensorflow as tf

from tensorflow.python.keras import keras_parameterized  # pylint: disable=g-direct-tensorflow-import
from official.nlp.modeling.layers import attention


# This decorator runs the test in V1, V2-Eager, and V2-Functional mode. It
# guarantees forward compatibility of this code for the V2 switchover.
@keras_parameterized.run_all_keras_modes
31
class MultiHeadAttentionTest(keras_parameterized.TestCase):
Hongkun Yu's avatar
Hongkun Yu committed
32
33
34

  def test_non_masked_attention(self):
    """Test that the attention layer can be created without a mask tensor."""
35
    test_layer = attention.MultiHeadAttention(num_heads=12, head_size=64)
Hongkun Yu's avatar
Hongkun Yu committed
36
37
38
39
40
41
42
43
    # Create a 3-dimensional input (the first dimension is implicit).
    from_tensor = tf.keras.Input(shape=(40, 80))
    to_tensor = tf.keras.Input(shape=(20, 80))
    output = test_layer([from_tensor, to_tensor])
    self.assertEqual(output.shape.as_list(), [None, 40, 12, 64])

  def test_non_masked_self_attention(self):
    """Test with one input (self-attenntion) and no mask tensor."""
44
    test_layer = attention.MultiHeadAttention(num_heads=12, head_size=64)
Hongkun Yu's avatar
Hongkun Yu committed
45
46
47
48
49
50
51
    # Create a 3-dimensional input (the first dimension is implicit).
    from_tensor = tf.keras.Input(shape=(40, 80))
    output = test_layer([from_tensor, from_tensor])
    self.assertEqual(output.shape.as_list(), [None, 40, 12, 64])

  def test_masked_attention(self):
    """Test with a mask tensor."""
52
    test_layer = attention.MultiHeadAttention(num_heads=2, head_size=2)
Hongkun Yu's avatar
Hongkun Yu committed
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
    # Create a 3-dimensional input (the first dimension is implicit).
    from_tensor = tf.keras.Input(shape=(4, 8))
    to_tensor = tf.keras.Input(shape=(2, 8))
    mask_tensor = tf.keras.Input(shape=(4, 2))
    output = test_layer([from_tensor, to_tensor, mask_tensor])

    # Create a model containing the test layer.
    model = tf.keras.Model([from_tensor, to_tensor, mask_tensor], output)

    # Generate data for the input (non-mask) tensors.
    from_data = 10 * np.random.random_sample((3, 4, 8))
    to_data = 10 * np.random.random_sample((3, 2, 8))

    # Invoke the data with a random set of mask data. This should mask at least
    # one element.
    mask_data = np.random.randint(2, size=(3, 4, 2))
    masked_output_data = model.predict([from_data, to_data, mask_data])

    # Invoke the same data, but with a null mask (where no elements are masked).
    null_mask_data = np.ones((3, 4, 2))
    unmasked_output_data = model.predict([from_data, to_data, null_mask_data])

    # Because one data is masked and one is not, the outputs should not be the
    # same.
    self.assertNotAllClose(masked_output_data, unmasked_output_data)

  def test_initializer(self):
    """Test with a specified initializer."""
81
    test_layer = attention.MultiHeadAttention(
Hongkun Yu's avatar
Hongkun Yu committed
82
83
84
85
86
87
88
89
90
        num_heads=12,
        head_size=64,
        kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.02))
    # Create a 3-dimensional input (the first dimension is implicit).
    from_tensor = tf.keras.Input(shape=(40, 80))
    output = test_layer([from_tensor, from_tensor])
    self.assertEqual(output.shape.as_list(), [None, 40, 12, 64])


91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def _create_cache(batch_size, init_decode_length, num_heads, head_size):
  return {
      "key":
          tf.zeros([batch_size, init_decode_length, num_heads, head_size],
                   dtype=tf.float32),
      "value":
          tf.zeros([batch_size, init_decode_length, num_heads, head_size],
                   dtype=tf.float32)
  }


@keras_parameterized.run_all_keras_modes
class CachedAttentionTest(keras_parameterized.TestCase):

  def test_masked_attention(self):
    """Test with a mask tensor."""
    num_heads, head_size = 2, 2
    # Create a 3-dimensional input (the first dimension is implicit).
    from_seq_length = 4
    batch_size = 3
    # GPU/CPU case.
    init_decode_length = 0
    # Directly tests the keras layer.
    cache = _create_cache(batch_size, init_decode_length, num_heads, head_size)
    layer = attention.CachedAttention(num_heads=num_heads, head_size=head_size)

    # Generate data for the input (non-mask) tensors.
    from_data = tf.zeros((batch_size, from_seq_length, 8), dtype=np.float32)
    # Invoke the data with a random set of mask data. This should mask at least
    # one element.
    mask_data = np.random.randint(
        2, size=(batch_size, from_seq_length, from_seq_length))
    masked_output_data, cache = layer([from_data, from_data, mask_data, cache])
    self.assertEqual(masked_output_data.shape, (3, 4, 2, 2))
    self.assertEqual(cache["value"].shape, (3, 4, 2, 2))

    # Tests inputs without cache.
    masked_output_data, cache = layer([from_data, from_data, mask_data])
    self.assertEqual(masked_output_data.shape, (3, 4, 2, 2))
    self.assertIsNone(cache)

  def test_padded_decode(self):
    """Test with a mask tensor."""
    num_heads, head_size = 2, 2
    from_seq_length = 4
    # TPU decoding should pre-allocate the entire sequence.
    batch_size = 3
    init_decode_length = from_seq_length

    # Directly tests the keras layer.
    cache = _create_cache(batch_size, init_decode_length, num_heads, head_size)
    layer = attention.CachedAttention(num_heads=num_heads, head_size=head_size)

    # Generate data for the input (non-mask) tensors.
    from_data = tf.zeros((batch_size, from_seq_length, 8), dtype=np.float32)
    decode_loop_step = 2
    mask_data = np.random.randint(
        2, size=(batch_size, from_seq_length, from_seq_length), dtype=np.int32)
    # Testing the invocation directly as Keras cannot consume inputs correctly.
    masked_output_data, cache = layer([from_data, from_data, mask_data, cache],
                                      decode_loop_step=decode_loop_step)
    self.assertEqual(masked_output_data.shape, (3, 4, 2, 2))
    self.assertEqual(cache["value"].shape, (3, 4, 2, 2))


if __name__ == "__main__":
Hongkun Yu's avatar
Hongkun Yu committed
157
  tf.test.main()