Commit db0da559 authored by David Pollack's avatar David Pollack Committed by Soumith Chintala
Browse files

sox effects test commit

parent 7314b36d
...@@ -10,31 +10,9 @@ class Test_LoadSave(unittest.TestCase): ...@@ -10,31 +10,9 @@ class Test_LoadSave(unittest.TestCase):
test_filepath = os.path.join(test_dirpath, "assets", test_filepath = os.path.join(test_dirpath, "assets",
"steam-train-whistle-daniel_simon.mp3") "steam-train-whistle-daniel_simon.mp3")
def test_load(self): def test_1_save(self):
# check normal loading
x, sr = torchaudio.load(self.test_filepath)
self.assertEqual(sr, 44100)
self.assertEqual(x.size(), (278756, 2))
self.assertGreater(x.sum(), 0)
# check normalizing
x, sr = torchaudio.load(self.test_filepath, normalization=True)
self.assertEqual(x.dtype, torch.float32)
self.assertTrue(x.min() >= -1.0)
self.assertTrue(x.max() <= 1.0)
# check raising errors
with self.assertRaises(OSError):
torchaudio.load("file-does-not-exist.mp3")
with self.assertRaises(OSError):
tdir = os.path.join(
os.path.dirname(self.test_dirpath), "torchaudio")
torchaudio.load(tdir)
def test_save(self):
# load signal # load signal
x, sr = torchaudio.load(self.test_filepath) x, sr = torchaudio.load(self.test_filepath, normalization=False)
# check save # check save
new_filepath = os.path.join(self.test_dirpath, "test.wav") new_filepath = os.path.join(self.test_dirpath, "test.wav")
...@@ -49,7 +27,8 @@ class Test_LoadSave(unittest.TestCase): ...@@ -49,7 +27,8 @@ class Test_LoadSave(unittest.TestCase):
os.unlink(new_filepath) os.unlink(new_filepath)
# test save 1d tensor # test save 1d tensor
x = x[:, 0] # get mono signal #x = x[:, 0] # get mono signal
x = x[0, :] # get mono signal
x.squeeze_() # remove channel dim x.squeeze_() # remove channel dim
torchaudio.save(new_filepath, x, sr) torchaudio.save(new_filepath, x, sr)
self.assertTrue(os.path.isfile(new_filepath)) self.assertTrue(os.path.isfile(new_filepath))
...@@ -57,7 +36,7 @@ class Test_LoadSave(unittest.TestCase): ...@@ -57,7 +36,7 @@ class Test_LoadSave(unittest.TestCase):
# don't allow invalid sizes as inputs # don't allow invalid sizes as inputs
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
x.unsqueeze_(0) # N x L not L x N x.unsqueeze_(1) # L x C not C x L
torchaudio.save(new_filepath, x, sr) torchaudio.save(new_filepath, x, sr)
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
...@@ -66,18 +45,6 @@ class Test_LoadSave(unittest.TestCase): ...@@ -66,18 +45,6 @@ class Test_LoadSave(unittest.TestCase):
x.unsqueeze_(0) # 1 x L x 1 x.unsqueeze_(0) # 1 x L x 1
torchaudio.save(new_filepath, x, sr) torchaudio.save(new_filepath, x, sr)
# automatically convert sr from floating point to int
x.squeeze_(0)
torchaudio.save(new_filepath, x, float(sr))
self.assertTrue(os.path.isfile(new_filepath))
os.unlink(new_filepath)
# don't allow uneven integers
with self.assertRaises(TypeError):
torchaudio.save(new_filepath, x, float(sr) + 0.5)
self.assertTrue(os.path.isfile(new_filepath))
os.unlink(new_filepath)
# don't save to folders that don't exist # don't save to folders that don't exist
with self.assertRaises(OSError): with self.assertRaises(OSError):
new_filepath = os.path.join(self.test_dirpath, "no-path", new_filepath = os.path.join(self.test_dirpath, "no-path",
...@@ -93,22 +60,44 @@ class Test_LoadSave(unittest.TestCase): ...@@ -93,22 +60,44 @@ class Test_LoadSave(unittest.TestCase):
y = (torch.cos( y = (torch.cos(
2 * math.pi * torch.arange(0, 4 * sr).float() * freq / sr)) 2 * math.pi * torch.arange(0, 4 * sr).float() * freq / sr))
y.unsqueeze_(1) y.unsqueeze_(0)
# y is between -1 and 1, so must scale # y is between -1 and 1, so must scale
y = (y * volume * 2**31).long() y = (y * volume * (2**31)).long()
torchaudio.save(sinewave_filepath, y, sr) torchaudio.save(sinewave_filepath, y, sr)
self.assertTrue(os.path.isfile(sinewave_filepath)) self.assertTrue(os.path.isfile(sinewave_filepath))
# test precision # test precision
new_precision = 32
new_filepath = os.path.join(self.test_dirpath, "test.wav") new_filepath = os.path.join(self.test_dirpath, "test.wav")
_, _, _, bp = torchaudio.info(sinewave_filepath) si, ei = torchaudio.info(sinewave_filepath)
torchaudio.save(new_filepath, y, sr, precision=16) torchaudio.save(new_filepath, y, sr, new_precision)
_, _, _, bp16 = torchaudio.info(new_filepath) si32, ei32 = torchaudio.info(new_filepath)
self.assertEqual(bp, 32) self.assertEqual(si.precision, 16)
self.assertEqual(bp16, 16) self.assertEqual(si32.precision, new_precision)
os.unlink(new_filepath) os.unlink(new_filepath)
def test_load_and_save_is_identity(self): def test_2_load(self):
# check normal loading
x, sr = torchaudio.load(self.test_filepath)
self.assertEqual(sr, 44100)
self.assertEqual(x.size(), (2, 278756))
# check normalizing
x, sr = torchaudio.load(self.test_filepath, normalization=True)
self.assertEqual(x.dtype, torch.float32)
self.assertTrue(x.min() >= -1.0)
self.assertTrue(x.max() <= 1.0)
# check raising errors
with self.assertRaises(OSError):
torchaudio.load("file-does-not-exist.mp3")
with self.assertRaises(OSError):
tdir = os.path.join(
os.path.dirname(self.test_dirpath), "torchaudio")
torchaudio.load(tdir)
def test_3_load_and_save_is_identity(self):
input_path = os.path.join(self.test_dirpath, 'assets', 'sinewave.wav') input_path = os.path.join(self.test_dirpath, 'assets', 'sinewave.wav')
tensor, sample_rate = torchaudio.load(input_path) tensor, sample_rate = torchaudio.load(input_path)
output_path = os.path.join(self.test_dirpath, 'test.wav') output_path = os.path.join(self.test_dirpath, 'test.wav')
...@@ -118,48 +107,50 @@ class Test_LoadSave(unittest.TestCase): ...@@ -118,48 +107,50 @@ class Test_LoadSave(unittest.TestCase):
self.assertEqual(sample_rate, sample_rate2) self.assertEqual(sample_rate, sample_rate2)
os.unlink(output_path) os.unlink(output_path)
def test_load_partial(self): def test_4_load_partial(self):
num_frames = 100 num_frames = 100
offset = 200 offset = 200
# load entire mono sinewave wav file, load a partial copy and then compare # load entire mono sinewave wav file, load a partial copy and then compare
input_sine_path = os.path.join(self.test_dirpath, 'assets', 'sinewave.wav') input_sine_path = os.path.join(self.test_dirpath, 'assets', 'sinewave.wav')
x_sine_full, sr_sine = torchaudio.load(input_sine_path) x_sine_full, sr_sine = torchaudio.load(input_sine_path)
x_sine_part, _ = torchaudio.load(input_sine_path, num_frames=num_frames, offset=offset) x_sine_part, _ = torchaudio.load(input_sine_path, num_frames=num_frames, offset=offset)
l1_error = x_sine_full[offset:(num_frames+offset)].sub(x_sine_part).abs().sum().item() l1_error = x_sine_full[:, offset:(num_frames+offset)].sub(x_sine_part).abs().sum().item()
# test for the correct number of samples and that the correct portion was loaded # test for the correct number of samples and that the correct portion was loaded
self.assertEqual(x_sine_part.size(0), num_frames) self.assertEqual(x_sine_part.size(1), num_frames)
self.assertEqual(l1_error, 0.) self.assertEqual(l1_error, 0.)
# create a two channel version of this wavefile # create a two channel version of this wavefile
x_2ch_sine = x_sine_full.repeat(1, 2) x_2ch_sine = x_sine_full.repeat(1, 2)
out_2ch_sine_path = os.path.join(self.test_dirpath, 'assets', '2ch_sinewave.wav') out_2ch_sine_path = os.path.join(self.test_dirpath, 'assets', '2ch_sinewave.wav')
torchaudio.save(out_2ch_sine_path, x_2ch_sine, sr_sine) torchaudio.save(out_2ch_sine_path, x_2ch_sine, sr_sine)
x_2ch_sine_load, _ = torchaudio.load(out_2ch_sine_path, num_frames=num_frames, offset=offset) x_2ch_sine_load, _ = torchaudio.load(out_2ch_sine_path, num_frames=num_frames, offset=offset)
os.unlink(out_2ch_sine_path) os.unlink(out_2ch_sine_path)
l1_error = x_2ch_sine_load.sub(x_2ch_sine[offset:(offset + num_frames)]).abs().sum().item() l1_error = x_2ch_sine_load.sub(x_2ch_sine[:, offset:(offset + num_frames)]).abs().sum().item()
self.assertEqual(l1_error, 0.) self.assertEqual(l1_error, 0.)
# test with two channel mp3 # test with two channel mp3
x_2ch_full, sr_2ch = torchaudio.load(self.test_filepath, normalization=True) x_2ch_full, sr_2ch = torchaudio.load(self.test_filepath, normalization=True)
x_2ch_part, _ = torchaudio.load(self.test_filepath, normalization=True, num_frames=num_frames, offset=offset) x_2ch_part, _ = torchaudio.load(self.test_filepath, normalization=True, num_frames=num_frames, offset=offset)
l1_error = x_2ch_full[offset:(offset+num_frames)].sub(x_2ch_part).abs().sum().item() l1_error = x_2ch_full[:, offset:(offset+num_frames)].sub(x_2ch_part).abs().sum().item()
self.assertEqual(x_2ch_part.size(0), num_frames) self.assertEqual(x_2ch_part.size(1), num_frames)
self.assertEqual(l1_error, 0.) self.assertEqual(l1_error, 0.)
# check behavior if number of samples would exceed file length # check behavior if number of samples would exceed file length
offset_ns = 300 offset_ns = 300
x_ns, _ = torchaudio.load(input_sine_path, num_frames=100000, offset=offset_ns) x_ns, _ = torchaudio.load(input_sine_path, num_frames=100000, offset=offset_ns)
self.assertEqual(x_ns.size(0), x_sine_full.size(0) - offset_ns) self.assertEqual(x_ns.size(1), x_sine_full.size(1) - offset_ns)
# check when offset is beyond the end of the file # check when offset is beyond the end of the file
with self.assertRaises(RuntimeError): with self.assertRaises(RuntimeError):
torchaudio.load(input_sine_path, offset=100000) torchaudio.load(input_sine_path, offset=100000)
def test_get_info(self): def test_5_get_info(self):
input_path = os.path.join(self.test_dirpath, 'assets', 'sinewave.wav') input_path = os.path.join(self.test_dirpath, 'assets', 'sinewave.wav')
info_expected = (1, 64000, 16000, 32) channels, samples, rate, precision = (1, 64000, 16000, 16)
info_load = torchaudio.info(input_path) si, ei = torchaudio.info(input_path)
self.assertEqual(info_load, info_expected) self.assertEqual(si.channels, channels)
self.assertEqual(si.length, samples)
self.assertEqual(si.rate, rate)
self.assertEqual(ei.bits_per_sample, precision)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
import unittest
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchaudio
import math
import os
class TORCHAUDIODS(Dataset):
test_dirpath = os.path.dirname(os.path.realpath(__file__))
def __init__(self):
self.asset_dirpath = os.path.join(self.test_dirpath, "assets")
self.data = [os.path.join(self.asset_dirpath, fn) for fn in os.listdir(self.asset_dirpath)]
self.si, self.ei = torchaudio.info(os.path.join(self.asset_dirpath, "sinewave.wav"))
self.si.precision = 16
self.E = torchaudio.sox_effects.SoxEffects()
self.E.sox_append_effect_to_chain("rate", [self.si.rate]) # resample to 16000hz
self.E.sox_append_effect_to_chain("channels", [self.si.channels]) # mono singal
self.E.sox_append_effect_to_chain("trim", [0, 1]) # first sec of audio
def __getitem__(self, index):
fn = self.data[index]
self.E.set_input_file(fn)
x, sr = self.E.sox_build_flow_effects()
return x
def __len__(self):
return len(self.data)
class Test_LoadSave(unittest.TestCase):
def test_1(self):
expected_size = (2, 1, 16000)
ds = TORCHAUDIODS()
dl = DataLoader(ds, batch_size=2)
for x in dl:
#print(x.size())
continue
self.assertTrue(x.size() == expected_size)
if __name__ == '__main__':
torchaudio.initialize_sox()
unittest.main()
torchaudio.shutdown_sox()
import unittest
import torch
import torchaudio
from torchaudio.legacy import save, load
import math
import os
class Test_LoadSave(unittest.TestCase):
test_dirpath = os.path.dirname(os.path.realpath(__file__))
test_filepath = os.path.join(test_dirpath, "assets",
"steam-train-whistle-daniel_simon.mp3")
def test_load(self):
# check normal loading
x, sr = load(self.test_filepath)
self.assertEqual(sr, 44100)
self.assertEqual(x.size(), (278756, 2))
self.assertGreater(x.sum(), 0)
# check normalizing
x, sr = load(self.test_filepath, normalization=True)
self.assertEqual(x.dtype, torch.float32)
self.assertTrue(x.min() >= -1.0)
self.assertTrue(x.max() <= 1.0)
# check raising errors
with self.assertRaises(OSError):
load("file-does-not-exist.mp3")
with self.assertRaises(OSError):
tdir = os.path.join(
os.path.dirname(self.test_dirpath), "torchaudio")
load(tdir)
def test_save(self):
# load signal
x, sr = load(self.test_filepath)
# check save
new_filepath = os.path.join(self.test_dirpath, "test.wav")
save(new_filepath, x, sr)
self.assertTrue(os.path.isfile(new_filepath))
os.unlink(new_filepath)
# check automatic normalization
x /= 1 << 31
save(new_filepath, x, sr)
self.assertTrue(os.path.isfile(new_filepath))
os.unlink(new_filepath)
# test save 1d tensor
x = x[:, 0] # get mono signal
x.squeeze_() # remove channel dim
save(new_filepath, x, sr)
self.assertTrue(os.path.isfile(new_filepath))
os.unlink(new_filepath)
# don't allow invalid sizes as inputs
with self.assertRaises(ValueError):
x.unsqueeze_(0) # N x L not L x N
save(new_filepath, x, sr)
with self.assertRaises(ValueError):
x.squeeze_()
x.unsqueeze_(1)
x.unsqueeze_(0) # 1 x L x 1
save(new_filepath, x, sr)
# automatically convert sr from floating point to int
x.squeeze_(0)
save(new_filepath, x, float(sr))
self.assertTrue(os.path.isfile(new_filepath))
os.unlink(new_filepath)
# don't save to folders that don't exist
with self.assertRaises(OSError):
new_filepath = os.path.join(self.test_dirpath, "no-path",
"test.wav")
save(new_filepath, x, sr)
# save created file
sinewave_filepath = os.path.join(self.test_dirpath, "assets",
"sinewave.wav")
sr = 16000
freq = 440
volume = 0.3
y = (torch.cos(
2 * math.pi * torch.arange(0, 4 * sr).float() * freq / sr))
y.unsqueeze_(1)
# y is between -1 and 1, so must scale
y = (y * volume * 2**31).long()
save(sinewave_filepath, y, sr)
self.assertTrue(os.path.isfile(sinewave_filepath))
# test precision
new_filepath = os.path.join(self.test_dirpath, "test.wav")
si, ei = torchaudio.info(sinewave_filepath)
save(new_filepath, y, sr, precision=16)
si16, ei16 = torchaudio.info(new_filepath)
self.assertEqual(si.precision, 32)
self.assertEqual(si16.precision, 16)
os.unlink(new_filepath)
def test_load_and_save_is_identity(self):
input_path = os.path.join(self.test_dirpath, 'assets', 'sinewave.wav')
tensor, sample_rate = load(input_path)
output_path = os.path.join(self.test_dirpath, 'test.wav')
save(output_path, tensor, sample_rate, 32)
tensor2, sample_rate2 = load(output_path)
self.assertTrue(tensor.allclose(tensor2))
self.assertEqual(sample_rate, sample_rate2)
os.unlink(output_path)
def test_load_partial(self):
num_frames = 100
offset = 200
# load entire mono sinewave wav file, load a partial copy and then compare
input_sine_path = os.path.join(self.test_dirpath, 'assets', 'sinewave.wav')
x_sine_full, sr_sine = load(input_sine_path)
x_sine_part, _ = load(input_sine_path, num_frames=num_frames, offset=offset)
l1_error = x_sine_full[offset:(num_frames+offset)].sub(x_sine_part).abs().sum().item()
# test for the correct number of samples and that the correct portion was loaded
self.assertEqual(x_sine_part.size(0), num_frames)
self.assertEqual(l1_error, 0.)
# create a two channel version of this wavefile
x_2ch_sine = x_sine_full.repeat(1, 2)
out_2ch_sine_path = os.path.join(self.test_dirpath, 'assets', '2ch_sinewave.wav')
save(out_2ch_sine_path, x_2ch_sine, sr_sine)
x_2ch_sine_load, _ = load(out_2ch_sine_path, num_frames=num_frames, offset=offset)
os.unlink(out_2ch_sine_path)
l1_error = x_2ch_sine_load.sub(x_2ch_sine[offset:(offset + num_frames)]).abs().sum().item()
self.assertEqual(l1_error, 0.)
# test with two channel mp3
x_2ch_full, sr_2ch = load(self.test_filepath, normalization=True)
x_2ch_part, _ = load(self.test_filepath, normalization=True, num_frames=num_frames, offset=offset)
l1_error = x_2ch_full[offset:(offset+num_frames)].sub(x_2ch_part).abs().sum().item()
self.assertEqual(x_2ch_part.size(0), num_frames)
self.assertEqual(l1_error, 0.)
# check behavior if number of samples would exceed file length
offset_ns = 300
x_ns, _ = load(input_sine_path, num_frames=100000, offset=offset_ns)
self.assertEqual(x_ns.size(0), x_sine_full.size(0) - offset_ns)
# check when offset is beyond the end of the file
with self.assertRaises(RuntimeError):
load(input_sine_path, offset=100000)
def test_z_get_info(self):
input_path = os.path.join(self.test_dirpath, 'assets', 'sinewave.wav')
channels, samples, rate, precision = (1, 64000, 16000, 32)
si, ei = torchaudio.info(input_path)
self.assertEqual(si.channels, channels)
self.assertEqual(si.length, samples)
self.assertEqual(si.rate, rate)
self.assertEqual(ei.bits_per_sample, precision)
if __name__ == '__main__':
unittest.main()
import unittest
import torch
import torchaudio
import math
import os
class Test_SoxEffects(unittest.TestCase):
test_dirpath = os.path.dirname(os.path.realpath(__file__))
test_filepath = os.path.join(test_dirpath, "assets",
"steam-train-whistle-daniel_simon.mp3")
def test_rate_channels(self):
target_rate = 16000
target_channels = 1
E = torchaudio.sox_effects.SoxEffects()
E.set_input_file(self.test_filepath)
E.sox_append_effect_to_chain("rate", [target_rate])
E.sox_append_effect_to_chain("channels", [target_channels])
x, sr = E.sox_build_flow_effects()
# check if effects worked
self.assertEqual(sr, target_rate)
self.assertEqual(x.size(0), target_channels)
def test_other(self):
speed = .8
si, _ = torchaudio.info(self.test_filepath)
E = torchaudio.sox_effects.SoxEffects()
E.set_input_file(self.test_filepath)
E.sox_append_effect_to_chain("lowpass", 100)
E.sox_append_effect_to_chain("speed", speed)
E.sox_append_effect_to_chain("rate", si.rate)
x, sr = E.sox_build_flow_effects()
# check if effects worked
self.assertEqual(x.size(1), int((si.length / si.channels) / speed))
def test_ulaw_and_siginfo(self):
si_out = torchaudio.sox_signalinfo_t()
ei_out = torchaudio.sox_encodinginfo_t()
si_out.rate = 16000
si_out.channels = 1
si_out.precision = 8
ei_out.encoding = torchaudio.get_sox_encoding_t(9)
ei_out.bits_per_sample = 8
si_in, ei_in = torchaudio.info(self.test_filepath)
E = torchaudio.sox_effects.SoxEffects(out_siginfo=si_out, out_encinfo=ei_out)
E.set_input_file(self.test_filepath)
x, sr = E.sox_build_flow_effects()
# Note: the sample rate is reported as "changed", but no downsampling occured
# also the number of channels has not changed. Run rate and channels effects
# to make those changes
self.assertLess(x.unique().size(0), 2**8)
self.assertEqual(x.size(0), si_in.channels)
self.assertEqual(sr, si_out.rate)
self.assertEqual(x.numel(), si_in.length)
if __name__ == '__main__':
torchaudio.initialize_sox()
unittest.main()
torchaudio.shutdown_sox()
...@@ -3,12 +3,7 @@ import os.path ...@@ -3,12 +3,7 @@ import os.path
import torch import torch
import _torch_sox import _torch_sox
from torchaudio import transforms from torchaudio import transforms, datasets, sox_effects
from torchaudio import datasets
def get_tensor_type_name(tensor):
return tensor.type().replace('torch.', '').replace('Tensor', '')
def check_input(src): def check_input(src):
...@@ -18,17 +13,33 @@ def check_input(src): ...@@ -18,17 +13,33 @@ def check_input(src):
raise TypeError('Expected a CPU based tensor, got %s' % type(src)) raise TypeError('Expected a CPU based tensor, got %s' % type(src))
def load(filepath, out=None, normalization=None, num_frames=-1, offset=0): def load(filepath,
out=None,
normalization=True,
channels_first=True,
num_frames=-1,
offset=0,
signalinfo=None,
encodinginfo=None,
filetype=None):
"""Loads an audio file from disk into a Tensor """Loads an audio file from disk into a Tensor
Args: Args:
filepath (string): path to audio file filepath (string): path to audio file
out (Tensor, optional): an output Tensor to use instead of creating one out (Tensor, optional): an output Tensor to use instead of creating one
normalization (bool or number, optional): If boolean `True`, then output is divided by `1 << 31` normalization (bool, number, or function, optional): If boolean `True`, then output is divided by `1 << 31`
(assumes 16-bit depth audio, and normalizes to `[0, 1]`. (assumes 16-bit depth audio, and normalizes to `[0, 1]`.
If `number`, then output is divided by that number If `number`, then output is divided by that number
If `function`, then the output is passed as a parameter
to the given function, then the output is divided by
the result.
num_frames (int, optional): number of frames to load. -1 to load everything after the offset. num_frames (int, optional): number of frames to load. -1 to load everything after the offset.
offset (int, optional): number of frames from the start of the file to begin data loading. offset (int, optional): number of frames from the start of the file to begin data loading.
signalinfo (sox_signalinfo_t, optional): a sox_signalinfo_t type, which could be helpful if the
audio type cannot be automatically determine
encodinginfo (sox_encodinginfo_t, optional): a sox_encodinginfo_t type, which could be set if the
audio type cannot be automatically determined
filetype (str, optional): a filetype or extension to be set if sox cannot determine it automatically
Returns: tuple(Tensor, int) Returns: tuple(Tensor, int)
- Tensor: output Tensor of size `[L x C]` where L is the number of audio frames, C is the number of channels - Tensor: output Tensor of size `[L x C]` where L is the number of audio frames, C is the number of channels
...@@ -41,6 +52,9 @@ def load(filepath, out=None, normalization=None, num_frames=-1, offset=0): ...@@ -41,6 +52,9 @@ def load(filepath, out=None, normalization=None, num_frames=-1, offset=0):
torch.Size([278756, 2]) torch.Size([278756, 2])
>>> print(sample_rate) >>> print(sample_rate)
44100 44100
>>> data_volume_normalized, _ = torchaudio.load('foo.mp3', normalization=lambda x: torch.abs(x).max())
>>> print(data_volume_normalized.abs().max())
1.
""" """
# check if valid file # check if valid file
...@@ -57,26 +71,44 @@ def load(filepath, out=None, normalization=None, num_frames=-1, offset=0): ...@@ -57,26 +71,44 @@ def load(filepath, out=None, normalization=None, num_frames=-1, offset=0):
raise ValueError("Expected value for num_samples -1 (entire file) or >=0") raise ValueError("Expected value for num_samples -1 (entire file) or >=0")
if offset < 0: if offset < 0:
raise ValueError("Expected positive offset value") raise ValueError("Expected positive offset value")
sample_rate = _torch_sox.read_audio_file(filepath, out, num_frames, offset)
sample_rate = _torch_sox.read_audio_file(filepath,
out,
channels_first,
num_frames,
offset,
signalinfo,
encodinginfo,
filetype)
# normalize if needed # normalize if needed
if isinstance(normalization, bool) and normalization: _audio_normalization(out, normalization)
out /= 1 << 31 # assuming 16-bit depth
elif isinstance(normalization, (float, int)):
out /= normalization # normalize with custom value
return out, sample_rate return out, sample_rate
def save(filepath, src, sample_rate, precision=32): def save(filepath, src, sample_rate, precision=16, channels_first=True):
si = sox_signalinfo_t()
ch_idx = 0 if channels_first else 1
si.rate = sample_rate
si.channels = 1 if src.dim() == 1 else src.size(ch_idx)
si.length = src.numel()
si.precision = precision
return save_encinfo(filepath, src, channels_first, si)
def save_encinfo(filepath, src, channels_first=True, signalinfo=None, encodinginfo=None, filetype=None):
"""Saves a Tensor with audio signal to disk as a standard format like mp3, wav, etc. """Saves a Tensor with audio signal to disk as a standard format like mp3, wav, etc.
Args: Args:
filepath (string): path to audio file filepath (string): path to audio file
src (Tensor): an input 2D Tensor of shape `[L x C]` where L is src (Tensor): an input 2D Tensor of shape `[L x C]` where L is
the number of audio frames, C is the number of channels the number of audio frames, C is the number of channels
sample_rate (int): the sample-rate of the audio to be saved signalinfo (sox_signalinfo_t): a sox_signalinfo_t type, which could be helpful if the
precision (int, optional): the bit-precision of the audio to be saved audio type cannot be automatically determine
encodinginfo (sox_encodinginfo_t, optional): a sox_encodinginfo_t type, which could be set if the
audio type cannot be automatically determined
filetype (str, optional): a filetype or extension to be set if sox cannot determine it automatically
Example:: Example::
...@@ -84,37 +116,49 @@ def save(filepath, src, sample_rate, precision=32): ...@@ -84,37 +116,49 @@ def save(filepath, src, sample_rate, precision=32):
>>> torchaudio.save('foo.wav', data, sample_rate) >>> torchaudio.save('foo.wav', data, sample_rate)
""" """
ch_idx = 0 if channels_first else 1
len_idx = 1 if channels_first else 0
# check if save directory exists # check if save directory exists
abs_dirpath = os.path.dirname(os.path.abspath(filepath)) abs_dirpath = os.path.dirname(os.path.abspath(filepath))
if not os.path.isdir(abs_dirpath): if not os.path.isdir(abs_dirpath):
raise OSError("Directory does not exist: {}".format(abs_dirpath)) raise OSError("Directory does not exist: {}".format(abs_dirpath))
# check that src is a CPU tensor
check_input(src)
# Check/Fix shape of source data # Check/Fix shape of source data
if len(src.size()) == 1: if src.dim() == 1:
# 1d tensors as assumed to be mono signals # 1d tensors as assumed to be mono signals
src.unsqueeze_(1) src.unsqueeze_(ch_idx)
elif len(src.size()) > 2 or src.size(1) > 2: elif src.dim() > 2 or src.size(ch_idx) > src.size(len_idx):
# assumes num_samples > num_channels
raise ValueError( raise ValueError(
"Expected format (L x N), N = 1 or 2, but found {}".format(src.size())) "Expected format (L x C), C < L, but found {}".format(src.size()))
# check if sample_rate is an integer # sox stores the sample rate as a float, though practically sample rates are almost always integers
if not isinstance(sample_rate, int): # convert integers to floats
if int(sample_rate) == sample_rate: if not isinstance(signalinfo.rate, float):
sample_rate = int(sample_rate) if float(signalinfo.rate) == signalinfo.rate:
signalinfo.rate = float(signalinfo.rate)
else: else:
raise TypeError('Sample rate should be a integer') raise TypeError('Sample rate should be a float or int')
# check if bit_rate is an integer # check if the bit precision (i.e. bits per sample) is an integer
if not isinstance(precision, int): if not isinstance(signalinfo.precision, int):
if int(precision) == precision: if int(signalinfo.precision) == signalinfo.precision:
precision = int(precision) signalinfo.precision = int(signalinfo.precision)
else: else:
raise TypeError('Bit precision should be a integer') raise TypeError('Bit precision should be an integer')
# programs such as librosa normalize the signal, unnormalize if detected # programs such as librosa normalize the signal, unnormalize if detected
if src.min() >= -1.0 and src.max() <= 1.0: if src.min() >= -1.0 and src.max() <= 1.0:
src = src * (1 << 31) # assuming 16-bit depth src = src * (1 << 31)
src = src.long() src = src.long()
# save data to file # set filetype and allow for files with no extensions
extension = os.path.splitext(filepath)[1] extension = os.path.splitext(filepath)[1]
check_input(src) filetype = extension[1:] if len(extension) > 0 else filetype
_torch_sox.write_audio_file(filepath, src, extension[1:], sample_rate, precision) # transpose from C x L -> L x C
if channels_first:
src = src.transpose(1, 0)
# save data to file
src = src.contiguous()
_torch_sox.write_audio_file(filepath, src, signalinfo, encodinginfo, filetype)
def info(filepath): def info(filepath):
...@@ -123,14 +167,155 @@ def info(filepath): ...@@ -123,14 +167,155 @@ def info(filepath):
Args: Args:
filepath (string): path to audio file filepath (string): path to audio file
Returns: tuple(C, L, sr, precision) Returns: tuple(si, ei)
- C (int): number of audio channels - si (sox_signalinfo_t): signal info as a python object
- L (int): length of each channel in frames (samples / channels) - ei (sox_encodinginfo_t): encoding info as a python object
- sr (int): sample rate i.e. samples per second
- precision (float): bit precision i.e. 32-bit or 16-bit audio
Example:: Example::
>>> num_channels, length, sample_rate, precision = torchaudio.info('foo.wav') >>> si, ei = torchaudio.info('foo.wav')
>>> rate, channels, encoding = si.rate, si.channels, ei.encoding
""" """
C, L, sr, bp = _torch_sox.get_info(filepath) return _torch_sox.get_info(filepath)
return C, L, sr, bp
def effect_names():
"""Gets list of valid sox effect names
Returns: list[str]
Example::
>>> EFFECT_NAMES = torchaudio.effect_names()
"""
return _torch_sox.get_effect_names()
def SoxEffect():
"""Create a object to hold sox effect and options to pass between python and c++
Returns: SoxEffects(object)
- ename (str), name of effect
- eopts (list[str]), list of effect options
"""
return _torch_sox.SoxEffect()
def sox_signalinfo_t():
"""Create a sox_signalinfo_t object. This object can be used to set the sample
rate, number of channels, length, bit precision and headroom multiplier
primarily for effects
Returns: sox_signalinfo_t(object)
- rate (float), sample rate as a float, practically will likely be an integer float
- channel (int), number of audio channels
- precision (int), bit precision
- length (int), length of audio, 0 for unspecified and -1 for unknown
- mult (float, optional), headroom multiplier for effects and None for no multiplier
"""
return _torch_sox.sox_signalinfo_t()
def sox_encodinginfo_t():
"""Create a sox_encodinginfo_t object. This object can be used to set the encoding
type, bit precision, compression factor, reverse bytes, reverse nibbles,
reverse bits and endianness. This can be used in an effects chain to encode the
final output or to save a file with a specific encoding. For example, one could
use the sox ulaw encoding to do 8-bit ulaw encoding. Note in a tensor output
the result will be a 32-bit number, but number of unique values will be determined by
the bit precision.
Returns: sox_encodinginfo_t(object)
- encoding (sox_encoding_t), output encoding
- bits_per_sample (int), bit precision, same as `precision` in sox_signalinfo_t
- compression (float), compression for lossy formats, 0.0 for default compression
- reverse_bytes (sox_option_t), reverse bytes, use sox_option_default
- reverse_nibbles (sox_option_t), reverse nibbles, use sox_option_default
- reverse_bits (sox_option_t), reverse bytes, use sox_option_default
- opposite_endian (sox_bool), change endianness, use sox_false
"""
ei = _torch_sox.sox_encodinginfo_t()
sdo = get_sox_option_t(2) # sox_default_option
ei.reverse_bytes = sdo
ei.reverse_nibbles = sdo
ei.reverse_bits = sdo
return ei
def get_sox_encoding_t(i=None):
"""Get enum of sox_encoding_t for sox encodings.
Args:
i (int, optional): choose type or get a dict with all possible options
use .__members__ to see all options when not specified
Returns:
sox_encoding_t: a sox_encoding_t type for output encoding
"""
if i is None:
# one can see all possible values using the .__members__ attribute
return _torch_sox.sox_encoding_t
else:
return _torch_sox.sox_encoding_t(i)
def get_sox_option_t(i=2):
"""Get enum of sox_option_t for sox encodinginfo options.
Args:
i (int, optional): choose type or get a dict with all possible options
use .__members__ to see all options when not specified.
Defaults to sox_option_default.
Returns:
sox_option_t: a sox_option_t type
"""
if i is None:
return _torch_sox.sox_option_t
else:
return _torch_sox.sox_option_t(i)
def get_sox_bool(i=0):
"""Get enum of sox_bool for sox encodinginfo options.
Args:
i (int, optional): choose type or get a dict with all possible options
use .__members__ to see all options when not specified.
Defaults to sox_false.
Returns:
sox_bool: a sox_bool type
"""
if i is None:
return _torch_sox.sox_bool
else:
return _torch_sox.sox_bool(i)
def initialize_sox():
"""Initialize sox for effects chain. Not required for simple loading. Importantly,
only initialize this once and do not shutdown until you have done effect chain
calls even when loading multiple files.
"""
return _torch_sox.initialize_sox()
def shutdown_sox():
"""Showdown sox for effects chain. Not required for simple loading. Importantly,
only call once. Attempting to re-initialize sox will result seg faults.
"""
return _torch_sox.shutdown_sox()
def _audio_normalization(signal, normalization):
# assumes signed 32-bit depth, which is what sox uses internally
if not normalization:
return
if isinstance(normalization, bool):
normalization = 1 << 31
if isinstance(normalization, (float, int)):
# normalize with custom value
a = normalization
signal /= a
elif callable(normalization):
a = normalization(signal)
signal /= a
import os.path
import torch
import _torch_sox
from torchaudio import save as save_new, load as load_new
def load(filepath, out=None, normalization=None, num_frames=-1, offset=0):
"""Loads an audio file from disk into a Tensor. The default options have
changed as of torchaudio 0.2 and this function maintains option defaults
from version 0.1.
Args:
filepath (string): path to audio file
out (Tensor, optional): an output Tensor to use instead of creating one
normalization (bool or number, optional): If boolean `True`, then output is divided by `1 << 31`
(assumes 16-bit depth audio, and normalizes to `[0, 1]`.
If `number`, then output is divided by that number
num_frames (int, optional): number of frames to load. -1 to load everything after the offset.
offset (int, optional): number of frames from the start of the file to begin data loading.
Returns: tuple(Tensor, int)
- Tensor: output Tensor of size `[L x C]` where L is the number of audio frames, C is the number of channels
- int: the sample-rate of the audio (as listed in the metadata of the file)
Example::
>>> data, sample_rate = torchaudio.load('foo.mp3')
>>> print(data.size())
torch.Size([278756, 2])
>>> print(sample_rate)
44100
"""
return load_new(filepath, out, normalization, False, num_frames, offset)
def save(filepath, src, sample_rate, precision=32):
"""Saves a Tensor with audio signal to disk as a standard format like mp3, wav, etc.
The default options have changed as of torchaudio 0.2 and this function maintains
option defaults from version 0.1.
Args:
filepath (string): path to audio file
src (Tensor): an input 2D Tensor of shape `[L x C]` where L is
the number of audio frames, C is the number of channels
sample_rate (int): the sample-rate of the audio to be saved
precision (int, optional): the bit-precision of the audio to be saved
Example::
>>> data, sample_rate = torchaudio.load('foo.mp3')
>>> torchaudio.save('foo.wav', data, sample_rate)
"""
save_new(filepath, src, sample_rate, precision, False)
import torch
import _torch_sox
import torchaudio
EFFECT_NAMES = set(_torch_sox.get_effect_names())
"""
Notes:
sox_signalinfo_t {
sox_rate_t rate; /**< samples per second, 0 if unknown */
unsigned channels; /**< number of sound channels, 0 if unknown */
unsigned precision; /**< bits per sample, 0 if unknown */
sox_uint64_t length; /**< samples * chans in file, 0 if unspecified, -1 if unknown */
double * mult; /**< Effects headroom multiplier; may be null */
}
typedef struct sox_encodinginfo_t {
sox_encoding_t encoding; /**< format of sample numbers */
unsigned bits_per_sample; /**< 0 if unknown or variable; uncompressed value if lossless; compressed value if lossy */
double compression; /**< compression factor (where applicable) */
sox_option_t reverse_bytes; /** use sox_option_default */
sox_option_t reverse_nibbles; /** use sox_option_default */
sox_option_t reverse_bits; /** use sox_option_default */
sox_bool opposite_endian; /** use sox_false */
}
sox_encodings_t = {
"SOX_ENCODING_UNKNOWN",
"SOX_ENCODING_SIGN2",
"SOX_ENCODING_UNSIGNED",
"SOX_ENCODING_FLOAT",
"SOX_ENCODING_FLOAT_TEXT",
"SOX_ENCODING_FLAC",
"SOX_ENCODING_HCOM",
"SOX_ENCODING_WAVPACK",
"SOX_ENCODING_WAVPACKF",
"SOX_ENCODING_ULAW",
"SOX_ENCODING_ALAW",
"SOX_ENCODING_G721",
"SOX_ENCODING_G723",
"SOX_ENCODING_CL_ADPCM",
"SOX_ENCODING_CL_ADPCM16",
"SOX_ENCODING_MS_ADPCM",
"SOX_ENCODING_IMA_ADPCM",
"SOX_ENCODING_OKI_ADPCM",
"SOX_ENCODING_DPCM",
"SOX_ENCODING_DWVW",
"SOX_ENCODING_DWVWN",
"SOX_ENCODING_GSM",
"SOX_ENCODING_MP3",
"SOX_ENCODING_VORBIS",
"SOX_ENCODING_AMR_WB",
"SOX_ENCODING_AMR_NB",
"SOX_ENCODING_CVSD",
"SOX_ENCODING_LPC10",
"SOX_ENCODING_OPUS",
"SOX_ENCODINGS"
}
"""
class SoxEffects(object):
def __init__(self, normalization=True, channels_first=True, out_siginfo=None, out_encinfo=None, filetype="raw"):
self.input_file = None
self.chain = []
self.MAX_EFFECT_OPTS = 20
self.out_siginfo = out_siginfo
self.out_encinfo = out_encinfo
self.filetype = filetype
self.normalization = normalization
self.channels_first = channels_first
def sox_check_effect(self, e):
if e.lower() not in EFFECT_NAMES:
raise LookupError("Effect name, {}, not valid".format(e.lower()))
return e.lower()
def sox_append_effect_to_chain(self, ename, eargs=None):
e = torchaudio.SoxEffect()
# check if we have a valid effect
ename = self.sox_check_effect(ename)
if eargs is None or eargs == []:
eargs = [""]
elif not isinstance(eargs, list):
eargs = [eargs]
eargs = self._flatten(eargs)
if len(eargs) > self.MAX_EFFECT_OPTS:
raise RuntimeError("Number of effect options ({}) is greater than max "
"suggested number of options {}. Increase MAX_EFFECT_OPTS "
"or lower the number of effect options".format(len(eargs), self.MAX_EFFECT_OPTS))
e.ename = ename
e.eopts = eargs
self.chain.append(e)
def sox_build_flow_effects(self, out=None):
# initialize output tensor
if out is not None:
torchaudio.check_input(out)
else:
out = torch.FloatTensor()
if not len(self.chain):
e = torchaudio.SoxEffect()
e.ename = "no_effects"
e.eopts = [""]
self.chain.append(e)
# print("effect options:", [x.eopts for x in self.chain])
sr = _torch_sox.build_flow_effects(self.input_file,
out,
self.channels_first,
self.out_siginfo,
self.out_encinfo,
self.filetype,
self.chain,
self.MAX_EFFECT_OPTS)
torchaudio._audio_normalization(out, self.normalization)
return out, sr
def clear_chain(self):
self.chain = []
def set_input_file(self, input_file):
self.input_file = input_file
# https://stackoverflow.com/questions/12472338/flattening-a-list-recursively
# convenience function to flatten list recursively
def _flatten(self, x):
if x == []:
return []
if isinstance(x[0], list):
return self._flatten(x[:1]) + self._flatten(x[:1])
return [str(a) for a in x[:1]] + self._flatten(x[1:])
...@@ -31,16 +31,27 @@ struct SoxDescriptor { ...@@ -31,16 +31,27 @@ struct SoxDescriptor {
sox_format_t* fd_; sox_format_t* fd_;
}; };
int64_t write_audio(SoxDescriptor& fd, at::Tensor tensor) {
std::vector<sox_sample_t> buffer(tensor.numel());
AT_DISPATCH_ALL_TYPES(tensor.type(), "write_audio_buffer", [&] {
auto* data = tensor.data<scalar_t>();
std::copy(data, data + tensor.numel(), buffer.begin());
});
const auto samples_written =
sox_write(fd.get(), buffer.data(), buffer.size());
return samples_written;
}
void read_audio( void read_audio(
SoxDescriptor& fd, SoxDescriptor& fd,
at::Tensor output, at::Tensor output,
int64_t number_of_channels, int64_t buffer_length) {
int64_t buffer_length,
int64_t offset) {
std::vector<sox_sample_t> buffer(buffer_length); std::vector<sox_sample_t> buffer(buffer_length);
if (sox_seek(fd.get(), offset, 0) == SOX_EOF) {
throw std::runtime_error("sox_seek reached EOF, try reducing offset or num_samples"); int number_of_channels = fd->signal.channels;
}
const int64_t samples_read = sox_read(fd.get(), buffer.data(), buffer_length); const int64_t samples_read = sox_read(fd.get(), buffer.data(), buffer_length);
if (samples_read == 0) { if (samples_read == 0) {
throw std::runtime_error( throw std::runtime_error(
...@@ -55,50 +66,74 @@ void read_audio( ...@@ -55,50 +66,74 @@ void read_audio(
std::copy(buffer.begin(), buffer.begin() + samples_read, data); std::copy(buffer.begin(), buffer.begin() + samples_read, data);
}); });
} }
} // namespace
int64_t write_audio(SoxDescriptor& fd, at::Tensor tensor) { struct SoxEffect {
std::vector<sox_sample_t> buffer(tensor.numel()); SoxEffect() : ename(""), eopts({""}) { }
std::string ename;
AT_DISPATCH_ALL_TYPES(tensor.type(), "write_audio_buffer", [&] { std::vector<std::string> eopts;
auto* data = tensor.data<scalar_t>(); };
std::copy(data, data + tensor.numel(), buffer.begin());
});
const auto samples_written = std::tuple<sox_signalinfo_t, sox_encodinginfo_t> get_info(
sox_write(fd.get(), buffer.data(), buffer.size()); const std::string& file_name
) {
SoxDescriptor fd(sox_open_read(
file_name.c_str(),
/*signal=*/nullptr,
/*encoding=*/nullptr,
/*filetype=*/nullptr));
if (fd.get() == nullptr) {
throw std::runtime_error("Error opening audio file");
}
return std::make_tuple(fd->signal, fd->encoding);
}
return samples_written; std::vector<std::string> get_effect_names() {
sox_effect_fn_t const * fns = sox_get_effect_fns();
std::vector<std::string> sv;
for(int i = 0; fns[i]; ++i) {
const sox_effect_handler_t *eh = fns[i] ();
if(eh && eh->name)
sv.push_back(eh->name);
}
return sv;
} }
} // namespace
int read_audio_file( int read_audio_file(
const std::string& file_name, const std::string& file_name,
at::Tensor output, at::Tensor output,
bool ch_first,
int64_t nframes, int64_t nframes,
int64_t offset) { int64_t offset,
sox_signalinfo_t* si,
sox_encodinginfo_t* ei,
const char* ft) {
SoxDescriptor fd(sox_open_read( SoxDescriptor fd(sox_open_read(
file_name.c_str(), file_name.c_str(),
/*signal=*/nullptr, /*signal=*/si,
/*encoding=*/nullptr, /*encoding=*/ei,
/*filetype=*/nullptr)); /*filetype=*/ft));
if (fd.get() == nullptr) { if (fd.get() == nullptr) {
throw std::runtime_error("Error opening audio file"); throw std::runtime_error("Error opening audio file");
} }
const int64_t number_of_channels = fd->signal.channels; const int number_of_channels = fd->signal.channels;
const int sample_rate = fd->signal.rate; const int sample_rate = fd->signal.rate;
const int64_t total_length = fd->signal.length; const int64_t total_length = fd->signal.length;
if (total_length == 0) { if (total_length == 0) {
throw std::runtime_error("Error reading audio file: unknown length"); throw std::runtime_error("Error reading audio file: unknown length");
} }
if (offset > total_length) {
throw std::runtime_error("Offset past EOF");
}
// calculate buffer length // calculate buffer length
int64_t buffer_length = total_length; int64_t buffer_length = total_length;
if (offset > 0 && offset < total_length) { if (offset > 0) {
buffer_length -= offset; buffer_length -= offset;
} }
if (nframes != -1 && buffer_length > nframes) { if (nframes != -1 && buffer_length > nframes) {
// get requested number of frames
buffer_length = nframes; buffer_length = nframes;
} }
...@@ -106,7 +141,17 @@ int read_audio_file( ...@@ -106,7 +141,17 @@ int read_audio_file(
buffer_length *= number_of_channels; buffer_length *= number_of_channels;
offset *= number_of_channels; offset *= number_of_channels;
read_audio(fd, output, number_of_channels, buffer_length, offset); // seek to offset point before reading data
if (sox_seek(fd.get(), offset, 0) == SOX_EOF) {
throw std::runtime_error("sox_seek reached EOF, try reducing offset or num_samples");
}
// read data and fill output tensor
read_audio(fd, output, buffer_length);
if (ch_first) {
output.transpose_(1, 0);
}
return sample_rate; return sample_rate;
} }
...@@ -114,31 +159,26 @@ int read_audio_file( ...@@ -114,31 +159,26 @@ int read_audio_file(
void write_audio_file( void write_audio_file(
const std::string& file_name, const std::string& file_name,
at::Tensor tensor, at::Tensor tensor,
const std::string& extension, sox_signalinfo_t* si,
int sample_rate, sox_encodinginfo_t* ei,
int precision) { const char* file_type) {
if (!tensor.is_contiguous()) { if (!tensor.is_contiguous()) {
throw std::runtime_error( throw std::runtime_error(
"Error writing audio file: input tensor must be contiguous"); "Error writing audio file: input tensor must be contiguous");
} }
sox_signalinfo_t signal; // remove ?
signal.rate = sample_rate;
signal.channels = tensor.size(1);
signal.length = tensor.numel();
signal.precision = precision; // precision in bits
#if SOX_LIB_VERSION_CODE >= 918272 // >= 14.3.0 #if SOX_LIB_VERSION_CODE >= 918272 // >= 14.3.0
signal.mult = nullptr; si->mult = nullptr;
#endif #endif
SoxDescriptor fd(sox_open_write( SoxDescriptor fd(sox_open_write(
file_name.c_str(), file_name.c_str(),
&signal, si,
/*encoding=*/nullptr, ei,
extension.c_str(), file_type,
/*filetype=*/nullptr, /*oob=*/nullptr,
/*oob=*/nullptr)); /*overwrite=*/nullptr));
if (fd.get() == nullptr) { if (fd.get() == nullptr) {
throw std::runtime_error( throw std::runtime_error(
...@@ -153,27 +193,279 @@ void write_audio_file( ...@@ -153,27 +193,279 @@ void write_audio_file(
} }
} }
std::tuple<int64_t, int64_t, int64_t, int64_t> get_info( int initialize_sox() {
const std::string& file_name /* Initializion for sox effects. Only initialize once */
) { return sox_init();
SoxDescriptor fd(sox_open_read( }
file_name.c_str(),
/*signal=*/nullptr, int shutdown_sox() {
/*encoding=*/nullptr, /* Shutdown for sox effects. Do not shutdown between multiple calls */
/*filetype=*/nullptr)); return sox_quit();
if (fd.get() == nullptr) { }
int build_flow_effects(const std::string& file_name,
at::Tensor otensor,
bool ch_first,
sox_signalinfo_t* target_signal,
sox_encodinginfo_t* target_encoding,
const char* file_type,
std::vector<SoxEffect> pyeffs,
int max_num_eopts) {
/* This function builds an effects flow and puts the results into a tensor.
It can also be used to re-encode audio using any of the available encoding
options in SoX including sample rate and channel re-encoding. */
// open input
sox_format_t* input = sox_open_read(file_name.c_str(), nullptr, nullptr, nullptr);
if (input == nullptr) {
throw std::runtime_error("Error opening audio file"); throw std::runtime_error("Error opening audio file");
} }
int64_t nchannels = fd->signal.channels;
int64_t length = fd->signal.length; // only used if target signal or encoding are null
int64_t sample_rate = fd->signal.rate; sox_signalinfo_t empty_signal;
int64_t precision = fd->signal.precision; sox_encodinginfo_t empty_encoding;
return std::make_tuple(nchannels, length, sample_rate, precision);
// set signalinfo and encodinginfo if blank
if(target_signal == nullptr) {
target_signal = &empty_signal;
target_signal->rate = input->signal.rate;
target_signal->channels = input->signal.channels;
target_signal->length = SOX_UNSPEC;
target_signal->precision = input->signal.precision;
#if SOX_LIB_VERSION_CODE >= 918272 // >= 14.3.0
target_signal->mult = nullptr;
#endif
}
if(target_encoding == nullptr) {
target_encoding = &empty_encoding;
target_encoding->encoding = SOX_ENCODING_SIGN2; // Sample format
target_encoding->bits_per_sample = input->signal.precision; // Bits per sample
target_encoding->compression = 0.0; // Compression factor
target_encoding->reverse_bytes = sox_option_default; // Should bytes be reversed
target_encoding->reverse_nibbles = sox_option_default; // Should nibbles be reversed
target_encoding->reverse_bits = sox_option_default; // Should bits be reversed (pairs of bits?)
target_encoding->opposite_endian = sox_false; // Reverse endianness
}
// set target precision / bits_per_sample if it's still 0
//if (target_signal->precision == 0)
// target_signal->precision = input->signal.precision;
//if (target_encoding->bits_per_sample == 0)
// target_encoding->bits_per_sample = input->signal.precision;
// check for rate or channels effect and change the output signalinfo accordingly
for (SoxEffect se : pyeffs) {
if (se.ename == "rate") {
target_signal->rate = std::stod(se.eopts[0]);
//se.eopts[0] = "";
} else if (se.ename == "channels") {
target_signal->channels = std::stoi(se.eopts[0]);
//se.eopts[0] = "";
}
}
// create interm_signal for effects, intermediate steps change this in-place
sox_signalinfo_t interm_signal = input->signal;
// create buffer and buffer_size for output in memwrite
char* buffer;
size_t buffer_size;
//const char* otype = (file_type.empty()) ? (const char*) "raw" : file_type.c_str();
#ifdef __APPLE__
// According to Mozilla Deepspeech sox_open_memstream_write doesn't work
// with OSX
char* tmp_name = tmpnam(NULL);
assert(tmp_name);
sox_format_t* output = sox_open_write(tmp_name, &target_signal,
&target_encoding, file_type, nullptr, nullptr);
#else
// in-memory descriptor (this may not work for OSX)
sox_format_t* output = sox_open_memstream_write(&buffer,
&buffer_size,
target_signal,
target_encoding,
file_type, nullptr);
#endif
assert(output);
// Setup the effects chain to decode/resample
sox_effects_chain_t* chain =
sox_create_effects_chain(&input->encoding, &output->encoding);
sox_effect_t* e = sox_create_effect(sox_find_effect("input"));
char* io_args[1];
io_args[0] = (char*)input;
sox_effect_options(e, 1, io_args);
sox_add_effect(chain, e, &interm_signal, &input->signal);
free(e);
for(SoxEffect tae : pyeffs) {
if(tae.ename == "no_effects") break;
e = sox_create_effect(sox_find_effect(tae.ename.c_str()));
if(tae.eopts[0] == "") {
sox_effect_options(e, 0, nullptr);
} else {
int num_opts = tae.eopts.size();
char* sox_args[max_num_eopts];
//for(std::string s : tae.eopts) {
for(std::vector<std::string>::size_type i = 0; i != tae.eopts.size(); i++) {
sox_args[i] = (char*) tae.eopts[i].c_str();
}
sox_effect_options(e, num_opts, sox_args);
}
sox_add_effect(chain, e, &interm_signal, &input->signal);
free(e);
}
e = sox_create_effect(sox_find_effect("output"));
io_args[0] = (char*)output;
sox_effect_options(e, 1, io_args);
sox_add_effect(chain, e, &interm_signal, &output->signal);
free(e);
// Finally run the effects chain
sox_flow_effects(chain, nullptr, nullptr);
sox_delete_effects_chain(chain);
// Close sox handles, buffer does not get properly sized until these are closed
sox_close(output);
sox_close(input);
// Resize output tensor to desired dimensions
int nc = interm_signal.channels;
int ns = interm_signal.length;
otensor.resize_({ns/nc, nc});
otensor = otensor.contiguous();
// Read the in-memory audio buffer or temp file that we just wrote.
#ifdef __APPLE__
buffer_size = (size_t) ns * 2; // sizeof(char)? dependent on bit precision?
input = sox_open_read(tmp_name, target_signal, target_encoding, file_type);
#else
input = sox_open_mem_read(buffer, buffer_size, target_signal, target_encoding, file_type);
#endif
std::vector<sox_sample_t> samples(buffer_size);
const int64_t samples_read = sox_read(input, samples.data(), buffer_size);
// buffer size is twice signal length, but half the buffer is empty so correct
// number of samples should be read
assert(samples_read != nc * ns && samples_read != 0);
AT_DISPATCH_ALL_TYPES(otensor.type(), "effects_buffer", [&] {
auto* data = otensor.data<scalar_t>();
std::copy(samples.begin(), samples.begin() + samples_read, data);
});
// free buffer and quit sox
sox_close(input);
#ifdef __APPLE__
unlink(tmp_name)
#endif
free(buffer);
if (ch_first) {
otensor.transpose_(1, 0);
}
return (int) target_signal->rate;
} }
} // namespace audio } // namespace audio
} // namespace torch } // namespace torch
PYBIND11_MODULE(_torch_sox, m) { PYBIND11_MODULE(_torch_sox, m) {
py::class_<torch::audio::SoxEffect>(m, "SoxEffect")
.def(py::init<>())
.def("__repr__", [](const torch::audio::SoxEffect &self) {
std::stringstream ss;
std::string sep;
ss << "SoxEffect (" << self.ename << " ,[";
for(std::string s : self.eopts) {
ss << sep << "\"" << s << "\"";
sep = ", ";
}
ss << "])\n";
return ss.str();
})
.def_readwrite("ename", &torch::audio::SoxEffect::ename)
.def_readwrite("eopts", &torch::audio::SoxEffect::eopts);
py::class_<sox_signalinfo_t>(m, "sox_signalinfo_t")
.def(py::init<>())
.def("__repr__", [](const sox_signalinfo_t &self) {
std::stringstream ss;
ss << "sox_signalinfo_t {\n"
<< " rate-> " << self.rate << "\n"
<< " channels-> " << self.channels << "\n"
<< " precision-> " << self.precision << "\n"
<< " length-> " << self.length << "\n"
<< " mult-> " << self.mult << "\n"
<< "}\n";
return ss.str();
})
.def_readwrite("rate", &sox_signalinfo_t::rate)
.def_readwrite("channels", &sox_signalinfo_t::channels)
.def_readwrite("precision", &sox_signalinfo_t::precision)
.def_readwrite("length", &sox_signalinfo_t::length)
.def_readwrite("mult", &sox_signalinfo_t::mult);
py::class_<sox_encodinginfo_t>(m, "sox_encodinginfo_t")
.def(py::init<>())
.def("__repr__", [](const sox_encodinginfo_t &self) {
std::stringstream ss;
ss << "sox_encodinginfo_t {\n"
<< " encoding-> " << self.encoding << "\n"
<< " bits_per_sample-> " << self.bits_per_sample << "\n"
<< " compression-> " << self.compression << "\n"
<< " reverse_bytes-> " << self.reverse_bytes << "\n"
<< " reverse_nibbles-> " << self.reverse_nibbles << "\n"
<< " reverse_bits-> " << self.reverse_bits << "\n"
<< " opposite_endian-> " << self.opposite_endian << "\n"
<< "}\n";
return ss.str();
})
.def_readwrite("encoding", &sox_encodinginfo_t::encoding)
.def_readwrite("bits_per_sample", &sox_encodinginfo_t::bits_per_sample)
.def_readwrite("compression", &sox_encodinginfo_t::compression)
.def_readwrite("reverse_bytes", &sox_encodinginfo_t::reverse_bytes)
.def_readwrite("reverse_nibbles", &sox_encodinginfo_t::reverse_nibbles)
.def_readwrite("reverse_bits", &sox_encodinginfo_t::reverse_bits)
.def_readwrite("opposite_endian", &sox_encodinginfo_t::opposite_endian);
py::enum_<sox_encoding_t>(m, "sox_encoding_t")
.value("SOX_ENCODING_UNKNOWN", sox_encoding_t::SOX_ENCODING_UNKNOWN)
.value("SOX_ENCODING_SIGN2", sox_encoding_t::SOX_ENCODING_SIGN2)
.value("SOX_ENCODING_UNSIGNED", sox_encoding_t::SOX_ENCODING_UNSIGNED)
.value("SOX_ENCODING_FLOAT", sox_encoding_t::SOX_ENCODING_FLOAT)
.value("SOX_ENCODING_FLOAT_TEXT", sox_encoding_t::SOX_ENCODING_FLOAT_TEXT)
.value("SOX_ENCODING_FLAC", sox_encoding_t::SOX_ENCODING_FLAC)
.value("SOX_ENCODING_HCOM", sox_encoding_t::SOX_ENCODING_HCOM)
.value("SOX_ENCODING_WAVPACK", sox_encoding_t::SOX_ENCODING_WAVPACK)
.value("SOX_ENCODING_WAVPACKF", sox_encoding_t::SOX_ENCODING_WAVPACKF)
.value("SOX_ENCODING_ULAW", sox_encoding_t::SOX_ENCODING_ULAW)
.value("SOX_ENCODING_ALAW", sox_encoding_t::SOX_ENCODING_ALAW)
.value("SOX_ENCODING_G721", sox_encoding_t::SOX_ENCODING_G721)
.value("SOX_ENCODING_G723", sox_encoding_t::SOX_ENCODING_G723)
.value("SOX_ENCODING_CL_ADPCM", sox_encoding_t::SOX_ENCODING_CL_ADPCM)
.value("SOX_ENCODING_CL_ADPCM16", sox_encoding_t::SOX_ENCODING_CL_ADPCM16)
.value("SOX_ENCODING_MS_ADPCM", sox_encoding_t::SOX_ENCODING_MS_ADPCM)
.value("SOX_ENCODING_IMA_ADPCM", sox_encoding_t::SOX_ENCODING_IMA_ADPCM)
.value("SOX_ENCODING_OKI_ADPCM", sox_encoding_t::SOX_ENCODING_OKI_ADPCM)
.value("SOX_ENCODING_DPCM", sox_encoding_t::SOX_ENCODING_DPCM)
.value("SOX_ENCODING_DWVW", sox_encoding_t::SOX_ENCODING_DWVW)
.value("SOX_ENCODING_DWVWN", sox_encoding_t::SOX_ENCODING_DWVWN)
.value("SOX_ENCODING_GSM", sox_encoding_t::SOX_ENCODING_GSM)
.value("SOX_ENCODING_MP3", sox_encoding_t::SOX_ENCODING_MP3)
.value("SOX_ENCODING_VORBIS", sox_encoding_t::SOX_ENCODING_VORBIS)
.value("SOX_ENCODING_AMR_WB", sox_encoding_t::SOX_ENCODING_AMR_WB)
.value("SOX_ENCODING_AMR_NB", sox_encoding_t::SOX_ENCODING_AMR_NB)
.value("SOX_ENCODING_LPC10", sox_encoding_t::SOX_ENCODING_LPC10)
//.value("SOX_ENCODING_OPUS", sox_encoding_t::SOX_ENCODING_OPUS) // creates a compile error
.value("SOX_ENCODINGS", sox_encoding_t::SOX_ENCODINGS)
.export_values();
py::enum_<sox_option_t>(m, "sox_option_t")
.value("sox_option_no", sox_option_t::sox_option_no)
.value("sox_option_yes", sox_option_t::sox_option_yes)
.value("sox_option_default", sox_option_t::sox_option_default)
.export_values();
py::enum_<sox_bool>(m, "sox_bool")
.value("sox_false", sox_bool::sox_false)
.value("sox_true", sox_bool::sox_true)
.export_values();
m.def( m.def(
"read_audio_file", "read_audio_file",
&torch::audio::read_audio_file, &torch::audio::read_audio_file,
...@@ -186,4 +478,20 @@ PYBIND11_MODULE(_torch_sox, m) { ...@@ -186,4 +478,20 @@ PYBIND11_MODULE(_torch_sox, m) {
"get_info", "get_info",
&torch::audio::get_info, &torch::audio::get_info,
"Gets information about an audio file"); "Gets information about an audio file");
m.def(
"get_effect_names",
&torch::audio::get_effect_names,
"Gets the names of all available effects");
m.def(
"build_flow_effects",
&torch::audio::build_flow_effects,
"build effects and flow chain into tensors");
m.def(
"initialize_sox",
&torch::audio::initialize_sox,
"initialize sox for effects");
m.def(
"shutdown_sox",
&torch::audio::shutdown_sox,
"shutdown sox for effects");
} }
...@@ -11,26 +11,53 @@ namespace torch { namespace audio { ...@@ -11,26 +11,53 @@ namespace torch { namespace audio {
/// Throws `std::runtime_error` if the audio file could not be opened, or an /// Throws `std::runtime_error` if the audio file could not be opened, or an
/// error ocurred during reading of the audio data. /// error ocurred during reading of the audio data.
int read_audio_file( int read_audio_file(
const std::string& path, const std::string& file_name,
at::Tensor output, at::Tensor output,
int64_t number_of_samples, bool ch_first,
int64_t offset); int64_t nframes,
int64_t offset,
sox_signalinfo_t* si,
sox_encodinginfo_t* ei,
const char* ft)
/// Writes the data of a `Tensor` into an audio file at the given `path`, with /// Writes the data of a `Tensor` into an audio file at the given `path`, with
/// a certain extension (e.g. `wav`or `mp3`) and sample rate. /// a certain extension (e.g. `wav`or `mp3`) and sample rate.
/// Throws `std::runtime_error` when the audio file could not be opened for /// Throws `std::runtime_error` when the audio file could not be opened for
/// writing, or an error ocurred during writing of the audio data. /// writing, or an error ocurred during writing of the audio data.
void write_audio_file( void write_audio_file(
const std::string& path, const std::string& file_name,
at::Tensor tensor, at::Tensor tensor,
const std::string& extension, bool ch_first,
int sample_rate, sox_signalinfo_t* si,
int precision); sox_encodinginfo_t* ei,
const char* extension)
/// Reads an audio file from the given `path` and returns a tuple of /// Reads an audio file from the given `path` and returns a tuple of
/// the number of channels, length in samples, sample rate, and bits / sec. /// sox_signalinfo_t and sox_encodinginfo_t, which contain information about
/// the audio file such as sample rate, length, bit precision, encoding and more.
/// Throws `std::runtime_error` if the audio file could not be opened, or an /// Throws `std::runtime_error` if the audio file could not be opened, or an
/// error ocurred during reading of the audio data. /// error ocurred during reading of the audio data.
std::tuple<int64_t, int64_t, int64_t, int64_t> get_info( std::tuple<sox_signalinfo_t, sox_encodinginfo_t> get_info(
const std::string& file_name); const std::string& file_name);
// get names of all sox effects
std::vector<std::string> get_effect_names();
// Initialize and Shutdown SoX effects chain. These functions should only be run once.
int initialize_sox();
int shutdown_sox();
/// Build a SoX chain, flow the effects, and capture the results in a tensor.
/// An audio file from the given `path` flows through an effects chain given
/// by a list of effects and effect options to an output buffer which is encoded
/// into memory to a target signal type and target signal encoding. The resulting
/// buffer is then placed into a tensor. This function returns the output tensor
/// and the sample rate of the output tensor.
int build_flow_effects(const std::string& file_name,
at::Tensor otensor,
sox_signalinfo_t* target_signal,
sox_encodinginfo_t* target_encoding,
const char* file_type,
std::vector<SoxEffect> pyeffs,
int max_num_eopts);
}} // namespace torch::audio }} // namespace torch::audio
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment