Unverified Commit 9a0e70ea authored by moto's avatar moto Committed by GitHub
Browse files

Run functional librosa compatibility test on CUDA as well (#1436)

parent 52e2943c
......@@ -2,6 +2,7 @@ from .data_utils import (
get_asset_path,
get_whitenoise,
get_sinusoid,
get_spectrogram,
)
from .backend_utils import (
set_audio_backend,
......@@ -30,8 +31,28 @@ from .parameterized_utils import (
nested_params
)
__all__ = ['get_asset_path', 'get_whitenoise', 'get_sinusoid', 'set_audio_backend',
'TempDirMixin', 'HttpServerMixin', 'TestBaseMixin', 'PytorchTestCase', 'TorchaudioTestCase',
'skipIfNoCuda', 'skipIfNoExec', 'skipIfNoModule', 'skipIfNoKaldi', 'skipIfNoSox',
'skipIfNoSoxBackend', 'skipIfRocm', 'get_wav_data', 'normalize_wav', 'load_wav', 'save_wav',
'load_params', 'nested_params']
__all__ = [
'get_asset_path',
'get_whitenoise',
'get_sinusoid',
'get_spectrogram',
'set_audio_backend',
'TempDirMixin',
'HttpServerMixin',
'TestBaseMixin',
'PytorchTestCase',
'TorchaudioTestCase',
'skipIfNoCuda',
'skipIfNoExec',
'skipIfNoModule',
'skipIfNoKaldi',
'skipIfNoSox',
'skipIfNoSoxBackend',
'skipIfRocm',
'get_wav_data',
'normalize_wav',
'load_wav',
'save_wav',
'load_params',
'nested_params',
]
import os.path
from typing import Union
from typing import Union, Optional
import torch
......@@ -62,7 +62,7 @@ def get_whitenoise(
"""
if isinstance(dtype, str):
dtype = getattr(torch, dtype)
if dtype not in [torch.float32, torch.int32, torch.int16, torch.uint8]:
if dtype not in [torch.float64, torch.float32, torch.int32, torch.int16, torch.uint8]:
raise NotImplementedError(f'dtype {dtype} is not supported.')
# According to the doc, folking rng on all CUDA devices is slow when there are many CUDA devices,
# so we only fork on CPU, generate values and move the data to the given device
......@@ -110,3 +110,43 @@ def get_sinusoid(
if not channels_first:
tensor = tensor.t()
return convert_tensor_encoding(tensor, dtype)
def get_spectrogram(
waveform,
*,
n_fft: int = 2048,
hop_length: Optional[int] = None,
win_length: Optional[int] = None,
window: Optional[torch.Tensor] = None,
center: bool = True,
pad_mode: str = 'reflect',
power: Optional[float] = None,
):
"""Generate a spectrogram of the given Tensor
Args:
n_fft: The number of FFT bins.
hop_length: Stride for sliding window. default: ``n_fft // 4``.
win_length: The size of window frame and STFT filter. default: ``n_fft``.
winwdow: Window function. default: Hann window
center: Pad the input sequence if True. See ``torch.stft`` for the detail.
pad_mode: Padding method used when center is True. Default: "reflect".
power: If ``None``, raw spectrogram with complex values are returned,
otherwise the norm of the spectrogram is returned.
"""
hop_length = hop_length or n_fft // 4
win_length = win_length or n_fft
window = torch.hann_window(win_length) if window is None else window
spec = torch.stft(
waveform,
n_fft=n_fft,
hop_length=hop_length,
win_length=win_length,
center=center,
window=window,
pad_mode=pad_mode,
return_complex=True)
if power is not None:
spec = spec.abs() ** power
return spec
......@@ -11,17 +11,41 @@ def load_params(*paths):
return [param(json.loads(line)) for line in file]
def nested_params(*params):
def _name_func(func, _, params):
strs = []
for arg in params.args:
if isinstance(arg, tuple):
strs.append("_".join(str(a) for a in arg))
else:
strs.append(str(arg))
return f'{func.__name__}_{"_".join(strs)}'
return parameterized.expand(
list(product(*params)),
name_func=_name_func
)
def _name_func(func, _, params):
strs = []
for arg in params.args:
if isinstance(arg, tuple):
strs.append("_".join(str(a) for a in arg))
else:
strs.append(str(arg))
return f'{func.__name__}_{"_".join(strs)}'
def nested_params(*params_set):
"""Generate the cartesian product of the given list of parameters.
Args:
params_set (list of parameters): Parameters. When using ``parameterized.param`` class,
all the parameters have to be specified with the class, only using kwargs.
"""
flatten = [p for params in params_set for p in params]
# Parameters to be nested are given as list of plain objects
if all(not isinstance(p, param) for p in flatten):
args = list(product(*params_set))
return parameterized.expand(args, name_func=_name_func)
# Parameters to be nested are given as list of `parameterized.param`
if not all(isinstance(p, param) for p in flatten):
raise TypeError(
"When using ``parameterized.param``, "
"all the parameters have to be of the ``param`` type.")
if any(p.args for p in flatten):
raise ValueError(
"When using ``parameterized.param``, "
"all the parameters have to be provided as keyword argument."
)
args = [param()]
for params in params_set:
args = [param(**x.kwargs, **y.kwargs) for x in args for y in params]
return parameterized.expand(args)
from torchaudio_unittest.common_utils import PytorchTestCase
from .librosa_compatibility_test_impl import Functional, FunctionalComplex
class TestFunctionalCPU(Functional, PytorchTestCase):
device = 'cpu'
class TestFunctionalComplexCPU(FunctionalComplex, PytorchTestCase):
device = 'cpu'
from torchaudio_unittest.common_utils import PytorchTestCase, skipIfNoCuda
from .librosa_compatibility_test_impl import Functional, FunctionalComplex
@skipIfNoCuda
class TestFunctionalCUDA(Functional, PytorchTestCase):
device = 'cuda'
@skipIfNoCuda
class TestFunctionalComplexCUDA(FunctionalComplex, PytorchTestCase):
device = 'cuda'
......@@ -2,7 +2,7 @@ import unittest
from distutils.version import StrictVersion
import torch
from parameterized import parameterized, param
from parameterized import param
import torchaudio.functional as F
from torchaudio._internal.module_utils import is_module_available
......@@ -13,44 +13,58 @@ if LIBROSA_AVAILABLE:
import numpy as np
import librosa
from torchaudio_unittest import common_utils
from torchaudio_unittest.common_utils import (
TestBaseMixin,
nested_params,
get_whitenoise,
get_spectrogram,
)
@unittest.skipIf(not LIBROSA_AVAILABLE, "Librosa not available")
class TestFunctional(common_utils.TorchaudioTestCase):
class Functional(TestBaseMixin):
"""Test suite for functions in `functional` module."""
def test_griffinlim(self):
# NOTE: This test is flaky without a fixed random seed
# See https://github.com/pytorch/audio/issues/382
torch.random.manual_seed(42)
tensor = torch.rand((1, 1000))
dtype = torch.float64
@nested_params([0, 0.99])
def test_griffinlim(self, momentum):
# FFT params
n_fft = 400
ws = 400
hop = 100
window = torch.hann_window(ws)
normalize = False
momentum = 0.99
win_length = n_fft
hop_length = n_fft // 4
window = torch.hann_window(win_length)
power = 1
# GriffinLim params
n_iter = 8
length = 1000
rand_init = False
init = 'random' if rand_init else None
specgram = F.spectrogram(tensor, 0, window, n_fft, hop, ws, 2, normalize).sqrt()
ta_out = F.griffinlim(specgram, window, n_fft, hop, ws, 1,
n_iter, momentum, length, rand_init)
lr_out = librosa.griffinlim(specgram.squeeze(0).numpy(), n_iter=n_iter, hop_length=hop,
momentum=momentum, init=init, length=length)
lr_out = torch.from_numpy(lr_out).unsqueeze(0)
self.assertEqual(ta_out, lr_out, atol=5e-5, rtol=1e-5)
@parameterized.expand([
param(norm=norm, mel_scale=mel_scale, **p.kwargs)
for p in [
waveform = get_whitenoise(device=self.device, dtype=self.dtype)
specgram = get_spectrogram(
waveform, n_fft=n_fft, hop_length=hop_length, power=power,
win_length=win_length, window=window)
result = F.griffinlim(
specgram,
window=window,
n_fft=n_fft,
hop_length=hop_length,
win_length=win_length,
power=power,
n_iter=n_iter,
momentum=momentum,
length=waveform.size(1),
rand_init=False)
expected = librosa.griffinlim(
specgram[0].cpu().numpy(),
n_iter=n_iter,
hop_length=hop_length,
momentum=momentum,
init=None,
length=waveform.size(1))[None, ...]
self.assertEqual(result, torch.from_numpy(expected), atol=5e-5, rtol=1e-07)
@nested_params(
[
param(),
param(n_mels=128, sample_rate=44100),
param(n_mels=128, fmin=2000.0, fmax=5000.0),
......@@ -58,62 +72,60 @@ class TestFunctional(common_utils.TorchaudioTestCase):
param(n_mels=56, fmin=800.0, fmax=900.0),
param(n_mels=56, fmin=1900.0, fmax=900.0),
param(n_mels=10, fmin=1900.0, fmax=900.0),
]
for norm in [None, 'slaney']
for mel_scale in ['htk', 'slaney']
])
],
[param(norm=n) for n in [None, 'slaney']],
[param(mel_scale=s) for s in ['htk', 'slaney']],
)
def test_create_fb(self, n_mels=40, sample_rate=22050, n_fft=2048,
fmin=0.0, fmax=8000.0, norm=None, mel_scale="htk"):
if (norm == "slaney" and StrictVersion(librosa.__version__) < StrictVersion("0.7.2")):
self.skipTest('Test is known to fail with older versions of librosa.')
librosa_fb = librosa.filters.mel(sr=sample_rate,
n_fft=n_fft,
n_mels=n_mels,
fmax=fmax,
fmin=fmin,
htk=mel_scale == "htk",
norm=norm)
fb = F.create_fb_matrix(sample_rate=sample_rate,
n_mels=n_mels,
f_max=fmax,
f_min=fmin,
n_freqs=(n_fft // 2 + 1),
norm=norm,
mel_scale=mel_scale)
for i_mel_bank in range(n_mels):
self.assertEqual(
fb[:, i_mel_bank], torch.tensor(librosa_fb[i_mel_bank]), atol=1e-4, rtol=1e-5)
def test_amplitude_to_DB(self):
spec = torch.rand((6, 201))
if self.device != 'cpu':
self.skipTest('No need to run this test on CUDA')
expected = librosa.filters.mel(
sr=sample_rate,
n_fft=n_fft,
n_mels=n_mels,
fmax=fmax,
fmin=fmin,
htk=mel_scale == "htk",
norm=norm).T
result = F.create_fb_matrix(
sample_rate=sample_rate,
n_mels=n_mels,
f_max=fmax,
f_min=fmin,
n_freqs=(n_fft // 2 + 1),
norm=norm,
mel_scale=mel_scale)
self.assertEqual(result, torch.from_numpy(expected), atol=7e-5, rtol=1.3e-6)
def test_amplitude_to_DB_power(self):
amin = 1e-10
db_multiplier = 0.0
top_db = 80.0
# Power to DB
multiplier = 10.0
ta_out = F.amplitude_to_DB(spec, multiplier, amin, db_multiplier, top_db)
lr_out = librosa.core.power_to_db(spec.numpy())
lr_out = torch.from_numpy(lr_out)
spec = get_spectrogram(get_whitenoise(device=self.device, dtype=self.dtype), power=2)
result = F.amplitude_to_DB(spec, multiplier, amin, db_multiplier, top_db)
expected = librosa.core.power_to_db(spec[0].cpu().numpy())[None, ...]
self.assertEqual(result, torch.from_numpy(expected))
self.assertEqual(ta_out, lr_out, atol=5e-5, rtol=1e-5)
# Amplitude to DB
def test_amplitude_to_DB(self):
amin = 1e-10
db_multiplier = 0.0
top_db = 80.0
multiplier = 20.0
ta_out = F.amplitude_to_DB(spec, multiplier, amin, db_multiplier, top_db)
lr_out = librosa.core.amplitude_to_db(spec.numpy())
lr_out = torch.from_numpy(lr_out)
self.assertEqual(ta_out, lr_out, atol=5e-5, rtol=1e-5)
spec = get_spectrogram(get_whitenoise(device=self.device, dtype=self.dtype), power=1)
result = F.amplitude_to_DB(spec, multiplier, amin, db_multiplier, top_db)
expected = librosa.core.amplitude_to_db(spec[0].cpu().numpy())[None, ...]
self.assertEqual(result, torch.from_numpy(expected))
@unittest.skipIf(not LIBROSA_AVAILABLE, "Librosa not available")
class TestFunctionalComplex(common_utils.TorchaudioTestCase):
class FunctionalComplex(TestBaseMixin):
@nested_params(
[0.5, 1.01, 1.3],
[True, False],
......@@ -127,11 +139,12 @@ class TestFunctionalComplex(common_utils.TorchaudioTestCase):
# Due to cummulative sum, numerical error in using torch.float32 will
# result in bottom right values of the stretched sectrogram to not
# match with librosa.
spec = torch.randn(num_freq, num_frames, dtype=torch.complex128)
spec = torch.randn(num_freq, num_frames, device=self.device, dtype=torch.complex128)
phase_advance = torch.linspace(
0,
np.pi * hop_length,
num_freq,
device=self.device,
dtype=torch.float64)[..., None]
stretched = F.phase_vocoder(
......@@ -139,7 +152,7 @@ class TestFunctionalComplex(common_utils.TorchaudioTestCase):
rate=rate, phase_advance=phase_advance)
expected_stretched = librosa.phase_vocoder(
spec.numpy(),
spec.cpu().numpy(),
rate=rate,
hop_length=hop_length)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment