"docs/vscode:/vscode.git/clone" did not exist on "9658305f1c2c1e17da6ea985a5d79ee657b4a5de"
Unverified Commit 0076ab07 authored by moto's avatar moto Committed by GitHub
Browse files

Remove legacy sox effects (#977)

parent fa2e4fd4
...@@ -5,10 +5,6 @@ torchaudio.sox_effects ...@@ -5,10 +5,6 @@ torchaudio.sox_effects
.. currentmodule:: torchaudio.sox_effects .. currentmodule:: torchaudio.sox_effects
.. warning::
The :py:class:`SoxEffect` and :py:class:`SoxEffectsChain` classes are deprecated. Please migrate to :func:`apply_effects_tensor` and :func:`apply_effects_file`.
Resource initialization / shutdown Resource initialization / shutdown
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
...@@ -35,18 +31,3 @@ Applying effects on file ...@@ -35,18 +31,3 @@ Applying effects on file
------------------------ ------------------------
.. autofunction:: apply_effects_file .. autofunction:: apply_effects_file
Legacy
~~~~~~
SoxEffect
---------
.. autoclass:: SoxEffect
:members:
SoxEffectsChain
---------------
.. autoclass:: SoxEffectsChain
:members: append_effect_to_chain, sox_build_flow_effects, clear_chain, set_input_file
import unittest
import torchaudio
from torch.utils.data import Dataset, DataLoader
from torchaudio_unittest import common_utils
class TORCHAUDIODS(Dataset):
def __init__(self):
sound_files = ["sinewave.wav", "steam-train-whistle-daniel_simon.mp3"]
self.data = [common_utils.get_asset_path(fn) for fn in sound_files]
self.si, self.ei = torchaudio.info(common_utils.get_asset_path("sinewave.wav"))
self.si.precision = 16
self.E = torchaudio.sox_effects.SoxEffectsChain()
self.E.append_effect_to_chain("rate", [self.si.rate]) # resample to 16000hz
self.E.append_effect_to_chain("channels", [self.si.channels]) # mono signal
self.E.append_effect_to_chain("trim", [0, "16000s"]) # first 16000 samples of audio
def __getitem__(self, index):
fn = self.data[index]
self.E.set_input_file(fn)
x, sr = self.E.sox_build_flow_effects()
return x
def __len__(self):
return len(self.data)
class Test_DataLoader(common_utils.TorchaudioTestCase):
backend = 'sox'
@common_utils.skipIfNoSoxBackend
def test_1(self):
expected_size = (2, 1, 16000)
ds = TORCHAUDIODS()
dl = DataLoader(ds, batch_size=2)
for x in dl:
self.assertTrue(x.size() == expected_size)
import sys
import math
import unittest
import torch
import torchaudio
from .. import common_utils
@common_utils.skipIfNoSoxBackend
class Test_SoxEffectsChain(common_utils.TorchaudioTestCase):
backend = 'sox'
test_filepath = common_utils.get_asset_path("steam-train-whistle-daniel_simon.wav")
def test_single_channel(self):
fn_sine = common_utils.get_asset_path("sinewave.wav")
E = torchaudio.sox_effects.SoxEffectsChain()
E.set_input_file(fn_sine)
E.append_effect_to_chain("echos", [0.8, 0.7, 40, 0.25, 63, 0.3])
x, sr = E.sox_build_flow_effects()
# check if effects worked
# print(x.size())
def test_rate_channels(self):
target_rate = 16000
target_channels = 1
E = torchaudio.sox_effects.SoxEffectsChain()
E.set_input_file(self.test_filepath)
E.append_effect_to_chain("rate", [target_rate])
E.append_effect_to_chain("channels", [target_channels])
x, sr = E.sox_build_flow_effects()
# check if effects worked
self.assertEqual(sr, target_rate)
self.assertEqual(x.size(0), target_channels)
@unittest.skipIf(sys.platform == 'darwin', 'This test is known to fail on macOS')
def test_lowpass_speed(self):
speed = .8
si, _ = torchaudio.info(self.test_filepath)
E = torchaudio.sox_effects.SoxEffectsChain()
E.set_input_file(self.test_filepath)
E.append_effect_to_chain("lowpass", 100)
E.append_effect_to_chain("speed", speed)
E.append_effect_to_chain("rate", si.rate)
x, sr = E.sox_build_flow_effects()
# check if effects worked, add small tolerance for rounding effects
self.assertEqual(x.size(1), int((si.length / si.channels) / speed), atol=1, rtol=1e-8)
def test_ulaw_and_siginfo(self):
si_out = torchaudio.sox_signalinfo_t()
ei_out = torchaudio.sox_encodinginfo_t()
si_out.precision = 8
ei_out.encoding = torchaudio.get_sox_encoding_t(9)
ei_out.bits_per_sample = 8
si_in, ei_in = torchaudio.info(self.test_filepath)
si_out.rate = 44100
si_out.channels = 2
E = torchaudio.sox_effects.SoxEffectsChain(out_siginfo=si_out, out_encinfo=ei_out)
E.set_input_file(self.test_filepath)
x, sr = E.sox_build_flow_effects()
# Note: the output was encoded into ulaw because the
# number of unique values in the output is less than 256.
self.assertLess(x.unique().size(0), 2**8 + 1)
self.assertEqual(x.numel(), si_in.length)
def test_band_chorus(self):
si_in, ei_in = torchaudio.info(self.test_filepath)
ei_in.encoding = torchaudio.get_sox_encoding_t(1)
E = torchaudio.sox_effects.SoxEffectsChain(out_encinfo=ei_in, out_siginfo=si_in)
E.set_input_file(self.test_filepath)
E.append_effect_to_chain("band", ["-n", "10k", "3.5k"])
E.append_effect_to_chain("chorus", [.5, .7, 55, 0.4, .25, 2, '-s'])
E.append_effect_to_chain("rate", [si_in.rate])
E.append_effect_to_chain("channels", [si_in.channels])
x, sr = E.sox_build_flow_effects()
# The chorus effect will make the output file longer than the input
self.assertEqual(x.size(0), si_in.channels)
self.assertGreaterEqual(x.size(1) * x.size(0), si_in.length)
def test_synth(self):
si_in, ei_in = torchaudio.info(self.test_filepath)
len_in_seconds = si_in.length / si_in.channels / si_in.rate
ei_in.encoding = torchaudio.get_sox_encoding_t(1)
E = torchaudio.sox_effects.SoxEffectsChain(out_encinfo=ei_in, out_siginfo=si_in)
E.set_input_file(self.test_filepath)
E.append_effect_to_chain("synth", [str(len_in_seconds), "pinknoise", "mix"])
E.append_effect_to_chain("rate", [44100])
E.append_effect_to_chain("channels", [2])
x, sr = E.sox_build_flow_effects()
self.assertEqual(x.size(0), si_in.channels)
self.assertEqual(si_in.length, x.size(0) * x.size(1))
def test_gain(self):
E = torchaudio.sox_effects.SoxEffectsChain()
E.set_input_file(self.test_filepath)
E.append_effect_to_chain("gain", ["5"])
x, sr = E.sox_build_flow_effects()
E.clear_chain()
self.assertTrue(x.abs().max().item(), 1.)
E.set_input_file(self.test_filepath)
E.append_effect_to_chain("gain", ["-e", "-5"])
x, sr = E.sox_build_flow_effects()
E.clear_chain()
self.assertLess(x.abs().max().item(), 1.)
E.set_input_file(self.test_filepath)
E.append_effect_to_chain("gain", ["-b", "8"])
x, sr = E.sox_build_flow_effects()
E.clear_chain()
self.assertTrue(x.abs().max().item(), 1.)
E.set_input_file(self.test_filepath)
E.append_effect_to_chain("gain", ["-n", "-10"])
x, sr = E.sox_build_flow_effects()
E.clear_chain()
self.assertLess(x.abs().max().item(), 1.)
def test_tempo_or_speed(self):
tempo = .8
si, _ = torchaudio.info(self.test_filepath)
E = torchaudio.sox_effects.SoxEffectsChain()
E.set_input_file(self.test_filepath)
E.append_effect_to_chain("tempo", ["-s", tempo])
x, sr = E.sox_build_flow_effects()
# check if effect worked
self.assertAlmostEqual(x.size(1), math.ceil((si.length / si.channels) / tempo), delta=1)
# tempo > 1
E.clear_chain()
tempo = 1.2
E.append_effect_to_chain("tempo", ["-s", tempo])
x, sr = E.sox_build_flow_effects()
# check if effect worked
self.assertAlmostEqual(x.size(1), math.ceil((si.length / si.channels) / tempo), delta=1)
# tempo > 1
E.clear_chain()
speed = 1.2
E.append_effect_to_chain("speed", [speed])
E.append_effect_to_chain("rate", [si.rate])
x, sr = E.sox_build_flow_effects()
# check if effect worked
self.assertAlmostEqual(x.size(1), math.ceil((si.length / si.channels) / speed), delta=1)
# speed < 1
E.clear_chain()
speed = 0.8
E.append_effect_to_chain("speed", [speed])
E.append_effect_to_chain("rate", [si.rate])
x, sr = E.sox_build_flow_effects()
# check if effect worked
self.assertAlmostEqual(x.size(1), math.ceil((si.length / si.channels) / speed), delta=1)
def test_trim(self):
x_orig, _ = torchaudio.load(self.test_filepath)
offset = "10000s"
offset_int = int(offset[:-1])
num_frames = "20000s"
num_frames_int = int(num_frames[:-1])
E = torchaudio.sox_effects.SoxEffectsChain()
E.set_input_file(self.test_filepath)
E.append_effect_to_chain("trim", [offset, num_frames])
x, sr = E.sox_build_flow_effects()
# check if effect worked
self.assertTrue(x.allclose(x_orig[:, offset_int:(offset_int + num_frames_int)], rtol=1e-4, atol=1e-4))
def test_silence_contrast(self):
si, _ = torchaudio.info(self.test_filepath)
E = torchaudio.sox_effects.SoxEffectsChain()
E.set_input_file(self.test_filepath)
E.append_effect_to_chain("silence", [1, 100, 1])
E.append_effect_to_chain("contrast", [])
x, sr = E.sox_build_flow_effects()
# check if effect worked
self.assertLess(x.numel(), si.length)
def test_reverse(self):
x_orig, _ = torchaudio.load(self.test_filepath)
E = torchaudio.sox_effects.SoxEffectsChain()
E.set_input_file(self.test_filepath)
E.append_effect_to_chain("reverse", "")
x_rev, _ = E.sox_build_flow_effects()
# check if effect worked
rev_idx = torch.LongTensor(range(x_orig.size(1))[::-1])
self.assertTrue(x_orig.allclose(x_rev[:, rev_idx], rtol=1e-5, atol=2e-5))
def test_compand_fade(self):
E = torchaudio.sox_effects.SoxEffectsChain()
E.set_input_file(self.test_filepath)
E.append_effect_to_chain("compand", ["0.3,1", "6:-70,-60,-20", "-5", "-90", "0.2"])
E.append_effect_to_chain("fade", ["q", "0.25", "0", "0.33"])
x, _ = E.sox_build_flow_effects()
# check if effect worked
# print(x.size())
def test_biquad_delay(self):
si, _ = torchaudio.info(self.test_filepath)
E = torchaudio.sox_effects.SoxEffectsChain()
E.set_input_file(self.test_filepath)
E.append_effect_to_chain("biquad", ["0.25136437", "0.50272873", "0.25136437",
"1.0", "-0.17123075", "0.17668821"])
E.append_effect_to_chain("delay", ["15000s"])
x, _ = E.sox_build_flow_effects()
# check if effect worked
self.assertTrue(x.size(1) == (si.length / si.channels) + 15000)
def test_invalid_effect_name(self):
E = torchaudio.sox_effects.SoxEffectsChain()
E.set_input_file(self.test_filepath)
# there is no effect named "special"
with self.assertRaises(LookupError):
E.append_effect_to_chain("special", [""])
def test_unimplemented_effect(self):
E = torchaudio.sox_effects.SoxEffectsChain()
E.set_input_file(self.test_filepath)
# the sox spectrogram function is not implemented in torchaudio
with self.assertRaises(NotImplementedError):
E.append_effect_to_chain("spectrogram", [""])
def test_invalid_effect_options(self):
E = torchaudio.sox_effects.SoxEffectsChain()
E.set_input_file(self.test_filepath)
# first two options should be combined to "0.3,1"
E.append_effect_to_chain("compand", ["0.3", "1", "6:-70,-60,-20", "-5", "-90", "0.2"])
with self.assertRaises(RuntimeError):
E.sox_build_flow_effects()
...@@ -24,36 +24,8 @@ from torchaudio.backend import ( ...@@ -24,36 +24,8 @@ from torchaudio.backend import (
SignalInfo, SignalInfo,
EncodingInfo, EncodingInfo,
) )
from torchaudio.sox_effects import (
init_sox_effects as _init_sox_effects,
shutdown_sox_effects as _shutdown_sox_effects,
)
try: try:
from .version import __version__, git_version # noqa: F401 from .version import __version__, git_version # noqa: F401
except ImportError: except ImportError:
pass pass
@_mod_utils.deprecated(
"Please remove the function call to initialize_sox. "
"Resource initialization is now automatically handled.")
def initialize_sox():
"""Initialize sox effects.
This function is deprecated. See :py:func:`torchaudio.sox_effects.init_sox_effects`
"""
_init_sox_effects()
@_mod_utils.deprecated(
"Please remove the function call to torchaudio.shutdown_sox. "
"Resource clean up is now automatically handled. "
"In the unlikely event that you need to manually shutdown sox, "
"please use torchaudio.sox_effects.shutdown_sox_effects.")
def shutdown_sox():
"""Shutdown sox effects.
This function is deprecated. See :py:func:`torchaudio.sox_effects.shutdown_sox_effects`
"""
_shutdown_sox_effects()
...@@ -175,207 +175,10 @@ void write_audio_file( ...@@ -175,207 +175,10 @@ void write_audio_file(
} }
} }
int build_flow_effects(const std::string& file_name,
at::Tensor otensor,
bool ch_first,
sox_signalinfo_t* target_signal,
sox_encodinginfo_t* target_encoding,
const char* file_type,
std::vector<SoxEffect> pyeffs,
int max_num_eopts) {
/* This function builds an effects flow and puts the results into a tensor.
It can also be used to re-encode audio using any of the available encoding
options in SoX including sample rate and channel re-encoding. */
// open input
sox_format_t* input = sox_open_read(file_name.c_str(), nullptr, nullptr, nullptr);
if (input == nullptr) {
throw std::runtime_error("Error opening audio file");
}
// only used if target signal or encoding are null
sox_signalinfo_t empty_signal;
sox_encodinginfo_t empty_encoding;
// set signalinfo and encodinginfo if blank
if(target_signal == nullptr) {
target_signal = &empty_signal;
target_signal->rate = input->signal.rate;
target_signal->channels = input->signal.channels;
target_signal->length = SOX_UNSPEC;
target_signal->precision = input->signal.precision;
#if SOX_LIB_VERSION_CODE >= 918272 // >= 14.3.0
target_signal->mult = nullptr;
#endif
}
if(target_encoding == nullptr) {
target_encoding = &empty_encoding;
target_encoding->encoding = SOX_ENCODING_SIGN2; // Sample format
target_encoding->bits_per_sample = input->signal.precision; // Bits per sample
target_encoding->compression = 0.0; // Compression factor
target_encoding->reverse_bytes = sox_option_default; // Should bytes be reversed
target_encoding->reverse_nibbles = sox_option_default; // Should nibbles be reversed
target_encoding->reverse_bits = sox_option_default; // Should bits be reversed (pairs of bits?)
target_encoding->opposite_endian = sox_false; // Reverse endianness
}
// check for rate or channels effect and change the output signalinfo accordingly
for (SoxEffect se : pyeffs) {
if (se.ename == "rate") {
target_signal->rate = std::stod(se.eopts[0]);
} else if (se.ename == "channels") {
target_signal->channels = std::stoi(se.eopts[0]);
}
}
// create interm_signal for effects, intermediate steps change this in-place
sox_signalinfo_t interm_signal = input->signal;
#ifdef __APPLE__
// According to Mozilla Deepspeech sox_open_memstream_write doesn't work
// with OSX
char tmp_name[] = "/tmp/fileXXXXXX";
int tmp_fd = mkstemp(tmp_name);
close(tmp_fd);
sox_format_t* output = sox_open_write(tmp_name, target_signal,
target_encoding, "wav", nullptr, nullptr);
#else
// create buffer and buffer_size for output in memwrite
char* buffer;
size_t buffer_size;
// in-memory descriptor (this may not work for OSX)
sox_format_t* output = sox_open_memstream_write(&buffer,
&buffer_size,
target_signal,
target_encoding,
file_type, nullptr);
#endif
if (output == nullptr) {
throw std::runtime_error("Error opening output memstream/temporary file");
}
// Setup the effects chain to decode/resample
sox_effects_chain_t* chain =
sox_create_effects_chain(&input->encoding, &output->encoding);
sox_effect_t* e = sox_create_effect(sox_find_effect("input"));
char* io_args[1];
io_args[0] = (char*)input;
sox_effect_options(e, 1, io_args);
sox_add_effect(chain, e, &interm_signal, &input->signal);
free(e);
for(SoxEffect tae : pyeffs) {
if(tae.ename == "no_effects") break;
e = sox_create_effect(sox_find_effect(tae.ename.c_str()));
e->global_info->global_info->verbosity = 1;
if(tae.eopts[0] == "") {
sox_effect_options(e, 0, nullptr);
} else {
int num_opts = tae.eopts.size();
char* sox_args[max_num_eopts];
for(std::vector<std::string>::size_type i = 0; i != tae.eopts.size(); i++) {
sox_args[i] = (char*) tae.eopts[i].c_str();
}
if(sox_effect_options(e, num_opts, sox_args) != SOX_SUCCESS) {
#ifdef __APPLE__
unlink(tmp_name);
#endif
throw std::runtime_error("invalid effect options, see SoX docs for details");
}
}
sox_add_effect(chain, e, &interm_signal, &output->signal);
free(e);
}
e = sox_create_effect(sox_find_effect("output"));
io_args[0] = (char*)output;
sox_effect_options(e, 1, io_args);
sox_add_effect(chain, e, &interm_signal, &output->signal);
free(e);
// Finally run the effects chain
sox_flow_effects(chain, nullptr, nullptr);
sox_delete_effects_chain(chain);
// Close sox handles, buffer does not get properly sized until these are closed
sox_close(output);
sox_close(input);
int sr;
// Read the in-memory audio buffer or temp file that we just wrote.
#ifdef __APPLE__
/*
Temporary filetype must have a valid header. Wav seems to work here while
raw does not. Certain effects like chorus caused strange behavior on the mac.
*/
// read_audio_file reads the temporary file and returns the sr and otensor
sr = read_audio_file(tmp_name, otensor, ch_first, 0, 0,
target_signal, target_encoding, "wav");
// delete temporary audio file
unlink(tmp_name);
#else
// Resize output tensor to desired dimensions, different effects result in output->signal.length,
// interm_signal.length and buffer size being inconsistent with the result of the file output.
// We prioritize in the order: output->signal.length > interm_signal.length > buffer_size
// Could be related to: https://sourceforge.net/p/sox/bugs/314/
int nc, ns;
if (output->signal.length == 0) {
// sometimes interm_signal length is extremely large, but the buffer_size
// is double the length of the output signal
if (interm_signal.length > (buffer_size * 10)) {
ns = buffer_size / 2;
} else {
ns = interm_signal.length;
}
nc = interm_signal.channels;
} else {
nc = output->signal.channels;
ns = output->signal.length;
}
otensor.resize_({ns/nc, nc});
otensor = otensor.contiguous();
input = sox_open_mem_read(buffer, buffer_size, target_signal, target_encoding, file_type);
std::vector<sox_sample_t> samples(buffer_size);
const int64_t samples_read = sox_read(input, samples.data(), buffer_size);
assert(samples_read != nc * ns && samples_read != 0);
AT_DISPATCH_ALL_TYPES(otensor.scalar_type(), "effects_buffer", [&] {
auto* data = otensor.data_ptr<scalar_t>();
std::copy(samples.begin(), samples.begin() + samples_read, data);
});
// free buffer and close mem_read
sox_close(input);
free(buffer);
if (ch_first) {
otensor.transpose_(1, 0);
}
sr = target_signal->rate;
#endif
// return sample rate, output tensor modified in-place
return sr;
}
} // namespace audio } // namespace audio
} // namespace torch } // namespace torch
PYBIND11_MODULE(_torchaudio, m) { PYBIND11_MODULE(_torchaudio, m) {
py::class_<torch::audio::SoxEffect>(m, "SoxEffect")
.def(py::init<>())
.def("__repr__", [](const torch::audio::SoxEffect &self) {
std::stringstream ss;
std::string sep;
ss << "SoxEffect (" << self.ename << " ,[";
for(std::string s : self.eopts) {
ss << sep << "\"" << s << "\"";
sep = ", ";
}
ss << "])\n";
return ss.str();
})
.def_readwrite("ename", &torch::audio::SoxEffect::ename)
.def_readwrite("eopts", &torch::audio::SoxEffect::eopts);
py::class_<sox_signalinfo_t>(m, "sox_signalinfo_t") py::class_<sox_signalinfo_t>(m, "sox_signalinfo_t")
.def(py::init<>()) .def(py::init<>())
.def("__repr__", [](const sox_signalinfo_t &self) { .def("__repr__", [](const sox_signalinfo_t &self) {
...@@ -468,8 +271,4 @@ PYBIND11_MODULE(_torchaudio, m) { ...@@ -468,8 +271,4 @@ PYBIND11_MODULE(_torchaudio, m) {
"get_info", "get_info",
&torch::audio::get_info, &torch::audio::get_info,
"Gets information about an audio file"); "Gets information about an audio file");
m.def(
"build_flow_effects",
&torch::audio::build_flow_effects,
"build effects and flow chain into tensors");
} }
...@@ -44,26 +44,4 @@ void write_audio_file( ...@@ -44,26 +44,4 @@ void write_audio_file(
/// error occurred during reading of the audio data. /// error occurred during reading of the audio data.
std::tuple<sox_signalinfo_t, sox_encodinginfo_t> get_info( std::tuple<sox_signalinfo_t, sox_encodinginfo_t> get_info(
const std::string& file_name); const std::string& file_name);
// Struct for build_flow_effects function
struct SoxEffect {
SoxEffect() : ename(""), eopts({""}) { }
std::string ename;
std::vector<std::string> eopts;
};
/// Build a SoX chain, flow the effects, and capture the results in a tensor.
/// An audio file from the given `path` flows through an effects chain given
/// by a list of effects and effect options to an output buffer which is encoded
/// into memory to a target signal type and target signal encoding. The resulting
/// buffer is then placed into a tensor. This function returns the output tensor
/// and the sample rate of the output tensor.
int build_flow_effects(const std::string& file_name,
at::Tensor otensor,
bool ch_first,
sox_signalinfo_t* target_signal,
sox_encodinginfo_t* target_encoding,
const char* file_type,
std::vector<SoxEffect> pyeffs,
int max_num_eopts);
}} // namespace torch::audio }} // namespace torch::audio
...@@ -5,8 +5,6 @@ from .sox_effects import ( ...@@ -5,8 +5,6 @@ from .sox_effects import (
effect_names, effect_names,
apply_effects_tensor, apply_effects_tensor,
apply_effects_file, apply_effects_file,
SoxEffect,
SoxEffectsChain,
) )
......
from typing import Any, Callable, List, Optional, Tuple, Union from typing import List, Tuple
import torch import torch
from torch import Tensor
from torchaudio._internal import ( from torchaudio._internal import module_utils as _mod_utils
module_utils as _mod_utils,
misc_ops as _misc_ops,
)
from torchaudio.utils.sox_utils import list_effects from torchaudio.utils.sox_utils import list_effects
if _mod_utils.is_module_available('torchaudio._torchaudio'):
from torchaudio import _torchaudio
@_mod_utils.requires_module('torchaudio._torchaudio') @_mod_utils.requires_module('torchaudio._torchaudio')
def init_sox_effects(): def init_sox_effects():
"""Initialize resources required to use sox effects. """Initialize resources required to use sox effects.
...@@ -257,189 +249,3 @@ def apply_effects_file( ...@@ -257,189 +249,3 @@ def apply_effects_file(
""" """
signal = torch.ops.torchaudio.sox_effects_apply_effects_file(path, effects, normalize, channels_first) signal = torch.ops.torchaudio.sox_effects_apply_effects_file(path, effects, normalize, channels_first)
return signal.get_tensor(), signal.get_sample_rate() return signal.get_tensor(), signal.get_sample_rate()
@_mod_utils.requires_module('torchaudio._torchaudio')
@_mod_utils.deprecated('Please migrate to `apply_effects_file` or `apply_effects_tensor`.')
def SoxEffect():
r"""Create an object for passing sox effect information between python and c++
Warning:
This function is deprecated.
Please migrate to :func:`apply_effects_file` or :func:`apply_effects_tensor`.
Returns:
SoxEffect: An object with the following attributes: ename (str) which is the
name of effect, and eopts (List[str]) which is a list of effect options.
"""
return _torchaudio.SoxEffect()
@_mod_utils.deprecated('Please migrate to `apply_effects_file` or `apply_effects_tensor`.')
class SoxEffectsChain(object):
r"""SoX effects chain class.
Warning:
This class is deprecated.
Please migrate to :func:`apply_effects_file` or :func:`apply_effects_tensor`.
Args:
normalization (bool, number, or callable, optional):
If boolean ``True``, then output is divided by ``1 << 31``
(assumes signed 32-bit audio), and normalizes to ``[-1, 1]``.
If ``number``, then output is divided by that number.
If ``callable``, then the output is passed as a parameter to the given function, then
the output is divided by the result. (Default: ``True``)
channels_first (bool, optional):
Set channels first or length first in result. (Default: ``True``)
out_siginfo (sox_signalinfo_t, optional):
a sox_signalinfo_t type, which could be helpful if the audio type cannot be
automatically determined. (Default: ``None``)
out_encinfo (sox_encodinginfo_t, optional):
a sox_encodinginfo_t type, which could be set if the audio type cannot be
automatically determined. (Default: ``None``)
filetype (str, optional):
a filetype or extension to be set if sox cannot determine it automatically.
(Default: ``'raw'``)
Returns:
Tuple[Tensor, int]:
An output Tensor of size ``[C x L]`` or ``[L x C]`` where L is the number
of audio frames and C is the number of channels. An integer which is the sample rate of the
audio (as listed in the metadata of the file)
Example
>>> class MyDataset(Dataset):
... def __init__(self, audiodir_path):
... self.data = [
... os.path.join(audiodir_path, fn)
... for fn in os.listdir(audiodir_path)]
... self.E = torchaudio.sox_effects.SoxEffectsChain()
... self.E.append_effect_to_chain("rate", [16000]) # resample to 16000hz
... self.E.append_effect_to_chain("channels", ["1"]) # mono signal
... def __getitem__(self, index):
... fn = self.data[index]
... self.E.set_input_file(fn)
... x, sr = self.E.sox_build_flow_effects()
... return x, sr
...
... def __len__(self):
... return len(self.data)
...
>>> ds = MyDataset(path_to_audio_files)
>>> for sig, sr in ds:
... pass
"""
EFFECTS_UNIMPLEMENTED = {"spectrogram", "splice", "noiseprof", "fir"}
def __init__(self,
normalization: Union[bool, float, Callable] = True,
channels_first: bool = True,
out_siginfo: Any = None,
out_encinfo: Any = None,
filetype: str = "raw") -> None:
self.input_file: Optional[str] = None
self.chain: List[str] = []
self.MAX_EFFECT_OPTS = 20
self.out_siginfo = out_siginfo
self.out_encinfo = out_encinfo
self.filetype = filetype
self.normalization = normalization
self.channels_first = channels_first
# Define in __init__ to avoid calling at import time
self.EFFECTS_AVAILABLE = set(effect_names())
def append_effect_to_chain(self,
ename: str,
eargs: Optional[Union[List[str], str]] = None) -> None:
r"""Append effect to a sox effects chain.
Args:
ename (str): which is the name of effect
eargs (List[str] or str, optional): which is a list of effect options. (Default: ``None``)
"""
e = SoxEffect()
# check if we have a valid effect
ename = self._check_effect(ename)
if eargs is None or eargs == []:
eargs = [""]
elif not isinstance(eargs, list):
eargs = [eargs]
eargs = self._flatten(eargs)
if len(eargs) > self.MAX_EFFECT_OPTS:
raise RuntimeError("Number of effect options ({}) is greater than max "
"suggested number of options {}. Increase MAX_EFFECT_OPTS "
"or lower the number of effect options".format(len(eargs), self.MAX_EFFECT_OPTS))
e.ename = ename
e.eopts = eargs
self.chain.append(e)
@_mod_utils.requires_module('torchaudio._torchaudio')
def sox_build_flow_effects(self,
out: Optional[Tensor] = None) -> Tuple[Tensor, int]:
r"""Build effects chain and flow effects from input file to output tensor
Args:
out (Tensor, optional): Where the output will be written to. (Default: ``None``)
Returns:
Tuple[Tensor, int]: An output Tensor of size `[C x L]` or `[L x C]` where
L is the number of audio frames and C is the number of channels.
An integer which is the sample rate of the audio (as listed in the metadata of the file)
"""
# initialize output tensor
if out is not None:
_misc_ops.check_input(out)
else:
out = torch.FloatTensor()
if not len(self.chain):
e = SoxEffect()
e.ename = "no_effects"
e.eopts = [""]
self.chain.append(e)
# print("effect options:", [x.eopts for x in self.chain])
sr = _torchaudio.build_flow_effects(self.input_file,
out,
self.channels_first,
self.out_siginfo,
self.out_encinfo,
self.filetype,
self.chain,
self.MAX_EFFECT_OPTS)
_misc_ops.normalize_audio(out, self.normalization)
return out, sr
def clear_chain(self) -> None:
r"""Clear effects chain in python
"""
self.chain = []
def set_input_file(self, input_file: str) -> None:
r"""Set input file for input of chain
Args:
input_file (str): The path to the input file.
"""
self.input_file = input_file
def _check_effect(self, e: str) -> str:
if e.lower() in self.EFFECTS_UNIMPLEMENTED:
raise NotImplementedError("This effect ({}) is not implement in torchaudio".format(e))
elif e.lower() not in self.EFFECTS_AVAILABLE:
raise LookupError("Effect name, {}, not valid".format(e.lower()))
return e.lower()
# https://stackoverflow.com/questions/12472338/flattening-a-list-recursively
# convenience function to flatten list recursively
def _flatten(self, x: list) -> list:
if x == []:
return []
if isinstance(x[0], list):
return self._flatten(x[:1]) + self._flatten(x[:1])
return [str(a) for a in x[:1]] + self._flatten(x[1:])
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment