Unverified Commit 60a8e23d authored by moto's avatar moto Committed by GitHub
Browse files

Add Torchscript sox effects (#760)

* Add sox_utils module

* Make init/shutdown thread safe

* Add sox effects implementation

* Add test for sox effects

* Update docstrings and add examples
parent db8f2bf3
...@@ -13,6 +13,7 @@ The :mod:`torchaudio` package consists of I/O, popular datasets and common audio ...@@ -13,6 +13,7 @@ The :mod:`torchaudio` package consists of I/O, popular datasets and common audio
kaldi_io kaldi_io
transforms transforms
functional functional
utils
.. automodule:: torchaudio .. automodule:: torchaudio
:members: :members:
...@@ -4,10 +4,16 @@ ...@@ -4,10 +4,16 @@
torchaudio.sox_effects torchaudio.sox_effects
====================== ======================
Create SoX effects chain for preprocessing audio.
.. currentmodule:: torchaudio.sox_effects .. currentmodule:: torchaudio.sox_effects
Apply SoX effects chain on torch.Tensor or on file and load as torch.Tensor.
.. autofunction:: apply_effects_tensor
.. autofunction:: apply_effects_file
Create SoX effects chain for preprocessing audio.
:hidden:`SoxEffect` :hidden:`SoxEffect`
~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......
.. role:: hidden
:class: hidden-section
torchaudio.utils.sox_utils
==========================
Utility module to configure libsox. This affects functionalities in ``sox_io`` backend and ``torchaudio.sox_effects``.
.. currentmodule:: torchaudio.utils.sox_utils
.. autofunction:: set_seed
.. autofunction:: set_verbosity
.. autofunction:: set_buffer_size
.. autofunction:: set_use_threads
.. autofunction:: list_effects
.. autofunction:: list_formats
{"effects": [["allpass", "300", "10"]]}
{"effects": [["band", "300", "10"]]}
{"effects": [["bandpass", "300", "10"]]}
{"effects": [["bandreject", "300", "10"]]}
{"effects": [["bass", "-10"]]}
{"effects": [["bend", ".35,180,.25", ".15,740,.53", "0,-520,.3"]]}
{"effects": [["biquad", "0.4", "0.2", "0.9", "0.7", "0.2", "0.6"]]}
{"effects": [["chorus", "0.7", "0.9", "55", "0.4", "0.25", "2", "-t"]]}
{"effects": [["chorus", "0.6", "0.9", "50", "0.4", "0.25", "2", "-t", "60", "0.32", "0.4", "1.3", "-s"]]}
{"effects": [["chorus", "0.5", "0.9", "50", "0.4", "0.25", "2", "-t", "60", "0.32", "0.4", "2.3", "-t", "40", "0.3", "0.3", "1.3", "-s"]]}
{"effects": [["channels", "1"]]}
{"effects": [["channels", "2"]]}
{"effects": [["channels", "3"]]}
{"effects": [["compand", "0.3,1", "6:-70,-60,-20", "-5", "-90", "0.2"]]}
{"effects": [["compand", ".1,.2", "-inf,-50.1,-inf,-50,-50", "0", "-90", ".1"]]}
{"effects": [["compand", ".1,.1", "-45.1,-45,-inf,0,-inf", "45", "-90", ".1"]]}
{"effects": [["contrast", "0"]]}
{"effects": [["contrast", "25"]]}
{"effects": [["contrast", "50"]]}
{"effects": [["contrast", "75"]]}
{"effects": [["contrast", "100"]]}
{"effects": [["dcshift", "1.0"]]}
{"effects": [["dcshift", "-1.0"]]}
{"effects": [["deemph"]], "input_sample_rate": 44100}
{"effects": [["delay", "1.5", "+1"]]}
{"effects": [["dither", "-s"]]}
{"effects": [["dither", "-S"]]}
{"effects": [["divide"]]}
{"effects": [["downsample", "2"]], "input_sample_rate": 8000, "output_sample_rate": 4000}
{"effects": [["earwax"]], "input_sample_rate": 44100}
{"effects": [["echo", "0.8", "0.88", "60", "0.4"]]}
{"effects": [["echo", "0.8", "0.88", "6", "0.4"]]}
{"effects": [["echo", "0.8", "0.9", "1000", "0.3"]]}
{"effects": [["echo", "0.8", "0.9", "1000", "0.3", "1800", "0.25"]]}
{"effects": [["echos", "0.8", "0.7", "700", "0.25", "700", "0.3"]]}
{"effects": [["echos", "0.8", "0.7", "700", "0.25", "900", "0.3"]]}
{"effects": [["echos", "0.8", "0.7", "40", "0.25", "63", "0.3"]]}
{"effects": [["equalizer", "300", "10", "5"]]}
{"effects": [["fade", "q", "3"]]}
{"effects": [["fade", "h", "3"]]}
{"effects": [["fade", "t", "3"]]}
{"effects": [["fade", "l", "3"]]}
{"effects": [["fade", "p", "3"]]}
{"effects": [["fir", "0.0195", "-0.082", "0.234", "0.891", "-0.145", "0.043"]]}
{"effects": [["fir", "test/assets/sox_effect_test_fir_coeffs.txt"]]}
{"effects": [["flanger"]]}
{"effects": [["gain", "-n"]]}
{"effects": [["gain", "-n", "-3"]]}
{"effects": [["gain", "-l", "-6"]]}
{"effects": [["highpass", "-1", "300"]]}
{"effects": [["highpass", "-2", "300"]]}
{"effects": [["hilbert"]]}
{"effects": [["loudness"]]}
{"effects": [["lowpass", "-1", "300"]]}
{"effects": [["lowpass", "-2", "300"]]}
{"effects": [["mcompand", "0.005,0.1 -47,-40,-34,-34,-17,-33", "100", "0.003,0.05 -47,-40,-34,-34,-17,-33", "400", "0.000625,0.0125 -47,-40,-34,-34,-15,-33", "1600", "0.0001,0.025 -47,-40,-34,-34,-31,-31,-0,-30", "6400", "0,0.025 -38,-31,-28,-28,-0,-25"]], "input_sample_rate": 44100}
{"effects": [["norm"]]}
{"effects": [["oops"]]}
{"effects": [["overdrive"]]}
{"effects": [["pad"]]}
{"effects": [["phaser"]]}
{"effects": [["pitch", "6.48"], ["rate", "8030"]], "output_sample_rate": 8030}
{"effects": [["pitch", "-6.50"], ["rate", "7970"]], "output_sample_rate": 7970}
{"effects": [["rate", "4567"]], "output_sample_rate": 4567}
{"effects": [["remix", "6", "7", "8", "0"]], "num_channels": 8}
{"effects": [["remix", "1-3,7", "3"]], "num_channels": 8}
{"effects": [["repeat"]]}
{"effects": [["reverb"]]}
{"effects": [["reverse"]]}
{"effects": [["riaa"]], "input_sample_rate": 44100}
{"effects": [["silence", "0"]]}
{"effects": [["sinc", "3k"]]}
{"effects": [["speed", "1.3"]], "input_sample_rate": 4000, "output_sample_rate": 5200}
{"effects": [["speed", "0.7"]], "input_sample_rate": 4000, "output_sample_rate": 2800}
{"effects": [["stat"]]}
{"effects": [["stats"]]}
{"effects": [["stretch"]]}
{"effects": [["swap"]]}
{"effects": [["synth"]]}
{"effects": [["tempo", "0.9"]]}
{"effects": [["tempo", "1.1"]]}
{"effects": [["treble", "3"]]}
{"effects": [["tremolo", "300", "40"]]}
{"effects": [["tremolo", "300", "50"]]}
{"effects": [["trim", "0", "0.1"]]}
{"effects": [["upsample", "2"]], "input_sample_rate": 8000, "output_sample_rate": 16000}
{"effects": [["vad"]]}
{"effects": [["vol", "3"]]}
0.0195 -0.082 0.234 0.891 -0.145 0.043
...@@ -72,6 +72,7 @@ def get_sinusoid( ...@@ -72,6 +72,7 @@ def get_sinusoid(
n_channels: int = 1, n_channels: int = 1,
dtype: Union[str, torch.dtype] = "float32", dtype: Union[str, torch.dtype] = "float32",
device: Union[str, torch.device] = "cpu", device: Union[str, torch.device] = "cpu",
channels_first: bool = True,
): ):
"""Generate pseudo audio data with sine wave. """Generate pseudo audio data with sine wave.
...@@ -91,4 +92,7 @@ def get_sinusoid( ...@@ -91,4 +92,7 @@ def get_sinusoid(
pie2 = 2 * 3.141592653589793 pie2 = 2 * 3.141592653589793
end = pie2 * frequency * duration end = pie2 * frequency * duration
theta = torch.linspace(0, end, sample_rate * duration, dtype=dtype, device=device) theta = torch.linspace(0, end, sample_rate * duration, dtype=dtype, device=device)
return torch.sin(theta, out=None).repeat([n_channels, 1]) sin = torch.sin(theta, out=None).repeat([n_channels, 1])
if not channels_first:
sin = sin.t()
return sin
...@@ -14,33 +14,28 @@ from .backend_utils import set_audio_backend ...@@ -14,33 +14,28 @@ from .backend_utils import set_audio_backend
class TempDirMixin: class TempDirMixin:
"""Mixin to provide easy access to temp dir""" """Mixin to provide easy access to temp dir"""
temp_dir_ = None temp_dir_ = None
base_temp_dir = None
temp_dir = None
@classmethod @property
def setUpClass(cls): def base_temp_dir(self):
super().setUpClass()
# If TORCHAUDIO_TEST_TEMP_DIR is set, use it instead of temporary directory. # If TORCHAUDIO_TEST_TEMP_DIR is set, use it instead of temporary directory.
# this is handy for debugging. # this is handy for debugging.
key = 'TORCHAUDIO_TEST_TEMP_DIR' key = 'TORCHAUDIO_TEST_TEMP_DIR'
if key in os.environ: if key in os.environ:
cls.base_temp_dir = os.environ[key] return os.environ[key]
else: if self.__class__.temp_dir_ is None:
cls.temp_dir_ = tempfile.TemporaryDirectory() self.__class__.temp_dir_ = tempfile.TemporaryDirectory()
cls.base_temp_dir = cls.temp_dir_.name return self.__class__.temp_dir_.name
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
super().tearDownClass() super().tearDownClass()
if isinstance(cls.temp_dir_, tempfile.TemporaryDirectory): if cls.temp_dir_ is not None:
cls.temp_dir_.cleanup() cls.temp_dir_.cleanup()
cls.temp_dir_ = None
def setUp(self):
super().setUp()
self.temp_dir = os.path.join(self.base_temp_dir, self.id())
def get_temp_path(self, *paths): def get_temp_path(self, *paths):
path = os.path.join(self.temp_dir, *paths) temp_dir = os.path.join(self.base_temp_dir, self.id())
path = os.path.join(temp_dir, *paths)
os.makedirs(os.path.dirname(path), exist_ok=True) os.makedirs(os.path.dirname(path), exist_ok=True)
return path return path
......
def name_func(func, _, params):
if isinstance(params.args[0], str):
args = "_".join([str(arg) for arg in params.args])
else:
args = "_".join([str(arg) for arg in params.args[0]])
return f'{func.__name__}_{args}'
from typing import List, Tuple
import numpy as np
import torch
import torchaudio
from ..common_utils import (
TempDirMixin,
PytorchTestCase,
skipIfNoExtension,
get_whitenoise,
load_wav,
save_wav,
)
class RandomPerturbationFile(torch.utils.data.Dataset):
"""Given flist, apply random speed perturbation"""
def __init__(self, flist: List[str], sample_rate: int):
super().__init__()
self.flist = flist
self.sample_rate = sample_rate
self.rng = None
def __getitem__(self, index):
speed = self.rng.uniform(0.5, 2.0)
effects = [
['gain', '-n', '-10'],
['speed', f'{speed:.5f}'], # duration of data is 0.5 ~ 2.0 seconds.
['rate', f'{self.sample_rate}'],
['pad', '0', '1.5'], # add 1.5 seconds silence at the end
['trim', '0', '2'], # get the first 2 seconds
]
data, _ = torchaudio.sox_effects.apply_effects_file(self.flist[index], effects)
return data
def __len__(self):
return len(self.flist)
class RandomPerturbationTensor(torch.utils.data.Dataset):
"""Apply speed purturbation to (synthetic) Tensor data"""
def __init__(self, signals: List[Tuple[torch.Tensor, int]], sample_rate: int):
super().__init__()
self.signals = signals
self.sample_rate = sample_rate
self.rng = None
def __getitem__(self, index):
speed = self.rng.uniform(0.5, 2.0)
effects = [
['gain', '-n', '-10'],
['speed', f'{speed:.5f}'], # duration of data is 0.5 ~ 2.0 seconds.
['rate', f'{self.sample_rate}'],
['pad', '0', '1.5'], # add 1.5 seconds silence at the end
['trim', '0', '2'], # get the first 2 seconds
]
tensor, sample_rate = self.signals[index]
data, _ = torchaudio.sox_effects.apply_effects_tensor(tensor, sample_rate, effects)
return data
def __len__(self):
return len(self.signals)
def init_random_seed(worker_id):
dataset = torch.utils.data.get_worker_info().dataset
dataset.rng = np.random.RandomState(worker_id)
@skipIfNoExtension
class TestSoxEffectsDataset(TempDirMixin, PytorchTestCase):
"""Test `apply_effects_file` in multi-process dataloader setting"""
def _generate_dataset(self, num_samples=128):
flist = []
for i in range(num_samples):
sample_rate = np.random.choice([8000, 16000, 44100])
dtype = np.random.choice(['float32', 'int32', 'int16', 'uint8'])
data = get_whitenoise(n_channels=2, sample_rate=sample_rate, duration=1, dtype=dtype)
path = self.get_temp_path(f'{i:03d}_{dtype}_{sample_rate}.wav')
save_wav(path, data, sample_rate)
flist.append(path)
return flist
def test_apply_effects_file(self):
sample_rate = 12000
flist = self._generate_dataset()
dataset = RandomPerturbationFile(flist, sample_rate)
loader = torch.utils.data.DataLoader(
dataset, batch_size=32, num_workers=16,
worker_init_fn=init_random_seed,
)
for batch in loader:
assert batch.shape == (32, 2, 2 * sample_rate)
def _generate_signals(self, num_samples=128):
signals = []
for _ in range(num_samples):
sample_rate = np.random.choice([8000, 16000, 44100])
data = get_whitenoise(
n_channels=2, sample_rate=sample_rate, duration=1, dtype='float32')
signals.append((data, sample_rate))
return signals
def test_apply_effects_tensor(self):
sample_rate = 12000
signals = self._generate_signals()
dataset = RandomPerturbationTensor(signals, sample_rate)
loader = torch.utils.data.DataLoader(
dataset, batch_size=32, num_workers=16,
worker_init_fn=init_random_seed,
)
for batch in loader:
assert batch.shape == (32, 2, 2 * sample_rate)
import itertools
from torchaudio import sox_effects
from parameterized import parameterized
from ..common_utils import (
TempDirMixin,
PytorchTestCase,
skipIfNoExtension,
get_sinusoid,
get_wav_data,
save_wav,
load_wav,
load_params,
sox_utils,
)
from .common import (
name_func,
)
@skipIfNoExtension
class TestSoxEffects(PytorchTestCase):
def test_init(self):
"""Calling init_sox_effects multiple times does not crush"""
for _ in range(3):
sox_effects.init_sox_effects()
@skipIfNoExtension
class TestSoxEffectsTensor(TempDirMixin, PytorchTestCase):
"""Test suite for `apply_effects_tensor` function"""
@parameterized.expand(list(itertools.product(
['float32', 'int32', 'int16', 'uint8'],
[8000, 16000],
[1, 2, 4, 8],
[True, False]
)), name_func=name_func)
def test_apply_no_effect(self, dtype, sample_rate, num_channels, channels_first):
"""`apply_effects_tensor` without effects should return identical data as input"""
original = get_wav_data(dtype, num_channels, channels_first=channels_first)
expected = original.clone()
found, output_sample_rate = sox_effects.apply_effects_tensor(
expected, sample_rate, [], channels_first)
assert output_sample_rate == sample_rate
# SoxEffect should not alter the input Tensor object
self.assertEqual(original, expected)
# SoxEffect should not return the same Tensor object
assert expected is not found
# Returned Tensor should equal to the input Tensor
self.assertEqual(expected, found)
@parameterized.expand(
load_params("sox_effect_test_args.json"),
name_func=lambda f, i, p: f'{f.__name__}_{i}_{p.args[0]["effects"][0][0]}',
)
def test_apply_effects(self, args):
"""`apply_effects_tensor` should return identical data as sox command"""
effects = args['effects']
num_channels = args.get("num_channels", 2)
input_sr = args.get("input_sample_rate", 8000)
output_sr = args.get("output_sample_rate")
input_path = self.get_temp_path('input.wav')
reference_path = self.get_temp_path('reference.wav')
original = get_sinusoid(
frequency=800, sample_rate=input_sr,
n_channels=num_channels, dtype='float32')
save_wav(input_path, original, input_sr)
sox_utils.run_sox_effect(
input_path, reference_path, effects, output_sample_rate=output_sr)
expected, expected_sr = load_wav(reference_path)
found, sr = sox_effects.apply_effects_tensor(original, input_sr, effects)
assert sr == expected_sr
self.assertEqual(expected, found)
@skipIfNoExtension
class TestSoxEffectsFile(TempDirMixin, PytorchTestCase):
"""Test suite for `apply_effects_file` function"""
@parameterized.expand(list(itertools.product(
['float32', 'int32', 'int16', 'uint8'],
[8000, 16000],
[1, 2, 4, 8],
[False, True],
)), name_func=name_func)
def test_apply_no_effect(self, dtype, sample_rate, num_channels, channels_first):
"""`apply_effects_file` without effects should return identical data as input"""
path = self.get_temp_path('input.wav')
expected = get_wav_data(dtype, num_channels, channels_first=channels_first)
save_wav(path, expected, sample_rate, channels_first=channels_first)
found, output_sample_rate = sox_effects.apply_effects_file(
path, [], normalize=False, channels_first=channels_first)
assert output_sample_rate == sample_rate
self.assertEqual(expected, found)
@parameterized.expand(
load_params("sox_effect_test_args.json"),
name_func=lambda f, i, p: f'{f.__name__}_{i}_{p.args[0]["effects"][0][0]}',
)
def test_apply_effects(self, args):
"""`apply_effects_file` should return identical data as sox command"""
dtype = 'int32'
channels_first = True
effects = args['effects']
num_channels = args.get("num_channels", 2)
input_sr = args.get("input_sample_rate", 8000)
output_sr = args.get("output_sample_rate")
input_path = self.get_temp_path('input.wav')
reference_path = self.get_temp_path('reference.wav')
data = get_wav_data(dtype, num_channels, channels_first=channels_first)
save_wav(input_path, data, input_sr, channels_first=channels_first)
sox_utils.run_sox_effect(
input_path, reference_path, effects, output_sample_rate=output_sr)
expected, expected_sr = load_wav(reference_path)
found, sr = sox_effects.apply_effects_file(
input_path, effects, normalize=False, channels_first=channels_first)
assert sr == expected_sr
self.assertEqual(found, expected)
@skipIfNoExtension
class TestFileFormats(TempDirMixin, PytorchTestCase):
"""`apply_effects_file` gives the same result as sox on various file formats"""
@parameterized.expand(list(itertools.product(
['float32', 'int32', 'int16', 'uint8'],
[8000, 16000],
[1, 2],
)), name_func=lambda f, _, p: f'{f.__name__}_{"_".join(str(arg) for arg in p.args)}')
def test_wav(self, dtype, sample_rate, num_channels):
"""`apply_effects_file` works on various wav format"""
channels_first = True
effects = [['band', '300', '10']]
input_path = self.get_temp_path('input.wav')
reference_path = self.get_temp_path('reference.wav')
data = get_wav_data(dtype, num_channels, channels_first=channels_first)
save_wav(input_path, data, sample_rate, channels_first=channels_first)
sox_utils.run_sox_effect(input_path, reference_path, effects)
expected, expected_sr = load_wav(reference_path)
found, sr = sox_effects.apply_effects_file(
input_path, effects, normalize=False, channels_first=channels_first)
assert sr == expected_sr
self.assertEqual(found, expected)
@parameterized.expand(list(itertools.product(
[8000, 16000],
[1, 2],
)), name_func=lambda f, _, p: f'{f.__name__}_{"_".join(str(arg) for arg in p.args)}')
def test_mp3(self, sample_rate, num_channels):
"""`apply_effects_file` works on various mp3 format"""
channels_first = True
effects = [['band', '300', '10']]
input_path = self.get_temp_path('input.mp3')
reference_path = self.get_temp_path('reference.wav')
sox_utils.gen_audio_file(input_path, sample_rate, num_channels)
sox_utils.run_sox_effect(input_path, reference_path, effects)
expected, expected_sr = load_wav(reference_path)
found, sr = sox_effects.apply_effects_file(
input_path, effects, channels_first=channels_first)
save_wav(self.get_temp_path('result.wav'), found, sr, channels_first=channels_first)
assert sr == expected_sr
self.assertEqual(found, expected, atol=1e-4, rtol=1e-8)
@parameterized.expand(list(itertools.product(
[8000, 16000],
[1, 2],
)), name_func=lambda f, _, p: f'{f.__name__}_{"_".join(str(arg) for arg in p.args)}')
def test_flac(self, sample_rate, num_channels):
"""`apply_effects_file` works on various flac format"""
channels_first = True
effects = [['band', '300', '10']]
input_path = self.get_temp_path('input.flac')
reference_path = self.get_temp_path('reference.wav')
sox_utils.gen_audio_file(input_path, sample_rate, num_channels)
sox_utils.run_sox_effect(input_path, reference_path, effects, output_bitdepth=32)
expected, expected_sr = load_wav(reference_path)
found, sr = sox_effects.apply_effects_file(
input_path, effects, channels_first=channels_first)
save_wav(self.get_temp_path('result.wav'), found, sr, channels_first=channels_first)
assert sr == expected_sr
self.assertEqual(found, expected)
@parameterized.expand(list(itertools.product(
[8000, 16000],
[1, 2],
)), name_func=lambda f, _, p: f'{f.__name__}_{"_".join(str(arg) for arg in p.args)}')
def test_vorbis(self, sample_rate, num_channels):
"""`apply_effects_file` works on various vorbis format"""
channels_first = True
effects = [['band', '300', '10']]
input_path = self.get_temp_path('input.vorbis')
reference_path = self.get_temp_path('reference.wav')
sox_utils.gen_audio_file(input_path, sample_rate, num_channels)
sox_utils.run_sox_effect(input_path, reference_path, effects, output_bitdepth=32)
expected, expected_sr = load_wav(reference_path)
found, sr = sox_effects.apply_effects_file(
input_path, effects, channels_first=channels_first)
save_wav(self.get_temp_path('result.wav'), found, sr, channels_first=channels_first)
assert sr == expected_sr
self.assertEqual(found, expected)
from typing import List
import torch
from torchaudio import sox_effects
from parameterized import parameterized
from ..common_utils import (
TempDirMixin,
PytorchTestCase,
skipIfNoExtension,
get_sinusoid,
load_params,
save_wav,
)
class SoxEffectTensorTransform(torch.nn.Module):
effects: List[List[str]]
def __init__(self, effects: List[List[str]], sample_rate: int, channels_first: bool):
super().__init__()
self.effects = effects
self.sample_rate = sample_rate
self.channels_first = channels_first
def forward(self, tensor: torch.Tensor):
return sox_effects.apply_effects_tensor(
tensor, self.sample_rate, self.effects, self.channels_first)
class SoxEffectFileTransform(torch.nn.Module):
effects: List[List[str]]
channels_first: bool
def __init__(self, effects: List[List[str]], channels_first: bool):
super().__init__()
self.effects = effects
self.channels_first = channels_first
def forward(self, path: str):
return sox_effects.apply_effects_file(path, self.effects, self.channels_first)
@skipIfNoExtension
class TestTorchScript(TempDirMixin, PytorchTestCase):
@parameterized.expand(
load_params("sox_effect_test_args.json"),
name_func=lambda f, i, p: f'{f.__name__}_{i}_{p.args[0]["effects"][0][0]}',
)
def test_apply_effects_tensor(self, args):
effects = args['effects']
channels_first = True
num_channels = args.get("num_channels", 2)
input_sr = args.get("input_sample_rate", 8000)
trans = SoxEffectTensorTransform(effects, input_sr, channels_first)
path = self.get_temp_path('sox_effect.zip')
torch.jit.script(trans).save(path)
trans = torch.jit.load(path)
wav = get_sinusoid(
frequency=800, sample_rate=input_sr,
n_channels=num_channels, dtype='float32', channels_first=channels_first)
found, sr_found = trans(wav)
expected, sr_expected = sox_effects.apply_effects_tensor(
wav, input_sr, effects, channels_first)
assert sr_found == sr_expected
self.assertEqual(expected, found)
@parameterized.expand(
load_params("sox_effect_test_args.json"),
name_func=lambda f, i, p: f'{f.__name__}_{i}_{p.args[0]["effects"][0][0]}',
)
def test_apply_effects_file(self, args):
effects = args['effects']
channels_first = True
num_channels = args.get("num_channels", 2)
input_sr = args.get("input_sample_rate", 8000)
trans = SoxEffectFileTransform(effects, channels_first)
path = self.get_temp_path('sox_effect.zip')
torch.jit.script(trans).save(path)
trans = torch.jit.load(path)
path = self.get_temp_path('input.wav')
wav = get_sinusoid(
frequency=800, sample_rate=input_sr,
n_channels=num_channels, dtype='float32', channels_first=channels_first)
save_wav(path, wav, sample_rate=input_sr, channels_first=channels_first)
found, sr_found = trans(path)
expected, sr_expected = sox_effects.apply_effects_file(path, effects, channels_first)
assert sr_found == sr_expected
self.assertEqual(expected, found)
from torchaudio.utils import sox_utils
from ..common_utils import (
PytorchTestCase,
skipIfNoExtension,
)
@skipIfNoExtension
class TestSoxUtils(PytorchTestCase):
"""Smoke tests for sox_util module"""
def test_set_seed(self):
"""`set_seed` does not crush"""
sox_utils.set_seed(0)
def test_set_verbosity(self):
"""`set_verbosity` does not crush"""
for val in range(6, 0, -1):
sox_utils.set_verbosity(val)
def test_set_buffer_size(self):
"""`set_buffer_size` does not crush"""
sox_utils.set_buffer_size(131072)
# back to default
sox_utils.set_buffer_size(8192)
def test_set_use_threads(self):
"""`set_use_threads` does not crush"""
sox_utils.set_use_threads(True)
# back to default
sox_utils.set_use_threads(False)
def test_list_effects(self):
"""`list_effects` returns the list of available effects"""
effects = sox_utils.list_effects()
# We cannot infer what effects are available, so only check some of them.
assert 'highpass' in effects
assert 'phaser' in effects
assert 'gain' in effects
def test_list_formats(self):
"""`list_formats` returns the list of supported formats"""
formats = sox_utils.list_formats()
assert 'wav' in formats
...@@ -4,6 +4,7 @@ from torchaudio import ( ...@@ -4,6 +4,7 @@ from torchaudio import (
compliance, compliance,
datasets, datasets,
kaldi_io, kaldi_io,
utils,
sox_effects, sox_effects,
transforms transforms
) )
......
...@@ -18,6 +18,17 @@ static auto registerTensorSignal = ...@@ -18,6 +18,17 @@ static auto registerTensorSignal =
.def("get_sample_rate", &sox_utils::TensorSignal::getSampleRate) .def("get_sample_rate", &sox_utils::TensorSignal::getSampleRate)
.def("get_channels_first", &sox_utils::TensorSignal::getChannelsFirst); .def("get_channels_first", &sox_utils::TensorSignal::getChannelsFirst);
static auto registerSetSoxOptions =
torch::RegisterOperators()
.op("torchaudio::sox_utils_set_seed", &sox_utils::set_seed)
.op("torchaudio::sox_utils_set_verbosity", &sox_utils::set_verbosity)
.op("torchaudio::sox_utils_set_use_threads",
&sox_utils::set_use_threads)
.op("torchaudio::sox_utils_set_buffer_size",
&sox_utils::set_buffer_size)
.op("torchaudio::sox_utils_list_effects", &sox_utils::list_effects)
.op("torchaudio::sox_utils_list_formats", &sox_utils::list_formats);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// sox_io.h // sox_io.h
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
...@@ -53,12 +64,23 @@ static auto registerSaveAudioFile = torch::RegisterOperators().op( ...@@ -53,12 +64,23 @@ static auto registerSaveAudioFile = torch::RegisterOperators().op(
// sox_effects.h // sox_effects.h
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static auto registerSoxEffects = static auto registerSoxEffects =
torch::RegisterOperators( torch::RegisterOperators()
"torchaudio::sox_effects_initialize_sox_effects", .op("torchaudio::sox_effects_initialize_sox_effects",
&sox_effects::initialize_sox_effects) &sox_effects::initialize_sox_effects)
.op("torchaudio::sox_effects_shutdown_sox_effects", .op("torchaudio::sox_effects_shutdown_sox_effects",
&sox_effects::shutdown_sox_effects) &sox_effects::shutdown_sox_effects)
.op("torchaudio::sox_effects_list_effects", &sox_effects::list_effects); .op(torch::RegisterOperators::options()
.schema(
"torchaudio::sox_effects_apply_effects_tensor(__torch__.torch.classes.torchaudio.TensorSignal input_signal, str[][] effects) -> __torch__.torch.classes.torchaudio.TensorSignal output_signal")
.catchAllKernel<
decltype(sox_effects::apply_effects_tensor),
&sox_effects::apply_effects_tensor>())
.op(torch::RegisterOperators::options()
.schema(
"torchaudio::sox_effects_apply_effects_file(str path, str[][] effects, bool normalize, bool channels_first) -> __torch__.torch.classes.torchaudio.TensorSignal output_signal")
.catchAllKernel<
decltype(sox_effects::apply_effects_file),
&sox_effects::apply_effects_file>());
} // namespace } // namespace
} // namespace torchaudio } // namespace torchaudio
......
#include <sox.h> #include <sox.h>
#include <torchaudio/csrc/sox_effects.h> #include <torchaudio/csrc/sox_effects.h>
#include <torchaudio/csrc/sox_effects_chain.h>
#include <torchaudio/csrc/sox_utils.h>
using namespace torch::indexing; using namespace torchaudio::sox_utils;
namespace torchaudio { namespace torchaudio {
namespace sox_effects { namespace sox_effects {
...@@ -10,44 +12,125 @@ namespace { ...@@ -10,44 +12,125 @@ namespace {
enum SoxEffectsResourceState { NotInitialized, Initialized, ShutDown }; enum SoxEffectsResourceState { NotInitialized, Initialized, ShutDown };
SoxEffectsResourceState SOX_RESOURCE_STATE = NotInitialized; SoxEffectsResourceState SOX_RESOURCE_STATE = NotInitialized;
std::mutex SOX_RESOUCE_STATE_MUTEX;
} // namespace } // namespace
void initialize_sox_effects() { void initialize_sox_effects() {
if (SOX_RESOURCE_STATE == ShutDown) { const std::lock_guard<std::mutex> lock(SOX_RESOUCE_STATE_MUTEX);
throw std::runtime_error(
"SoX Effects has been shut down. Cannot initialize again."); switch (SOX_RESOURCE_STATE) {
} case NotInitialized:
if (SOX_RESOURCE_STATE == NotInitialized) {
if (sox_init() != SOX_SUCCESS) { if (sox_init() != SOX_SUCCESS) {
throw std::runtime_error("Failed to initialize sox effects."); throw std::runtime_error("Failed to initialize sox effects.");
}; };
SOX_RESOURCE_STATE = Initialized; SOX_RESOURCE_STATE = Initialized;
case Initialized:
break;
case ShutDown:
throw std::runtime_error(
"SoX Effects has been shut down. Cannot initialize again.");
} }
}; };
void shutdown_sox_effects() { void shutdown_sox_effects() {
if (SOX_RESOURCE_STATE == NotInitialized) { const std::lock_guard<std::mutex> lock(SOX_RESOUCE_STATE_MUTEX);
switch (SOX_RESOURCE_STATE) {
case NotInitialized:
throw std::runtime_error( throw std::runtime_error(
"SoX Effects is not initialized. Cannot shutdown."); "SoX Effects is not initialized. Cannot shutdown.");
} case Initialized:
if (SOX_RESOURCE_STATE == Initialized) {
if (sox_quit() != SOX_SUCCESS) { if (sox_quit() != SOX_SUCCESS) {
throw std::runtime_error("Failed to initialize sox effects."); throw std::runtime_error("Failed to initialize sox effects.");
}; };
SOX_RESOURCE_STATE = ShutDown; SOX_RESOURCE_STATE = ShutDown;
case ShutDown:
break;
} }
} }
std::vector<std::string> list_effects() { c10::intrusive_ptr<TensorSignal> apply_effects_tensor(
std::vector<std::string> names; const c10::intrusive_ptr<TensorSignal>& input_signal,
const sox_effect_fn_t* fns = sox_get_effect_fns(); std::vector<std::vector<std::string>> effects) {
for (int i = 0; fns[i]; ++i) { auto in_tensor = input_signal->getTensor();
const sox_effect_handler_t* handler = fns[i](); validate_input_tensor(in_tensor);
if (handler && handler->name)
names.push_back(handler->name); // Create SoxEffectsChain
const auto dtype = in_tensor.dtype();
torchaudio::sox_effects_chain::SoxEffectsChain chain(
/*input_encoding=*/get_encodinginfo("wav", dtype, 0.),
/*output_encoding=*/get_encodinginfo("wav", dtype, 0.));
// Prepare output buffer
std::vector<sox_sample_t> out_buffer;
out_buffer.reserve(in_tensor.numel());
// Build and run effects chain
chain.addInputTensor(input_signal.get());
for (const auto& effect : effects) {
chain.addEffect(effect);
} }
return names; chain.addOutputBuffer(&out_buffer);
chain.run();
// Create tensor from buffer
const auto channels_first = input_signal->getChannelsFirst();
auto out_tensor = convert_to_tensor(
/*buffer=*/out_buffer.data(),
/*num_samples=*/out_buffer.size(),
/*num_channels=*/chain.getOutputNumChannels(),
dtype,
/*noramlize=*/false,
channels_first);
return c10::make_intrusive<TensorSignal>(
out_tensor, chain.getOutputSampleRate(), channels_first);
}
c10::intrusive_ptr<TensorSignal> apply_effects_file(
const std::string path,
std::vector<std::vector<std::string>> effects,
const bool normalize,
const bool channels_first) {
// Open input file
SoxFormat sf(sox_open_read(
path.c_str(),
/*signal=*/nullptr,
/*encoding=*/nullptr,
/*filetype=*/nullptr));
validate_input_file(sf);
const auto dtype = get_dtype(sf->encoding.encoding, sf->signal.precision);
// Prepare output
std::vector<sox_sample_t> out_buffer;
out_buffer.reserve(sf->signal.length);
// Create and run SoxEffectsChain
torchaudio::sox_effects_chain::SoxEffectsChain chain(
/*input_encoding=*/sf->encoding,
/*output_encoding=*/get_encodinginfo("wav", dtype, 0.));
chain.addInputFile(sf);
for (const auto& effect : effects) {
chain.addEffect(effect);
}
chain.addOutputBuffer(&out_buffer);
chain.run();
// Create tensor from buffer
auto tensor = convert_to_tensor(
/*buffer=*/out_buffer.data(),
/*num_samples=*/out_buffer.size(),
/*num_channels=*/chain.getOutputNumChannels(),
dtype,
normalize,
channels_first);
return c10::make_intrusive<TensorSignal>(
tensor, chain.getOutputSampleRate(), channels_first);
} }
} // namespace sox_effects } // namespace sox_effects
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#define TORCHAUDIO_SOX_EFFECTS_H #define TORCHAUDIO_SOX_EFFECTS_H
#include <torch/script.h> #include <torch/script.h>
#include <torchaudio/csrc/sox_utils.h>
namespace torchaudio { namespace torchaudio {
namespace sox_effects { namespace sox_effects {
...@@ -10,7 +11,15 @@ void initialize_sox_effects(); ...@@ -10,7 +11,15 @@ void initialize_sox_effects();
void shutdown_sox_effects(); void shutdown_sox_effects();
std::vector<std::string> list_effects(); c10::intrusive_ptr<torchaudio::sox_utils::TensorSignal> apply_effects_tensor(
const c10::intrusive_ptr<torchaudio::sox_utils::TensorSignal>& input_signal,
std::vector<std::vector<std::string>> effects);
c10::intrusive_ptr<torchaudio::sox_utils::TensorSignal> apply_effects_file(
const std::string path,
std::vector<std::vector<std::string>> effects,
const bool normalize = true,
const bool channels_first = true);
} // namespace sox_effects } // namespace sox_effects
} // namespace torchaudio } // namespace torchaudio
......
#include <torchaudio/csrc/sox_effects_chain.h>
#include <torchaudio/csrc/sox_utils.h>
using namespace torch::indexing;
using namespace torchaudio::sox_utils;
namespace torchaudio {
namespace sox_effects_chain {
namespace {
// Helper struct to safely close sox_effect_t* pointer returned by
// sox_create_effect
struct SoxEffect {
explicit SoxEffect(sox_effect_t* se) noexcept : se_(se){};
SoxEffect(const SoxEffect& other) = delete;
SoxEffect(const SoxEffect&& other) = delete;
SoxEffect& operator=(const SoxEffect& other) = delete;
SoxEffect& operator=(SoxEffect&& other) = delete;
~SoxEffect() {
if (se_ != nullptr) {
free(se_);
}
}
operator sox_effect_t*() const {
return se_;
};
sox_effect_t* operator->() noexcept {
return se_;
}
private:
sox_effect_t* se_;
};
/// helper classes for passing the location of input tensor and output buffer
///
/// drain/flow callback functions require plaing C style function signature and
/// the way to pass extra data is to attach data to sox_fffect_t::priv pointer.
/// The following structs will be assigned to sox_fffect_t::priv pointer which
/// gives sox_effect_t an access to input Tensor and output buffer object.
struct TensorInputPriv {
size_t index;
TensorSignal* signal;
};
struct TensorOutputPriv {
std::vector<sox_sample_t>* buffer;
};
/// Callback function to feed Tensor data to SoxEffectChain.
int tensor_input_drain(sox_effect_t* effp, sox_sample_t* obuf, size_t* osamp) {
// Retrieve the input Tensor and current index
auto priv = static_cast<TensorInputPriv*>(effp->priv);
auto index = priv->index;
auto signal = priv->signal;
auto tensor = signal->getTensor();
auto num_channels = effp->out_signal.channels;
// Adjust the number of samples to read
const size_t num_samples = tensor.numel();
if (index + *osamp > num_samples) {
*osamp = num_samples - index;
}
// Ensure that it's a multiple of the number of channels
*osamp -= *osamp % num_channels;
// Slice the input Tensor and unnormalize the values
const auto tensor_ = [&]() {
auto i_frame = index / num_channels;
auto num_frames = *osamp / num_channels;
auto t = (signal->getChannelsFirst())
? tensor.index({Slice(), Slice(i_frame, i_frame + num_frames)}).t()
: tensor.index({Slice(i_frame, i_frame + num_frames), Slice()});
return unnormalize_wav(t.reshape({-1})).contiguous();
}();
priv->index += *osamp;
// Write data to SoxEffectsChain buffer.
auto ptr = tensor_.data_ptr<int32_t>();
std::copy(ptr, ptr + *osamp, obuf);
return (priv->index == num_samples) ? SOX_EOF : SOX_SUCCESS;
}
/// Callback function to fetch data from SoxEffectChain.
int tensor_output_flow(
sox_effect_t* effp LSX_UNUSED,
sox_sample_t const* ibuf,
sox_sample_t* obuf LSX_UNUSED,
size_t* isamp,
size_t* osamp) {
*osamp = 0;
// Get output buffer
auto out_buffer = static_cast<TensorOutputPriv*>(effp->priv)->buffer;
// Append at the end
out_buffer->insert(out_buffer->end(), ibuf, ibuf + *isamp);
return SOX_SUCCESS;
}
sox_effect_handler_t* get_tensor_input_handler() {
static sox_effect_handler_t handler{/*name=*/"input_tensor",
/*usage=*/NULL,
/*flags=*/SOX_EFF_MCHAN,
/*getopts=*/NULL,
/*start=*/NULL,
/*flow=*/NULL,
/*drain=*/tensor_input_drain,
/*stop=*/NULL,
/*kill=*/NULL,
/*priv_size=*/sizeof(TensorInputPriv)};
return &handler;
}
sox_effect_handler_t* get_tensor_output_handler() {
static sox_effect_handler_t handler{/*name=*/"output_tensor",
/*usage=*/NULL,
/*flags=*/SOX_EFF_MCHAN,
/*getopts=*/NULL,
/*start=*/NULL,
/*flow=*/tensor_output_flow,
/*drain=*/NULL,
/*stop=*/NULL,
/*kill=*/NULL,
/*priv_size=*/sizeof(TensorOutputPriv)};
return &handler;
}
} // namespace
SoxEffectsChain::SoxEffectsChain(
sox_encodinginfo_t input_encoding,
sox_encodinginfo_t output_encoding)
: in_enc_(input_encoding),
out_enc_(output_encoding),
in_sig_(),
interm_sig_(),
sec_(sox_create_effects_chain(&in_enc_, &out_enc_)) {
if (!sec_) {
throw std::runtime_error("Failed to create effect chain.");
}
}
SoxEffectsChain::~SoxEffectsChain() {
if (sec_ != nullptr) {
sox_delete_effects_chain(sec_);
}
}
void SoxEffectsChain::run() {
sox_flow_effects(sec_, NULL, NULL);
}
void SoxEffectsChain::addInputTensor(TensorSignal* signal) {
in_sig_ = get_signalinfo(signal, "wav");
interm_sig_ = in_sig_;
SoxEffect e(sox_create_effect(get_tensor_input_handler()));
auto priv = static_cast<TensorInputPriv*>(e->priv);
priv->signal = signal;
priv->index = 0;
if (sox_add_effect(sec_, e, &interm_sig_, &in_sig_) != SOX_SUCCESS) {
throw std::runtime_error("Failed to add effect: input_tensor");
}
}
void SoxEffectsChain::addOutputBuffer(
std::vector<sox_sample_t>* output_buffer) {
SoxEffect e(sox_create_effect(get_tensor_output_handler()));
static_cast<TensorOutputPriv*>(e->priv)->buffer = output_buffer;
if (sox_add_effect(sec_, e, &interm_sig_, &in_sig_) != SOX_SUCCESS) {
throw std::runtime_error("Failed to add effect: output_tensor");
}
}
void SoxEffectsChain::addInputFile(sox_format_t* sf) {
in_sig_ = sf->signal;
interm_sig_ = in_sig_;
SoxEffect e(sox_create_effect(sox_find_effect("input")));
char* opts[] = {(char*)sf};
sox_effect_options(e, 1, opts);
if (sox_add_effect(sec_, e, &interm_sig_, &in_sig_) != SOX_SUCCESS) {
std::ostringstream stream;
stream << "Failed to add effect: input " << sf->filename;
throw std::runtime_error(stream.str());
}
}
void SoxEffectsChain::addEffect(const std::vector<std::string> effect) {
const auto num_args = effect.size();
if (num_args == 0) {
throw std::runtime_error("Invalid argument: empty effect.");
}
const auto name = effect[0];
if (UNSUPPORTED_EFFECTS.find(name) != UNSUPPORTED_EFFECTS.end()) {
std::ostringstream stream;
stream << "Unsupported effect: " << name;
throw std::runtime_error(stream.str());
}
SoxEffect e(sox_create_effect(sox_find_effect(name.c_str())));
const auto num_options = num_args - 1;
std::vector<char*> opts;
for (size_t i = 1; i < num_args; ++i) {
opts.push_back((char*)effect[i].c_str());
}
if (sox_effect_options(e, num_options, num_options ? opts.data() : nullptr) !=
SOX_SUCCESS) {
std::ostringstream stream;
stream << "Invalid effect option:";
for (const auto& v : effect) {
stream << " " << v;
}
throw std::runtime_error(stream.str());
}
if (sox_add_effect(sec_, e, &interm_sig_, &in_sig_) != SOX_SUCCESS) {
std::ostringstream stream;
stream << "Failed to add effect: \"" << name;
for (size_t i = 1; i < num_args; ++i) {
stream << " " << effect[i];
}
stream << "\"";
throw std::runtime_error(stream.str());
}
}
int64_t SoxEffectsChain::getOutputNumChannels() {
return interm_sig_.channels;
}
int64_t SoxEffectsChain::getOutputSampleRate() {
return interm_sig_.rate;
}
} // namespace sox_effects_chain
} // namespace torchaudio
#ifndef TORCHAUDIO_SOX_EFFECTS_CHAIN_H
#define TORCHAUDIO_SOX_EFFECTS_CHAIN_H
#include <sox.h>
#include <torch/script.h>
#include <torchaudio/csrc/sox_utils.h>
namespace torchaudio {
namespace sox_effects_chain {
// Helper struct to safely close sox_effects_chain_t with handy methods
class SoxEffectsChain {
const sox_encodinginfo_t in_enc_;
const sox_encodinginfo_t out_enc_;
sox_signalinfo_t in_sig_;
sox_signalinfo_t interm_sig_;
sox_effects_chain_t* sec_;
public:
explicit SoxEffectsChain(
sox_encodinginfo_t input_encoding,
sox_encodinginfo_t output_encoding);
SoxEffectsChain(const SoxEffectsChain& other) = delete;
SoxEffectsChain(const SoxEffectsChain&& other) = delete;
SoxEffectsChain& operator=(const SoxEffectsChain& other) = delete;
SoxEffectsChain& operator=(SoxEffectsChain&& other) = delete;
~SoxEffectsChain();
void run();
void addInputTensor(torchaudio::sox_utils::TensorSignal* signal);
void addInputFile(sox_format_t* sf);
void addOutputBuffer(std::vector<sox_sample_t>* output_buffer);
void addEffect(const std::vector<std::string> effect);
int64_t getOutputNumChannels();
int64_t getOutputSampleRate();
};
} // namespace sox_effects_chain
} // namespace torchaudio
#endif
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment