compliance_kaldi_test.py 10.8 KB
Newer Older
1
import os
moto's avatar
moto committed
2
3
import math

4
5
6
import torch
import torchaudio
import torchaudio.compliance.kaldi as kaldi
7

8
from torchaudio_unittest import common_utils
9
from .compliance import utils as compliance_utils
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47


def extract_window(window, wave, f, frame_length, frame_shift, snip_edges):
    # just a copy of ExtractWindow from feature-window.cc in python
    def first_sample_of_frame(frame, window_size, window_shift, snip_edges):
        if snip_edges:
            return frame * window_shift
        else:
            midpoint_of_frame = frame * window_shift + window_shift // 2
            beginning_of_frame = midpoint_of_frame - window_size // 2
            return beginning_of_frame

    sample_offset = 0
    num_samples = sample_offset + wave.size(0)
    start_sample = first_sample_of_frame(f, frame_length, frame_shift, snip_edges)
    end_sample = start_sample + frame_length

    if snip_edges:
        assert(start_sample >= sample_offset and end_sample <= num_samples)
    else:
        assert(sample_offset == 0 or start_sample >= sample_offset)

    wave_start = start_sample - sample_offset
    wave_end = wave_start + frame_length
    if wave_start >= 0 and wave_end <= wave.size(0):
        window[f, :] = wave[wave_start:(wave_start + frame_length)]
    else:
        wave_dim = wave.size(0)
        for s in range(frame_length):
            s_in_wave = s + wave_start
            while s_in_wave < 0 or s_in_wave >= wave_dim:
                if s_in_wave < 0:
                    s_in_wave = - s_in_wave - 1
                else:
                    s_in_wave = 2 * wave_dim - 1 - s_in_wave
            window[f, s] = wave[s_in_wave]


moto's avatar
moto committed
48
@common_utils.skipIfNoSoxBackend
49
class Test_Kaldi(common_utils.TempDirMixin, common_utils.TorchaudioTestCase):
moto's avatar
moto committed
50
51
    backend = 'sox'

52
    kaldi_output_dir = common_utils.get_asset_path('kaldi')
53
    test_filepath = common_utils.get_asset_path('kaldi_file.wav')
54
    test_filepaths = {prefix: [] for prefix in compliance_utils.TEST_PREFIX}
jamarshon's avatar
jamarshon committed
55

56
57
58
59
60
61
62
63
64
65
66
67
    def setUp(self):
        super().setUp()

        # 1. test signal for testing resampling
        self.test1_signal_sr = 16000
        self.test1_signal = common_utils.get_whitenoise(
            sample_rate=self.test1_signal_sr, duration=0.5,
        )

        # 2. test audio file corresponding to saved kaldi ark files
        self.test2_filepath = common_utils.get_asset_path('kaldi_file_8000.wav')

jamarshon's avatar
jamarshon committed
68
69
70
71
72
73
74
    # separating test files by their types (e.g 'spec', 'fbank', etc.)
    for f in os.listdir(kaldi_output_dir):
        dash_idx = f.find('-')
        assert f.endswith('.ark') and dash_idx != -1
        key = f[:dash_idx]
        assert key in test_filepaths
        test_filepaths[key].append(f)
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

    def _test_get_strided_helper(self, num_samples, window_size, window_shift, snip_edges):
        waveform = torch.arange(num_samples).float()
        output = kaldi._get_strided(waveform, window_size, window_shift, snip_edges)

        # from NumFrames in feature-window.cc
        n = window_size
        if snip_edges:
            m = 0 if num_samples < window_size else 1 + (num_samples - window_size) // window_shift
        else:
            m = (num_samples + (window_shift // 2)) // window_shift

        self.assertTrue(output.dim() == 2)
        self.assertTrue(output.shape[0] == m and output.shape[1] == n)

        window = torch.empty((m, window_size))

        for r in range(m):
            extract_window(window, waveform, r, window_size, window_shift, snip_edges)
94
        torch.testing.assert_allclose(window, output)
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114

    def test_get_strided(self):
        # generate any combination where 0 < window_size <= num_samples and
        # 0 < window_shift.
        for num_samples in range(1, 20):
            for window_size in range(1, num_samples + 1):
                for window_shift in range(1, 2 * num_samples + 1):
                    for snip_edges in range(0, 2):
                        self._test_get_strided_helper(num_samples, window_size, window_shift, snip_edges)

    def _create_data_set(self):
        # used to generate the dataset to test on. this is not used in testing (offline procedure)
        sr = 16000
        x = torch.arange(0, 20).float()
        # between [-6,6]
        y = torch.cos(2 * math.pi * x) + 3 * torch.sin(math.pi * x) + 2 * torch.cos(x)
        # between [-2^30, 2^30]
        y = (y / 6 * (1 << 30)).long()
        # clear the last 16 bits because they aren't used anyways
        y = ((y >> 16) << 16).float()
115
116
        torchaudio.save(self.test_filepath, y, sr)
        sound, sample_rate = torchaudio.load(self.test_filepath, normalization=False)
117
118
        print(y >> 16)
        self.assertTrue(sample_rate == sr)
119
        torch.testing.assert_allclose(y, sound)
120

jamarshon's avatar
jamarshon committed
121
122
123
124
125
126
127
128
129
130
131
132
133
    def _print_diagnostic(self, output, expect_output):
        # given an output and expected output, it will print the absolute/relative errors (max and mean squared)
        abs_error = output - expect_output
        abs_mse = abs_error.pow(2).sum() / output.numel()
        abs_max_error = torch.max(abs_error.abs())

        relative_error = abs_error / expect_output
        relative_mse = relative_error.pow(2).sum() / output.numel()
        relative_max_error = torch.max(relative_error.abs())

        print('abs_mse:', abs_mse.item(), 'abs_max_error:', abs_max_error.item())
        print('relative_mse:', relative_mse.item(), 'relative_max_error:', relative_max_error.item())

jamarshon's avatar
jamarshon committed
134
    def _compliance_test_helper(self, sound_filepath, filepath_key, expected_num_files,
135
                                expected_num_args, get_output_fn, atol=1e-5, rtol=1e-7):
jamarshon's avatar
jamarshon committed
136
137
        """
        Inputs:
jamarshon's avatar
jamarshon committed
138
            sound_filepath (str): The location of the sound file
jamarshon's avatar
jamarshon committed
139
140
141
142
143
            filepath_key (str): A key to `test_filepaths` which matches which files to use
            expected_num_files (int): The expected number of kaldi files to read
            expected_num_args (int): The expected number of arguments used in a kaldi configuration
            get_output_fn (Callable[[Tensor, List], Tensor]): A function that takes in a sound signal
                and a configuration and returns an output
jamarshon's avatar
jamarshon committed
144
145
            atol (float): absolute tolerance
            rtol (float): relative tolerance
jamarshon's avatar
jamarshon committed
146
        """
147
        sound, sr = common_utils.load_wav(sound_filepath, normalize=False)
jamarshon's avatar
jamarshon committed
148
149
        files = self.test_filepaths[filepath_key]

150
151
152
        assert len(files) == expected_num_files, \
            ('number of kaldi {} file changed to {}'.format(
                filepath_key, len(files)))
153
154
155

        for f in files:
            print(f)
jamarshon's avatar
jamarshon committed
156
157
158

            # Read kaldi's output from file
            kaldi_output_path = os.path.join(self.kaldi_output_dir, f)
159
160
161
162
163
            kaldi_output_dict = {k: v for k, v in torchaudio.kaldi_io.read_mat_ark(kaldi_output_path)}

            assert len(kaldi_output_dict) == 1 and 'my_id' in kaldi_output_dict, 'invalid test kaldi ark file'
            kaldi_output = kaldi_output_dict['my_id']

jamarshon's avatar
jamarshon committed
164
            # Construct the same configuration used by kaldi
165
166
            args = f.split('-')
            args[-1] = os.path.splitext(args[-1])[0]
jamarshon's avatar
jamarshon committed
167
            assert len(args) == expected_num_args, 'invalid test kaldi file name'
168
            args = [compliance_utils.parse(arg) for arg in args]
169

jamarshon's avatar
jamarshon committed
170
171
172
            output = get_output_fn(sound, args)

            self._print_diagnostic(output, kaldi_output)
173
            torch.testing.assert_allclose(output, kaldi_output, atol=atol, rtol=rtol)
jamarshon's avatar
jamarshon committed
174

jamarshon's avatar
jamarshon committed
175
176
177
178
    def test_mfcc_empty(self):
        # Passing in an empty tensor should result in an error
        self.assertRaises(AssertionError, kaldi.mfcc, torch.empty(0))

jamarshon's avatar
jamarshon committed
179
180
181
182
183
    def test_resample_waveform(self):
        def get_output_fn(sound, args):
            output = kaldi.resample_waveform(sound, args[1], args[2])
            return output

184
        self._compliance_test_helper(self.test2_filepath, 'resample', 32, 3, get_output_fn, atol=1e-2, rtol=1e-5)
jamarshon's avatar
jamarshon committed
185
186

    def test_resample_waveform_upsample_size(self):
187
188
        upsample_sound = kaldi.resample_waveform(self.test1_signal, self.test1_signal_sr, self.test1_signal_sr * 2)
        self.assertTrue(upsample_sound.size(-1) == self.test1_signal.size(-1) * 2)
jamarshon's avatar
jamarshon committed
189
190

    def test_resample_waveform_downsample_size(self):
191
192
        downsample_sound = kaldi.resample_waveform(self.test1_signal, self.test1_signal_sr, self.test1_signal_sr // 2)
        self.assertTrue(downsample_sound.size(-1) == self.test1_signal.size(-1) // 2)
jamarshon's avatar
jamarshon committed
193
194

    def test_resample_waveform_identity_size(self):
195
196
        downsample_sound = kaldi.resample_waveform(self.test1_signal, self.test1_signal_sr, self.test1_signal_sr)
        self.assertTrue(downsample_sound.size(-1) == self.test1_signal.size(-1))
jamarshon's avatar
jamarshon committed
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223

    def _test_resample_waveform_accuracy(self, up_scale_factor=None, down_scale_factor=None,
                                         atol=1e-1, rtol=1e-4):
        # resample the signal and compare it to the ground truth
        n_to_trim = 20
        sample_rate = 1000
        new_sample_rate = sample_rate

        if up_scale_factor is not None:
            new_sample_rate *= up_scale_factor

        if down_scale_factor is not None:
            new_sample_rate //= down_scale_factor

        duration = 5  # seconds
        original_timestamps = torch.arange(0, duration, 1.0 / sample_rate)

        sound = 123 * torch.cos(2 * math.pi * 3 * original_timestamps).unsqueeze(0)
        estimate = kaldi.resample_waveform(sound, sample_rate, new_sample_rate).squeeze()

        new_timestamps = torch.arange(0, duration, 1.0 / new_sample_rate)[:estimate.size(0)]
        ground_truth = 123 * torch.cos(2 * math.pi * 3 * new_timestamps)

        # trim the first/last n samples as these points have boundary effects
        ground_truth = ground_truth[..., n_to_trim:-n_to_trim]
        estimate = estimate[..., n_to_trim:-n_to_trim]

224
        torch.testing.assert_allclose(estimate, ground_truth, atol=atol, rtol=rtol)
jamarshon's avatar
jamarshon committed
225
226
227
228
229
230
231
232

    def test_resample_waveform_downsample_accuracy(self):
        for i in range(1, 20):
            self._test_resample_waveform_accuracy(down_scale_factor=i * 2)

    def test_resample_waveform_upsample_accuracy(self):
        for i in range(1, 20):
            self._test_resample_waveform_accuracy(up_scale_factor=1.0 + i / 20.0)
233

jamarshon's avatar
jamarshon committed
234
235
236
    def test_resample_waveform_multi_channel(self):
        num_channels = 3

237
        multi_sound = self.test1_signal.repeat(num_channels, 1)  # (num_channels, 8000 smp)
jamarshon's avatar
jamarshon committed
238
239
240
241

        for i in range(num_channels):
            multi_sound[i, :] *= (i + 1) * 1.5

242
        multi_sound_sampled = kaldi.resample_waveform(multi_sound, self.test1_signal_sr, self.test1_signal_sr // 2)
jamarshon's avatar
jamarshon committed
243
244
245

        # check that sampling is same whether using separately or in a tensor of size (c, n)
        for i in range(num_channels):
246
247
248
249
            single_channel = self.test1_signal * (i + 1) * 1.5
            single_channel_sampled = kaldi.resample_waveform(single_channel, self.test1_signal_sr,
                                                             self.test1_signal_sr // 2)
            torch.testing.assert_allclose(multi_sound_sampled[i, :], single_channel_sampled[0], rtol=1e-4, atol=1e-7)