Commit b4cc0f33 authored by hwangjeff's avatar hwangjeff Committed by Facebook GitHub Bot
Browse files

Move data augmentation transforms out of prototype (#3009)

Summary:
Moves `AddNoise`, `Convolve`, `FFTConvolve`, `Speed`, `SpeedPerturbation`, `Deemphasis`, and `Preemphasis` out of `torchaudio.prototype.transforms` and into `torchaudio.transforms`.

Pull Request resolved: https://github.com/pytorch/audio/pull/3009

Reviewed By: xiaohui-zhang, mthrok

Differential Revision: D42730322

Pulled By: hwangjeff

fbshipit-source-id: 43739ac31437150d3127e51eddc0f0bba5facb15
parent 7ea69e61
......@@ -9,13 +9,6 @@ torchaudio.prototype.transforms
:toctree: generated
:nosignatures:
AddNoise
Convolve
FFTConvolve
BarkScale
InverseBarkScale
BarkSpectrogram
Speed
SpeedPerturbation
Deemphasis
Preemphasis
......@@ -89,6 +89,13 @@ Utility
Fade
Vol
Loudness
AddNoise
Convolve
FFTConvolve
Speed
SpeedPerturbation
Deemphasis
Preemphasis
Feature Extractions
-------------------
......
......@@ -27,18 +27,6 @@ class Autograd(TestBaseMixin):
assert gradcheck(transform, inputs_)
assert gradgradcheck(transform, inputs_, nondet_tol=nondet_tol)
@nested_params(
[T.Convolve, T.FFTConvolve],
["full", "valid", "same"],
)
def test_Convolve(self, cls, mode):
leading_dims = (4, 3, 2)
L_x, L_y = 23, 40
x = torch.rand(*leading_dims, L_x, dtype=self.dtype, device=self.device)
y = torch.rand(*leading_dims, L_y, dtype=self.dtype, device=self.device)
convolve = cls(mode=mode).to(dtype=self.dtype, device=self.device)
self.assert_grad(convolve, [x, y])
def test_barkspectrogram(self):
# replication_pad1d_backward_cuda is not deteministic and
# gives very small (~e-16) difference.
......@@ -56,50 +44,3 @@ class Autograd(TestBaseMixin):
get_whitenoise(sample_rate=sample_rate, duration=0.05, n_channels=2), n_fft=n_fft, power=1
)
self.assert_grad(transform, [spec])
def test_Speed(self):
leading_dims = (3, 2)
time = 200
waveform = torch.rand(*leading_dims, time, dtype=torch.float64, device=self.device, requires_grad=True)
lengths = torch.randint(1, time, leading_dims, dtype=torch.float64, device=self.device)
speed = T.Speed(1000, 1.1).to(device=self.device, dtype=torch.float64)
assert gradcheck(speed, (waveform, lengths))
assert gradgradcheck(speed, (waveform, lengths))
def test_SpeedPerturbation(self):
leading_dims = (3, 2)
time = 200
waveform = torch.rand(*leading_dims, time, dtype=torch.float64, device=self.device, requires_grad=True)
lengths = torch.randint(1, time, leading_dims, dtype=torch.float64, device=self.device)
speed = T.SpeedPerturbation(1000, [0.9]).to(device=self.device, dtype=torch.float64)
assert gradcheck(speed, (waveform, lengths))
assert gradgradcheck(speed, (waveform, lengths))
@nested_params([True, False])
def test_AddNoise(self, use_lengths):
leading_dims = (2, 3)
L = 31
waveform = torch.rand(*leading_dims, L, dtype=torch.float64, device=self.device, requires_grad=True)
noise = torch.rand(*leading_dims, L, dtype=torch.float64, device=self.device, requires_grad=True)
if use_lengths:
lengths = torch.rand(*leading_dims, dtype=torch.float64, device=self.device, requires_grad=True)
else:
lengths = None
snr = torch.rand(*leading_dims, dtype=torch.float64, device=self.device, requires_grad=True) * 10
add_noise = T.AddNoise().to(self.device, torch.float64)
assert gradcheck(add_noise, (waveform, noise, snr, lengths))
assert gradgradcheck(add_noise, (waveform, noise, snr, lengths))
def test_Preemphasis(self):
waveform = torch.rand(3, 4, 10, dtype=torch.float64, device=self.device, requires_grad=True)
preemphasis = T.Preemphasis(coeff=0.97).to(dtype=torch.float64, device=self.device)
assert gradcheck(preemphasis, (waveform,))
assert gradgradcheck(preemphasis, (waveform,))
def test_Deemphasis(self):
waveform = torch.rand(3, 4, 10, dtype=torch.float64, device=self.device, requires_grad=True)
deemphasis = T.Deemphasis(coeff=0.97).to(dtype=torch.float64, device=self.device)
assert gradcheck(deemphasis, (waveform,))
assert gradgradcheck(deemphasis, (waveform,))
......@@ -3,7 +3,7 @@ import os
import torch
import torchaudio.prototype.transforms as T
import torchaudio.transforms as transforms
from torchaudio_unittest.common_utils import nested_params, TorchaudioTestCase
from torchaudio_unittest.common_utils import TorchaudioTestCase
class BatchConsistencyTest(TorchaudioTestCase):
......@@ -23,29 +23,6 @@ class BatchConsistencyTest(TorchaudioTestCase):
self.assertEqual(items_input, batch_input, rtol=rtol, atol=atol)
self.assertEqual(items_result, batch_result, rtol=rtol, atol=atol)
@nested_params(
[T.Convolve, T.FFTConvolve],
["full", "valid", "same"],
)
def test_Convolve(self, cls, mode):
leading_dims = (2, 3)
L_x, L_y = 89, 43
x = torch.rand(*leading_dims, L_x, dtype=self.dtype, device=self.device)
y = torch.rand(*leading_dims, L_y, dtype=self.dtype, device=self.device)
convolve = cls(mode=mode)
actual = convolve(x, y)
expected = torch.stack(
[
torch.stack(
[convolve(x[i, j].unsqueeze(0), y[i, j].unsqueeze(0)).squeeze(0) for j in range(leading_dims[1])]
)
for i in range(leading_dims[0])
]
)
self.assertEqual(expected, actual)
def test_batch_BarkScale(self):
specgram = torch.randn(3, 2, 201, 256)
......@@ -63,99 +40,3 @@ class BatchConsistencyTest(TorchaudioTestCase):
# Because InverseBarkScale runs SGD on randomly initialized values so they do not yield
# exactly same result. For this reason, tolerance is very relaxed here.
self.assert_batch_consistency(transform, bark_spec, atol=1.0, rtol=1e-5)
def test_Speed(self):
B = 5
orig_freq = 100
factor = 0.8
input_lengths = torch.randint(1, 1000, (B,), dtype=torch.int32)
speed = T.Speed(orig_freq, factor)
unbatched_input = [torch.ones((int(length),)) * 1.0 for length in input_lengths]
batched_input = torch.nn.utils.rnn.pad_sequence(unbatched_input, batch_first=True)
output, output_lengths = speed(batched_input, input_lengths)
unbatched_output = []
unbatched_output_lengths = []
for idx in range(len(unbatched_input)):
w, l = speed(unbatched_input[idx], input_lengths[idx])
unbatched_output.append(w)
unbatched_output_lengths.append(l)
self.assertEqual(output_lengths, torch.stack(unbatched_output_lengths))
for idx in range(len(unbatched_output)):
w, l = output[idx], output_lengths[idx]
self.assertEqual(unbatched_output[idx], w[:l])
def test_SpeedPerturbation(self):
B = 5
orig_freq = 100
factor = 0.8
input_lengths = torch.randint(1, 1000, (B,), dtype=torch.int32)
speed = T.SpeedPerturbation(orig_freq, [factor])
unbatched_input = [torch.ones((int(length),)) * 1.0 for length in input_lengths]
batched_input = torch.nn.utils.rnn.pad_sequence(unbatched_input, batch_first=True)
output, output_lengths = speed(batched_input, input_lengths)
unbatched_output = []
unbatched_output_lengths = []
for idx in range(len(unbatched_input)):
w, l = speed(unbatched_input[idx], input_lengths[idx])
unbatched_output.append(w)
unbatched_output_lengths.append(l)
self.assertEqual(output_lengths, torch.stack(unbatched_output_lengths))
for idx in range(len(unbatched_output)):
w, l = output[idx], output_lengths[idx]
self.assertEqual(unbatched_output[idx], w[:l])
def test_AddNoise(self):
leading_dims = (5, 2, 3)
L = 51
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
lengths = torch.rand(*leading_dims, dtype=self.dtype, device=self.device)
snr = torch.rand(*leading_dims, dtype=self.dtype, device=self.device) * 10
add_noise = T.AddNoise()
actual = add_noise(waveform, noise, snr, lengths)
expected = []
for i in range(leading_dims[0]):
for j in range(leading_dims[1]):
for k in range(leading_dims[2]):
expected.append(add_noise(waveform[i][j][k], noise[i][j][k], snr[i][j][k], lengths[i][j][k]))
self.assertEqual(torch.stack(expected), actual.reshape(-1, L))
def test_Preemphasis(self):
waveform = torch.rand((3, 5, 2, 100), dtype=self.dtype, device=self.device)
preemphasis = T.Preemphasis(coeff=0.97)
actual = preemphasis(waveform)
expected = []
for i in range(waveform.size(0)):
for j in range(waveform.size(1)):
for k in range(waveform.size(2)):
expected.append(preemphasis(waveform[i][j][k]))
self.assertEqual(torch.stack(expected), actual.reshape(-1, waveform.size(-1)))
def test_Deemphasis(self):
waveform = torch.rand((3, 5, 2, 100), dtype=self.dtype, device=self.device)
deemphasis = T.Deemphasis(coeff=0.97)
actual = deemphasis(waveform)
expected = []
for i in range(waveform.size(0)):
for j in range(waveform.size(1)):
for k in range(waveform.size(2)):
expected.append(deemphasis(waveform[i][j][k]))
self.assertEqual(torch.stack(expected), actual.reshape(-1, waveform.size(-1)))
import torch
from torchaudio_unittest.common_utils import PytorchTestCase
from .torchscript_consistency_impl import Transforms
class TestTransformsFloat32(Transforms, PytorchTestCase):
dtype = torch.float32
device = torch.device("cpu")
class TestTransformsFloat64(Transforms, PytorchTestCase):
dtype = torch.float64
device = torch.device("cpu")
import torch
from torchaudio_unittest.common_utils import PytorchTestCase, skipIfNoCuda
from .torchscript_consistency_impl import Transforms
@skipIfNoCuda
class TestTransformsFloat32(Transforms, PytorchTestCase):
dtype = torch.float32
device = torch.device("cuda")
@skipIfNoCuda
class TestTransformsFloat64(Transforms, PytorchTestCase):
dtype = torch.float64
device = torch.device("cuda")
import torch
import torchaudio.prototype.transforms as T
from torchaudio_unittest.common_utils import nested_params, TestBaseMixin, torch_script
class Transforms(TestBaseMixin):
@nested_params(
["Convolve", "FFTConvolve"],
["full", "valid", "same"],
)
def test_Convolve(self, cls, mode):
leading_dims = (2, 3, 2)
L_x, L_y = 32, 55
x = torch.rand(*leading_dims, L_x, dtype=self.dtype, device=self.device)
y = torch.rand(*leading_dims, L_y, dtype=self.dtype, device=self.device)
convolve = getattr(T, cls)(mode=mode).to(device=self.device, dtype=self.dtype)
output = convolve(x, y)
ts_output = torch_script(convolve)(x, y)
self.assertEqual(ts_output, output)
def test_Speed(self):
leading_dims = (3, 2)
time = 200
waveform = torch.rand(*leading_dims, time, dtype=self.dtype, device=self.device, requires_grad=True)
lengths = torch.randint(1, time, leading_dims, dtype=self.dtype, device=self.device)
speed = T.Speed(1000, 0.9).to(self.device, self.dtype)
output = speed(waveform, lengths)
ts_output = torch_script(speed)(waveform, lengths)
self.assertEqual(ts_output, output)
def test_SpeedPerturbation(self):
leading_dims = (3, 2)
time = 200
waveform = torch.rand(*leading_dims, time, dtype=self.dtype, device=self.device, requires_grad=True)
lengths = torch.randint(1, time, leading_dims, dtype=self.dtype, device=self.device)
speed = T.SpeedPerturbation(1000, [0.9]).to(self.device, self.dtype)
output = speed(waveform, lengths)
ts_output = torch_script(speed)(waveform, lengths)
self.assertEqual(ts_output, output)
@nested_params([True, False])
def test_AddNoise(self, use_lengths):
leading_dims = (2, 3)
L = 31
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device, requires_grad=True)
noise = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device, requires_grad=True)
if use_lengths:
lengths = torch.rand(*leading_dims, dtype=self.dtype, device=self.device, requires_grad=True)
else:
lengths = None
snr = torch.rand(*leading_dims, dtype=self.dtype, device=self.device, requires_grad=True) * 10
add_noise = T.AddNoise().to(self.device, self.dtype)
output = add_noise(waveform, noise, snr, lengths)
ts_output = torch_script(add_noise)(waveform, noise, snr, lengths)
self.assertEqual(ts_output, output)
def test_Preemphasis(self):
waveform = torch.rand(3, 4, 10, dtype=self.dtype, device=self.device)
preemphasis = T.Preemphasis(coeff=0.97).to(dtype=self.dtype, device=self.device)
output = preemphasis(waveform)
ts_output = torch_script(preemphasis)(waveform)
self.assertEqual(ts_output, output)
def test_Deemphasis(self):
waveform = torch.rand(3, 4, 10, dtype=self.dtype, device=self.device)
deemphasis = T.Deemphasis(coeff=0.97).to(dtype=self.dtype, device=self.device)
output = deemphasis(waveform)
ts_output = torch_script(deemphasis)(waveform)
self.assertEqual(ts_output, output)
import math
import random
from unittest.mock import patch
import numpy as np
import torch
import torchaudio.prototype.transforms as T
from parameterized import parameterized
from scipy import signal
from torchaudio.functional import lfilter, preemphasis
from torchaudio_unittest.common_utils import get_spectrogram, get_whitenoise, nested_params, TestBaseMixin
from torchaudio_unittest.common_utils import get_spectrogram, get_whitenoise, TestBaseMixin
def _get_ratio(mat):
......@@ -16,53 +8,6 @@ def _get_ratio(mat):
class TransformsTestImpl(TestBaseMixin):
@nested_params(
[(10, 4), (4, 3, 1, 2), (2,), ()],
[(100, 43), (21, 45)],
["full", "valid", "same"],
)
def test_Convolve(self, leading_dims, lengths, mode):
"""Check that convolve returns values identical to those that SciPy produces."""
L_x, L_y = lengths
x = torch.rand(*(leading_dims + (L_x,)), dtype=self.dtype, device=self.device)
y = torch.rand(*(leading_dims + (L_y,)), dtype=self.dtype, device=self.device)
convolve = T.Convolve(mode=mode).to(self.device)
actual = convolve(x, y)
num_signals = torch.tensor(leading_dims).prod() if leading_dims else 1
x_reshaped = x.reshape((num_signals, L_x))
y_reshaped = y.reshape((num_signals, L_y))
expected = [
signal.convolve(x_reshaped[i].detach().cpu().numpy(), y_reshaped[i].detach().cpu().numpy(), mode=mode)
for i in range(num_signals)
]
expected = torch.tensor(np.array(expected))
expected = expected.reshape(leading_dims + (-1,))
self.assertEqual(expected, actual)
@nested_params(
[(10, 4), (4, 3, 1, 2), (2,), ()],
[(100, 43), (21, 45)],
["full", "valid", "same"],
)
def test_FFTConvolve(self, leading_dims, lengths, mode):
"""Check that fftconvolve returns values identical to those that SciPy produces."""
L_x, L_y = lengths
x = torch.rand(*(leading_dims + (L_x,)), dtype=self.dtype, device=self.device)
y = torch.rand(*(leading_dims + (L_y,)), dtype=self.dtype, device=self.device)
convolve = T.FFTConvolve(mode=mode).to(self.device)
actual = convolve(x, y)
expected = signal.fftconvolve(x.detach().cpu().numpy(), y.detach().cpu().numpy(), axes=-1, mode=mode)
expected = torch.tensor(expected)
self.assertEqual(expected, actual)
def test_InverseBarkScale(self):
"""Gauge the quality of InverseBarkScale transform.
......@@ -105,146 +50,3 @@ class TransformsTestImpl(TestBaseMixin):
print(f"Ratio of relative diff smaller than {tol:e} is " f"{_get_ratio(relative_diff < tol)}")
assert _get_ratio(relative_diff < 1e-1) > 0.2
assert _get_ratio(relative_diff < 1e-3) > 2e-3
def test_Speed_identity(self):
"""speed of 1.0 does not alter input waveform and length"""
leading_dims = (5, 4, 2)
time = 1000
waveform = torch.rand(*leading_dims, time)
lengths = torch.randint(1, 1000, leading_dims)
speed = T.Speed(1000, 1.0)
actual_waveform, actual_lengths = speed(waveform, lengths)
self.assertEqual(waveform, actual_waveform)
self.assertEqual(lengths, actual_lengths)
@nested_params(
[0.8, 1.1, 1.2],
)
def test_Speed_accuracy(self, factor):
"""sinusoidal waveform is properly compressed by factor"""
n_to_trim = 20
sample_rate = 1000
freq = 2
times = torch.arange(0, 5, 1.0 / sample_rate)
waveform = torch.cos(2 * math.pi * freq * times).unsqueeze(0).to(self.device, self.dtype)
lengths = torch.tensor([waveform.size(1)])
speed = T.Speed(sample_rate, factor).to(self.device, self.dtype)
output, output_lengths = speed(waveform, lengths)
self.assertEqual(output.size(1), output_lengths[0])
new_times = torch.arange(0, 5 / factor, 1.0 / sample_rate)
expected_waveform = torch.cos(2 * math.pi * freq * factor * new_times).unsqueeze(0).to(self.device, self.dtype)
self.assertEqual(
expected_waveform[..., n_to_trim:-n_to_trim], output[..., n_to_trim:-n_to_trim], atol=1e-1, rtol=1e-4
)
def test_SpeedPerturbation(self):
"""sinusoidal waveform is properly compressed by sampled factors"""
n_to_trim = 20
sample_rate = 1000
freq = 2
times = torch.arange(0, 5, 1.0 / sample_rate)
waveform = torch.cos(2 * math.pi * freq * times).unsqueeze(0).to(self.device, self.dtype)
lengths = torch.tensor([waveform.size(1)])
factors = [0.8, 1.1, 1.0]
indices = random.choices(range(len(factors)), k=5)
speed_perturb = T.SpeedPerturbation(sample_rate, factors).to(self.device, self.dtype)
with patch("torch.randint", side_effect=indices):
for idx in indices:
output, output_lengths = speed_perturb(waveform, lengths)
self.assertEqual(output.size(1), output_lengths[0])
factor = factors[idx]
new_times = torch.arange(0, 5 / factor, 1.0 / sample_rate)
expected_waveform = (
torch.cos(2 * math.pi * freq * factor * new_times).unsqueeze(0).to(self.device, self.dtype)
)
self.assertEqual(
expected_waveform[..., n_to_trim:-n_to_trim],
output[..., n_to_trim:-n_to_trim],
atol=1e-1,
rtol=1e-4,
)
def test_AddNoise_broadcast(self):
"""Check that add_noise produces correct outputs when broadcasting input dimensions."""
leading_dims = (5, 2, 3)
L = 51
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(5, 1, 1, L, dtype=self.dtype, device=self.device)
lengths = torch.rand(5, 1, 3, dtype=self.dtype, device=self.device)
snr = torch.rand(1, 1, 1, dtype=self.dtype, device=self.device) * 10
add_noise = T.AddNoise()
actual = add_noise(waveform, noise, snr, lengths)
noise_expanded = noise.expand(*leading_dims, L)
snr_expanded = snr.expand(*leading_dims)
lengths_expanded = lengths.expand(*leading_dims)
expected = add_noise(waveform, noise_expanded, snr_expanded, lengths_expanded)
self.assertEqual(expected, actual)
@parameterized.expand(
[((5, 2, 3), (2, 1, 1), (5, 2), (5, 2, 3)), ((2, 1), (5,), (5,), (5,)), ((3,), (5, 2, 3), (2, 1, 1), (5, 2))]
)
def test_AddNoise_leading_dim_check(self, waveform_dims, noise_dims, lengths_dims, snr_dims):
"""Check that add_noise properly rejects inputs with different leading dimension lengths."""
L = 51
waveform = torch.rand(*waveform_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(*noise_dims, L, dtype=self.dtype, device=self.device)
lengths = torch.rand(*lengths_dims, dtype=self.dtype, device=self.device)
snr = torch.rand(*snr_dims, dtype=self.dtype, device=self.device) * 10
add_noise = T.AddNoise()
with self.assertRaisesRegex(ValueError, "Input leading dimensions"):
add_noise(waveform, noise, snr, lengths)
def test_AddNoise_length_check(self):
"""Check that add_noise properly rejects inputs that have inconsistent length dimensions."""
leading_dims = (5, 2, 3)
L = 51
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(*leading_dims, 50, dtype=self.dtype, device=self.device)
lengths = torch.rand(*leading_dims, dtype=self.dtype, device=self.device)
snr = torch.rand(*leading_dims, dtype=self.dtype, device=self.device) * 10
add_noise = T.AddNoise()
with self.assertRaisesRegex(ValueError, "Length dimensions"):
add_noise(waveform, noise, snr, lengths)
@nested_params(
[(2, 1, 31)],
[0.97, 0.72],
)
def test_Preemphasis(self, input_shape, coeff):
waveform = torch.rand(*input_shape, dtype=self.dtype, device=self.device)
preemphasis = T.Preemphasis(coeff=coeff).to(dtype=self.dtype, device=self.device)
actual = preemphasis(waveform)
a_coeffs = torch.tensor([1.0, 0.0], device=self.device, dtype=self.dtype)
b_coeffs = torch.tensor([1.0, -coeff], device=self.device, dtype=self.dtype)
expected = lfilter(waveform, a_coeffs=a_coeffs, b_coeffs=b_coeffs)
self.assertEqual(actual, expected)
@nested_params(
[(2, 1, 31)],
[0.97, 0.72],
)
def test_Deemphasis(self, input_shape, coeff):
waveform = torch.rand(*input_shape, dtype=self.dtype, device=self.device)
preemphasized = preemphasis(waveform, coeff=coeff)
deemphasis = T.Deemphasis(coeff=coeff).to(dtype=self.dtype, device=self.device)
deemphasized = deemphasis(preemphasized)
self.assertEqual(deemphasized, waveform)
......@@ -28,6 +28,7 @@ class AutogradTestMixin(TestBaseMixin):
inputs: List[torch.Tensor],
*,
nondet_tol: float = 0.0,
enable_all_grad: bool = True,
):
transform = transform.to(dtype=torch.float64, device=self.device)
......@@ -37,7 +38,8 @@ class AutogradTestMixin(TestBaseMixin):
for i in inputs:
if torch.is_tensor(i):
i = i.to(dtype=torch.cdouble if i.is_complex() else torch.double, device=self.device)
i.requires_grad = True
if enable_all_grad:
i.requires_grad = True
inputs_.append(i)
assert gradcheck(transform, inputs_)
assert gradgradcheck(transform, inputs_, nondet_tol=nondet_tol)
......@@ -317,6 +319,61 @@ class AutogradTestMixin(TestBaseMixin):
reference_channel = 0
self.assert_grad(transform, [specgram, psd_s, psd_n, reference_channel])
@nested_params(
["Convolve", "FFTConvolve"],
["full", "valid", "same"],
)
def test_convolve(self, cls, mode):
leading_dims = (4, 3, 2)
L_x, L_y = 23, 40
x = torch.rand(*leading_dims, L_x)
y = torch.rand(*leading_dims, L_y)
convolve = getattr(T, cls)(mode=mode)
self.assert_grad(convolve, [x, y])
def test_speed(self):
leading_dims = (3, 2)
time = 200
waveform = torch.rand(*leading_dims, time, requires_grad=True)
lengths = torch.randint(1, time, leading_dims)
speed = T.Speed(1000, 1.1)
self.assert_grad(speed, (waveform, lengths), enable_all_grad=False)
def test_speed_perturbation(self):
leading_dims = (3, 2)
time = 200
waveform = torch.rand(*leading_dims, time, requires_grad=True)
lengths = torch.randint(1, time, leading_dims)
speed = T.SpeedPerturbation(1000, [0.9])
self.assert_grad(speed, (waveform, lengths), enable_all_grad=False)
@nested_params([True, False])
def test_add_noise(self, use_lengths):
leading_dims = (2, 3)
L = 31
waveform = torch.rand(*leading_dims, L)
noise = torch.rand(*leading_dims, L)
if use_lengths:
lengths = torch.rand(*leading_dims)
else:
lengths = None
snr = torch.rand(*leading_dims)
add_noise = T.AddNoise()
self.assert_grad(add_noise, (waveform, noise, snr, lengths))
def test_preemphasis(self):
waveform = torch.rand(3, 4, 10)
preemphasis = T.Preemphasis(coeff=0.97)
self.assert_grad(preemphasis, (waveform,))
def test_deemphasis(self):
waveform = torch.rand(3, 4, 10)
deemphasis = T.Deemphasis(coeff=0.97)
self.assert_grad(deemphasis, (waveform,))
class AutogradTestFloat32(TestBaseMixin):
def assert_grad(
......
......@@ -257,3 +257,122 @@ class TestTransforms(common_utils.TorchaudioTestCase):
computed = transform(specgram, psd_s, psd_n, reference_channel)
self.assertEqual(computed, expected)
@common_utils.nested_params(
["Convolve", "FFTConvolve"],
["full", "valid", "same"],
)
def test_convolve(self, cls, mode):
leading_dims = (2, 3)
L_x, L_y = 89, 43
x = torch.rand(*leading_dims, L_x, dtype=self.dtype, device=self.device)
y = torch.rand(*leading_dims, L_y, dtype=self.dtype, device=self.device)
convolve = getattr(T, cls)(mode=mode)
actual = convolve(x, y)
expected = torch.stack(
[
torch.stack(
[convolve(x[i, j].unsqueeze(0), y[i, j].unsqueeze(0)).squeeze(0) for j in range(leading_dims[1])]
)
for i in range(leading_dims[0])
]
)
self.assertEqual(expected, actual)
def test_speed(self):
B = 5
orig_freq = 100
factor = 0.8
input_lengths = torch.randint(1, 1000, (B,), dtype=torch.int32)
speed = T.Speed(orig_freq, factor)
unbatched_input = [torch.ones((int(length),)) * 1.0 for length in input_lengths]
batched_input = torch.nn.utils.rnn.pad_sequence(unbatched_input, batch_first=True)
output, output_lengths = speed(batched_input, input_lengths)
unbatched_output = []
unbatched_output_lengths = []
for idx in range(len(unbatched_input)):
w, l = speed(unbatched_input[idx], input_lengths[idx])
unbatched_output.append(w)
unbatched_output_lengths.append(l)
self.assertEqual(output_lengths, torch.stack(unbatched_output_lengths))
for idx in range(len(unbatched_output)):
w, l = output[idx], output_lengths[idx]
self.assertEqual(unbatched_output[idx], w[:l])
def test_speed_perturbation(self):
B = 5
orig_freq = 100
factor = 0.8
input_lengths = torch.randint(1, 1000, (B,), dtype=torch.int32)
speed = T.SpeedPerturbation(orig_freq, [factor])
unbatched_input = [torch.ones((int(length),)) * 1.0 for length in input_lengths]
batched_input = torch.nn.utils.rnn.pad_sequence(unbatched_input, batch_first=True)
output, output_lengths = speed(batched_input, input_lengths)
unbatched_output = []
unbatched_output_lengths = []
for idx in range(len(unbatched_input)):
w, l = speed(unbatched_input[idx], input_lengths[idx])
unbatched_output.append(w)
unbatched_output_lengths.append(l)
self.assertEqual(output_lengths, torch.stack(unbatched_output_lengths))
for idx in range(len(unbatched_output)):
w, l = output[idx], output_lengths[idx]
self.assertEqual(unbatched_output[idx], w[:l])
def test_add_noise(self):
leading_dims = (5, 2, 3)
L = 51
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
lengths = torch.rand(*leading_dims, dtype=self.dtype, device=self.device)
snr = torch.rand(*leading_dims, dtype=self.dtype, device=self.device) * 10
add_noise = T.AddNoise()
actual = add_noise(waveform, noise, snr, lengths)
expected = []
for i in range(leading_dims[0]):
for j in range(leading_dims[1]):
for k in range(leading_dims[2]):
expected.append(add_noise(waveform[i][j][k], noise[i][j][k], snr[i][j][k], lengths[i][j][k]))
self.assertEqual(torch.stack(expected), actual.reshape(-1, L))
def test_preemphasis(self):
waveform = torch.rand((3, 5, 2, 100), dtype=self.dtype, device=self.device)
preemphasis = T.Preemphasis(coeff=0.97)
actual = preemphasis(waveform)
expected = []
for i in range(waveform.size(0)):
for j in range(waveform.size(1)):
for k in range(waveform.size(2)):
expected.append(preemphasis(waveform[i][j][k]))
self.assertEqual(torch.stack(expected), actual.reshape(-1, waveform.size(-1)))
def test_deemphasis(self):
waveform = torch.rand((3, 5, 2, 100), dtype=self.dtype, device=self.device)
deemphasis = T.Deemphasis(coeff=0.97)
actual = deemphasis(waveform)
expected = []
for i in range(waveform.size(0)):
for j in range(waveform.size(1)):
for k in range(waveform.size(2)):
expected.append(deemphasis(waveform[i][j][k]))
self.assertEqual(torch.stack(expected), actual.reshape(-1, waveform.size(-1)))
......@@ -192,6 +192,75 @@ class Transforms(TestBaseMixin):
reference_channel = 0
self._assert_consistency_complex(T.SoudenMVDR(), specgram, psd_s, psd_n, reference_channel)
@common_utils.nested_params(
["Convolve", "FFTConvolve"],
["full", "valid", "same"],
)
def test_convolve(self, cls, mode):
leading_dims = (2, 3, 2)
L_x, L_y = 32, 55
x = torch.rand(*leading_dims, L_x, dtype=self.dtype, device=self.device)
y = torch.rand(*leading_dims, L_y, dtype=self.dtype, device=self.device)
convolve = getattr(T, cls)(mode=mode).to(device=self.device, dtype=self.dtype)
output = convolve(x, y)
ts_output = torch_script(convolve)(x, y)
self.assertEqual(ts_output, output)
def test_speed(self):
leading_dims = (3, 2)
time = 200
waveform = torch.rand(*leading_dims, time, dtype=self.dtype, device=self.device, requires_grad=True)
lengths = torch.randint(1, time, leading_dims, dtype=self.dtype, device=self.device)
speed = T.Speed(1000, 0.9).to(self.device, self.dtype)
output = speed(waveform, lengths)
ts_output = torch_script(speed)(waveform, lengths)
self.assertEqual(ts_output, output)
def test_speed_perturbation(self):
leading_dims = (3, 2)
time = 200
waveform = torch.rand(*leading_dims, time, dtype=self.dtype, device=self.device, requires_grad=True)
lengths = torch.randint(1, time, leading_dims, dtype=self.dtype, device=self.device)
speed = T.SpeedPerturbation(1000, [0.9]).to(self.device, self.dtype)
output = speed(waveform, lengths)
ts_output = torch_script(speed)(waveform, lengths)
self.assertEqual(ts_output, output)
@common_utils.nested_params([True, False])
def test_add_noise(self, use_lengths):
leading_dims = (2, 3)
L = 31
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device, requires_grad=True)
noise = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device, requires_grad=True)
if use_lengths:
lengths = torch.rand(*leading_dims, dtype=self.dtype, device=self.device, requires_grad=True)
else:
lengths = None
snr = torch.rand(*leading_dims, dtype=self.dtype, device=self.device, requires_grad=True) * 10
add_noise = T.AddNoise().to(self.device, self.dtype)
output = add_noise(waveform, noise, snr, lengths)
ts_output = torch_script(add_noise)(waveform, noise, snr, lengths)
self.assertEqual(ts_output, output)
def test_preemphasis(self):
waveform = torch.rand(3, 4, 10, dtype=self.dtype, device=self.device)
preemphasis = T.Preemphasis(coeff=0.97).to(dtype=self.dtype, device=self.device)
output = preemphasis(waveform)
ts_output = torch_script(preemphasis)(waveform)
self.assertEqual(ts_output, output)
def test_deemphasis(self):
waveform = torch.rand(3, 4, 10, dtype=self.dtype, device=self.device)
deemphasis = T.Deemphasis(coeff=0.97).to(dtype=self.dtype, device=self.device)
output = deemphasis(waveform)
ts_output = torch_script(deemphasis)(waveform)
self.assertEqual(ts_output, output)
class TransformsFloat32Only(TestBaseMixin):
def test_rnnt_loss(self):
......
import math
import random
from unittest.mock import patch
import numpy as np
import torch
import torchaudio.transforms as T
from parameterized import param, parameterized
from scipy import signal
from torchaudio.functional import lfilter, preemphasis
from torchaudio.functional.functional import _get_sinc_resample_kernel
from torchaudio_unittest.common_utils import get_spectrogram, get_whitenoise, nested_params, TestBaseMixin
from torchaudio_unittest.common_utils.psd_utils import psd_numpy
......@@ -158,3 +165,193 @@ class TransformsTestBase(TestBaseMixin):
trans.orig_freq, sample_rate, trans.gcd, device=self.device, dtype=self.dtype
)
self.assertEqual(trans.kernel, expected)
@nested_params(
[(10, 4), (4, 3, 1, 2), (2,), ()],
[(100, 43), (21, 45)],
["full", "valid", "same"],
)
def test_convolve(self, leading_dims, lengths, mode):
"""Check that Convolve returns values identical to those that SciPy produces."""
L_x, L_y = lengths
x = torch.rand(*(leading_dims + (L_x,)), dtype=self.dtype, device=self.device)
y = torch.rand(*(leading_dims + (L_y,)), dtype=self.dtype, device=self.device)
convolve = T.Convolve(mode=mode).to(self.device)
actual = convolve(x, y)
num_signals = torch.tensor(leading_dims).prod() if leading_dims else 1
x_reshaped = x.reshape((num_signals, L_x))
y_reshaped = y.reshape((num_signals, L_y))
expected = [
signal.convolve(x_reshaped[i].detach().cpu().numpy(), y_reshaped[i].detach().cpu().numpy(), mode=mode)
for i in range(num_signals)
]
expected = torch.tensor(np.array(expected))
expected = expected.reshape(leading_dims + (-1,))
self.assertEqual(expected, actual)
@nested_params(
[(10, 4), (4, 3, 1, 2), (2,), ()],
[(100, 43), (21, 45)],
["full", "valid", "same"],
)
def test_fftconvolve(self, leading_dims, lengths, mode):
"""Check that FFTConvolve returns values identical to those that SciPy produces."""
L_x, L_y = lengths
x = torch.rand(*(leading_dims + (L_x,)), dtype=self.dtype, device=self.device)
y = torch.rand(*(leading_dims + (L_y,)), dtype=self.dtype, device=self.device)
convolve = T.FFTConvolve(mode=mode).to(self.device)
actual = convolve(x, y)
expected = signal.fftconvolve(x.detach().cpu().numpy(), y.detach().cpu().numpy(), axes=-1, mode=mode)
expected = torch.tensor(expected)
self.assertEqual(expected, actual)
def test_speed_identity(self):
"""speed of 1.0 does not alter input waveform and length"""
leading_dims = (5, 4, 2)
time = 1000
waveform = torch.rand(*leading_dims, time)
lengths = torch.randint(1, 1000, leading_dims)
speed = T.Speed(1000, 1.0)
actual_waveform, actual_lengths = speed(waveform, lengths)
self.assertEqual(waveform, actual_waveform)
self.assertEqual(lengths, actual_lengths)
@nested_params(
[0.8, 1.1, 1.2],
)
def test_speed_accuracy(self, factor):
"""sinusoidal waveform is properly compressed by factor"""
n_to_trim = 20
sample_rate = 1000
freq = 2
times = torch.arange(0, 5, 1.0 / sample_rate)
waveform = torch.cos(2 * math.pi * freq * times).unsqueeze(0).to(self.device, self.dtype)
lengths = torch.tensor([waveform.size(1)])
speed = T.Speed(sample_rate, factor).to(self.device, self.dtype)
output, output_lengths = speed(waveform, lengths)
self.assertEqual(output.size(1), output_lengths[0])
new_times = torch.arange(0, 5 / factor, 1.0 / sample_rate)
expected_waveform = torch.cos(2 * math.pi * freq * factor * new_times).unsqueeze(0).to(self.device, self.dtype)
self.assertEqual(
expected_waveform[..., n_to_trim:-n_to_trim], output[..., n_to_trim:-n_to_trim], atol=1e-1, rtol=1e-4
)
def test_speed_perturbation(self):
"""sinusoidal waveform is properly compressed by sampled factors"""
n_to_trim = 20
sample_rate = 1000
freq = 2
times = torch.arange(0, 5, 1.0 / sample_rate)
waveform = torch.cos(2 * math.pi * freq * times).unsqueeze(0).to(self.device, self.dtype)
lengths = torch.tensor([waveform.size(1)])
factors = [0.8, 1.1, 1.0]
indices = random.choices(range(len(factors)), k=5)
speed_perturb = T.SpeedPerturbation(sample_rate, factors).to(self.device, self.dtype)
with patch("torch.randint", side_effect=indices):
for idx in indices:
output, output_lengths = speed_perturb(waveform, lengths)
self.assertEqual(output.size(1), output_lengths[0])
factor = factors[idx]
new_times = torch.arange(0, 5 / factor, 1.0 / sample_rate)
expected_waveform = (
torch.cos(2 * math.pi * freq * factor * new_times).unsqueeze(0).to(self.device, self.dtype)
)
self.assertEqual(
expected_waveform[..., n_to_trim:-n_to_trim],
output[..., n_to_trim:-n_to_trim],
atol=1e-1,
rtol=1e-4,
)
def test_add_noise_broadcast(self):
"""Check that AddNoise produces correct outputs when broadcasting input dimensions."""
leading_dims = (5, 2, 3)
L = 51
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(5, 1, 1, L, dtype=self.dtype, device=self.device)
lengths = torch.rand(5, 1, 3, dtype=self.dtype, device=self.device)
snr = torch.rand(1, 1, 1, dtype=self.dtype, device=self.device) * 10
add_noise = T.AddNoise()
actual = add_noise(waveform, noise, snr, lengths)
noise_expanded = noise.expand(*leading_dims, L)
snr_expanded = snr.expand(*leading_dims)
lengths_expanded = lengths.expand(*leading_dims)
expected = add_noise(waveform, noise_expanded, snr_expanded, lengths_expanded)
self.assertEqual(expected, actual)
@parameterized.expand(
[((5, 2, 3), (2, 1, 1), (5, 2), (5, 2, 3)), ((2, 1), (5,), (5,), (5,)), ((3,), (5, 2, 3), (2, 1, 1), (5, 2))]
)
def test_add_noise_leading_dim_check(self, waveform_dims, noise_dims, lengths_dims, snr_dims):
"""Check that AddNoise properly rejects inputs with different leading dimension lengths."""
L = 51
waveform = torch.rand(*waveform_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(*noise_dims, L, dtype=self.dtype, device=self.device)
lengths = torch.rand(*lengths_dims, dtype=self.dtype, device=self.device)
snr = torch.rand(*snr_dims, dtype=self.dtype, device=self.device) * 10
add_noise = T.AddNoise()
with self.assertRaisesRegex(ValueError, "Input leading dimensions"):
add_noise(waveform, noise, snr, lengths)
def test_add_noise_length_check(self):
"""Check that add_noise properly rejects inputs that have inconsistent length dimensions."""
leading_dims = (5, 2, 3)
L = 51
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(*leading_dims, 50, dtype=self.dtype, device=self.device)
lengths = torch.rand(*leading_dims, dtype=self.dtype, device=self.device)
snr = torch.rand(*leading_dims, dtype=self.dtype, device=self.device) * 10
add_noise = T.AddNoise()
with self.assertRaisesRegex(ValueError, "Length dimensions"):
add_noise(waveform, noise, snr, lengths)
@nested_params(
[(2, 1, 31)],
[0.97, 0.72],
)
def test_preemphasis(self, input_shape, coeff):
waveform = torch.rand(*input_shape, dtype=self.dtype, device=self.device)
preemphasis = T.Preemphasis(coeff=coeff).to(dtype=self.dtype, device=self.device)
actual = preemphasis(waveform)
a_coeffs = torch.tensor([1.0, 0.0], device=self.device, dtype=self.dtype)
b_coeffs = torch.tensor([1.0, -coeff], device=self.device, dtype=self.dtype)
expected = lfilter(waveform, a_coeffs=a_coeffs, b_coeffs=b_coeffs)
self.assertEqual(actual, expected)
@nested_params(
[(2, 1, 31)],
[0.97, 0.72],
)
def test_deemphasis(self, input_shape, coeff):
waveform = torch.rand(*input_shape, dtype=self.dtype, device=self.device)
preemphasized = preemphasis(waveform, coeff=coeff)
deemphasis = T.Deemphasis(coeff=coeff).to(dtype=self.dtype, device=self.device)
deemphasized = deemphasis(preemphasized)
self.assertEqual(deemphasized, waveform)
from ._transforms import (
AddNoise,
BarkScale,
BarkSpectrogram,
Convolve,
Deemphasis,
FFTConvolve,
InverseBarkScale,
Preemphasis,
Speed,
SpeedPerturbation,
)
from ._transforms import BarkScale, BarkSpectrogram, InverseBarkScale
__all__ = [
"AddNoise",
"BarkScale",
"BarkSpectrogram",
"Convolve",
"Deemphasis",
"FFTConvolve",
"InverseBarkScale",
"Preemphasis",
"SpeedPerturbation",
"Speed",
]
import math
from typing import Callable, Optional, Sequence, Tuple
from typing import Callable, Optional
import torch
from torchaudio.functional import add_noise, convolve, deemphasis, fftconvolve, preemphasis
from torchaudio.functional.functional import _check_convolve_mode
from torchaudio.prototype.functional import barkscale_fbanks
from torchaudio.transforms import Resample, Spectrogram
class Convolve(torch.nn.Module):
r"""
Convolves inputs along their last dimension using the direct method.
Note that, in contrast to :class:`torch.nn.Conv1d`, which actually applies the valid cross-correlation
operator, this module applies the true `convolution`_ operator.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
mode (str, optional): Must be one of ("full", "valid", "same").
* "full": Returns the full convolution result, with shape `(..., N + M - 1)`, where
`N` and `M` are the trailing dimensions of the two inputs. (Default)
* "valid": Returns the segment of the full convolution result corresponding to where
the two inputs overlap completely, with shape `(..., max(N, M) - min(N, M) + 1)`.
* "same": Returns the center segment of the full convolution result, with shape `(..., N)`.
.. _convolution:
https://en.wikipedia.org/wiki/Convolution
"""
def __init__(self, mode: str = "full") -> None:
_check_convolve_mode(mode)
super().__init__()
self.mode = mode
def forward(self, x: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
r"""
Args:
x (torch.Tensor): First convolution operand, with shape `(..., N)`.
y (torch.Tensor): Second convolution operand, with shape `(..., M)`
(leading dimensions must match those of ``x``).
Returns:
torch.Tensor: Result of convolving ``x`` and ``y``, with shape `(..., L)`, where
the leading dimensions match those of ``x`` and `L` is dictated by ``mode``.
"""
return convolve(x, y, mode=self.mode)
class FFTConvolve(torch.nn.Module):
r"""
Convolves inputs along their last dimension using FFT. For inputs with large last dimensions, this module
is generally much faster than :class:`Convolve`.
Note that, in contrast to :class:`torch.nn.Conv1d`, which actually applies the valid cross-correlation
operator, this module applies the true `convolution`_ operator.
Also note that this module can only output float tensors (int tensor inputs will be cast to float).
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
mode (str, optional): Must be one of ("full", "valid", "same").
* "full": Returns the full convolution result, with shape `(..., N + M - 1)`, where
`N` and `M` are the trailing dimensions of the two inputs. (Default)
* "valid": Returns the segment of the full convolution result corresponding to where
the two inputs overlap completely, with shape `(..., max(N, M) - min(N, M) + 1)`.
* "same": Returns the center segment of the full convolution result, with shape `(..., N)`.
.. _convolution:
https://en.wikipedia.org/wiki/Convolution
"""
def __init__(self, mode: str = "full") -> None:
_check_convolve_mode(mode)
super().__init__()
self.mode = mode
def forward(self, x: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
r"""
Args:
x (torch.Tensor): First convolution operand, with shape `(..., N)`.
y (torch.Tensor): Second convolution operand, with shape `(..., M)`
(leading dimensions must be broadcast-able to those of ``x``).
Returns:
torch.Tensor: Result of convolving ``x`` and ``y``, with shape `(..., L)`, where
the leading dimensions match those of ``x`` and `L` is dictated by ``mode``.
"""
return fftconvolve(x, y, mode=self.mode)
from torchaudio.transforms import Spectrogram
class BarkScale(torch.nn.Module):
......@@ -386,185 +295,3 @@ class BarkSpectrogram(torch.nn.Module):
specgram = self.spectrogram(waveform)
bark_specgram = self.bark_scale(specgram)
return bark_specgram
def _source_target_sample_rate(orig_freq: int, speed: float) -> Tuple[int, int]:
source_sample_rate = int(speed * orig_freq)
target_sample_rate = int(orig_freq)
gcd = math.gcd(source_sample_rate, target_sample_rate)
return source_sample_rate // gcd, target_sample_rate // gcd
class Speed(torch.nn.Module):
r"""Adjusts waveform speed.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
orig_freq (int): Original frequency of the signals in ``waveform``.
factor (float): Factor by which to adjust speed of input. Values greater than 1.0
compress ``waveform`` in time, whereas values less than 1.0 stretch ``waveform`` in time.
"""
def __init__(self, orig_freq, factor) -> None:
super().__init__()
self.orig_freq = orig_freq
self.factor = factor
self.source_sample_rate, self.target_sample_rate = _source_target_sample_rate(orig_freq, factor)
self.resampler = Resample(orig_freq=self.source_sample_rate, new_freq=self.target_sample_rate)
def forward(self, waveform, lengths) -> Tuple[torch.Tensor, torch.Tensor]:
r"""
Args:
waveform (torch.Tensor): Input signals, with shape `(..., time)`.
lengths (torch.Tensor): Valid lengths of signals in ``waveform``, with shape `(...)`.
Returns:
(torch.Tensor, torch.Tensor):
torch.Tensor
Speed-adjusted waveform, with shape `(..., new_time).`
torch.Tensor
Valid lengths of signals in speed-adjusted waveform, with shape `(...)`.
"""
return (
self.resampler(waveform),
torch.ceil(lengths * self.target_sample_rate / self.source_sample_rate).to(lengths.dtype),
)
class SpeedPerturbation(torch.nn.Module):
r"""Applies the speed perturbation augmentation introduced in
*Audio augmentation for speech recognition* :cite:`ko15_interspeech`. For a given input,
the module samples a speed-up factor from ``factors`` uniformly at random and adjusts
the speed of the input by that factor.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
orig_freq (int): Original frequency of the signals in ``waveform``.
factors (Sequence[float]): Factors by which to adjust speed of input. Values greater than 1.0
compress ``waveform`` in time, whereas values less than 1.0 stretch ``waveform`` in time.
Example
>>> speed_perturb = SpeedPerturbation(16000, [0.9, 1.1, 1.0, 1.0, 1.0])
>>> # waveform speed will be adjusted by factor 0.9 with 20% probability,
>>> # 1.1 with 20% probability, and 1.0 (i.e. kept the same) with 60% probability.
>>> speed_perturbed_waveform = speed_perturb(waveform, lengths)
"""
def __init__(self, orig_freq: int, factors: Sequence[float]) -> None:
super().__init__()
self.speeders = torch.nn.ModuleList([Speed(orig_freq=orig_freq, factor=factor) for factor in factors])
def forward(self, waveform: torch.Tensor, lengths: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
r"""
Args:
waveform (torch.Tensor): Input signals, with shape `(..., time)`.
lengths (torch.Tensor): Valid lengths of signals in ``waveform``, with shape `(...)`.
Returns:
(torch.Tensor, torch.Tensor):
torch.Tensor
Speed-adjusted waveform, with shape `(..., new_time).`
torch.Tensor
Valid lengths of signals in speed-adjusted waveform, with shape `(...)`.
"""
idx = int(torch.randint(len(self.speeders), ()))
# NOTE: we do this because TorchScript doesn't allow for
# indexing ModuleList instances with non-literals.
for speeder_idx, speeder in enumerate(self.speeders):
if idx == speeder_idx:
return speeder(waveform, lengths)
raise RuntimeError("Speeder not found; execution should have never reached here.")
class AddNoise(torch.nn.Module):
r"""Scales and adds noise to waveform per signal-to-noise ratio.
See :meth:`torchaudio.prototype.functional.add_noise` for more details.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
"""
def forward(
self, waveform: torch.Tensor, noise: torch.Tensor, snr: torch.Tensor, lengths: Optional[torch.Tensor] = None
) -> torch.Tensor:
r"""
Args:
waveform (torch.Tensor): Input waveform, with shape `(..., L)`.
noise (torch.Tensor): Noise, with shape `(..., L)` (same shape as ``waveform``).
snr (torch.Tensor): Signal-to-noise ratios in dB, with shape `(...,)`.
lengths (torch.Tensor or None, optional): Valid lengths of signals in ``waveform`` and ``noise``,
with shape `(...,)` (leading dimensions must match those of ``waveform``). If ``None``, all
elements in ``waveform`` and ``noise`` are treated as valid. (Default: ``None``)
Returns:
torch.Tensor: Result of scaling and adding ``noise`` to ``waveform``, with shape `(..., L)`
(same shape as ``waveform``).
"""
return add_noise(waveform, noise, snr, lengths)
class Preemphasis(torch.nn.Module):
r"""Pre-emphasizes a waveform along its last dimension.
See :meth:`torchaudio.prototype.functional.preemphasis` for more details.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
coeff (float, optional): Pre-emphasis coefficient. Typically between 0.0 and 1.0.
(Default: 0.97)
"""
def __init__(self, coeff: float = 0.97) -> None:
super().__init__()
self.coeff = coeff
def forward(self, waveform: torch.Tensor) -> torch.Tensor:
r"""
Args:
waveform (torch.Tensor): Waveform, with shape `(..., N)`.
Returns:
torch.Tensor: Pre-emphasized waveform, with shape `(..., N)`.
"""
return preemphasis(waveform, coeff=self.coeff)
class Deemphasis(torch.nn.Module):
r"""De-emphasizes a waveform along its last dimension.
See :meth:`torchaudio.prototype.functional.deemphasis` for more details.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
coeff (float, optional): De-emphasis coefficient. Typically between 0.0 and 1.0.
(Default: 0.97)
"""
def __init__(self, coeff: float = 0.97) -> None:
super().__init__()
self.coeff = coeff
def forward(self, waveform: torch.Tensor) -> torch.Tensor:
r"""
Args:
waveform (torch.Tensor): Waveform, with shape `(..., N)`.
Returns:
torch.Tensor: De-emphasized waveform, with shape `(..., N)`.
"""
return deemphasis(waveform, coeff=self.coeff)
from ._multi_channel import MVDR, PSD, RTFMVDR, SoudenMVDR
from ._transforms import (
AddNoise,
AmplitudeToDB,
ComputeDeltas,
Convolve,
Deemphasis,
Fade,
FFTConvolve,
FrequencyMasking,
GriffinLim,
InverseMelScale,
......@@ -15,11 +19,14 @@ from ._transforms import (
MuLawDecoding,
MuLawEncoding,
PitchShift,
Preemphasis,
Resample,
RNNTLoss,
SlidingWindowCmn,
SpectralCentroid,
Spectrogram,
Speed,
SpeedPerturbation,
TimeMasking,
TimeStretch,
Vad,
......@@ -28,9 +35,13 @@ from ._transforms import (
__all__ = [
"AddNoise",
"AmplitudeToDB",
"ComputeDeltas",
"Convolve",
"Deemphasis",
"Fade",
"FFTConvolve",
"FrequencyMasking",
"GriffinLim",
"InverseMelScale",
......@@ -45,6 +56,7 @@ __all__ = [
"MuLawEncoding",
"PSD",
"PitchShift",
"Preemphasis",
"RNNTLoss",
"RTFMVDR",
"Resample",
......@@ -52,6 +64,8 @@ __all__ = [
"SoudenMVDR",
"SpectralCentroid",
"Spectrogram",
"Speed",
"SpeedPerturbation",
"TimeMasking",
"TimeStretch",
"Vad",
......
......@@ -2,7 +2,7 @@
import math
import warnings
from typing import Callable, Optional, Union
from typing import Callable, Optional, Sequence, Tuple, Union
import torch
from torch import Tensor
......@@ -12,6 +12,7 @@ from torch.nn.parameter import UninitializedParameter
from torchaudio import functional as F
from torchaudio.functional.functional import (
_apply_sinc_resample_kernel,
_check_convolve_mode,
_fix_waveform_shape,
_get_sinc_resample_kernel,
_stretch_waveform,
......@@ -1807,3 +1808,273 @@ class RNNTLoss(torch.nn.Module):
self.reduction,
self.fused_log_softmax,
)
class Convolve(torch.nn.Module):
r"""
Convolves inputs along their last dimension using the direct method.
Note that, in contrast to :class:`torch.nn.Conv1d`, which actually applies the valid cross-correlation
operator, this module applies the true `convolution`_ operator.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
mode (str, optional): Must be one of ("full", "valid", "same").
* "full": Returns the full convolution result, with shape `(..., N + M - 1)`, where
`N` and `M` are the trailing dimensions of the two inputs. (Default)
* "valid": Returns the segment of the full convolution result corresponding to where
the two inputs overlap completely, with shape `(..., max(N, M) - min(N, M) + 1)`.
* "same": Returns the center segment of the full convolution result, with shape `(..., N)`.
.. _convolution:
https://en.wikipedia.org/wiki/Convolution
"""
def __init__(self, mode: str = "full") -> None:
_check_convolve_mode(mode)
super().__init__()
self.mode = mode
def forward(self, x: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
r"""
Args:
x (torch.Tensor): First convolution operand, with shape `(..., N)`.
y (torch.Tensor): Second convolution operand, with shape `(..., M)`
(leading dimensions must match those of ``x``).
Returns:
torch.Tensor: Result of convolving ``x`` and ``y``, with shape `(..., L)`, where
the leading dimensions match those of ``x`` and `L` is dictated by ``mode``.
"""
return F.convolve(x, y, mode=self.mode)
class FFTConvolve(torch.nn.Module):
r"""
Convolves inputs along their last dimension using FFT. For inputs with large last dimensions, this module
is generally much faster than :class:`Convolve`.
Note that, in contrast to :class:`torch.nn.Conv1d`, which actually applies the valid cross-correlation
operator, this module applies the true `convolution`_ operator.
Also note that this module can only output float tensors (int tensor inputs will be cast to float).
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
mode (str, optional): Must be one of ("full", "valid", "same").
* "full": Returns the full convolution result, with shape `(..., N + M - 1)`, where
`N` and `M` are the trailing dimensions of the two inputs. (Default)
* "valid": Returns the segment of the full convolution result corresponding to where
the two inputs overlap completely, with shape `(..., max(N, M) - min(N, M) + 1)`.
* "same": Returns the center segment of the full convolution result, with shape `(..., N)`.
.. _convolution:
https://en.wikipedia.org/wiki/Convolution
"""
def __init__(self, mode: str = "full") -> None:
_check_convolve_mode(mode)
super().__init__()
self.mode = mode
def forward(self, x: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
r"""
Args:
x (torch.Tensor): First convolution operand, with shape `(..., N)`.
y (torch.Tensor): Second convolution operand, with shape `(..., M)`
(leading dimensions must be broadcast-able to those of ``x``).
Returns:
torch.Tensor: Result of convolving ``x`` and ``y``, with shape `(..., L)`, where
the leading dimensions match those of ``x`` and `L` is dictated by ``mode``.
"""
return F.fftconvolve(x, y, mode=self.mode)
def _source_target_sample_rate(orig_freq: int, speed: float) -> Tuple[int, int]:
source_sample_rate = int(speed * orig_freq)
target_sample_rate = int(orig_freq)
gcd = math.gcd(source_sample_rate, target_sample_rate)
return source_sample_rate // gcd, target_sample_rate // gcd
class Speed(torch.nn.Module):
r"""Adjusts waveform speed.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
orig_freq (int): Original frequency of the signals in ``waveform``.
factor (float): Factor by which to adjust speed of input. Values greater than 1.0
compress ``waveform`` in time, whereas values less than 1.0 stretch ``waveform`` in time.
"""
def __init__(self, orig_freq, factor) -> None:
super().__init__()
self.orig_freq = orig_freq
self.factor = factor
self.source_sample_rate, self.target_sample_rate = _source_target_sample_rate(orig_freq, factor)
self.resampler = Resample(orig_freq=self.source_sample_rate, new_freq=self.target_sample_rate)
def forward(self, waveform, lengths) -> Tuple[torch.Tensor, torch.Tensor]:
r"""
Args:
waveform (torch.Tensor): Input signals, with shape `(..., time)`.
lengths (torch.Tensor): Valid lengths of signals in ``waveform``, with shape `(...)`.
Returns:
(torch.Tensor, torch.Tensor):
torch.Tensor
Speed-adjusted waveform, with shape `(..., new_time).`
torch.Tensor
Valid lengths of signals in speed-adjusted waveform, with shape `(...)`.
"""
return (
self.resampler(waveform),
torch.ceil(lengths * self.target_sample_rate / self.source_sample_rate).to(lengths.dtype),
)
class SpeedPerturbation(torch.nn.Module):
r"""Applies the speed perturbation augmentation introduced in
*Audio augmentation for speech recognition* :cite:`ko15_interspeech`. For a given input,
the module samples a speed-up factor from ``factors`` uniformly at random and adjusts
the speed of the input by that factor.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
orig_freq (int): Original frequency of the signals in ``waveform``.
factors (Sequence[float]): Factors by which to adjust speed of input. Values greater than 1.0
compress ``waveform`` in time, whereas values less than 1.0 stretch ``waveform`` in time.
Example
>>> speed_perturb = SpeedPerturbation(16000, [0.9, 1.1, 1.0, 1.0, 1.0])
>>> # waveform speed will be adjusted by factor 0.9 with 20% probability,
>>> # 1.1 with 20% probability, and 1.0 (i.e. kept the same) with 60% probability.
>>> speed_perturbed_waveform = speed_perturb(waveform, lengths)
"""
def __init__(self, orig_freq: int, factors: Sequence[float]) -> None:
super().__init__()
self.speeders = torch.nn.ModuleList([Speed(orig_freq=orig_freq, factor=factor) for factor in factors])
def forward(self, waveform: torch.Tensor, lengths: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
r"""
Args:
waveform (torch.Tensor): Input signals, with shape `(..., time)`.
lengths (torch.Tensor): Valid lengths of signals in ``waveform``, with shape `(...)`.
Returns:
(torch.Tensor, torch.Tensor):
torch.Tensor
Speed-adjusted waveform, with shape `(..., new_time).`
torch.Tensor
Valid lengths of signals in speed-adjusted waveform, with shape `(...)`.
"""
idx = int(torch.randint(len(self.speeders), ()))
# NOTE: we do this because TorchScript doesn't allow for
# indexing ModuleList instances with non-literals.
for speeder_idx, speeder in enumerate(self.speeders):
if idx == speeder_idx:
return speeder(waveform, lengths)
raise RuntimeError("Speeder not found; execution should have never reached here.")
class AddNoise(torch.nn.Module):
r"""Scales and adds noise to waveform per signal-to-noise ratio.
See :meth:`torchaudio.functional.add_noise` for more details.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
"""
def forward(
self, waveform: torch.Tensor, noise: torch.Tensor, snr: torch.Tensor, lengths: Optional[torch.Tensor] = None
) -> torch.Tensor:
r"""
Args:
waveform (torch.Tensor): Input waveform, with shape `(..., L)`.
noise (torch.Tensor): Noise, with shape `(..., L)` (same shape as ``waveform``).
snr (torch.Tensor): Signal-to-noise ratios in dB, with shape `(...,)`.
lengths (torch.Tensor or None, optional): Valid lengths of signals in ``waveform`` and ``noise``,
with shape `(...,)` (leading dimensions must match those of ``waveform``). If ``None``, all
elements in ``waveform`` and ``noise`` are treated as valid. (Default: ``None``)
Returns:
torch.Tensor: Result of scaling and adding ``noise`` to ``waveform``, with shape `(..., L)`
(same shape as ``waveform``).
"""
return F.add_noise(waveform, noise, snr, lengths)
class Preemphasis(torch.nn.Module):
r"""Pre-emphasizes a waveform along its last dimension.
See :meth:`torchaudio.functional.preemphasis` for more details.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
coeff (float, optional): Pre-emphasis coefficient. Typically between 0.0 and 1.0.
(Default: 0.97)
"""
def __init__(self, coeff: float = 0.97) -> None:
super().__init__()
self.coeff = coeff
def forward(self, waveform: torch.Tensor) -> torch.Tensor:
r"""
Args:
waveform (torch.Tensor): Waveform, with shape `(..., N)`.
Returns:
torch.Tensor: Pre-emphasized waveform, with shape `(..., N)`.
"""
return F.preemphasis(waveform, coeff=self.coeff)
class Deemphasis(torch.nn.Module):
r"""De-emphasizes a waveform along its last dimension.
See :meth:`torchaudio.functional.deemphasis` for more details.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
coeff (float, optional): De-emphasis coefficient. Typically between 0.0 and 1.0.
(Default: 0.97)
"""
def __init__(self, coeff: float = 0.97) -> None:
super().__init__()
self.coeff = coeff
def forward(self, waveform: torch.Tensor) -> torch.Tensor:
r"""
Args:
waveform (torch.Tensor): Waveform, with shape `(..., N)`.
Returns:
torch.Tensor: De-emphasized waveform, with shape `(..., N)`.
"""
return F.deemphasis(waveform, coeff=self.coeff)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment