Commit 41b88314 authored by hwangjeff's avatar hwangjeff Committed by Facebook GitHub Bot
Browse files

Move data augmentation functions out of prototype (#3001)

Summary:
Moves `add_noise`, `fftconvolve`, `convolve`, `speed`, `preemphasis`, and `deemphasis` out of `torchaudio.prototype.functional` and into `torchaudio.functional`.

Pull Request resolved: https://github.com/pytorch/audio/pull/3001

Reviewed By: mthrok

Differential Revision: D42688971

Pulled By: hwangjeff

fbshipit-source-id: 43280bd3ffeccddae57f1092ac45afb64dd426cc
parent 09e7d818
......@@ -26,6 +26,12 @@ Utility
apply_codec
resample
loudness
convolve
fftconvolve
add_noise
preemphasis
deemphasis
speed
Filtering
......
......@@ -4,41 +4,11 @@ torchaudio.prototype.functional
.. py:module:: torchaudio.prototype.functional
.. currentmodule:: torchaudio.prototype.functional
add_noise
~~~~~~~~~
.. autofunction:: add_noise
barkscale_fbanks
~~~~~~~~~~~~~~~~
.. autofunction:: barkscale_fbanks
convolve
~~~~~~~~
.. autofunction:: convolve
deemphasis
~~~~~~~~~~
.. autofunction:: deemphasis
fftconvolve
~~~~~~~~~~~
.. autofunction:: fftconvolve
preemphasis
~~~~~~~~~~~
.. autofunction:: preemphasis
speed
~~~~~
.. autofunction:: speed
DSP
~~~
......
......@@ -6,7 +6,7 @@ import torchaudio.functional as F
from parameterized import parameterized
from torch import Tensor
from torch.autograd import gradcheck, gradgradcheck
from torchaudio_unittest.common_utils import get_spectrogram, get_whitenoise, rnnt_utils, TestBaseMixin
from torchaudio_unittest.common_utils import get_spectrogram, get_whitenoise, nested_params, rnnt_utils, TestBaseMixin
class Autograd(TestBaseMixin):
......@@ -335,6 +335,43 @@ class Autograd(TestBaseMixin):
beamform_weights = torch.rand(batch_size, n_fft_bin, num_channels, dtype=torch.cfloat)
self.assert_grad(F.apply_beamforming, (beamform_weights, specgram))
@nested_params(
[F.convolve, F.fftconvolve],
["full", "valid", "same"],
)
def test_convolve(self, fn, mode):
leading_dims = (4, 3, 2)
L_x, L_y = 23, 40
x = torch.rand(*leading_dims, L_x, dtype=self.dtype, device=self.device)
y = torch.rand(*leading_dims, L_y, dtype=self.dtype, device=self.device)
self.assert_grad(fn, (x, y, mode))
def test_add_noise(self):
leading_dims = (5, 2, 3)
L = 51
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
lengths = torch.rand(*leading_dims, dtype=self.dtype, device=self.device)
snr = torch.rand(*leading_dims, dtype=self.dtype, device=self.device) * 10
self.assert_grad(F.add_noise, (waveform, noise, snr, lengths))
def test_speed(self):
leading_dims = (3, 2)
T = 200
waveform = torch.rand(*leading_dims, T, dtype=self.dtype, device=self.device, requires_grad=True)
lengths = torch.randint(1, T, leading_dims, dtype=self.dtype, device=self.device)
self.assert_grad(F.speed, (waveform, lengths, 1000, 1.1), enable_all_grad=False)
def test_preemphasis(self):
waveform = torch.rand(3, 2, 100, device=self.device, dtype=self.dtype, requires_grad=True)
coeff = 0.9
self.assert_grad(F.preemphasis, (waveform, coeff))
def test_deemphasis(self):
waveform = torch.rand(3, 2, 100, device=self.device, dtype=self.dtype, requires_grad=True)
coeff = 0.9
self.assert_grad(F.deemphasis, (waveform, coeff))
class AutogradFloat32(TestBaseMixin):
def assert_grad(
......
......@@ -407,3 +407,89 @@ class TestFunctional(common_utils.TorchaudioTestCase):
specgram = specgram.view(batch_size, num_channels, n_fft_bin, specgram.size(-1))
beamform_weights = torch.rand(batch_size, n_fft_bin, num_channels, dtype=torch.cfloat)
self.assert_batch_consistency(F.apply_beamforming, (beamform_weights, specgram))
@common_utils.nested_params(
[F.convolve, F.fftconvolve],
["full", "valid", "same"],
)
def test_convolve(self, fn, mode):
leading_dims = (2, 3)
L_x, L_y = 89, 43
x = torch.rand(*leading_dims, L_x, dtype=self.dtype, device=self.device)
y = torch.rand(*leading_dims, L_y, dtype=self.dtype, device=self.device)
actual = fn(x, y, mode)
expected = torch.stack(
[
torch.stack(
[fn(x[i, j].unsqueeze(0), y[i, j].unsqueeze(0), mode).squeeze(0) for j in range(leading_dims[1])]
)
for i in range(leading_dims[0])
]
)
self.assertEqual(expected, actual)
def test_add_noise(self):
leading_dims = (5, 2, 3)
L = 51
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
lengths = torch.rand(*leading_dims, dtype=self.dtype, device=self.device)
snr = torch.rand(*leading_dims, dtype=self.dtype, device=self.device) * 10
actual = F.add_noise(waveform, noise, snr, lengths)
expected = []
for i in range(leading_dims[0]):
for j in range(leading_dims[1]):
for k in range(leading_dims[2]):
expected.append(F.add_noise(waveform[i][j][k], noise[i][j][k], snr[i][j][k], lengths[i][j][k]))
self.assertEqual(torch.stack(expected), actual.reshape(-1, L))
def test_speed(self):
B = 5
orig_freq = 100
factor = 0.8
input_lengths = torch.randint(1, 1000, (B,), dtype=torch.int32)
unbatched_input = [torch.ones((int(length),)) * 1.0 for length in input_lengths]
batched_input = torch.nn.utils.rnn.pad_sequence(unbatched_input, batch_first=True)
output, output_lengths = F.speed(batched_input, input_lengths, orig_freq=orig_freq, factor=factor)
unbatched_output = []
unbatched_output_lengths = []
for idx in range(len(unbatched_input)):
w, l = F.speed(unbatched_input[idx], input_lengths[idx], orig_freq=orig_freq, factor=factor)
unbatched_output.append(w)
unbatched_output_lengths.append(l)
self.assertEqual(output_lengths, torch.stack(unbatched_output_lengths))
for idx in range(len(unbatched_output)):
w, l = output[idx], output_lengths[idx]
self.assertEqual(unbatched_output[idx], w[:l])
def test_preemphasis(self):
waveform = torch.rand(3, 2, 100, device=self.device, dtype=self.dtype)
coeff = 0.9
actual = F.preemphasis(waveform, coeff=coeff)
expected = []
for i in range(waveform.size(0)):
expected.append(F.preemphasis(waveform[i], coeff=coeff))
self.assertEqual(torch.stack(expected), actual)
def test_deemphasis(self):
waveform = torch.rand(3, 2, 100, device=self.device, dtype=self.dtype)
coeff = 0.9
actual = F.deemphasis(waveform, coeff=coeff)
expected = []
for i in range(waveform.size(0)):
expected.append(F.deemphasis(waveform[i], coeff=coeff))
self.assertEqual(torch.stack(expected), actual)
......@@ -892,6 +892,215 @@ class Functional(TestBaseMixin):
torch.tensor(specgram_enhanced, dtype=self.complex_dtype, device=self.device), specgram_enhanced_audio
)
@nested_params(
[(10, 4), (4, 3, 1, 2), (2,), ()],
[(100, 43), (21, 45)],
["full", "valid", "same"],
)
def test_convolve_numerics(self, leading_dims, lengths, mode):
"""Check that convolve returns values identical to those that SciPy produces."""
L_x, L_y = lengths
x = torch.rand(*(leading_dims + (L_x,)), dtype=self.dtype, device=self.device)
y = torch.rand(*(leading_dims + (L_y,)), dtype=self.dtype, device=self.device)
actual = F.convolve(x, y, mode=mode)
num_signals = torch.tensor(leading_dims).prod() if leading_dims else 1
x_reshaped = x.reshape((num_signals, L_x))
y_reshaped = y.reshape((num_signals, L_y))
expected = [
signal.convolve(x_reshaped[i].detach().cpu().numpy(), y_reshaped[i].detach().cpu().numpy(), mode=mode)
for i in range(num_signals)
]
expected = torch.tensor(np.array(expected))
expected = expected.reshape(leading_dims + (-1,))
self.assertEqual(expected, actual)
@nested_params(
[(10, 4), (4, 3, 1, 2), (2,), ()],
[(100, 43), (21, 45)],
["full", "valid", "same"],
)
def test_fftconvolve_numerics(self, leading_dims, lengths, mode):
"""Check that fftconvolve returns values identical to those that SciPy produces."""
L_x, L_y = lengths
x = torch.rand(*(leading_dims + (L_x,)), dtype=self.dtype, device=self.device)
y = torch.rand(*(leading_dims + (L_y,)), dtype=self.dtype, device=self.device)
actual = F.fftconvolve(x, y, mode=mode)
expected = signal.fftconvolve(x.detach().cpu().numpy(), y.detach().cpu().numpy(), axes=-1, mode=mode)
expected = torch.tensor(expected)
self.assertEqual(expected, actual)
@parameterized.expand(
[
# fmt: off
((5, 2, 3), (5, 1, 3)),
((5, 2, 3), (1, 2, 3)),
((5, 2, 3), (1, 1, 3)),
# fmt: on
]
)
def test_fftconvolve_broadcast(self, x_shape, y_shape):
"""fftconvolve works for Tensors for different shapes if they are broadcast-able"""
# 1. Test broad cast case
x = torch.rand(x_shape, dtype=self.dtype, device=self.device)
y = torch.rand(y_shape, dtype=self.dtype, device=self.device)
out1 = F.fftconvolve(x, y)
# 2. Test without broadcast
y_clone = y.expand(x_shape).clone()
assert y is not y_clone
assert y_clone.shape == x.shape
out2 = F.fftconvolve(x, y_clone)
# check that they are same
self.assertEqual(out1, out2)
@parameterized.expand(
[
# fmt: off
# different ndim
(0, F.convolve, (4, 3, 1, 2), (10, 4)),
(0, F.convolve, (4, 3, 1, 2), (2, 2, 2)),
(0, F.convolve, (1, ), (10, 4)),
(0, F.convolve, (1, ), (2, 2, 2)),
(0, F.fftconvolve, (4, 3, 1, 2), (10, 4)),
(0, F.fftconvolve, (4, 3, 1, 2), (2, 2, 2)),
(0, F.fftconvolve, (1, ), (10, 4)),
(0, F.fftconvolve, (1, ), (2, 2, 2)),
# incompatible shape except the last dim
(1, F.convolve, (5, 2, 3), (5, 3, 3)),
(1, F.convolve, (5, 2, 3), (5, 3, 4)),
(1, F.convolve, (5, 2, 3), (5, 3, 5)),
(2, F.fftconvolve, (5, 2, 3), (5, 3, 3)),
(2, F.fftconvolve, (5, 2, 3), (5, 3, 4)),
(2, F.fftconvolve, (5, 2, 3), (5, 3, 5)),
# broadcast-able (only for convolve)
(1, F.convolve, (5, 2, 3), (5, 1, 3)),
(1, F.convolve, (5, 2, 3), (5, 1, 4)),
(1, F.convolve, (5, 2, 3), (5, 1, 5)),
# fmt: on
],
)
def test_convolve_input_leading_dim_check(self, case, fn, x_shape, y_shape):
"""Check that convolve properly rejects inputs with different leading dimensions."""
x = torch.rand(*x_shape, dtype=self.dtype, device=self.device)
y = torch.rand(*y_shape, dtype=self.dtype, device=self.device)
message = [
"The operands must be the same dimension",
"Leading dimensions of x and y don't match",
"Leading dimensions of x and y are not broadcastable",
][case]
with self.assertRaisesRegex(ValueError, message):
fn(x, y)
def test_add_noise_broadcast(self):
"""Check that add_noise produces correct outputs when broadcasting input dimensions."""
leading_dims = (5, 2, 3)
L = 51
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(5, 1, 1, L, dtype=self.dtype, device=self.device)
lengths = torch.rand(5, 1, 3, dtype=self.dtype, device=self.device)
snr = torch.rand(1, 1, 1, dtype=self.dtype, device=self.device) * 10
actual = F.add_noise(waveform, noise, snr, lengths)
noise_expanded = noise.expand(*leading_dims, L)
snr_expanded = snr.expand(*leading_dims)
lengths_expanded = lengths.expand(*leading_dims)
expected = F.add_noise(waveform, noise_expanded, snr_expanded, lengths_expanded)
self.assertEqual(expected, actual)
@parameterized.expand(
[((5, 2, 3), (2, 1, 1), (5, 2), (5, 2, 3)), ((2, 1), (5,), (5,), (5,)), ((3,), (5, 2, 3), (2, 1, 1), (5, 2))]
)
def test_add_noise_leading_dim_check(self, waveform_dims, noise_dims, lengths_dims, snr_dims):
"""Check that add_noise properly rejects inputs with different leading dimension lengths."""
L = 51
waveform = torch.rand(*waveform_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(*noise_dims, L, dtype=self.dtype, device=self.device)
lengths = torch.rand(*lengths_dims, dtype=self.dtype, device=self.device)
snr = torch.rand(*snr_dims, dtype=self.dtype, device=self.device) * 10
with self.assertRaisesRegex(ValueError, "Input leading dimensions"):
F.add_noise(waveform, noise, snr, lengths)
def test_add_noise_length_check(self):
"""Check that add_noise properly rejects inputs that have inconsistent length dimensions."""
leading_dims = (5, 2, 3)
L = 51
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(*leading_dims, 50, dtype=self.dtype, device=self.device)
lengths = torch.rand(*leading_dims, dtype=self.dtype, device=self.device)
snr = torch.rand(*leading_dims, dtype=self.dtype, device=self.device) * 10
with self.assertRaisesRegex(ValueError, "Length dimensions"):
F.add_noise(waveform, noise, snr, lengths)
def test_speed_identity(self):
"""speed of 1.0 does not alter input waveform and length"""
leading_dims = (5, 4, 2)
T = 1000
waveform = torch.rand(*leading_dims, T)
lengths = torch.randint(1, 1000, leading_dims)
actual_waveform, actual_lengths = F.speed(waveform, lengths, orig_freq=1000, factor=1.0)
self.assertEqual(waveform, actual_waveform)
self.assertEqual(lengths, actual_lengths)
@nested_params(
[0.8, 1.1, 1.2],
)
def test_speed_accuracy(self, factor):
"""sinusoidal waveform is properly compressed by factor"""
n_to_trim = 20
sample_rate = 1000
freq = 2
times = torch.arange(0, 5, 1.0 / sample_rate)
waveform = torch.cos(2 * math.pi * freq * times).unsqueeze(0).to(self.device, self.dtype)
lengths = torch.tensor([waveform.size(1)])
output, output_lengths = F.speed(waveform, lengths, orig_freq=sample_rate, factor=factor)
self.assertEqual(output.size(1), output_lengths[0])
new_times = torch.arange(0, 5 / factor, 1.0 / sample_rate)
expected_waveform = torch.cos(2 * math.pi * freq * factor * new_times).unsqueeze(0).to(self.device, self.dtype)
self.assertEqual(
expected_waveform[..., n_to_trim:-n_to_trim], output[..., n_to_trim:-n_to_trim], atol=1e-1, rtol=1e-4
)
@nested_params(
[(3, 2, 100), (95,)],
[0.97, 0.9, 0.68],
)
def test_preemphasis(self, input_shape, coeff):
waveform = torch.rand(*input_shape, device=self.device, dtype=self.dtype)
actual = F.preemphasis(waveform, coeff=coeff)
a_coeffs = torch.tensor([1.0, 0.0], device=self.device, dtype=self.dtype)
b_coeffs = torch.tensor([1.0, -coeff], device=self.device, dtype=self.dtype)
expected = F.lfilter(waveform, a_coeffs=a_coeffs, b_coeffs=b_coeffs)
self.assertEqual(actual, expected)
@nested_params(
[(3, 2, 100), (95,)],
[0.97, 0.9, 0.68],
)
def test_preemphasis_deemphasis_roundtrip(self, input_shape, coeff):
waveform = torch.rand(*input_shape, device=self.device, dtype=self.dtype)
preemphasized = F.preemphasis(waveform, coeff=coeff)
deemphasized = F.deemphasis(preemphasized, coeff=coeff)
self.assertEqual(deemphasized, waveform)
class FunctionalCPUOnly(TestBaseMixin):
def test_melscale_fbanks_no_warning_high_n_freq(self):
......
......@@ -758,6 +758,50 @@ class Functional(TempDirMixin, TestBaseMixin):
specgram = torch.rand(num_channels, n_fft_bin, num_frames, dtype=self.complex_dtype, device=self.device)
self._assert_consistency_complex(F.apply_beamforming, (beamform_weights, specgram))
@common_utils.nested_params(
["convolve", "fftconvolve"],
["full", "valid", "same"],
)
def test_convolve(self, fn, mode):
leading_dims = (2, 3, 2)
L_x, L_y = 32, 55
x = torch.rand(*leading_dims, L_x, dtype=self.dtype, device=self.device)
y = torch.rand(*leading_dims, L_y, dtype=self.dtype, device=self.device)
self._assert_consistency(getattr(F, fn), (x, y, mode))
@common_utils.nested_params([True, False])
def test_add_noise(self, use_lengths):
leading_dims = (2, 3)
L = 31
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device, requires_grad=True)
noise = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device, requires_grad=True)
if use_lengths:
lengths = torch.rand(*leading_dims, dtype=self.dtype, device=self.device, requires_grad=True)
else:
lengths = None
snr = torch.rand(*leading_dims, dtype=self.dtype, device=self.device, requires_grad=True) * 10
self._assert_consistency(F.add_noise, (waveform, noise, snr, lengths))
def test_speed(self):
leading_dims = (3, 2)
T = 200
waveform = torch.rand(*leading_dims, T, dtype=self.dtype, device=self.device, requires_grad=True)
lengths = torch.randint(1, T, leading_dims, dtype=self.dtype, device=self.device)
self._assert_consistency(F.speed, (waveform, lengths, 1000, 1.1))
def test_preemphasis(self):
waveform = torch.rand(3, 2, 100, device=self.device, dtype=self.dtype)
coeff = 0.9
self._assert_consistency(F.preemphasis, (waveform, coeff))
def test_deemphasis(self):
waveform = torch.rand(3, 2, 100, device=self.device, dtype=self.dtype)
coeff = 0.9
self._assert_consistency(F.deemphasis, (waveform, coeff))
class FunctionalFloat32Only(TestBaseMixin):
def test_rnnt_loss(self):
......
import torch
import torchaudio.prototype.functional as F
from parameterized import parameterized
from torch.autograd import gradcheck, gradgradcheck
from torchaudio_unittest.common_utils import nested_params, TestBaseMixin
from torch.autograd import gradcheck
from torchaudio_unittest.common_utils import TestBaseMixin
class AutogradTestImpl(TestBaseMixin):
@nested_params(
[F.convolve, F.fftconvolve],
["full", "valid", "same"],
)
def test_convolve(self, fn, mode):
leading_dims = (4, 3, 2)
L_x, L_y = 23, 40
x = torch.rand(*leading_dims, L_x, dtype=self.dtype, device=self.device, requires_grad=True)
y = torch.rand(*leading_dims, L_y, dtype=self.dtype, device=self.device, requires_grad=True)
self.assertTrue(gradcheck(fn, (x, y, mode)))
self.assertTrue(gradgradcheck(fn, (x, y, mode)))
def test_add_noise(self):
leading_dims = (5, 2, 3)
L = 51
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device, requires_grad=True)
noise = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device, requires_grad=True)
lengths = torch.rand(*leading_dims, dtype=self.dtype, device=self.device, requires_grad=True)
snr = torch.rand(*leading_dims, dtype=self.dtype, device=self.device, requires_grad=True) * 10
self.assertTrue(gradcheck(F.add_noise, (waveform, noise, snr, lengths)))
self.assertTrue(gradgradcheck(F.add_noise, (waveform, noise, snr, lengths)))
@parameterized.expand(
[
(8000, (2, 3, 5, 7)),
......@@ -68,26 +44,6 @@ class AutogradTestImpl(TestBaseMixin):
assert gradcheck(F.sinc_impulse_response, (cutoff, 513, False))
assert gradcheck(F.sinc_impulse_response, (cutoff, 513, True))
def test_speed(self):
leading_dims = (3, 2)
T = 200
waveform = torch.rand(*leading_dims, T, dtype=self.dtype, device=self.device, requires_grad=True)
lengths = torch.randint(1, T, leading_dims, dtype=self.dtype, device=self.device)
self.assertTrue(gradcheck(F.speed, (waveform, lengths, 1000, 1.1)))
self.assertTrue(gradgradcheck(F.speed, (waveform, lengths, 1000, 1.1)))
def test_preemphasis(self):
waveform = torch.rand(3, 2, 100, device=self.device, dtype=self.dtype, requires_grad=True)
coeff = 0.9
self.assertTrue(gradcheck(F.preemphasis, (waveform, coeff)))
self.assertTrue(gradgradcheck(F.preemphasis, (waveform, coeff)))
def test_deemphasis(self):
waveform = torch.rand(3, 2, 100, device=self.device, dtype=self.dtype, requires_grad=True)
coeff = 0.9
self.assertTrue(gradcheck(F.deemphasis, (waveform, coeff)))
self.assertTrue(gradgradcheck(F.deemphasis, (waveform, coeff)))
def test_freq_ir(self):
mags = torch.tensor([0, 0.5, 1.0], device=self.device, dtype=self.dtype, requires_grad=True)
assert gradcheck(F.frequency_impulse_response, (mags,))
......
import torch
import torchaudio.prototype.functional as F
from torchaudio_unittest.common_utils import nested_params, TorchaudioTestCase
class BatchConsistencyTest(TorchaudioTestCase):
@nested_params(
[F.convolve, F.fftconvolve],
["full", "valid", "same"],
)
def test_convolve(self, fn, mode):
leading_dims = (2, 3)
L_x, L_y = 89, 43
x = torch.rand(*leading_dims, L_x, dtype=self.dtype, device=self.device)
y = torch.rand(*leading_dims, L_y, dtype=self.dtype, device=self.device)
actual = fn(x, y, mode)
expected = torch.stack(
[
torch.stack(
[fn(x[i, j].unsqueeze(0), y[i, j].unsqueeze(0), mode).squeeze(0) for j in range(leading_dims[1])]
)
for i in range(leading_dims[0])
]
)
self.assertEqual(expected, actual)
def test_add_noise(self):
leading_dims = (5, 2, 3)
L = 51
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
lengths = torch.rand(*leading_dims, dtype=self.dtype, device=self.device)
snr = torch.rand(*leading_dims, dtype=self.dtype, device=self.device) * 10
actual = F.add_noise(waveform, noise, snr, lengths)
expected = []
for i in range(leading_dims[0]):
for j in range(leading_dims[1]):
for k in range(leading_dims[2]):
expected.append(F.add_noise(waveform[i][j][k], noise[i][j][k], snr[i][j][k], lengths[i][j][k]))
self.assertEqual(torch.stack(expected), actual.reshape(-1, L))
def test_speed(self):
B = 5
orig_freq = 100
factor = 0.8
input_lengths = torch.randint(1, 1000, (B,), dtype=torch.int32)
unbatched_input = [torch.ones((int(length),)) * 1.0 for length in input_lengths]
batched_input = torch.nn.utils.rnn.pad_sequence(unbatched_input, batch_first=True)
output, output_lengths = F.speed(batched_input, input_lengths, orig_freq=orig_freq, factor=factor)
unbatched_output = []
unbatched_output_lengths = []
for idx in range(len(unbatched_input)):
w, l = F.speed(unbatched_input[idx], input_lengths[idx], orig_freq=orig_freq, factor=factor)
unbatched_output.append(w)
unbatched_output_lengths.append(l)
self.assertEqual(output_lengths, torch.stack(unbatched_output_lengths))
for idx in range(len(unbatched_output)):
w, l = output[idx], output_lengths[idx]
self.assertEqual(unbatched_output[idx], w[:l])
def test_preemphasis(self):
waveform = torch.rand(3, 2, 100, device=self.device, dtype=self.dtype)
coeff = 0.9
actual = F.preemphasis(waveform, coeff=coeff)
expected = []
for i in range(waveform.size(0)):
expected.append(F.preemphasis(waveform[i], coeff=coeff))
self.assertEqual(torch.stack(expected), actual)
def test_deemphasis(self):
waveform = torch.rand(3, 2, 100, device=self.device, dtype=self.dtype)
coeff = 0.9
actual = F.deemphasis(waveform, coeff=coeff)
expected = []
for i in range(waveform.size(0)):
expected.append(F.deemphasis(waveform[i], coeff=coeff))
self.assertEqual(torch.stack(expected), actual)
import math
import numpy as np
import torch
import torchaudio.prototype.functional as F
from parameterized import param, parameterized
from scipy import signal
from torchaudio.functional import lfilter
from torchaudio_unittest.common_utils import nested_params, TestBaseMixin
......@@ -19,159 +17,6 @@ def _prod(l):
class FunctionalTestImpl(TestBaseMixin):
@nested_params(
[(10, 4), (4, 3, 1, 2), (2,), ()],
[(100, 43), (21, 45)],
["full", "valid", "same"],
)
def test_convolve_numerics(self, leading_dims, lengths, mode):
"""Check that convolve returns values identical to those that SciPy produces."""
L_x, L_y = lengths
x = torch.rand(*(leading_dims + (L_x,)), dtype=self.dtype, device=self.device)
y = torch.rand(*(leading_dims + (L_y,)), dtype=self.dtype, device=self.device)
actual = F.convolve(x, y, mode=mode)
num_signals = torch.tensor(leading_dims).prod() if leading_dims else 1
x_reshaped = x.reshape((num_signals, L_x))
y_reshaped = y.reshape((num_signals, L_y))
expected = [
signal.convolve(x_reshaped[i].detach().cpu().numpy(), y_reshaped[i].detach().cpu().numpy(), mode=mode)
for i in range(num_signals)
]
expected = torch.tensor(np.array(expected))
expected = expected.reshape(leading_dims + (-1,))
self.assertEqual(expected, actual)
@nested_params(
[(10, 4), (4, 3, 1, 2), (2,), ()],
[(100, 43), (21, 45)],
["full", "valid", "same"],
)
def test_fftconvolve_numerics(self, leading_dims, lengths, mode):
"""Check that fftconvolve returns values identical to those that SciPy produces."""
L_x, L_y = lengths
x = torch.rand(*(leading_dims + (L_x,)), dtype=self.dtype, device=self.device)
y = torch.rand(*(leading_dims + (L_y,)), dtype=self.dtype, device=self.device)
actual = F.fftconvolve(x, y, mode=mode)
expected = signal.fftconvolve(x.detach().cpu().numpy(), y.detach().cpu().numpy(), axes=-1, mode=mode)
expected = torch.tensor(expected)
self.assertEqual(expected, actual)
@parameterized.expand(
[
# fmt: off
((5, 2, 3), (5, 1, 3)),
((5, 2, 3), (1, 2, 3)),
((5, 2, 3), (1, 1, 3)),
# fmt: on
]
)
def test_fftconvolve_broadcast(self, x_shape, y_shape):
"""fftconvolve works for Tensors for different shapes if they are broadcast-able"""
# 1. Test broad cast case
x = torch.rand(x_shape, dtype=self.dtype, device=self.device)
y = torch.rand(y_shape, dtype=self.dtype, device=self.device)
out1 = F.fftconvolve(x, y)
# 2. Test without broadcast
y_clone = y.expand(x_shape).clone()
assert y is not y_clone
assert y_clone.shape == x.shape
out2 = F.fftconvolve(x, y_clone)
# check that they are same
self.assertEqual(out1, out2)
@parameterized.expand(
[
# fmt: off
# different ndim
(0, F.convolve, (4, 3, 1, 2), (10, 4)),
(0, F.convolve, (4, 3, 1, 2), (2, 2, 2)),
(0, F.convolve, (1, ), (10, 4)),
(0, F.convolve, (1, ), (2, 2, 2)),
(0, F.fftconvolve, (4, 3, 1, 2), (10, 4)),
(0, F.fftconvolve, (4, 3, 1, 2), (2, 2, 2)),
(0, F.fftconvolve, (1, ), (10, 4)),
(0, F.fftconvolve, (1, ), (2, 2, 2)),
# incompatible shape except the last dim
(1, F.convolve, (5, 2, 3), (5, 3, 3)),
(1, F.convolve, (5, 2, 3), (5, 3, 4)),
(1, F.convolve, (5, 2, 3), (5, 3, 5)),
(2, F.fftconvolve, (5, 2, 3), (5, 3, 3)),
(2, F.fftconvolve, (5, 2, 3), (5, 3, 4)),
(2, F.fftconvolve, (5, 2, 3), (5, 3, 5)),
# broadcast-able (only for convolve)
(1, F.convolve, (5, 2, 3), (5, 1, 3)),
(1, F.convolve, (5, 2, 3), (5, 1, 4)),
(1, F.convolve, (5, 2, 3), (5, 1, 5)),
# fmt: on
],
)
def test_convolve_input_leading_dim_check(self, case, fn, x_shape, y_shape):
"""Check that convolve properly rejects inputs with different leading dimensions."""
x = torch.rand(*x_shape, dtype=self.dtype, device=self.device)
y = torch.rand(*y_shape, dtype=self.dtype, device=self.device)
message = [
"The operands must be the same dimension",
"Leading dimensions of x and y don't match",
"Leading dimensions of x and y are not broadcastable",
][case]
with self.assertRaisesRegex(ValueError, message):
fn(x, y)
def test_add_noise_broadcast(self):
"""Check that add_noise produces correct outputs when broadcasting input dimensions."""
leading_dims = (5, 2, 3)
L = 51
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(5, 1, 1, L, dtype=self.dtype, device=self.device)
lengths = torch.rand(5, 1, 3, dtype=self.dtype, device=self.device)
snr = torch.rand(1, 1, 1, dtype=self.dtype, device=self.device) * 10
actual = F.add_noise(waveform, noise, snr, lengths)
noise_expanded = noise.expand(*leading_dims, L)
snr_expanded = snr.expand(*leading_dims)
lengths_expanded = lengths.expand(*leading_dims)
expected = F.add_noise(waveform, noise_expanded, snr_expanded, lengths_expanded)
self.assertEqual(expected, actual)
@parameterized.expand(
[((5, 2, 3), (2, 1, 1), (5, 2), (5, 2, 3)), ((2, 1), (5,), (5,), (5,)), ((3,), (5, 2, 3), (2, 1, 1), (5, 2))]
)
def test_add_noise_leading_dim_check(self, waveform_dims, noise_dims, lengths_dims, snr_dims):
"""Check that add_noise properly rejects inputs with different leading dimension lengths."""
L = 51
waveform = torch.rand(*waveform_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(*noise_dims, L, dtype=self.dtype, device=self.device)
lengths = torch.rand(*lengths_dims, dtype=self.dtype, device=self.device)
snr = torch.rand(*snr_dims, dtype=self.dtype, device=self.device) * 10
with self.assertRaisesRegex(ValueError, "Input leading dimensions"):
F.add_noise(waveform, noise, snr, lengths)
def test_add_noise_length_check(self):
"""Check that add_noise properly rejects inputs that have inconsistent length dimensions."""
leading_dims = (5, 2, 3)
L = 51
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device)
noise = torch.rand(*leading_dims, 50, dtype=self.dtype, device=self.device)
lengths = torch.rand(*leading_dims, dtype=self.dtype, device=self.device)
snr = torch.rand(*leading_dims, dtype=self.dtype, device=self.device) * 10
with self.assertRaisesRegex(ValueError, "Length dimensions"):
F.add_noise(waveform, noise, snr, lengths)
@nested_params(
[(2, 3), (2, 3, 5), (2, 3, 5, 7)],
["sum", "mean", "none"],
......@@ -414,62 +259,6 @@ class FunctionalTestImpl(TestBaseMixin):
self.assertEqual(hyp, ref)
def test_speed_identity(self):
"""speed of 1.0 does not alter input waveform and length"""
leading_dims = (5, 4, 2)
T = 1000
waveform = torch.rand(*leading_dims, T)
lengths = torch.randint(1, 1000, leading_dims)
actual_waveform, actual_lengths = F.speed(waveform, lengths, orig_freq=1000, factor=1.0)
self.assertEqual(waveform, actual_waveform)
self.assertEqual(lengths, actual_lengths)
@nested_params(
[0.8, 1.1, 1.2],
)
def test_speed_accuracy(self, factor):
"""sinusoidal waveform is properly compressed by factor"""
n_to_trim = 20
sample_rate = 1000
freq = 2
times = torch.arange(0, 5, 1.0 / sample_rate)
waveform = torch.cos(2 * math.pi * freq * times).unsqueeze(0).to(self.device, self.dtype)
lengths = torch.tensor([waveform.size(1)])
output, output_lengths = F.speed(waveform, lengths, orig_freq=sample_rate, factor=factor)
self.assertEqual(output.size(1), output_lengths[0])
new_times = torch.arange(0, 5 / factor, 1.0 / sample_rate)
expected_waveform = torch.cos(2 * math.pi * freq * factor * new_times).unsqueeze(0).to(self.device, self.dtype)
self.assertEqual(
expected_waveform[..., n_to_trim:-n_to_trim], output[..., n_to_trim:-n_to_trim], atol=1e-1, rtol=1e-4
)
@nested_params(
[(3, 2, 100), (95,)],
[0.97, 0.9, 0.68],
)
def test_preemphasis(self, input_shape, coeff):
waveform = torch.rand(*input_shape, device=self.device, dtype=self.dtype)
actual = F.preemphasis(waveform, coeff=coeff)
a_coeffs = torch.tensor([1.0, 0.0], device=self.device, dtype=self.dtype)
b_coeffs = torch.tensor([1.0, -coeff], device=self.device, dtype=self.dtype)
expected = lfilter(waveform, a_coeffs=a_coeffs, b_coeffs=b_coeffs)
self.assertEqual(actual, expected)
@nested_params(
[(3, 2, 100), (95,)],
[0.97, 0.9, 0.68],
)
def test_preemphasis_deemphasis_roundtrip(self, input_shape, coeff):
waveform = torch.rand(*input_shape, device=self.device, dtype=self.dtype)
preemphasized = F.preemphasis(waveform, coeff=coeff)
deemphasized = F.deemphasis(preemphasized, coeff=coeff)
self.assertEqual(deemphasized, waveform)
def test_freq_ir_warns_negative_values(self):
"""frequency_impulse_response warns negative input value"""
magnitudes = -torch.ones((1, 30), device=self.device, dtype=self.dtype)
......
......@@ -2,7 +2,7 @@ import unittest
import torch
import torchaudio.prototype.functional as F
from torchaudio_unittest.common_utils import nested_params, TestBaseMixin, torch_script
from torchaudio_unittest.common_utils import TestBaseMixin, torch_script
class TorchScriptConsistencyTestImpl(TestBaseMixin):
......@@ -25,33 +25,6 @@ class TorchScriptConsistencyTestImpl(TestBaseMixin):
output = output.shape
self.assertEqual(ts_output, output)
@nested_params(
["convolve", "fftconvolve"],
["full", "valid", "same"],
)
def test_convolve(self, fn, mode):
leading_dims = (2, 3, 2)
L_x, L_y = 32, 55
x = torch.rand(*leading_dims, L_x, dtype=self.dtype, device=self.device)
y = torch.rand(*leading_dims, L_y, dtype=self.dtype, device=self.device)
self._assert_consistency(getattr(F, fn), (x, y, mode))
@nested_params([True, False])
def test_add_noise(self, use_lengths):
leading_dims = (2, 3)
L = 31
waveform = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device, requires_grad=True)
noise = torch.rand(*leading_dims, L, dtype=self.dtype, device=self.device, requires_grad=True)
if use_lengths:
lengths = torch.rand(*leading_dims, dtype=self.dtype, device=self.device, requires_grad=True)
else:
lengths = None
snr = torch.rand(*leading_dims, dtype=self.dtype, device=self.device, requires_grad=True) * 10
self._assert_consistency(F.add_noise, (waveform, noise, snr, lengths))
def test_barkscale_fbanks(self):
if self.device != torch.device("cpu"):
raise unittest.SkipTest("No need to perform test on device other than CPU")
......@@ -86,23 +59,6 @@ class TorchScriptConsistencyTestImpl(TestBaseMixin):
self._assert_consistency(F.sinc_impulse_response, (cutoff, 513, False))
self._assert_consistency(F.sinc_impulse_response, (cutoff, 513, True))
def test_speed(self):
leading_dims = (3, 2)
T = 200
waveform = torch.rand(*leading_dims, T, dtype=self.dtype, device=self.device, requires_grad=True)
lengths = torch.randint(1, T, leading_dims, dtype=self.dtype, device=self.device)
self._assert_consistency(F.speed, (waveform, lengths, 1000, 1.1))
def test_preemphasis(self):
waveform = torch.rand(3, 2, 100, device=self.device, dtype=self.dtype)
coeff = 0.9
self._assert_consistency(F.preemphasis, (waveform, coeff))
def test_deemphasis(self):
waveform = torch.rand(3, 2, 100, device=self.device, dtype=self.dtype)
coeff = 0.9
self._assert_consistency(F.deemphasis, (waveform, coeff))
def test_freq_ir(self):
mags = torch.tensor([0, 0.5, 1.0], device=self.device, dtype=self.dtype)
self._assert_consistency(F.frequency_impulse_response, (mags,))
......@@ -7,8 +7,7 @@ import torch
import torchaudio.prototype.transforms as T
from parameterized import parameterized
from scipy import signal
from torchaudio.functional import lfilter
from torchaudio.prototype.functional import preemphasis
from torchaudio.functional import lfilter, preemphasis
from torchaudio_unittest.common_utils import get_spectrogram, get_whitenoise, nested_params, TestBaseMixin
......
......@@ -23,15 +23,19 @@ from .filtering import (
vad,
)
from .functional import (
add_noise,
amplitude_to_DB,
apply_beamforming,
apply_codec,
compute_deltas,
compute_kaldi_pitch,
convolve,
create_dct,
DB_to_amplitude,
deemphasis,
detect_pitch_frequency,
edit_distance,
fftconvolve,
griffinlim,
inverse_spectrogram,
linear_fbanks,
......@@ -45,6 +49,7 @@ from .functional import (
mvdr_weights_souden,
phase_vocoder,
pitch_shift,
preemphasis,
psd,
resample,
rnnt_loss,
......@@ -53,6 +58,7 @@ from .functional import (
sliding_window_cmn,
spectral_centroid,
spectrogram,
speed,
)
__all__ = [
......@@ -108,4 +114,10 @@ __all__ = [
"rtf_evd",
"rtf_power",
"apply_beamforming",
"fftconvolve",
"convolve",
"add_noise",
"speed",
"preemphasis",
"deemphasis",
]
......@@ -45,6 +45,12 @@ __all__ = [
"rtf_evd",
"rtf_power",
"apply_beamforming",
"fftconvolve",
"convolve",
"add_noise",
"speed",
"preemphasis",
"deemphasis",
]
......@@ -2287,3 +2293,283 @@ def apply_beamforming(beamform_weights: Tensor, specgram: Tensor) -> Tensor:
# (..., freq, channel) x (..., channel, freq, time) -> (..., freq, time)
specgram_enhanced = torch.einsum("...fc,...cft->...ft", [beamform_weights.conj(), specgram])
return specgram_enhanced
def _check_shape_compatible(x: torch.Tensor, y: torch.Tensor, allow_broadcast: bool) -> None:
if x.ndim != y.ndim:
raise ValueError(f"The operands must be the same dimension (got {x.ndim} and {y.ndim}).")
if not allow_broadcast:
if x.shape[:-1] != y.shape[:-1]:
raise ValueError(f"Leading dimensions of x and y don't match (got {x.shape} and {y.shape}).")
else:
for i in range(x.ndim - 1):
xi = x.size(i)
yi = y.size(i)
if xi == yi or xi == 1 or yi == 1:
continue
raise ValueError(f"Leading dimensions of x and y are not broadcastable (got {x.shape} and {y.shape}).")
def _check_convolve_mode(mode: str) -> None:
valid_convolve_modes = ["full", "valid", "same"]
if mode not in valid_convolve_modes:
raise ValueError(f"Unrecognized mode value '{mode}'. Please specify one of {valid_convolve_modes}.")
def _apply_convolve_mode(conv_result: torch.Tensor, x_length: int, y_length: int, mode: str) -> torch.Tensor:
valid_convolve_modes = ["full", "valid", "same"]
if mode == "full":
return conv_result
elif mode == "valid":
target_length = max(x_length, y_length) - min(x_length, y_length) + 1
start_idx = (conv_result.size(-1) - target_length) // 2
return conv_result[..., start_idx : start_idx + target_length]
elif mode == "same":
start_idx = (conv_result.size(-1) - x_length) // 2
return conv_result[..., start_idx : start_idx + x_length]
else:
raise ValueError(f"Unrecognized mode value '{mode}'. Please specify one of {valid_convolve_modes}.")
def fftconvolve(x: torch.Tensor, y: torch.Tensor, mode: str = "full") -> torch.Tensor:
r"""
Convolves inputs along their last dimension using FFT. For inputs with large last dimensions, this function
is generally much faster than :meth:`convolve`.
Note that, in contrast to :meth:`torch.nn.functional.conv1d`, which actually applies the valid cross-correlation
operator, this function applies the true `convolution`_ operator.
Also note that this function can only output float tensors (int tensor inputs will be cast to float).
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
x (torch.Tensor): First convolution operand, with shape `(..., N)`.
y (torch.Tensor): Second convolution operand, with shape `(..., M)`
(leading dimensions must be broadcast-able to those of ``x``).
mode (str, optional): Must be one of ("full", "valid", "same").
* "full": Returns the full convolution result, with shape `(..., N + M - 1)`. (Default)
* "valid": Returns the segment of the full convolution result corresponding to where
the two inputs overlap completely, with shape `(..., max(N, M) - min(N, M) + 1)`.
* "same": Returns the center segment of the full convolution result, with shape `(..., N)`.
Returns:
torch.Tensor: Result of convolving ``x`` and ``y``, with shape `(..., L)`, where
the leading dimensions match those of ``x`` and `L` is dictated by ``mode``.
.. _convolution:
https://en.wikipedia.org/wiki/Convolution
"""
_check_shape_compatible(x, y, allow_broadcast=True)
_check_convolve_mode(mode)
n = x.size(-1) + y.size(-1) - 1
fresult = torch.fft.rfft(x, n=n) * torch.fft.rfft(y, n=n)
result = torch.fft.irfft(fresult, n=n)
return _apply_convolve_mode(result, x.size(-1), y.size(-1), mode)
def convolve(x: torch.Tensor, y: torch.Tensor, mode: str = "full") -> torch.Tensor:
r"""
Convolves inputs along their last dimension using the direct method.
Note that, in contrast to :meth:`torch.nn.functional.conv1d`, which actually applies the valid cross-correlation
operator, this function applies the true `convolution`_ operator.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
x (torch.Tensor): First convolution operand, with shape `(..., N)`.
y (torch.Tensor): Second convolution operand, with shape `(..., M)`
(leading dimensions must match those of ``x``).
mode (str, optional): Must be one of ("full", "valid", "same").
* "full": Returns the full convolution result, with shape `(..., N + M - 1)`. (Default)
* "valid": Returns the segment of the full convolution result corresponding to where
the two inputs overlap completely, with shape `(..., max(N, M) - min(N, M) + 1)`.
* "same": Returns the center segment of the full convolution result, with shape `(..., N)`.
Returns:
torch.Tensor: Result of convolving ``x`` and ``y``, with shape `(..., L)`, where
the leading dimensions match those of ``x`` and `L` is dictated by ``mode``.
.. _convolution:
https://en.wikipedia.org/wiki/Convolution
"""
_check_shape_compatible(x, y, allow_broadcast=False)
_check_convolve_mode(mode)
x_size, y_size = x.size(-1), y.size(-1)
if x.size(-1) < y.size(-1):
x, y = y, x
num_signals = torch.tensor(x.shape[:-1]).prod()
reshaped_x = x.reshape((int(num_signals), x.size(-1)))
reshaped_y = y.reshape((int(num_signals), y.size(-1)))
output = torch.nn.functional.conv1d(
input=reshaped_x,
weight=reshaped_y.flip(-1).unsqueeze(1),
stride=1,
groups=reshaped_x.size(0),
padding=reshaped_y.size(-1) - 1,
)
output_shape = x.shape[:-1] + (-1,)
result = output.reshape(output_shape)
return _apply_convolve_mode(result, x_size, y_size, mode)
def add_noise(
waveform: torch.Tensor, noise: torch.Tensor, snr: torch.Tensor, lengths: Optional[torch.Tensor] = None
) -> torch.Tensor:
r"""Scales and adds noise to waveform per signal-to-noise ratio.
Specifically, for each pair of waveform vector :math:`x \in \mathbb{R}^L` and noise vector
:math:`n \in \mathbb{R}^L`, the function computes output :math:`y` as
.. math::
y = x + a n \, \text{,}
where
.. math::
a = \sqrt{ \frac{ ||x||_{2}^{2} }{ ||n||_{2}^{2} } \cdot 10^{-\frac{\text{SNR}}{10}} } \, \text{,}
with :math:`\text{SNR}` being the desired signal-to-noise ratio between :math:`x` and :math:`n`, in dB.
Note that this function broadcasts singleton leading dimensions in its inputs in a manner that is
consistent with the above formulae and PyTorch's broadcasting semantics.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
waveform (torch.Tensor): Input waveform, with shape `(..., L)`.
noise (torch.Tensor): Noise, with shape `(..., L)` (same shape as ``waveform``).
snr (torch.Tensor): Signal-to-noise ratios in dB, with shape `(...,)`.
lengths (torch.Tensor or None, optional): Valid lengths of signals in ``waveform`` and ``noise``, with shape
`(...,)` (leading dimensions must match those of ``waveform``). If ``None``, all elements in ``waveform``
and ``noise`` are treated as valid. (Default: ``None``)
Returns:
torch.Tensor: Result of scaling and adding ``noise`` to ``waveform``, with shape `(..., L)`
(same shape as ``waveform``).
"""
if not (waveform.ndim - 1 == noise.ndim - 1 == snr.ndim and (lengths is None or lengths.ndim == snr.ndim)):
raise ValueError("Input leading dimensions don't match.")
L = waveform.size(-1)
if L != noise.size(-1):
raise ValueError(f"Length dimensions of waveform and noise don't match (got {L} and {noise.size(-1)}).")
# compute scale
if lengths is not None:
mask = torch.arange(0, L, device=lengths.device).expand(waveform.shape) < lengths.unsqueeze(
-1
) # (*, L) < (*, 1) = (*, L)
masked_waveform = waveform * mask
masked_noise = noise * mask
else:
masked_waveform = waveform
masked_noise = noise
energy_signal = torch.linalg.vector_norm(masked_waveform, ord=2, dim=-1) ** 2 # (*,)
energy_noise = torch.linalg.vector_norm(masked_noise, ord=2, dim=-1) ** 2 # (*,)
original_snr_db = 10 * (torch.log10(energy_signal) - torch.log10(energy_noise))
scale = 10 ** ((original_snr_db - snr) / 20.0) # (*,)
# scale noise
scaled_noise = scale.unsqueeze(-1) * noise # (*, 1) * (*, L) = (*, L)
return waveform + scaled_noise # (*, L)
def speed(
waveform: torch.Tensor, lengths: torch.Tensor, orig_freq: int, factor: float
) -> Tuple[torch.Tensor, torch.Tensor]:
r"""Adjusts waveform speed.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
waveform (torch.Tensor): Input signals, with shape `(..., time)`.
lengths (torch.Tensor): Valid lengths of signals in ``waveform``, with shape `(...)`.
orig_freq (int): Original frequency of the signals in ``waveform``.
factor (float): Factor by which to adjust speed of input. Values greater than 1.0
compress ``waveform`` in time, whereas values less than 1.0 stretch ``waveform`` in time.
Returns:
(torch.Tensor, torch.Tensor):
torch.Tensor
Speed-adjusted waveform, with shape `(..., new_time).`
torch.Tensor
Valid lengths of signals in speed-adjusted waveform, with shape `(...)`.
"""
source_sample_rate = int(factor * orig_freq)
target_sample_rate = int(orig_freq)
gcd = math.gcd(source_sample_rate, target_sample_rate)
source_sample_rate = source_sample_rate // gcd
target_sample_rate = target_sample_rate // gcd
return resample(waveform, source_sample_rate, target_sample_rate), torch.ceil(
lengths * target_sample_rate / source_sample_rate
).to(lengths.dtype)
def preemphasis(waveform, coeff: float = 0.97) -> torch.Tensor:
r"""Pre-emphasizes a waveform along its last dimension, i.e.
for each signal :math:`x` in ``waveform``, computes
output :math:`y` as
.. math::
y[i] = x[i] - \text{coeff} \cdot x[i - 1]
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
waveform (torch.Tensor): Waveform, with shape `(..., N)`.
coeff (float, optional): Pre-emphasis coefficient. Typically between 0.0 and 1.0.
(Default: 0.97)
Returns:
torch.Tensor: Pre-emphasized waveform, with shape `(..., N)`.
"""
waveform = waveform.clone()
waveform[..., 1:] -= coeff * waveform[..., :-1]
return waveform
def deemphasis(waveform, coeff: float = 0.97) -> torch.Tensor:
r"""De-emphasizes a waveform along its last dimension.
Inverse of :meth:`preemphasis`. Concretely, for each signal
:math:`x` in ``waveform``, computes output :math:`y` as
.. math::
y[i] = x[i] + \text{coeff} \cdot y[i - 1]
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
waveform (torch.Tensor): Waveform, with shape `(..., N)`.
coeff (float, optional): De-emphasis coefficient. Typically between 0.0 and 1.0.
(Default: 0.97)
Returns:
torch.Tensor: De-emphasized waveform, with shape `(..., N)`.
"""
a_coeffs = torch.tensor([1.0, -coeff], dtype=waveform.dtype, device=waveform.device)
b_coeffs = torch.tensor([1.0, 0.0], dtype=waveform.dtype, device=waveform.device)
return torchaudio.functional.lfilter(waveform, a_coeffs=a_coeffs, b_coeffs=b_coeffs)
......@@ -6,21 +6,15 @@ from ._dsp import (
oscillator_bank,
sinc_impulse_response,
)
from .functional import add_noise, barkscale_fbanks, convolve, deemphasis, fftconvolve, preemphasis, speed
from .functional import barkscale_fbanks
__all__ = [
"add_noise",
"adsr_envelope",
"barkscale_fbanks",
"convolve",
"deemphasis",
"extend_pitch",
"fftconvolve",
"filter_waveform",
"frequency_impulse_response",
"oscillator_bank",
"preemphasis",
"sinc_impulse_response",
"speed",
]
......@@ -3,7 +3,7 @@ from typing import List, Optional, Union
import torch
from .functional import fftconvolve
from torchaudio.functional import fftconvolve
def oscillator_bank(
......
import math
import warnings
from typing import Optional, Tuple
import torch
from torchaudio.functional import lfilter, resample
from torchaudio.functional.functional import _create_triangular_filterbank
def _check_shape_compatible(x: torch.Tensor, y: torch.Tensor, allow_broadcast: bool) -> None:
if x.ndim != y.ndim:
raise ValueError(f"The operands must be the same dimension (got {x.ndim} and {y.ndim}).")
if not allow_broadcast:
if x.shape[:-1] != y.shape[:-1]:
raise ValueError(f"Leading dimensions of x and y don't match (got {x.shape} and {y.shape}).")
else:
for i in range(x.ndim - 1):
xi = x.size(i)
yi = y.size(i)
if xi == yi or xi == 1 or yi == 1:
continue
raise ValueError(f"Leading dimensions of x and y are not broadcastable (got {x.shape} and {y.shape}).")
def _check_convolve_mode(mode: str) -> None:
valid_convolve_modes = ["full", "valid", "same"]
if mode not in valid_convolve_modes:
raise ValueError(f"Unrecognized mode value '{mode}'. Please specify one of {valid_convolve_modes}.")
def _apply_convolve_mode(conv_result: torch.Tensor, x_length: int, y_length: int, mode: str) -> torch.Tensor:
valid_convolve_modes = ["full", "valid", "same"]
if mode == "full":
return conv_result
elif mode == "valid":
target_length = max(x_length, y_length) - min(x_length, y_length) + 1
start_idx = (conv_result.size(-1) - target_length) // 2
return conv_result[..., start_idx : start_idx + target_length]
elif mode == "same":
start_idx = (conv_result.size(-1) - x_length) // 2
return conv_result[..., start_idx : start_idx + x_length]
else:
raise ValueError(f"Unrecognized mode value '{mode}'. Please specify one of {valid_convolve_modes}.")
def fftconvolve(x: torch.Tensor, y: torch.Tensor, mode: str = "full") -> torch.Tensor:
r"""
Convolves inputs along their last dimension using FFT. For inputs with large last dimensions, this function
is generally much faster than :meth:`convolve`.
Note that, in contrast to :meth:`torch.nn.functional.conv1d`, which actually applies the valid cross-correlation
operator, this function applies the true `convolution`_ operator.
Also note that this function can only output float tensors (int tensor inputs will be cast to float).
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
x (torch.Tensor): First convolution operand, with shape `(..., N)`.
y (torch.Tensor): Second convolution operand, with shape `(..., M)`
(leading dimensions must be broadcast-able to those of ``x``).
mode (str, optional): Must be one of ("full", "valid", "same").
* "full": Returns the full convolution result, with shape `(..., N + M - 1)`. (Default)
* "valid": Returns the segment of the full convolution result corresponding to where
the two inputs overlap completely, with shape `(..., max(N, M) - min(N, M) + 1)`.
* "same": Returns the center segment of the full convolution result, with shape `(..., N)`.
Returns:
torch.Tensor: Result of convolving ``x`` and ``y``, with shape `(..., L)`, where
the leading dimensions match those of ``x`` and `L` is dictated by ``mode``.
.. _convolution:
https://en.wikipedia.org/wiki/Convolution
"""
_check_shape_compatible(x, y, allow_broadcast=True)
_check_convolve_mode(mode)
n = x.size(-1) + y.size(-1) - 1
fresult = torch.fft.rfft(x, n=n) * torch.fft.rfft(y, n=n)
result = torch.fft.irfft(fresult, n=n)
return _apply_convolve_mode(result, x.size(-1), y.size(-1), mode)
def convolve(x: torch.Tensor, y: torch.Tensor, mode: str = "full") -> torch.Tensor:
r"""
Convolves inputs along their last dimension using the direct method.
Note that, in contrast to :meth:`torch.nn.functional.conv1d`, which actually applies the valid cross-correlation
operator, this function applies the true `convolution`_ operator.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
x (torch.Tensor): First convolution operand, with shape `(..., N)`.
y (torch.Tensor): Second convolution operand, with shape `(..., M)`
(leading dimensions must match those of ``x``).
mode (str, optional): Must be one of ("full", "valid", "same").
* "full": Returns the full convolution result, with shape `(..., N + M - 1)`. (Default)
* "valid": Returns the segment of the full convolution result corresponding to where
the two inputs overlap completely, with shape `(..., max(N, M) - min(N, M) + 1)`.
* "same": Returns the center segment of the full convolution result, with shape `(..., N)`.
Returns:
torch.Tensor: Result of convolving ``x`` and ``y``, with shape `(..., L)`, where
the leading dimensions match those of ``x`` and `L` is dictated by ``mode``.
.. _convolution:
https://en.wikipedia.org/wiki/Convolution
"""
_check_shape_compatible(x, y, allow_broadcast=False)
_check_convolve_mode(mode)
x_size, y_size = x.size(-1), y.size(-1)
if x.size(-1) < y.size(-1):
x, y = y, x
num_signals = torch.tensor(x.shape[:-1]).prod()
reshaped_x = x.reshape((int(num_signals), x.size(-1)))
reshaped_y = y.reshape((int(num_signals), y.size(-1)))
output = torch.nn.functional.conv1d(
input=reshaped_x,
weight=reshaped_y.flip(-1).unsqueeze(1),
stride=1,
groups=reshaped_x.size(0),
padding=reshaped_y.size(-1) - 1,
)
output_shape = x.shape[:-1] + (-1,)
result = output.reshape(output_shape)
return _apply_convolve_mode(result, x_size, y_size, mode)
def add_noise(
waveform: torch.Tensor, noise: torch.Tensor, snr: torch.Tensor, lengths: Optional[torch.Tensor] = None
) -> torch.Tensor:
r"""Scales and adds noise to waveform per signal-to-noise ratio.
Specifically, for each pair of waveform vector :math:`x \in \mathbb{R}^L` and noise vector
:math:`n \in \mathbb{R}^L`, the function computes output :math:`y` as
.. math::
y = x + a n \, \text{,}
where
.. math::
a = \sqrt{ \frac{ ||x||_{2}^{2} }{ ||n||_{2}^{2} } \cdot 10^{-\frac{\text{SNR}}{10}} } \, \text{,}
with :math:`\text{SNR}` being the desired signal-to-noise ratio between :math:`x` and :math:`n`, in dB.
Note that this function broadcasts singleton leading dimensions in its inputs in a manner that is
consistent with the above formulae and PyTorch's broadcasting semantics.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
waveform (torch.Tensor): Input waveform, with shape `(..., L)`.
noise (torch.Tensor): Noise, with shape `(..., L)` (same shape as ``waveform``).
snr (torch.Tensor): Signal-to-noise ratios in dB, with shape `(...,)`.
lengths (torch.Tensor or None, optional): Valid lengths of signals in ``waveform`` and ``noise``, with shape
`(...,)` (leading dimensions must match those of ``waveform``). If ``None``, all elements in ``waveform``
and ``noise`` are treated as valid. (Default: ``None``)
Returns:
torch.Tensor: Result of scaling and adding ``noise`` to ``waveform``, with shape `(..., L)`
(same shape as ``waveform``).
"""
if not (waveform.ndim - 1 == noise.ndim - 1 == snr.ndim and (lengths is None or lengths.ndim == snr.ndim)):
raise ValueError("Input leading dimensions don't match.")
L = waveform.size(-1)
if L != noise.size(-1):
raise ValueError(f"Length dimensions of waveform and noise don't match (got {L} and {noise.size(-1)}).")
# compute scale
if lengths is not None:
mask = torch.arange(0, L, device=lengths.device).expand(waveform.shape) < lengths.unsqueeze(
-1
) # (*, L) < (*, 1) = (*, L)
masked_waveform = waveform * mask
masked_noise = noise * mask
else:
masked_waveform = waveform
masked_noise = noise
energy_signal = torch.linalg.vector_norm(masked_waveform, ord=2, dim=-1) ** 2 # (*,)
energy_noise = torch.linalg.vector_norm(masked_noise, ord=2, dim=-1) ** 2 # (*,)
original_snr_db = 10 * (torch.log10(energy_signal) - torch.log10(energy_noise))
scale = 10 ** ((original_snr_db - snr) / 20.0) # (*,)
# scale noise
scaled_noise = scale.unsqueeze(-1) * noise # (*, 1) * (*, L) = (*, L)
return waveform + scaled_noise # (*, L)
def _hz_to_bark(freqs: float, bark_scale: str = "traunmuller") -> float:
r"""Convert Hz to Barks.
......@@ -318,89 +121,3 @@ def barkscale_fbanks(
)
return fb
def speed(
waveform: torch.Tensor, lengths: torch.Tensor, orig_freq: int, factor: float
) -> Tuple[torch.Tensor, torch.Tensor]:
r"""Adjusts waveform speed.
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
waveform (torch.Tensor): Input signals, with shape `(..., time)`.
lengths (torch.Tensor): Valid lengths of signals in ``waveform``, with shape `(...)`.
orig_freq (int): Original frequency of the signals in ``waveform``.
factor (float): Factor by which to adjust speed of input. Values greater than 1.0
compress ``waveform`` in time, whereas values less than 1.0 stretch ``waveform`` in time.
Returns:
(torch.Tensor, torch.Tensor):
torch.Tensor
Speed-adjusted waveform, with shape `(..., new_time).`
torch.Tensor
Valid lengths of signals in speed-adjusted waveform, with shape `(...)`.
"""
source_sample_rate = int(factor * orig_freq)
target_sample_rate = int(orig_freq)
gcd = math.gcd(source_sample_rate, target_sample_rate)
source_sample_rate = source_sample_rate // gcd
target_sample_rate = target_sample_rate // gcd
return resample(waveform, source_sample_rate, target_sample_rate), torch.ceil(
lengths * target_sample_rate / source_sample_rate
).to(lengths.dtype)
def preemphasis(waveform, coeff: float = 0.97) -> torch.Tensor:
r"""Pre-emphasizes a waveform along its last dimension, i.e.
for each signal :math:`x` in ``waveform``, computes
output :math:`y` as
.. math::
y[i] = x[i] - \text{coeff} \cdot x[i - 1]
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
waveform (torch.Tensor): Waveform, with shape `(..., N)`.
coeff (float, optional): Pre-emphasis coefficient. Typically between 0.0 and 1.0.
(Default: 0.97)
Returns:
torch.Tensor: Pre-emphasized waveform, with shape `(..., N)`.
"""
waveform = waveform.clone()
waveform[..., 1:] -= coeff * waveform[..., :-1]
return waveform
def deemphasis(waveform, coeff: float = 0.97) -> torch.Tensor:
r"""De-emphasizes a waveform along its last dimension.
Inverse of :meth:`preemphasis`. Concretely, for each signal
:math:`x` in ``waveform``, computes output :math:`y` as
.. math::
y[i] = x[i] + \text{coeff} \cdot y[i - 1]
.. devices:: CPU CUDA
.. properties:: Autograd TorchScript
Args:
waveform (torch.Tensor): Waveform, with shape `(..., N)`.
coeff (float, optional): De-emphasis coefficient. Typically between 0.0 and 1.0.
(Default: 0.97)
Returns:
torch.Tensor: De-emphasized waveform, with shape `(..., N)`.
"""
a_coeffs = torch.tensor([1.0, -coeff], dtype=waveform.dtype, device=waveform.device)
b_coeffs = torch.tensor([1.0, 0.0], dtype=waveform.dtype, device=waveform.device)
return lfilter(waveform, a_coeffs=a_coeffs, b_coeffs=b_coeffs)
......@@ -2,8 +2,9 @@ import math
from typing import Callable, Optional, Sequence, Tuple
import torch
from torchaudio.prototype.functional import add_noise, barkscale_fbanks, convolve, deemphasis, fftconvolve, preemphasis
from torchaudio.prototype.functional.functional import _check_convolve_mode
from torchaudio.functional import add_noise, convolve, deemphasis, fftconvolve, preemphasis
from torchaudio.functional.functional import _check_convolve_mode
from torchaudio.prototype.functional import barkscale_fbanks
from torchaudio.transforms import Resample, Spectrogram
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment