Commit 9dcc7a15 authored by flyingdown's avatar flyingdown
Browse files

init v0.10.0

parent db2b0b79
Pipeline #254 failed with stages
in 0 seconds
from itertools import product
import torch
from torch.testing._internal.common_utils import TestCase
from parameterized import parameterized
from . import sdr_reference
from source_separation.utils import metrics
class TestSDR(TestCase):
@parameterized.expand([(1, ), (2, ), (32, )])
def test_sdr(self, batch_size):
"""sdr produces the same result as the reference implementation"""
num_frames = 256
estimation = torch.rand(batch_size, num_frames)
origin = torch.rand(batch_size, num_frames)
sdr_ref = sdr_reference.calc_sdr_torch(estimation, origin)
sdr = metrics.sdr(estimation.unsqueeze(1), origin.unsqueeze(1)).squeeze(1)
self.assertEqual(sdr, sdr_ref)
@parameterized.expand(list(product([1, 2, 32], [2, 3, 4, 5])))
def test_sdr_pit(self, batch_size, num_sources):
"""sdr_pit produces the same result as the reference implementation"""
num_frames = 256
estimation = torch.randn(batch_size, num_sources, num_frames)
origin = torch.randn(batch_size, num_sources, num_frames)
estimation -= estimation.mean(axis=2, keepdim=True)
origin -= origin.mean(axis=2, keepdim=True)
batch_sdr_ref = sdr_reference.batch_SDR_torch(estimation, origin)
batch_sdr = metrics.sdr_pit(estimation, origin)
self.assertEqual(batch_sdr, batch_sdr_ref)
"""Reference Implementation of SDR and PIT SDR.
This module was taken from the following implementation
https://github.com/naplab/Conv-TasNet/blob/e66d82a8f956a69749ec8a4ae382217faa097c5c/utility/sdr.py
which was made available by Yi Luo under the following liscence,
Creative Commons Attribution-NonCommercial-ShareAlike 3.0 United States License.
The module was modified in the following manner;
- Remove the functions other than `calc_sdr_torch` and `batch_SDR_torch`,
- Remove the import statements required only for the removed functions.
- Add `# flake8: noqa` so as not to report any format issue on this module.
The implementation of the retained functions and their formats are kept as-is.
"""
# flake8: noqa
import numpy as np
from itertools import permutations
import torch
def calc_sdr_torch(estimation, origin, mask=None):
"""
batch-wise SDR caculation for one audio file on pytorch Variables.
estimation: (batch, nsample)
origin: (batch, nsample)
mask: optional, (batch, nsample), binary
"""
if mask is not None:
origin = origin * mask
estimation = estimation * mask
origin_power = torch.pow(origin, 2).sum(1, keepdim=True) + 1e-8 # (batch, 1)
scale = torch.sum(origin*estimation, 1, keepdim=True) / origin_power # (batch, 1)
est_true = scale * origin # (batch, nsample)
est_res = estimation - est_true # (batch, nsample)
true_power = torch.pow(est_true, 2).sum(1)
res_power = torch.pow(est_res, 2).sum(1)
return 10*torch.log10(true_power) - 10*torch.log10(res_power) # (batch, 1)
def batch_SDR_torch(estimation, origin, mask=None):
"""
batch-wise SDR caculation for multiple audio files.
estimation: (batch, nsource, nsample)
origin: (batch, nsource, nsample)
mask: optional, (batch, nsample), binary
"""
batch_size_est, nsource_est, nsample_est = estimation.size()
batch_size_ori, nsource_ori, nsample_ori = origin.size()
assert batch_size_est == batch_size_ori, "Estimation and original sources should have same shape."
assert nsource_est == nsource_ori, "Estimation and original sources should have same shape."
assert nsample_est == nsample_ori, "Estimation and original sources should have same shape."
assert nsource_est < nsample_est, "Axis 1 should be the number of sources, and axis 2 should be the signal."
batch_size = batch_size_est
nsource = nsource_est
nsample = nsample_est
# zero mean signals
estimation = estimation - torch.mean(estimation, 2, keepdim=True).expand_as(estimation)
origin = origin - torch.mean(origin, 2, keepdim=True).expand_as(estimation)
# possible permutations
perm = list(set(permutations(np.arange(nsource))))
# pair-wise SDR
SDR = torch.zeros((batch_size, nsource, nsource)).type(estimation.type())
for i in range(nsource):
for j in range(nsource):
SDR[:,i,j] = calc_sdr_torch(estimation[:,i], origin[:,j], mask)
# choose the best permutation
SDR_max = []
SDR_perm = []
for permute in perm:
sdr = []
for idx in range(len(permute)):
sdr.append(SDR[:,idx,permute[idx]].view(batch_size,-1))
sdr = torch.sum(torch.cat(sdr, 1), 1)
SDR_perm.append(sdr.view(batch_size, 1))
SDR_perm = torch.cat(SDR_perm, 1)
SDR_max, _ = torch.max(SDR_perm, dim=1)
return SDR_max / nsource
import os
from torchaudio_unittest.common_utils import (
TempDirMixin,
TorchaudioTestCase,
get_whitenoise,
save_wav,
normalize_wav,
)
from source_separation.utils.dataset import wsj0mix
_FILENAMES = [
"012c0207_1.9952_01cc0202_-1.9952.wav",
"01co0302_1.63_014c020q_-1.63.wav",
"01do0316_0.24011_205a0104_-0.24011.wav",
"01lc020x_1.1301_027o030r_-1.1301.wav",
"01mc0202_0.34056_205o0106_-0.34056.wav",
"01nc020t_0.53821_018o030w_-0.53821.wav",
"01po030f_2.2136_40ko031a_-2.2136.wav",
"01ra010o_2.4098_403a010f_-2.4098.wav",
"01xo030b_0.22377_016o031a_-0.22377.wav",
"02ac020x_0.68566_01ec020b_-0.68566.wav",
"20co010m_0.82801_019c0212_-0.82801.wav",
"20da010u_1.2483_017c0211_-1.2483.wav",
"20oo010d_1.0631_01ic020s_-1.0631.wav",
"20sc0107_2.0222_20fo010h_-2.0222.wav",
"20tc010f_0.051456_404a0110_-0.051456.wav",
"407c0214_1.1712_02ca0113_-1.1712.wav",
"40ao030w_2.4697_20vc010a_-2.4697.wav",
"40pa0101_1.1087_40ea0107_-1.1087.wav",
]
def _mock_dataset(root_dir, num_speaker):
dirnames = ["mix"] + [f"s{i+1}" for i in range(num_speaker)]
for dirname in dirnames:
os.makedirs(os.path.join(root_dir, dirname), exist_ok=True)
seed = 0
sample_rate = 8000
expected = []
for filename in _FILENAMES:
mix = None
src = []
for dirname in dirnames:
waveform = get_whitenoise(
sample_rate=8000, duration=1, n_channels=1, dtype="int16", seed=seed
)
seed += 1
path = os.path.join(root_dir, dirname, filename)
save_wav(path, waveform, sample_rate)
waveform = normalize_wav(waveform)
if dirname == "mix":
mix = waveform
else:
src.append(waveform)
expected.append((sample_rate, mix, src))
return expected
class TestWSJ0Mix2(TempDirMixin, TorchaudioTestCase):
backend = "default"
root_dir = None
expected = None
@classmethod
def setUpClass(cls):
cls.root_dir = cls.get_base_temp_dir()
cls.expected = _mock_dataset(cls.root_dir, 2)
def test_wsj0mix(self):
dataset = wsj0mix.WSJ0Mix(self.root_dir, num_speakers=2, sample_rate=8000)
n_ite = 0
for i, sample in enumerate(dataset):
(_, sample_mix, sample_src) = sample
(_, expected_mix, expected_src) = self.expected[i]
self.assertEqual(sample_mix, expected_mix, atol=5e-5, rtol=1e-8)
self.assertEqual(sample_src[0], expected_src[0], atol=5e-5, rtol=1e-8)
self.assertEqual(sample_src[1], expected_src[1], atol=5e-5, rtol=1e-8)
n_ite += 1
assert n_ite == len(self.expected)
class TestWSJ0Mix3(TempDirMixin, TorchaudioTestCase):
backend = "default"
root_dir = None
expected = None
@classmethod
def setUpClass(cls):
cls.root_dir = cls.get_base_temp_dir()
cls.expected = _mock_dataset(cls.root_dir, 3)
def test_wsj0mix(self):
dataset = wsj0mix.WSJ0Mix(self.root_dir, num_speakers=3, sample_rate=8000)
n_ite = 0
for i, sample in enumerate(dataset):
(_, sample_mix, sample_src) = sample
(_, expected_mix, expected_src) = self.expected[i]
self.assertEqual(sample_mix, expected_mix, atol=5e-5, rtol=1e-8)
self.assertEqual(sample_src[0], expected_src[0], atol=5e-5, rtol=1e-8)
self.assertEqual(sample_src[1], expected_src[1], atol=5e-5, rtol=1e-8)
self.assertEqual(sample_src[2], expected_src[2], atol=5e-5, rtol=1e-8)
n_ite += 1
assert n_ite == len(self.expected)
import torch
from .tacotron2_loss_impl import (
Tacotron2LossShapeTests,
Tacotron2LossTorchscriptTests,
Tacotron2LossGradcheckTests,
)
from torchaudio_unittest.common_utils import PytorchTestCase
class TestTacotron2LossShapeFloat32CPU(Tacotron2LossShapeTests, PytorchTestCase):
dtype = torch.float32
device = torch.device("cpu")
class TestTacotron2TorchsciptFloat32CPU(Tacotron2LossTorchscriptTests, PytorchTestCase):
dtype = torch.float32
device = torch.device("cpu")
class TestTacotron2GradcheckFloat64CPU(Tacotron2LossGradcheckTests, PytorchTestCase):
dtype = torch.float64 # gradcheck needs a higher numerical accuracy
device = torch.device("cpu")
import torch
from .tacotron2_loss_impl import (
Tacotron2LossShapeTests,
Tacotron2LossTorchscriptTests,
Tacotron2LossGradcheckTests,
)
from torchaudio_unittest.common_utils import skipIfNoCuda, PytorchTestCase
@skipIfNoCuda
class TestTacotron2LossShapeFloat32CUDA(PytorchTestCase, Tacotron2LossShapeTests):
dtype = torch.float32
device = torch.device("cuda")
@skipIfNoCuda
class TestTacotron2TorchsciptFloat32CUDA(PytorchTestCase, Tacotron2LossTorchscriptTests):
dtype = torch.float32
device = torch.device("cuda")
@skipIfNoCuda
class TestTacotron2GradcheckFloat64CUDA(PytorchTestCase, Tacotron2LossGradcheckTests):
dtype = torch.float64 # gradcheck needs a higher numerical accuracy
device = torch.device("cuda")
import torch
from torch.autograd import gradcheck, gradgradcheck
from pipeline_tacotron2.loss import Tacotron2Loss
from torchaudio_unittest.common_utils import (
TestBaseMixin,
torch_script,
)
class Tacotron2LossInputMixin(TestBaseMixin):
def _get_inputs(self, n_mel=80, n_batch=16, max_mel_specgram_length=300):
mel_specgram = torch.rand(
n_batch, n_mel, max_mel_specgram_length, dtype=self.dtype, device=self.device
)
mel_specgram_postnet = torch.rand(
n_batch, n_mel, max_mel_specgram_length, dtype=self.dtype, device=self.device
)
gate_out = torch.rand(n_batch, dtype=self.dtype, device=self.device)
truth_mel_specgram = torch.rand(
n_batch, n_mel, max_mel_specgram_length, dtype=self.dtype, device=self.device
)
truth_gate_out = torch.rand(n_batch, dtype=self.dtype, device=self.device)
truth_mel_specgram.requires_grad = False
truth_gate_out.requires_grad = False
return (
mel_specgram,
mel_specgram_postnet,
gate_out,
truth_mel_specgram,
truth_gate_out,
)
class Tacotron2LossShapeTests(Tacotron2LossInputMixin):
def test_tacotron2_loss_shape(self):
"""Validate the output shape of Tacotron2Loss."""
n_batch = 16
(
mel_specgram,
mel_specgram_postnet,
gate_out,
truth_mel_specgram,
truth_gate_out,
) = self._get_inputs(n_batch=n_batch)
mel_loss, mel_postnet_loss, gate_loss = Tacotron2Loss()(
(mel_specgram, mel_specgram_postnet, gate_out),
(truth_mel_specgram, truth_gate_out)
)
self.assertEqual(mel_loss.size(), torch.Size([]))
self.assertEqual(mel_postnet_loss.size(), torch.Size([]))
self.assertEqual(gate_loss.size(), torch.Size([]))
class Tacotron2LossTorchscriptTests(Tacotron2LossInputMixin):
def _assert_torchscript_consistency(self, fn, tensors):
ts_func = torch_script(fn)
output = fn(tensors[:3], tensors[3:])
ts_output = ts_func(tensors[:3], tensors[3:])
self.assertEqual(ts_output, output)
def test_tacotron2_loss_torchscript_consistency(self):
"""Validate the torchscript consistency of Tacotron2Loss."""
loss_fn = Tacotron2Loss()
self._assert_torchscript_consistency(loss_fn, self._get_inputs())
class Tacotron2LossGradcheckTests(Tacotron2LossInputMixin):
def test_tacotron2_loss_gradcheck(self):
"""Performing gradient check on Tacotron2Loss."""
(
mel_specgram,
mel_specgram_postnet,
gate_out,
truth_mel_specgram,
truth_gate_out,
) = self._get_inputs()
mel_specgram.requires_grad_(True)
mel_specgram_postnet.requires_grad_(True)
gate_out.requires_grad_(True)
def _fn(mel_specgram, mel_specgram_postnet, gate_out, truth_mel_specgram, truth_gate_out):
loss_fn = Tacotron2Loss()
return loss_fn(
(mel_specgram, mel_specgram_postnet, gate_out),
(truth_mel_specgram, truth_gate_out),
)
gradcheck(
_fn,
(mel_specgram, mel_specgram_postnet, gate_out, truth_mel_specgram, truth_gate_out),
fast_mode=True,
)
gradgradcheck(
_fn,
(mel_specgram, mel_specgram_postnet, gate_out, truth_mel_specgram, truth_gate_out),
fast_mode=True,
)
from parameterized import parameterized
from torchaudio._internal.module_utils import is_module_available
from torchaudio_unittest.common_utils import TorchaudioTestCase, skipIfNoModule
if is_module_available("unidecode") and is_module_available("inflect"):
from pipeline_tacotron2.text.text_preprocessing import text_to_sequence
from pipeline_tacotron2.text.numbers import (
_remove_commas,
_expand_pounds,
_expand_dollars,
_expand_decimal_point,
_expand_ordinal,
_expand_number,
)
@skipIfNoModule("unidecode")
@skipIfNoModule("inflect")
class TestTextPreprocessor(TorchaudioTestCase):
@parameterized.expand(
[
["dr. Strange?", [15, 26, 14, 31, 26, 29, 11, 30, 31, 29, 12, 25, 18, 16, 10]],
["ML, is fun.", [24, 23, 6, 11, 20, 30, 11, 17, 32, 25, 7]],
["I love torchaudio!", [20, 11, 23, 26, 33, 16, 11, 31, 26, 29, 14, 19, 12, 32, 15, 20, 26, 2]],
# 'one thousand dollars, twenty cents'
["$1,000.20", [26, 25, 16, 11, 31, 19, 26, 32, 30, 12, 25, 15, 11, 15, 26, 23, 23,
12, 29, 30, 6, 11, 31, 34, 16, 25, 31, 36, 11, 14, 16, 25, 31, 30]],
]
)
def test_text_to_sequence(self, sent, seq):
assert (text_to_sequence(sent) == seq)
@parameterized.expand(
[
["He, she, and I have $1,000", "He, she, and I have $1000"],
]
)
def test_remove_commas(self, sent, truth):
assert (_remove_commas(sent) == truth)
@parameterized.expand(
[
["He, she, and I have £1000", "He, she, and I have 1000 pounds"],
]
)
def test_expand_pounds(self, sent, truth):
assert (_expand_pounds(sent) == truth)
@parameterized.expand(
[
["He, she, and I have $1000", "He, she, and I have 1000 dollars"],
["He, she, and I have $3000.01", "He, she, and I have 3000 dollars, 1 cent"],
["He has $500.20 and she has $1000.50.",
"He has 500 dollars, 20 cents and she has 1000 dollars, 50 cents."],
]
)
def test_expand_dollars(self, sent, truth):
assert (_expand_dollars(sent) == truth)
@parameterized.expand(
[
["1000.20", "1000 point 20"],
["1000.1", "1000 point 1"],
]
)
def test_expand_decimal_point(self, sent, truth):
assert (_expand_decimal_point(sent) == truth)
@parameterized.expand(
[
["21st centry", "twenty-first centry"],
["20th centry", "twentieth centry"],
["2nd place.", "second place."],
]
)
def test_expand_ordinal(self, sent, truth):
assert (_expand_ordinal(sent) == truth)
_expand_ordinal,
@parameterized.expand(
[
["100020 dollars.", "one hundred thousand twenty dollars."],
["1234567890!", "one billion, two hundred thirty-four million, "
"five hundred sixty-seven thousand, eight hundred ninety!"],
]
)
def test_expand_number(self, sent, truth):
assert (_expand_number(sent) == truth)
import torch
from .autograd_impl import Autograd, AutogradFloat32
from torchaudio_unittest import common_utils
class TestAutogradLfilterCPU(Autograd, common_utils.PytorchTestCase):
dtype = torch.float64
device = torch.device('cpu')
class TestAutogradRNNTCPU(AutogradFloat32, common_utils.PytorchTestCase):
dtype = torch.float32
device = torch.device('cpu')
import torch
from .autograd_impl import Autograd, AutogradFloat32
from torchaudio_unittest import common_utils
@common_utils.skipIfNoCuda
class TestAutogradLfilterCUDA(Autograd, common_utils.PytorchTestCase):
dtype = torch.float64
device = torch.device('cuda')
@common_utils.skipIfNoCuda
class TestAutogradRNNTCUDA(AutogradFloat32, common_utils.PytorchTestCase):
dtype = torch.float32
device = torch.device('cuda')
from typing import Callable, Tuple
from functools import partial
import torch
from parameterized import parameterized
from torch import Tensor
import torchaudio.functional as F
from torch.autograd import gradcheck, gradgradcheck
from torchaudio_unittest.common_utils import (
TestBaseMixin,
get_whitenoise,
rnnt_utils,
)
class Autograd(TestBaseMixin):
def assert_grad(
self,
transform: Callable[..., Tensor],
inputs: Tuple[torch.Tensor],
*,
enable_all_grad: bool = True,
):
inputs_ = []
for i in inputs:
if torch.is_tensor(i):
i = i.to(dtype=self.dtype, device=self.device)
if enable_all_grad:
i.requires_grad = True
inputs_.append(i)
assert gradcheck(transform, inputs_)
assert gradgradcheck(transform, inputs_)
def test_lfilter_x(self):
torch.random.manual_seed(2434)
x = get_whitenoise(sample_rate=22050, duration=0.01, n_channels=2)
a = torch.tensor([0.7, 0.2, 0.6])
b = torch.tensor([0.4, 0.2, 0.9])
x.requires_grad = True
self.assert_grad(F.lfilter, (x, a, b), enable_all_grad=False)
def test_lfilter_a(self):
torch.random.manual_seed(2434)
x = get_whitenoise(sample_rate=22050, duration=0.01, n_channels=2)
a = torch.tensor([0.7, 0.2, 0.6])
b = torch.tensor([0.4, 0.2, 0.9])
a.requires_grad = True
self.assert_grad(F.lfilter, (x, a, b), enable_all_grad=False)
def test_lfilter_b(self):
torch.random.manual_seed(2434)
x = get_whitenoise(sample_rate=22050, duration=0.01, n_channels=2)
a = torch.tensor([0.7, 0.2, 0.6])
b = torch.tensor([0.4, 0.2, 0.9])
b.requires_grad = True
self.assert_grad(F.lfilter, (x, a, b), enable_all_grad=False)
def test_lfilter_all_inputs(self):
torch.random.manual_seed(2434)
x = get_whitenoise(sample_rate=22050, duration=0.01, n_channels=2)
a = torch.tensor([0.7, 0.2, 0.6])
b = torch.tensor([0.4, 0.2, 0.9])
self.assert_grad(F.lfilter, (x, a, b))
def test_lfilter_filterbanks(self):
torch.random.manual_seed(2434)
x = get_whitenoise(sample_rate=22050, duration=0.01, n_channels=3)
a = torch.tensor([[0.7, 0.2, 0.6],
[0.8, 0.2, 0.9]])
b = torch.tensor([[0.4, 0.2, 0.9],
[0.7, 0.2, 0.6]])
self.assert_grad(partial(F.lfilter, batching=False), (x, a, b))
def test_lfilter_batching(self):
torch.random.manual_seed(2434)
x = get_whitenoise(sample_rate=22050, duration=0.01, n_channels=2)
a = torch.tensor([[0.7, 0.2, 0.6],
[0.8, 0.2, 0.9]])
b = torch.tensor([[0.4, 0.2, 0.9],
[0.7, 0.2, 0.6]])
self.assert_grad(F.lfilter, (x, a, b))
def test_filtfilt_a(self):
torch.random.manual_seed(2434)
x = get_whitenoise(sample_rate=22050, duration=0.01, n_channels=2)
a = torch.tensor([0.7, 0.2, 0.6])
b = torch.tensor([0.4, 0.2, 0.9])
a.requires_grad = True
self.assert_grad(F.filtfilt, (x, a, b), enable_all_grad=False)
def test_filtfilt_b(self):
torch.random.manual_seed(2434)
x = get_whitenoise(sample_rate=22050, duration=0.01, n_channels=2)
a = torch.tensor([0.7, 0.2, 0.6])
b = torch.tensor([0.4, 0.2, 0.9])
b.requires_grad = True
self.assert_grad(F.filtfilt, (x, a, b), enable_all_grad=False)
def test_filtfilt_all_inputs(self):
torch.random.manual_seed(2434)
x = get_whitenoise(sample_rate=22050, duration=0.01, n_channels=2)
a = torch.tensor([0.7, 0.2, 0.6])
b = torch.tensor([0.4, 0.2, 0.9])
self.assert_grad(F.filtfilt, (x, a, b))
def test_filtfilt_batching(self):
torch.random.manual_seed(2434)
x = get_whitenoise(sample_rate=22050, duration=0.01, n_channels=2)
a = torch.tensor([[0.7, 0.2, 0.6],
[0.8, 0.2, 0.9]])
b = torch.tensor([[0.4, 0.2, 0.9],
[0.7, 0.2, 0.6]])
self.assert_grad(F.filtfilt, (x, a, b))
def test_biquad(self):
torch.random.manual_seed(2434)
x = get_whitenoise(sample_rate=22050, duration=0.01, n_channels=1)
a = torch.tensor([0.7, 0.2, 0.6])
b = torch.tensor([0.4, 0.2, 0.9])
self.assert_grad(F.biquad, (x, b[0], b[1], b[2], a[0], a[1], a[2]))
@parameterized.expand([
(800, 0.7, True),
(800, 0.7, False),
])
def test_band_biquad(self, central_freq, Q, noise):
torch.random.manual_seed(2434)
sr = 22050
x = get_whitenoise(sample_rate=sr, duration=0.01, n_channels=1)
central_freq = torch.tensor(central_freq)
Q = torch.tensor(Q)
self.assert_grad(F.band_biquad, (x, sr, central_freq, Q, noise))
@parameterized.expand([
(800, 0.7, 10),
(800, 0.7, -10),
])
def test_bass_biquad(self, central_freq, Q, gain):
torch.random.manual_seed(2434)
sr = 22050
x = get_whitenoise(sample_rate=sr, duration=0.01, n_channels=1)
central_freq = torch.tensor(central_freq)
Q = torch.tensor(Q)
gain = torch.tensor(gain)
self.assert_grad(F.bass_biquad, (x, sr, gain, central_freq, Q))
@parameterized.expand([
(3000, 0.7, 10),
(3000, 0.7, -10),
])
def test_treble_biquad(self, central_freq, Q, gain):
torch.random.manual_seed(2434)
sr = 22050
x = get_whitenoise(sample_rate=sr, duration=0.01, n_channels=1)
central_freq = torch.tensor(central_freq)
Q = torch.tensor(Q)
gain = torch.tensor(gain)
self.assert_grad(F.treble_biquad, (x, sr, gain, central_freq, Q))
@parameterized.expand([
(800, 0.7, ),
])
def test_allpass_biquad(self, central_freq, Q):
torch.random.manual_seed(2434)
sr = 22050
x = get_whitenoise(sample_rate=sr, duration=0.01, n_channels=1)
central_freq = torch.tensor(central_freq)
Q = torch.tensor(Q)
self.assert_grad(F.allpass_biquad, (x, sr, central_freq, Q))
@parameterized.expand([
(800, 0.7, ),
])
def test_lowpass_biquad(self, cutoff_freq, Q):
torch.random.manual_seed(2434)
sr = 22050
x = get_whitenoise(sample_rate=sr, duration=0.01, n_channels=1)
cutoff_freq = torch.tensor(cutoff_freq)
Q = torch.tensor(Q)
self.assert_grad(F.lowpass_biquad, (x, sr, cutoff_freq, Q))
@parameterized.expand([
(800, 0.7, ),
])
def test_highpass_biquad(self, cutoff_freq, Q):
torch.random.manual_seed(2434)
sr = 22050
x = get_whitenoise(sample_rate=sr, duration=0.01, n_channels=1)
cutoff_freq = torch.tensor(cutoff_freq)
Q = torch.tensor(Q)
self.assert_grad(F.highpass_biquad, (x, sr, cutoff_freq, Q))
@parameterized.expand([
(800, 0.7, True),
(800, 0.7, False),
])
def test_bandpass_biquad(self, central_freq, Q, const_skirt_gain):
torch.random.manual_seed(2434)
sr = 22050
x = get_whitenoise(sample_rate=sr, duration=0.01, n_channels=1)
central_freq = torch.tensor(central_freq)
Q = torch.tensor(Q)
self.assert_grad(F.bandpass_biquad, (x, sr, central_freq, Q, const_skirt_gain))
@parameterized.expand([
(800, 0.7, 10),
(800, 0.7, -10),
])
def test_equalizer_biquad(self, central_freq, Q, gain):
torch.random.manual_seed(2434)
sr = 22050
x = get_whitenoise(sample_rate=sr, duration=0.01, n_channels=1)
central_freq = torch.tensor(central_freq)
Q = torch.tensor(Q)
gain = torch.tensor(gain)
self.assert_grad(F.equalizer_biquad, (x, sr, central_freq, gain, Q))
@parameterized.expand([
(800, 0.7, ),
])
def test_bandreject_biquad(self, central_freq, Q):
torch.random.manual_seed(2434)
sr = 22050
x = get_whitenoise(sample_rate=sr, duration=0.01, n_channels=1)
central_freq = torch.tensor(central_freq)
Q = torch.tensor(Q)
self.assert_grad(F.bandreject_biquad, (x, sr, central_freq, Q))
class AutogradFloat32(TestBaseMixin):
def assert_grad(
self,
transform: Callable[..., Tensor],
inputs: Tuple[torch.Tensor],
enable_all_grad: bool = True,
):
inputs_ = []
for i in inputs:
if torch.is_tensor(i):
i = i.to(dtype=self.dtype, device=self.device)
if enable_all_grad:
i.requires_grad = True
inputs_.append(i)
# gradcheck with float32 requires higher atol and epsilon
assert gradcheck(transform, inputs, eps=1e-3, atol=1e-3, nondet_tol=0.)
@parameterized.expand([
(rnnt_utils.get_B1_T10_U3_D4_data, ),
(rnnt_utils.get_B2_T4_U3_D3_data, ),
(rnnt_utils.get_B1_T2_U3_D5_data, ),
])
def test_rnnt_loss(self, data_func):
def get_data(data_func, device):
data = data_func()
if type(data) == tuple:
data = data[0]
return data
data = get_data(data_func, self.device)
inputs = (
data["logits"].to(torch.float32), # logits
data["targets"], # targets
data["logit_lengths"], # logit_lengths
data["target_lengths"], # target_lengths
data["blank"], # blank
-1, # clamp
)
self.assert_grad(F.rnnt_loss, inputs, enable_all_grad=False)
"""Test numerical consistency among single input and batched input."""
import itertools
import math
from parameterized import parameterized, parameterized_class
import torch
import torchaudio.functional as F
from torchaudio_unittest import common_utils
def _name_from_args(func, _, params):
"""Return a parameterized test name, based on parameter values."""
return "{}_{}".format(
func.__name__,
"_".join(str(arg) for arg in params.args))
@parameterized_class([
# Single-item batch isolates problems that come purely from adding a
# dimension (rather than processing multiple items)
{"batch_size": 1},
{"batch_size": 3},
])
class TestFunctional(common_utils.TorchaudioTestCase):
"""Test functions defined in `functional` module"""
backend = 'default'
def assert_batch_consistency(
self, functional, batch, *args, atol=1e-8, rtol=1e-5, seed=42,
**kwargs):
n = batch.size(0)
# Compute items separately, then batch the result
torch.random.manual_seed(seed)
items_input = batch.clone()
items_result = torch.stack([
functional(items_input[i], *args, **kwargs) for i in range(n)
])
# Batch the input and run
torch.random.manual_seed(seed)
batch_input = batch.clone()
batch_result = functional(batch_input, *args, **kwargs)
self.assertEqual(items_input, batch_input, rtol=rtol, atol=atol)
self.assertEqual(items_result, batch_result, rtol=rtol, atol=atol)
def test_griffinlim(self):
n_fft = 400
ws = 400
hop = 200
window = torch.hann_window(ws)
power = 2
momentum = 0.99
n_iter = 32
length = 1000
torch.random.manual_seed(0)
batch = torch.rand(self.batch_size, 1, 201, 6)
self.assert_batch_consistency(
F.griffinlim, batch, window, n_fft, hop, ws, power,
n_iter, momentum, length, 0, atol=5e-5)
@parameterized.expand(list(itertools.product(
[8000, 16000, 44100],
[1, 2],
)), name_func=_name_from_args)
def test_detect_pitch_frequency(self, sample_rate, n_channels):
# Use different frequencies to ensure each item in the batch returns a
# different answer.
torch.manual_seed(0)
frequencies = torch.randint(100, 1000, [self.batch_size])
waveforms = torch.stack([
common_utils.get_sinusoid(
frequency=frequency, sample_rate=sample_rate,
n_channels=n_channels, duration=5)
for frequency in frequencies
])
self.assert_batch_consistency(
F.detect_pitch_frequency, waveforms, sample_rate)
def test_amplitude_to_DB(self):
torch.manual_seed(0)
spec = torch.rand(self.batch_size, 2, 100, 100) * 200
amplitude_mult = 20.
amin = 1e-10
ref = 1.0
db_mult = math.log10(max(amin, ref))
# Test with & without a `top_db` clamp
self.assert_batch_consistency(
F.amplitude_to_DB, spec, amplitude_mult,
amin, db_mult, top_db=None)
self.assert_batch_consistency(
F.amplitude_to_DB, spec, amplitude_mult,
amin, db_mult, top_db=40.)
def test_amplitude_to_DB_itemwise_clamps(self):
"""Ensure that the clamps are separate for each spectrogram in a batch.
The clamp was determined per-batch in a prior implementation, which
meant it was determined by the loudest item, thus items weren't
independent. See:
https://github.com/pytorch/audio/issues/994
"""
amplitude_mult = 20.
amin = 1e-10
ref = 1.0
db_mult = math.log10(max(amin, ref))
top_db = 20.
# Make a batch of noise
torch.manual_seed(0)
spec = torch.rand([2, 2, 100, 100]) * 200
# Make one item blow out the other
spec[0] += 50
batchwise_dbs = F.amplitude_to_DB(spec, amplitude_mult, amin,
db_mult, top_db=top_db)
itemwise_dbs = torch.stack([
F.amplitude_to_DB(item, amplitude_mult, amin,
db_mult, top_db=top_db)
for item in spec
])
self.assertEqual(batchwise_dbs, itemwise_dbs)
def test_amplitude_to_DB_not_channelwise_clamps(self):
"""Check that clamps are applied per-item, not per channel."""
amplitude_mult = 20.
amin = 1e-10
ref = 1.0
db_mult = math.log10(max(amin, ref))
top_db = 40.
torch.manual_seed(0)
spec = torch.rand([1, 2, 100, 100]) * 200
# Make one channel blow out the other
spec[:, 0] += 50
specwise_dbs = F.amplitude_to_DB(spec, amplitude_mult, amin,
db_mult, top_db=top_db)
channelwise_dbs = torch.stack([
F.amplitude_to_DB(spec[:, i], amplitude_mult, amin,
db_mult, top_db=top_db)
for i in range(spec.size(-3))
])
# Just check channelwise gives a different answer.
difference = (specwise_dbs - channelwise_dbs).abs()
assert (difference >= 1e-5).any()
def test_contrast(self):
torch.random.manual_seed(0)
waveforms = torch.rand(self.batch_size, 2, 100) - 0.5
self.assert_batch_consistency(
F.contrast, waveforms, enhancement_amount=80.)
def test_dcshift(self):
torch.random.manual_seed(0)
waveforms = torch.rand(self.batch_size, 2, 100) - 0.5
self.assert_batch_consistency(
F.dcshift, waveforms, shift=0.5, limiter_gain=0.05)
def test_overdrive(self):
torch.random.manual_seed(0)
waveforms = torch.rand(self.batch_size, 2, 100) - 0.5
self.assert_batch_consistency(
F.overdrive, waveforms, gain=45, colour=30)
def test_phaser(self):
sample_rate = 44100
n_channels = 2
waveform = common_utils.get_whitenoise(
sample_rate=sample_rate, n_channels=self.batch_size * n_channels,
duration=1)
batch = waveform.view(self.batch_size, n_channels, waveform.size(-1))
self.assert_batch_consistency(F.phaser, batch, sample_rate)
def test_flanger(self):
torch.random.manual_seed(0)
waveforms = torch.rand(self.batch_size, 2, 100) - 0.5
sample_rate = 44100
self.assert_batch_consistency(F.flanger, waveforms, sample_rate)
@parameterized.expand(list(itertools.product(
[True, False], # center
[True, False], # norm_vars
)), name_func=_name_from_args)
def test_sliding_window_cmn(self, center, norm_vars):
torch.manual_seed(0)
spectrogram = torch.rand(self.batch_size, 2, 1024, 1024) * 200
self.assert_batch_consistency(
F.sliding_window_cmn, spectrogram, center=center,
norm_vars=norm_vars)
@parameterized.expand([("sinc_interpolation"), ("kaiser_window")])
def test_resample_waveform(self, resampling_method):
num_channels = 3
sr = 16000
new_sr = sr // 2
multi_sound = common_utils.get_whitenoise(sample_rate=sr, n_channels=num_channels, duration=0.5,)
self.assert_batch_consistency(
F.resample, multi_sound, orig_freq=sr, new_freq=new_sr,
resampling_method=resampling_method, rtol=1e-4, atol=1e-7)
@common_utils.skipIfNoKaldi
def test_compute_kaldi_pitch(self):
sample_rate = 44100
n_channels = 2
waveform = common_utils.get_whitenoise(
sample_rate=sample_rate, n_channels=self.batch_size * n_channels)
batch = waveform.view(self.batch_size, n_channels, waveform.size(-1))
self.assert_batch_consistency(
F.compute_kaldi_pitch, batch, sample_rate=sample_rate)
def test_lfilter(self):
signal_length = 2048
torch.manual_seed(2434)
x = torch.randn(self.batch_size, signal_length)
a = torch.rand(self.batch_size, 3)
b = torch.rand(self.batch_size, 3)
batchwise_output = F.lfilter(x, a, b, batching=True)
itemwise_output = torch.stack([
F.lfilter(x[i], a[i], b[i])
for i in range(self.batch_size)
])
self.assertEqual(batchwise_output, itemwise_output)
def test_filtfilt(self):
signal_length = 2048
torch.manual_seed(2434)
x = torch.randn(self.batch_size, signal_length)
a = torch.rand(self.batch_size, 3)
b = torch.rand(self.batch_size, 3)
batchwise_output = F.filtfilt(x, a, b)
itemwise_output = torch.stack([
F.filtfilt(x[i], a[i], b[i])
for i in range(self.batch_size)
])
self.assertEqual(batchwise_output, itemwise_output)
import torch
import torchaudio.functional as F
import unittest
from parameterized import parameterized
from torchaudio_unittest.common_utils import PytorchTestCase, TorchaudioTestCase, skipIfNoSox
from .functional_impl import Functional, FunctionalCPUOnly
class TestFunctionalFloat32(Functional, FunctionalCPUOnly, PytorchTestCase):
dtype = torch.float32
device = torch.device('cpu')
@unittest.expectedFailure
def test_lfilter_9th_order_filter_stability(self):
super().test_lfilter_9th_order_filter_stability()
class TestFunctionalFloat64(Functional, PytorchTestCase):
dtype = torch.float64
device = torch.device('cpu')
@skipIfNoSox
class TestApplyCodec(TorchaudioTestCase):
backend = "sox_io"
def _smoke_test(self, format, compression, check_num_frames):
"""
The purpose of this test suite is to verify that apply_codec functionalities do not exhibit
abnormal behaviors.
"""
torch.random.manual_seed(42)
sample_rate = 8000
num_frames = 3 * sample_rate
num_channels = 2
waveform = torch.rand(num_channels, num_frames)
augmented = F.apply_codec(waveform,
sample_rate,
format,
True,
compression
)
assert augmented.dtype == waveform.dtype
assert augmented.shape[0] == num_channels
if check_num_frames:
assert augmented.shape[1] == num_frames
def test_wave(self):
self._smoke_test("wav", compression=None, check_num_frames=True)
@parameterized.expand([(96,), (128,), (160,), (192,), (224,), (256,), (320,)])
def test_mp3(self, compression):
self._smoke_test("mp3", compression, check_num_frames=False)
@parameterized.expand([(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,)])
def test_flac(self, compression):
self._smoke_test("flac", compression, check_num_frames=False)
@parameterized.expand([(-1,), (0,), (1,), (2,), (3,), (3.6,), (5,), (10,)])
def test_vorbis(self, compression):
self._smoke_test("vorbis", compression, check_num_frames=False)
import torch
import unittest
from torchaudio_unittest.common_utils import PytorchTestCase, skipIfNoCuda
from .functional_impl import Functional
@skipIfNoCuda
class TestFunctionalFloat32(Functional, PytorchTestCase):
dtype = torch.float32
device = torch.device('cuda')
@unittest.expectedFailure
def test_lfilter_9th_order_filter_stability(self):
super().test_lfilter_9th_order_filter_stability()
@skipIfNoCuda
class TestLFilterFloat64(Functional, PytorchTestCase):
dtype = torch.float64
device = torch.device('cuda')
"""Test definition common to CPU and CUDA"""
import math
import itertools
import warnings
import numpy as np
import torch
import torchaudio.functional as F
from parameterized import parameterized
from scipy import signal
from torchaudio_unittest.common_utils import (
TestBaseMixin,
get_sinusoid,
nested_params,
get_whitenoise,
rnnt_utils,
)
class Functional(TestBaseMixin):
def _test_resample_waveform_accuracy(self, up_scale_factor=None, down_scale_factor=None,
resampling_method="sinc_interpolation", atol=1e-1, rtol=1e-4):
# resample the signal and compare it to the ground truth
n_to_trim = 20
sample_rate = 1000
new_sample_rate = sample_rate
if up_scale_factor is not None:
new_sample_rate = int(new_sample_rate * up_scale_factor)
if down_scale_factor is not None:
new_sample_rate = int(new_sample_rate / down_scale_factor)
duration = 5 # seconds
original_timestamps = torch.arange(0, duration, 1.0 / sample_rate)
sound = 123 * torch.cos(2 * math.pi * 3 * original_timestamps).unsqueeze(0)
estimate = F.resample(sound, sample_rate, new_sample_rate,
resampling_method=resampling_method).squeeze()
new_timestamps = torch.arange(0, duration, 1.0 / new_sample_rate)[:estimate.size(0)]
ground_truth = 123 * torch.cos(2 * math.pi * 3 * new_timestamps)
# trim the first/last n samples as these points have boundary effects
ground_truth = ground_truth[..., n_to_trim:-n_to_trim]
estimate = estimate[..., n_to_trim:-n_to_trim]
self.assertEqual(estimate, ground_truth, atol=atol, rtol=rtol)
def _test_costs_and_gradients(
self, data, ref_costs, ref_gradients, atol=1e-6, rtol=1e-2
):
logits_shape = data["logits"].shape
costs, gradients = rnnt_utils.compute_with_pytorch_transducer(data=data)
self.assertEqual(costs, ref_costs, atol=atol, rtol=rtol)
self.assertEqual(logits_shape, gradients.shape)
self.assertEqual(gradients, ref_gradients, atol=atol, rtol=rtol)
def test_lfilter_simple(self):
"""
Create a very basic signal,
Then make a simple 4th order delay
The output should be same as the input but shifted
"""
torch.random.manual_seed(42)
waveform = torch.rand(2, 44100 * 1, dtype=self.dtype, device=self.device)
b_coeffs = torch.tensor([0, 0, 0, 1], dtype=self.dtype, device=self.device)
a_coeffs = torch.tensor([1, 0, 0, 0], dtype=self.dtype, device=self.device)
output_waveform = F.lfilter(waveform, a_coeffs, b_coeffs)
self.assertEqual(output_waveform[:, 3:], waveform[:, 0:-3], atol=1e-5, rtol=1e-5)
def test_lfilter_clamp(self):
input_signal = torch.ones(1, 44100 * 1, dtype=self.dtype, device=self.device)
b_coeffs = torch.tensor([1, 0], dtype=self.dtype, device=self.device)
a_coeffs = torch.tensor([1, -0.95], dtype=self.dtype, device=self.device)
output_signal = F.lfilter(input_signal, a_coeffs, b_coeffs, clamp=True)
assert output_signal.max() <= 1
output_signal = F.lfilter(input_signal, a_coeffs, b_coeffs, clamp=False)
assert output_signal.max() > 1
@parameterized.expand([
((44100,), (4,), (44100,)),
((3, 44100), (4,), (3, 44100,)),
((2, 3, 44100), (4,), (2, 3, 44100,)),
((1, 2, 3, 44100), (4,), (1, 2, 3, 44100,)),
((44100,), (2, 4), (2, 44100)),
((3, 44100), (1, 4), (3, 1, 44100)),
((1, 2, 44100), (3, 4), (1, 2, 3, 44100))
])
def test_lfilter_shape(self, input_shape, coeff_shape, target_shape):
torch.random.manual_seed(42)
waveform = torch.rand(*input_shape, dtype=self.dtype, device=self.device)
b_coeffs = torch.rand(*coeff_shape, dtype=self.dtype, device=self.device)
a_coeffs = torch.rand(*coeff_shape, dtype=self.dtype, device=self.device)
output_waveform = F.lfilter(waveform, a_coeffs, b_coeffs, batching=False)
assert input_shape == waveform.size()
assert target_shape == output_waveform.size()
def test_lfilter_9th_order_filter_stability(self):
"""
Validate the precision of lfilter against reference scipy implementation when using high order filter.
The reference implementation use cascaded second-order filters so is more numerically accurate.
"""
# create an impulse signal
x = torch.zeros(1024, dtype=self.dtype, device=self.device)
x[0] = 1
# get target impulse response
sos = signal.butter(9, 850, 'hp', fs=22050, output='sos')
y = torch.from_numpy(signal.sosfilt(sos, x.cpu().numpy())).to(self.dtype).to(self.device)
# get lfilter coefficients
b, a = signal.butter(9, 850, 'hp', fs=22050, output='ba')
b, a = torch.from_numpy(b).to(self.dtype).to(self.device), torch.from_numpy(
a).to(self.dtype).to(self.device)
# predict impulse response
yhat = F.lfilter(x, a, b, False)
self.assertEqual(yhat, y, atol=1e-4, rtol=1e-5)
def test_filtfilt_simple(self):
"""
Check that, for an arbitrary signal, applying filtfilt with filter coefficients
corresponding to a pure delay filter imparts no time delay.
"""
waveform = get_whitenoise(sample_rate=8000, n_channels=2, dtype=self.dtype).to(
device=self.device
)
b_coeffs = torch.tensor([0, 0, 0, 1], dtype=self.dtype, device=self.device)
a_coeffs = torch.tensor([1, 0, 0, 0], dtype=self.dtype, device=self.device)
padded_waveform = torch.cat(
(waveform, torch.zeros(2, 3, dtype=self.dtype, device=self.device)), axis=1
)
output_waveform = F.filtfilt(padded_waveform, a_coeffs, b_coeffs)
self.assertEqual(output_waveform, padded_waveform, atol=1e-5, rtol=1e-5)
def test_filtfilt_filter_sinusoid(self):
"""
Check that, for a signal comprising two sinusoids, applying filtfilt
with appropriate filter coefficients correctly removes the higher-frequency
sinusoid while imparting no time delay.
"""
T = 1.0
samples = 1000
waveform_k0 = get_sinusoid(
frequency=5, sample_rate=samples // T, dtype=self.dtype, device=self.device
).squeeze(0)
waveform_k1 = get_sinusoid(
frequency=200,
sample_rate=samples // T,
dtype=self.dtype,
device=self.device,
).squeeze(0)
waveform = waveform_k0 + waveform_k1
# Transfer function numerator and denominator polynomial coefficients
# corresponding to 8th-order Butterworth filter with 100-cycle/T cutoff.
# Generated with
# >>> from scipy import signal
# >>> b_coeffs, a_coeffs = signal.butter(8, 0.2)
b_coeffs = torch.tensor(
[
2.39596441e-05,
1.91677153e-04,
6.70870035e-04,
1.34174007e-03,
1.67717509e-03,
1.34174007e-03,
6.70870035e-04,
1.91677153e-04,
2.39596441e-05,
],
dtype=self.dtype,
device=self.device,
)
a_coeffs = torch.tensor(
[
1.0,
-4.78451489,
10.44504107,
-13.45771989,
11.12933104,
-6.0252604,
2.0792738,
-0.41721716,
0.0372001,
],
dtype=self.dtype,
device=self.device,
)
# Extend waveform in each direction, preserving periodicity.
padded_waveform = torch.cat((waveform[:-1], waveform, waveform[1:]))
output_waveform = F.filtfilt(padded_waveform, a_coeffs, b_coeffs)
# Remove padding from output waveform; confirm that result
# closely matches waveform_k0.
self.assertEqual(
output_waveform[samples - 1: 2 * samples - 1],
waveform_k0,
atol=1e-3,
rtol=1e-3,
)
@parameterized.expand([(0., ), (1., ), (2., ), (3., )])
def test_spectogram_grad_at_zero(self, power):
"""The gradient of power spectrogram should not be nan but zero near x=0
https://github.com/pytorch/audio/issues/993
"""
x = torch.zeros(1, 22050, requires_grad=True)
spec = F.spectrogram(
x,
pad=0,
window=None,
n_fft=2048,
hop_length=None,
win_length=None,
power=power,
normalized=False,
)
spec.sum().backward()
assert not x.grad.isnan().sum()
def test_compute_deltas_one_channel(self):
specgram = torch.tensor([[[1.0, 2.0, 3.0, 4.0]]], dtype=self.dtype, device=self.device)
expected = torch.tensor([[[0.5, 1.0, 1.0, 0.5]]], dtype=self.dtype, device=self.device)
computed = F.compute_deltas(specgram, win_length=3)
self.assertEqual(computed, expected)
def test_compute_deltas_two_channels(self):
specgram = torch.tensor([[[1.0, 2.0, 3.0, 4.0],
[1.0, 2.0, 3.0, 4.0]]], dtype=self.dtype, device=self.device)
expected = torch.tensor([[[0.5, 1.0, 1.0, 0.5],
[0.5, 1.0, 1.0, 0.5]]], dtype=self.dtype, device=self.device)
computed = F.compute_deltas(specgram, win_length=3)
self.assertEqual(computed, expected)
@parameterized.expand([(100,), (440,)])
def test_detect_pitch_frequency_pitch(self, frequency):
sample_rate = 44100
test_sine_waveform = get_sinusoid(
frequency=frequency, sample_rate=sample_rate, duration=5
)
freq = F.detect_pitch_frequency(test_sine_waveform, sample_rate)
threshold = 1
s = ((freq - frequency).abs() > threshold).sum()
self.assertFalse(s)
@parameterized.expand([([100, 100],), ([2, 100, 100],), ([2, 2, 100, 100],)])
def test_amplitude_to_DB_reversible(self, shape):
"""Round trip between amplitude and db should return the original for various shape
This implicitly also tests `DB_to_amplitude`.
"""
amplitude_mult = 20.
power_mult = 10.
amin = 1e-10
ref = 1.0
db_mult = math.log10(max(amin, ref))
torch.manual_seed(0)
spec = torch.rand(*shape, dtype=self.dtype, device=self.device) * 200
# Spectrogram amplitude -> DB -> amplitude
db = F.amplitude_to_DB(spec, amplitude_mult, amin, db_mult, top_db=None)
x2 = F.DB_to_amplitude(db, ref, 0.5)
self.assertEqual(x2, spec, atol=5e-5, rtol=1e-5)
# Spectrogram power -> DB -> power
db = F.amplitude_to_DB(spec, power_mult, amin, db_mult, top_db=None)
x2 = F.DB_to_amplitude(db, ref, 1.)
self.assertEqual(x2, spec)
@parameterized.expand([([100, 100],), ([2, 100, 100],), ([2, 2, 100, 100],)])
def test_amplitude_to_DB_top_db_clamp(self, shape):
"""Ensure values are properly clamped when `top_db` is supplied."""
amplitude_mult = 20.
amin = 1e-10
ref = 1.0
db_mult = math.log10(max(amin, ref))
top_db = 40.
torch.manual_seed(0)
# A random tensor is used for increased entropy, but the max and min for
# each spectrogram still need to be predictable. The max determines the
# decibel cutoff, and the distance from the min must be large enough
# that it triggers a clamp.
spec = torch.rand(*shape, dtype=self.dtype, device=self.device)
# Ensure each spectrogram has a min of 0 and a max of 1.
spec -= spec.amin([-2, -1])[..., None, None]
spec /= spec.amax([-2, -1])[..., None, None]
# Expand the range to (0, 200) - wide enough to properly test clamping.
spec *= 200
decibels = F.amplitude_to_DB(spec, amplitude_mult, amin,
db_mult, top_db=top_db)
# Ensure the clamp was applied
below_limit = decibels < 6.0205
assert not below_limit.any(), (
"{} decibel values were below the expected cutoff:\n{}".format(
below_limit.sum().item(), decibels
)
)
# Ensure it didn't over-clamp
close_to_limit = decibels < 6.0207
assert close_to_limit.any(), (
f"No values were close to the limit. Did it over-clamp?\n{decibels}"
)
@parameterized.expand(
list(itertools.product([(1, 2, 1025, 400, 2), (1025, 400, 2)], [1, 2, 0.7]))
)
def test_complex_norm(self, shape, power):
torch.random.manual_seed(42)
complex_tensor = torch.randn(*shape, dtype=self.dtype, device=self.device)
expected_norm_tensor = complex_tensor.pow(2).sum(-1).pow(power / 2)
norm_tensor = F.complex_norm(complex_tensor, power)
self.assertEqual(norm_tensor, expected_norm_tensor, atol=1e-5, rtol=1e-5)
@parameterized.expand(
list(itertools.product([(2, 1025, 400), (1, 201, 100)], [100], [0., 30.], [1, 2]))
)
def test_mask_along_axis(self, shape, mask_param, mask_value, axis):
torch.random.manual_seed(42)
specgram = torch.randn(*shape, dtype=self.dtype, device=self.device)
mask_specgram = F.mask_along_axis(specgram, mask_param, mask_value, axis)
other_axis = 1 if axis == 2 else 2
masked_columns = (mask_specgram == mask_value).sum(other_axis)
num_masked_columns = (masked_columns == mask_specgram.size(other_axis)).sum()
num_masked_columns = torch.div(
num_masked_columns, mask_specgram.size(0), rounding_mode='floor')
assert mask_specgram.size() == specgram.size()
assert num_masked_columns < mask_param
@parameterized.expand(list(itertools.product([100], [0., 30.], [2, 3])))
def test_mask_along_axis_iid(self, mask_param, mask_value, axis):
torch.random.manual_seed(42)
specgrams = torch.randn(4, 2, 1025, 400, dtype=self.dtype, device=self.device)
mask_specgrams = F.mask_along_axis_iid(specgrams, mask_param, mask_value, axis)
other_axis = 2 if axis == 3 else 3
masked_columns = (mask_specgrams == mask_value).sum(other_axis)
num_masked_columns = (masked_columns == mask_specgrams.size(other_axis)).sum(-1)
assert mask_specgrams.size() == specgrams.size()
assert (num_masked_columns < mask_param).sum() == num_masked_columns.numel()
@parameterized.expand(
list(itertools.product([(2, 1025, 400), (1, 201, 100)], [100], [0., 30.], [1, 2]))
)
def test_mask_along_axis_preserve(self, shape, mask_param, mask_value, axis):
"""mask_along_axis should not alter original input Tensor
Test is run 5 times to bound the probability of no masking occurring to 1e-10
See https://github.com/pytorch/audio/issues/1478
"""
torch.random.manual_seed(42)
for _ in range(5):
specgram = torch.randn(*shape, dtype=self.dtype, device=self.device)
specgram_copy = specgram.clone()
F.mask_along_axis(specgram, mask_param, mask_value, axis)
self.assertEqual(specgram, specgram_copy)
@parameterized.expand(list(itertools.product([100], [0., 30.], [2, 3])))
def test_mask_along_axis_iid_preserve(self, mask_param, mask_value, axis):
"""mask_along_axis_iid should not alter original input Tensor
Test is run 5 times to bound the probability of no masking occurring to 1e-10
See https://github.com/pytorch/audio/issues/1478
"""
torch.random.manual_seed(42)
for _ in range(5):
specgrams = torch.randn(4, 2, 1025, 400, dtype=self.dtype, device=self.device)
specgrams_copy = specgrams.clone()
F.mask_along_axis_iid(specgrams, mask_param, mask_value, axis)
self.assertEqual(specgrams, specgrams_copy)
@parameterized.expand(list(itertools.product(
["sinc_interpolation", "kaiser_window"],
[16000, 44100],
)))
def test_resample_identity(self, resampling_method, sample_rate):
waveform = get_whitenoise(sample_rate=sample_rate, duration=1)
resampled = F.resample(waveform, sample_rate, sample_rate)
self.assertEqual(waveform, resampled)
@parameterized.expand([("sinc_interpolation"), ("kaiser_window")])
def test_resample_waveform_upsample_size(self, resampling_method):
sr = 16000
waveform = get_whitenoise(sample_rate=sr, duration=0.5,)
upsampled = F.resample(waveform, sr, sr * 2, resampling_method=resampling_method)
assert upsampled.size(-1) == waveform.size(-1) * 2
@parameterized.expand([("sinc_interpolation"), ("kaiser_window")])
def test_resample_waveform_downsample_size(self, resampling_method):
sr = 16000
waveform = get_whitenoise(sample_rate=sr, duration=0.5,)
downsampled = F.resample(waveform, sr, sr // 2, resampling_method=resampling_method)
assert downsampled.size(-1) == waveform.size(-1) // 2
@parameterized.expand([("sinc_interpolation"), ("kaiser_window")])
def test_resample_waveform_identity_size(self, resampling_method):
sr = 16000
waveform = get_whitenoise(sample_rate=sr, duration=0.5,)
resampled = F.resample(waveform, sr, sr, resampling_method=resampling_method)
assert resampled.size(-1) == waveform.size(-1)
@parameterized.expand(list(itertools.product(
["sinc_interpolation", "kaiser_window"],
list(range(1, 20)),
)))
def test_resample_waveform_downsample_accuracy(self, resampling_method, i):
self._test_resample_waveform_accuracy(down_scale_factor=i * 2, resampling_method=resampling_method)
@parameterized.expand(list(itertools.product(
["sinc_interpolation", "kaiser_window"],
list(range(1, 20)),
)))
def test_resample_waveform_upsample_accuracy(self, resampling_method, i):
self._test_resample_waveform_accuracy(up_scale_factor=1.0 + i / 20.0, resampling_method=resampling_method)
@nested_params(
[0.5, 1.01, 1.3],
[True, False],
)
def test_phase_vocoder_shape(self, rate, test_pseudo_complex):
"""Verify the output shape of phase vocoder"""
hop_length = 256
num_freq = 1025
num_frames = 400
batch_size = 2
torch.random.manual_seed(42)
spec = torch.randn(
batch_size, num_freq, num_frames, dtype=self.complex_dtype, device=self.device)
if test_pseudo_complex:
spec = torch.view_as_real(spec)
phase_advance = torch.linspace(
0,
np.pi * hop_length,
num_freq,
dtype=self.dtype, device=self.device)[..., None]
spec_stretch = F.phase_vocoder(spec, rate=rate, phase_advance=phase_advance)
assert spec.dim() == spec_stretch.dim()
expected_shape = torch.Size([batch_size, num_freq, int(np.ceil(num_frames / rate))])
output_shape = (torch.view_as_complex(spec_stretch) if test_pseudo_complex else spec_stretch).shape
assert output_shape == expected_shape
@parameterized.expand(
[
# words
["", "", 0], # equal
["abc", "abc", 0],
["ᑌᑎIᑕO", "ᑌᑎIᑕO", 0],
["abc", "", 3], # deletion
["aa", "aaa", 1],
["aaa", "aa", 1],
["ᑌᑎI", "ᑌᑎIᑕO", 2],
["aaa", "aba", 1], # substitution
["aba", "aaa", 1],
["aba", " ", 3],
["abc", "bcd", 2], # mix deletion and substitution
["0ᑌᑎI", "ᑌᑎIᑕO", 3],
# sentences
[["hello", "", "Tᕮ᙭T"], ["hello", "", "Tᕮ᙭T"], 0], # equal
[[], [], 0],
[["hello", "world"], ["hello", "world", "!"], 1], # deletion
[["hello", "world"], ["world"], 1],
[["hello", "world"], [], 2],
[["Tᕮ᙭T", ], ["world"], 1], # substitution
[["Tᕮ᙭T", "XD"], ["world", "hello"], 2],
[["", "XD"], ["world", ""], 2],
["aba", " ", 3],
[["hello", "world"], ["world", "hello", "!"], 2], # mix deletion and substitution
[["Tᕮ᙭T", "world", "LOL", "XD"], ["world", "hello", "ʕ•́ᴥ•̀ʔっ"], 3],
]
)
def test_simple_case_edit_distance(self, seq1, seq2, distance):
assert F.edit_distance(seq1, seq2) == distance
assert F.edit_distance(seq2, seq1) == distance
@nested_params(
[-4, -2, 0, 2, 4],
)
def test_pitch_shift_shape(self, n_steps):
sample_rate = 16000
torch.random.manual_seed(42)
waveform = torch.rand(2, 44100 * 1, dtype=self.dtype, device=self.device)
waveform_shift = F.pitch_shift(waveform, sample_rate, n_steps)
assert waveform.size() == waveform_shift.size()
def test_rnnt_loss_basic_backward(self):
logits, targets, logit_lengths, target_lengths = rnnt_utils.get_basic_data(self.device)
loss = F.rnnt_loss(logits, targets, logit_lengths, target_lengths)
loss.backward()
def test_rnnt_loss_basic_forward_no_grad(self):
"""In early stage, calls to `rnnt_loss` resulted in segmentation fault when
`logits` have `requires_grad = False`. This test makes sure that this no longer
occurs and the functional call runs without error.
See https://github.com/pytorch/audio/pull/1707
"""
logits, targets, logit_lengths, target_lengths = rnnt_utils.get_basic_data(self.device)
logits.requires_grad_(False)
F.rnnt_loss(logits, targets, logit_lengths, target_lengths)
@parameterized.expand([
(rnnt_utils.get_B1_T2_U3_D5_data, torch.float32, 1e-6, 1e-2),
(rnnt_utils.get_B2_T4_U3_D3_data, torch.float32, 1e-6, 1e-2),
(rnnt_utils.get_B1_T2_U3_D5_data, torch.float16, 1e-3, 1e-2),
(rnnt_utils.get_B2_T4_U3_D3_data, torch.float16, 1e-3, 1e-2),
])
def test_rnnt_loss_costs_and_gradients(self, data_func, dtype, atol, rtol):
data, ref_costs, ref_gradients = data_func(
dtype=dtype,
device=self.device,
)
self._test_costs_and_gradients(
data=data,
ref_costs=ref_costs,
ref_gradients=ref_gradients,
atol=atol,
rtol=rtol,
)
def test_rnnt_loss_costs_and_gradients_random_data_with_numpy_fp32(self):
seed = 777
for i in range(5):
data = rnnt_utils.get_random_data(dtype=torch.float32, device=self.device, seed=(seed + i))
ref_costs, ref_gradients = rnnt_utils.compute_with_numpy_transducer(data=data)
self._test_costs_and_gradients(
data=data, ref_costs=ref_costs, ref_gradients=ref_gradients
)
class FunctionalCPUOnly(TestBaseMixin):
def test_melscale_fbanks_no_warning_high_n_freq(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
F.melscale_fbanks(288, 0, 8000, 128, 16000)
assert len(w) == 0
def test_melscale_fbanks_no_warning_low_n_mels(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
F.melscale_fbanks(201, 0, 8000, 89, 16000)
assert len(w) == 0
def test_melscale_fbanks_warning(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
F.melscale_fbanks(201, 0, 8000, 128, 16000)
assert len(w) == 1
import torch
from torchaudio_unittest.common_utils import PytorchTestCase
from .kaldi_compatibility_test_impl import Kaldi, KaldiCPUOnly
class TestKaldiCPUOnly(KaldiCPUOnly, PytorchTestCase):
dtype = torch.float32
device = torch.device('cpu')
class TestKaldiFloat32(Kaldi, PytorchTestCase):
dtype = torch.float32
device = torch.device('cpu')
class TestKaldiFloat64(Kaldi, PytorchTestCase):
dtype = torch.float64
device = torch.device('cpu')
import torch
from torchaudio_unittest.common_utils import PytorchTestCase, skipIfNoCuda
from .kaldi_compatibility_test_impl import Kaldi
@skipIfNoCuda
class TestKaldiFloat32(Kaldi, PytorchTestCase):
dtype = torch.float32
device = torch.device('cuda')
@skipIfNoCuda
class TestKaldiFloat64(Kaldi, PytorchTestCase):
dtype = torch.float64
device = torch.device('cuda')
from parameterized import parameterized
import torch
import torchaudio.functional as F
from torchaudio_unittest.common_utils import (
get_sinusoid,
load_params,
save_wav,
skipIfNoExec,
TempDirMixin,
TestBaseMixin,
)
from torchaudio_unittest.common_utils.kaldi_utils import (
convert_args,
run_kaldi,
)
class Kaldi(TempDirMixin, TestBaseMixin):
def assert_equal(self, output, *, expected, rtol=None, atol=None):
expected = expected.to(dtype=self.dtype, device=self.device)
self.assertEqual(output, expected, rtol=rtol, atol=atol)
@skipIfNoExec('apply-cmvn-sliding')
def test_sliding_window_cmn(self):
"""sliding_window_cmn should be numerically compatible with apply-cmvn-sliding"""
kwargs = {
'cmn_window': 600,
'min_cmn_window': 100,
'center': False,
'norm_vars': False,
}
tensor = torch.randn(40, 10, dtype=self.dtype, device=self.device)
result = F.sliding_window_cmn(tensor, **kwargs)
command = ['apply-cmvn-sliding'] + convert_args(**kwargs) + ['ark:-', 'ark:-']
kaldi_result = run_kaldi(command, 'ark', tensor)
self.assert_equal(result, expected=kaldi_result)
class KaldiCPUOnly(TempDirMixin, TestBaseMixin):
def assert_equal(self, output, *, expected, rtol=None, atol=None):
expected = expected.to(dtype=self.dtype, device=self.device)
self.assertEqual(output, expected, rtol=rtol, atol=atol)
@parameterized.expand(load_params('kaldi_test_pitch_args.jsonl'))
@skipIfNoExec('compute-kaldi-pitch-feats')
def test_pitch_feats(self, kwargs):
"""compute_kaldi_pitch produces numerically compatible result with compute-kaldi-pitch-feats"""
sample_rate = kwargs['sample_rate']
waveform = get_sinusoid(dtype='float32', sample_rate=sample_rate)
result = F.compute_kaldi_pitch(waveform[0], **kwargs)
waveform = get_sinusoid(dtype='int16', sample_rate=sample_rate)
wave_file = self.get_temp_path('test.wav')
save_wav(wave_file, waveform, sample_rate)
command = ['compute-kaldi-pitch-feats'] + convert_args(**kwargs) + ['scp:-', 'ark:-']
kaldi_result = run_kaldi(command, 'scp', wave_file)
self.assert_equal(result, expected=kaldi_result)
from torchaudio_unittest.common_utils import PytorchTestCase
from .librosa_compatibility_test_impl import Functional, FunctionalComplex
class TestFunctionalCPU(Functional, PytorchTestCase):
device = 'cpu'
class TestFunctionalComplexCPU(FunctionalComplex, PytorchTestCase):
device = 'cpu'
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment