Unverified Commit 02b898ff authored by engineerchuan's avatar engineerchuan Committed by GitHub
Browse files

Get rid of whitenoise and sinewave files from test (#783)



* Get rid of sine wave files and whitenoise files
* Refactor integer encoding
* Relax rtol from 1e-8 to 1e-7 for compliance kaldi
* relax waveform multi channel resample atol to 1e-7 from 1e-8
* relax tolerance for length consistency for speed effect
Co-authored-by: default avatarmoto <855818+mthrok@users.noreply.github.com>
parent 8181a83b
...@@ -13,6 +13,28 @@ def get_asset_path(*paths): ...@@ -13,6 +13,28 @@ def get_asset_path(*paths):
return os.path.join(_TEST_DIR_PATH, 'assets', *paths) return os.path.join(_TEST_DIR_PATH, 'assets', *paths)
def convert_tensor_encoding(
tensor: torch.tensor,
dtype: torch.dtype,
):
"""Convert input tensor with values between -1 and 1 to integer encoding
Args:
tensor: input tensor, assumed between -1 and 1
dtype: desired output tensor dtype
Returns:
Tensor: shape of (n_channels, sample_rate * duration)
"""
if dtype == torch.int32:
tensor *= (tensor > 0) * 2147483647 + (tensor < 0) * 2147483648
if dtype == torch.int16:
tensor *= (tensor > 0) * 32767 + (tensor < 0) * 32768
if dtype == torch.uint8:
tensor *= (tensor > 0) * 127 + (tensor < 0) * 128
tensor += 128
tensor = tensor.to(dtype)
return tensor
def get_whitenoise( def get_whitenoise(
*, *,
sample_rate: int = 16000, sample_rate: int = 16000,
...@@ -43,25 +65,17 @@ def get_whitenoise( ...@@ -43,25 +65,17 @@ def get_whitenoise(
if dtype not in [torch.float32, torch.int32, torch.int16, torch.uint8]: if dtype not in [torch.float32, torch.int32, torch.int16, torch.uint8]:
raise NotImplementedError(f'dtype {dtype} is not supported.') raise NotImplementedError(f'dtype {dtype} is not supported.')
# According to the doc, folking rng on all CUDA devices is slow when there are many CUDA devices, # According to the doc, folking rng on all CUDA devices is slow when there are many CUDA devices,
# so we only folk on CPU, generate values and move the data to the given device # so we only fork on CPU, generate values and move the data to the given device
with torch.random.fork_rng([]): with torch.random.fork_rng([]):
torch.random.manual_seed(seed) torch.random.manual_seed(seed)
tensor = torch.randn([sample_rate * duration], dtype=torch.float32, device='cpu') tensor = torch.randn([int(sample_rate * duration)], dtype=torch.float32, device='cpu')
tensor /= 2.0 tensor /= 2.0
tensor *= scale_factor tensor *= scale_factor
tensor.clamp_(-1.0, 1.0) tensor.clamp_(-1.0, 1.0)
if dtype == torch.int32:
tensor *= (tensor > 0) * 2147483647 + (tensor < 0) * 2147483648
if dtype == torch.int16:
tensor *= (tensor > 0) * 32767 + (tensor < 0) * 32768
if dtype == torch.uint8:
tensor *= (tensor > 0) * 127 + (tensor < 0) * 128
tensor += 128
tensor = tensor.to(dtype)
tensor = tensor.repeat([n_channels, 1]) tensor = tensor.repeat([n_channels, 1])
if not channels_first: if not channels_first:
tensor = tensor.t() tensor = tensor.t()
return tensor.to(device=device) return convert_tensor_encoding(tensor, dtype)
def get_sinusoid( def get_sinusoid(
...@@ -91,8 +105,8 @@ def get_sinusoid( ...@@ -91,8 +105,8 @@ def get_sinusoid(
dtype = getattr(torch, dtype) dtype = getattr(torch, dtype)
pie2 = 2 * 3.141592653589793 pie2 = 2 * 3.141592653589793
end = pie2 * frequency * duration end = pie2 * frequency * duration
theta = torch.linspace(0, end, sample_rate * duration, dtype=dtype, device=device) theta = torch.linspace(0, end, int(sample_rate * duration), dtype=torch.float32, device=device)
sin = torch.sin(theta, out=None).repeat([n_channels, 1]) tensor = torch.sin(theta, out=None).repeat([n_channels, 1])
if not channels_first: if not channels_first:
sin = sin.t() tensor = tensor.t()
return sin return convert_tensor_encoding(tensor, dtype)
...@@ -4,6 +4,7 @@ import unittest ...@@ -4,6 +4,7 @@ import unittest
import torch import torch
import torchaudio import torchaudio
import torchaudio.functional as F import torchaudio.functional as F
from parameterized import parameterized
import pytest import pytest
from . import common_utils from . import common_utils
...@@ -299,24 +300,18 @@ class TestIstft(common_utils.TorchaudioTestCase): ...@@ -299,24 +300,18 @@ class TestIstft(common_utils.TorchaudioTestCase):
class TestDetectPitchFrequency(common_utils.TorchaudioTestCase): class TestDetectPitchFrequency(common_utils.TorchaudioTestCase):
def test_pitch(self): @parameterized.expand([(100,), (440,)])
test_filepath_100 = common_utils.get_asset_path("100Hz_44100Hz_16bit_05sec.wav") def test_pitch(self, frequency):
test_filepath_440 = common_utils.get_asset_path("440Hz_44100Hz_16bit_05sec.wav") sample_rate = 44100
test_sine_waveform = common_utils.get_sinusoid(
# Files from https://www.mediacollege.com/audio/tone/download/ frequency=frequency, sample_rate=sample_rate, duration=5,
tests = [ )
(test_filepath_100, 100),
(test_filepath_440, 440), freq = torchaudio.functional.detect_pitch_frequency(test_sine_waveform, sample_rate)
]
threshold = 1
for filename, freq_ref in tests: s = ((freq - frequency).abs() > threshold).sum()
waveform, sample_rate = common_utils.load_wav(filename) self.assertFalse(s)
freq = torchaudio.functional.detect_pitch_frequency(waveform, sample_rate)
threshold = 1
s = ((freq - freq_ref).abs() > threshold).sum()
self.assertFalse(s)
class TestDB_to_amplitude(common_utils.TorchaudioTestCase): class TestDB_to_amplitude(common_utils.TorchaudioTestCase):
......
"""Test numerical consistency among single input and batched input.""" """Test numerical consistency among single input and batched input."""
import unittest import unittest
import itertools
from parameterized import parameterized
import torch import torch
import torchaudio import torchaudio
...@@ -47,17 +49,15 @@ class TestFunctional(common_utils.TorchaudioTestCase): ...@@ -47,17 +49,15 @@ class TestFunctional(common_utils.TorchaudioTestCase):
F.griffinlim, tensor, window, n_fft, hop, ws, power, normalize, n_iter, momentum, length, 0, atol=5e-5 F.griffinlim, tensor, window, n_fft, hop, ws, power, normalize, n_iter, momentum, length, 0, atol=5e-5
) )
def test_detect_pitch_frequency(self): @parameterized.expand(list(itertools.product(
filenames = [ [100, 440],
'steam-train-whistle-daniel_simon.wav', # 2ch 44100Hz [8000, 16000, 44100],
# Files from https://www.mediacollege.com/audio/tone/download/ [1, 2],
'100Hz_44100Hz_16bit_05sec.wav', # 1ch )), name_func=lambda f, _, p: f'{f.__name__}_{"_".join(str(arg) for arg in p.args)}')
'440Hz_44100Hz_16bit_05sec.wav', # 1ch def test_detect_pitch_frequency(self, frequency, sample_rate, n_channels):
] waveform = common_utils.get_sinusoid(frequency=frequency, sample_rate=sample_rate,
for filename in filenames: n_channels=n_channels, duration=5)
filepath = common_utils.get_asset_path(filename) self.assert_batch_consistencies(F.detect_pitch_frequency, waveform, sample_rate)
waveform, sample_rate = torchaudio.load(filepath)
self.assert_batch_consistencies(F.detect_pitch_frequency, waveform, sample_rate)
def test_istft(self): def test_istft(self):
stft = torch.tensor([ stft = torch.tensor([
...@@ -80,8 +80,10 @@ class TestFunctional(common_utils.TorchaudioTestCase): ...@@ -80,8 +80,10 @@ class TestFunctional(common_utils.TorchaudioTestCase):
self.assert_batch_consistencies(F.overdrive, waveform, gain=45, colour=30) self.assert_batch_consistencies(F.overdrive, waveform, gain=45, colour=30)
def test_phaser(self): def test_phaser(self):
filepath = common_utils.get_asset_path("whitenoise.wav") sample_rate = 44100
waveform, sample_rate = torchaudio.load(filepath) waveform = common_utils.get_whitenoise(
sample_rate=sample_rate, duration=5,
)
self.assert_batch_consistencies(F.phaser, waveform, sample_rate) self.assert_batch_consistencies(F.phaser, waveform, sample_rate)
def test_flanger(self): def test_flanger(self):
......
...@@ -47,14 +47,25 @@ def extract_window(window, wave, f, frame_length, frame_shift, snip_edges): ...@@ -47,14 +47,25 @@ def extract_window(window, wave, f, frame_length, frame_shift, snip_edges):
@common_utils.skipIfNoSoxBackend @common_utils.skipIfNoSoxBackend
class Test_Kaldi(common_utils.TorchaudioTestCase): class Test_Kaldi(common_utils.TempDirMixin, common_utils.TorchaudioTestCase):
backend = 'sox' backend = 'sox'
test_filepath = common_utils.get_asset_path('kaldi_file.wav')
test_8000_filepath = common_utils.get_asset_path('kaldi_file_8000.wav')
kaldi_output_dir = common_utils.get_asset_path('kaldi') kaldi_output_dir = common_utils.get_asset_path('kaldi')
test_filepath = common_utils.get_asset_path('kaldi_file.wav')
test_filepaths = {prefix: [] for prefix in compliance_utils.TEST_PREFIX} test_filepaths = {prefix: [] for prefix in compliance_utils.TEST_PREFIX}
def setUp(self):
super().setUp()
# 1. test signal for testing resampling
self.test1_signal_sr = 16000
self.test1_signal = common_utils.get_whitenoise(
sample_rate=self.test1_signal_sr, duration=0.5,
)
# 2. test audio file corresponding to saved kaldi ark files
self.test2_filepath = common_utils.get_asset_path('kaldi_file_8000.wav')
# separating test files by their types (e.g 'spec', 'fbank', etc.) # separating test files by their types (e.g 'spec', 'fbank', etc.)
for f in os.listdir(kaldi_output_dir): for f in os.listdir(kaldi_output_dir):
dash_idx = f.find('-') dash_idx = f.find('-')
...@@ -94,7 +105,6 @@ class Test_Kaldi(common_utils.TorchaudioTestCase): ...@@ -94,7 +105,6 @@ class Test_Kaldi(common_utils.TorchaudioTestCase):
def _create_data_set(self): def _create_data_set(self):
# used to generate the dataset to test on. this is not used in testing (offline procedure) # used to generate the dataset to test on. this is not used in testing (offline procedure)
test_filepath = common_utils.get_asset_path('kaldi_file.wav')
sr = 16000 sr = 16000
x = torch.arange(0, 20).float() x = torch.arange(0, 20).float()
# between [-6,6] # between [-6,6]
...@@ -103,8 +113,8 @@ class Test_Kaldi(common_utils.TorchaudioTestCase): ...@@ -103,8 +113,8 @@ class Test_Kaldi(common_utils.TorchaudioTestCase):
y = (y / 6 * (1 << 30)).long() y = (y / 6 * (1 << 30)).long()
# clear the last 16 bits because they aren't used anyways # clear the last 16 bits because they aren't used anyways
y = ((y >> 16) << 16).float() y = ((y >> 16) << 16).float()
torchaudio.save(test_filepath, y, sr) torchaudio.save(self.test_filepath, y, sr)
sound, sample_rate = torchaudio.load(test_filepath, normalization=False) sound, sample_rate = torchaudio.load(self.test_filepath, normalization=False)
print(y >> 16) print(y >> 16)
self.assertTrue(sample_rate == sr) self.assertTrue(sample_rate == sr)
torch.testing.assert_allclose(y, sound) torch.testing.assert_allclose(y, sound)
...@@ -123,7 +133,7 @@ class Test_Kaldi(common_utils.TorchaudioTestCase): ...@@ -123,7 +133,7 @@ class Test_Kaldi(common_utils.TorchaudioTestCase):
print('relative_mse:', relative_mse.item(), 'relative_max_error:', relative_max_error.item()) print('relative_mse:', relative_mse.item(), 'relative_max_error:', relative_max_error.item())
def _compliance_test_helper(self, sound_filepath, filepath_key, expected_num_files, def _compliance_test_helper(self, sound_filepath, filepath_key, expected_num_files,
expected_num_args, get_output_fn, atol=1e-5, rtol=1e-8): expected_num_args, get_output_fn, atol=1e-5, rtol=1e-7):
""" """
Inputs: Inputs:
sound_filepath (str): The location of the sound file sound_filepath (str): The location of the sound file
...@@ -135,7 +145,7 @@ class Test_Kaldi(common_utils.TorchaudioTestCase): ...@@ -135,7 +145,7 @@ class Test_Kaldi(common_utils.TorchaudioTestCase):
atol (float): absolute tolerance atol (float): absolute tolerance
rtol (float): relative tolerance rtol (float): relative tolerance
""" """
sound, sample_rate = torchaudio.load_wav(sound_filepath) sound, sr = torchaudio.load_wav(sound_filepath)
files = self.test_filepaths[filepath_key] files = self.test_filepaths[filepath_key]
assert len(files) == expected_num_files, ('number of kaldi %s file changed to %d' % (filepath_key, len(files))) assert len(files) == expected_num_files, ('number of kaldi %s file changed to %d' % (filepath_key, len(files)))
...@@ -170,22 +180,19 @@ class Test_Kaldi(common_utils.TorchaudioTestCase): ...@@ -170,22 +180,19 @@ class Test_Kaldi(common_utils.TorchaudioTestCase):
output = kaldi.resample_waveform(sound, args[1], args[2]) output = kaldi.resample_waveform(sound, args[1], args[2])
return output return output
self._compliance_test_helper(self.test_8000_filepath, 'resample', 32, 3, get_output_fn, atol=1e-2, rtol=1e-5) self._compliance_test_helper(self.test2_filepath, 'resample', 32, 3, get_output_fn, atol=1e-2, rtol=1e-5)
def test_resample_waveform_upsample_size(self): def test_resample_waveform_upsample_size(self):
sound, sample_rate = torchaudio.load_wav(self.test_8000_filepath) upsample_sound = kaldi.resample_waveform(self.test1_signal, self.test1_signal_sr, self.test1_signal_sr * 2)
upsample_sound = kaldi.resample_waveform(sound, sample_rate, sample_rate * 2) self.assertTrue(upsample_sound.size(-1) == self.test1_signal.size(-1) * 2)
self.assertTrue(upsample_sound.size(-1) == sound.size(-1) * 2)
def test_resample_waveform_downsample_size(self): def test_resample_waveform_downsample_size(self):
sound, sample_rate = torchaudio.load_wav(self.test_8000_filepath) downsample_sound = kaldi.resample_waveform(self.test1_signal, self.test1_signal_sr, self.test1_signal_sr // 2)
downsample_sound = kaldi.resample_waveform(sound, sample_rate, sample_rate // 2) self.assertTrue(downsample_sound.size(-1) == self.test1_signal.size(-1) // 2)
self.assertTrue(downsample_sound.size(-1) == sound.size(-1) // 2)
def test_resample_waveform_identity_size(self): def test_resample_waveform_identity_size(self):
sound, sample_rate = torchaudio.load_wav(self.test_8000_filepath) downsample_sound = kaldi.resample_waveform(self.test1_signal, self.test1_signal_sr, self.test1_signal_sr)
downsample_sound = kaldi.resample_waveform(sound, sample_rate, sample_rate) self.assertTrue(downsample_sound.size(-1) == self.test1_signal.size(-1))
self.assertTrue(downsample_sound.size(-1) == sound.size(-1))
def _test_resample_waveform_accuracy(self, up_scale_factor=None, down_scale_factor=None, def _test_resample_waveform_accuracy(self, up_scale_factor=None, down_scale_factor=None,
atol=1e-1, rtol=1e-4): atol=1e-1, rtol=1e-4):
...@@ -226,19 +233,19 @@ class Test_Kaldi(common_utils.TorchaudioTestCase): ...@@ -226,19 +233,19 @@ class Test_Kaldi(common_utils.TorchaudioTestCase):
def test_resample_waveform_multi_channel(self): def test_resample_waveform_multi_channel(self):
num_channels = 3 num_channels = 3
sound, sample_rate = torchaudio.load_wav(self.test_8000_filepath) # (1, 8000) multi_sound = self.test1_signal.repeat(num_channels, 1) # (num_channels, 8000 smp)
multi_sound = sound.repeat(num_channels, 1) # (num_channels, 8000)
for i in range(num_channels): for i in range(num_channels):
multi_sound[i, :] *= (i + 1) * 1.5 multi_sound[i, :] *= (i + 1) * 1.5
multi_sound_sampled = kaldi.resample_waveform(multi_sound, sample_rate, sample_rate // 2) multi_sound_sampled = kaldi.resample_waveform(multi_sound, self.test1_signal_sr, self.test1_signal_sr // 2)
# check that sampling is same whether using separately or in a tensor of size (c, n) # check that sampling is same whether using separately or in a tensor of size (c, n)
for i in range(num_channels): for i in range(num_channels):
single_channel = sound * (i + 1) * 1.5 single_channel = self.test1_signal * (i + 1) * 1.5
single_channel_sampled = kaldi.resample_waveform(single_channel, sample_rate, sample_rate // 2) single_channel_sampled = kaldi.resample_waveform(single_channel, self.test1_signal_sr,
torch.testing.assert_allclose(multi_sound_sampled[i, :], single_channel_sampled[0], rtol=1e-4, atol=1e-8) self.test1_signal_sr // 2)
torch.testing.assert_allclose(multi_sound_sampled[i, :], single_channel_sampled[0], rtol=1e-4, atol=1e-7)
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -45,8 +45,8 @@ class Test_SoxEffectsChain(common_utils.TorchaudioTestCase): ...@@ -45,8 +45,8 @@ class Test_SoxEffectsChain(common_utils.TorchaudioTestCase):
E.append_effect_to_chain("speed", speed) E.append_effect_to_chain("speed", speed)
E.append_effect_to_chain("rate", si.rate) E.append_effect_to_chain("rate", si.rate)
x, sr = E.sox_build_flow_effects() x, sr = E.sox_build_flow_effects()
# check if effects worked # check if effects worked, add small tolerance for rounding effects
self.assertEqual(x.size(1), int((si.length / si.channels) / speed)) self.assertEqual(x.size(1), int((si.length / si.channels) / speed), atol=1, rtol=1e-8)
def test_ulaw_and_siginfo(self): def test_ulaw_and_siginfo(self):
si_out = torchaudio.sox_signalinfo_t() si_out = torchaudio.sox_signalinfo_t()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment