Commit 1e6a8f93 authored by moto's avatar moto Committed by Facebook GitHub Bot
Browse files

Refactor _backend module (#3547)

Summary:
* Move Backend implementations to separate files

Pull Request resolved: https://github.com/pytorch/audio/pull/3547

Reviewed By: hwangjeff

Differential Revision: D48233538

Pulled By: mthrok

fbshipit-source-id: bcc63fc07a5dfcd48929f0a2fb64bfcb3282eb92
parent 06301c0a
import os
from abc import ABC, abstractmethod
from typing import BinaryIO, Optional, Tuple, Union
from torch import Tensor
from torchaudio.backend.common import AudioMetaData
class Backend(ABC):
@staticmethod
@abstractmethod
def info(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str], buffer_size: int = 4096) -> AudioMetaData:
raise NotImplementedError
@staticmethod
@abstractmethod
def load(
uri: Union[BinaryIO, str, os.PathLike],
frame_offset: int = 0,
num_frames: int = -1,
normalize: bool = True,
channels_first: bool = True,
format: Optional[str] = None,
buffer_size: int = 4096,
) -> Tuple[Tensor, int]:
raise NotImplementedError
@staticmethod
@abstractmethod
def save(
uri: Union[BinaryIO, str, os.PathLike],
src: Tensor,
sample_rate: int,
channels_first: bool = True,
format: Optional[str] = None,
encoding: Optional[str] = None,
bits_per_sample: Optional[int] = None,
buffer_size: int = 4096,
) -> None:
raise NotImplementedError
@staticmethod
@abstractmethod
def can_decode(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str]) -> bool:
raise NotImplementedError
@staticmethod
@abstractmethod
def can_encode(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str]) -> bool:
raise NotImplementedError
import os
import re
import sys
from typing import BinaryIO, Optional, Tuple, Union
......@@ -7,6 +8,8 @@ import torchaudio
from torchaudio.backend.common import AudioMetaData
from torchaudio.io import StreamWriter
from .backend import Backend
if torchaudio._extension._FFMPEG_EXT is not None:
StreamReaderFileObj = torchaudio._extension._FFMPEG_EXT.StreamReaderFileObj
else:
......@@ -276,3 +279,87 @@ def save_audio(
)
with s.open():
s.write_audio_chunk(0, src)
def _map_encoding(encoding: str) -> str:
for dst in ["PCM_S", "PCM_U", "PCM_F"]:
if dst in encoding:
return dst
if encoding == "PCM_MULAW":
return "ULAW"
elif encoding == "PCM_ALAW":
return "ALAW"
return encoding
def _get_bits_per_sample(encoding: str, bits_per_sample: int) -> str:
if m := re.search(r"PCM_\w(\d+)\w*", encoding):
return int(m.group(1))
elif encoding in ["PCM_ALAW", "PCM_MULAW"]:
return 8
return bits_per_sample
class FFmpegBackend(Backend):
@staticmethod
def info(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str], buffer_size: int = 4096) -> AudioMetaData:
if hasattr(uri, "read"):
metadata = info_audio_fileobj(uri, format, buffer_size=buffer_size)
else:
metadata = info_audio(os.path.normpath(uri), format)
metadata.bits_per_sample = _get_bits_per_sample(metadata.encoding, metadata.bits_per_sample)
metadata.encoding = _map_encoding(metadata.encoding)
return metadata
@staticmethod
def load(
uri: Union[BinaryIO, str, os.PathLike],
frame_offset: int = 0,
num_frames: int = -1,
normalize: bool = True,
channels_first: bool = True,
format: Optional[str] = None,
buffer_size: int = 4096,
) -> Tuple[torch.Tensor, int]:
if hasattr(uri, "read"):
return load_audio_fileobj(
uri,
frame_offset,
num_frames,
normalize,
channels_first,
format,
buffer_size,
)
else:
return load_audio(os.path.normpath(uri), frame_offset, num_frames, normalize, channels_first, format)
@staticmethod
def save(
uri: Union[BinaryIO, str, os.PathLike],
src: torch.Tensor,
sample_rate: int,
channels_first: bool = True,
format: Optional[str] = None,
encoding: Optional[str] = None,
bits_per_sample: Optional[int] = None,
buffer_size: int = 4096,
) -> None:
save_audio(
uri,
src,
sample_rate,
channels_first,
format,
encoding,
bits_per_sample,
buffer_size,
)
@staticmethod
def can_decode(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str]) -> bool:
return True
@staticmethod
def can_encode(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str]) -> bool:
return True
import os
from typing import BinaryIO, Optional, Tuple, Union
import torch
from torchaudio.backend import soundfile_backend
from torchaudio.backend.common import AudioMetaData
from .backend import Backend
class SoundfileBackend(Backend):
@staticmethod
def info(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str], buffer_size: int = 4096) -> AudioMetaData:
return soundfile_backend.info(uri, format)
@staticmethod
def load(
uri: Union[BinaryIO, str, os.PathLike],
frame_offset: int = 0,
num_frames: int = -1,
normalize: bool = True,
channels_first: bool = True,
format: Optional[str] = None,
buffer_size: int = 4096,
) -> Tuple[torch.Tensor, int]:
return soundfile_backend.load(uri, frame_offset, num_frames, normalize, channels_first, format)
@staticmethod
def save(
uri: Union[BinaryIO, str, os.PathLike],
src: torch.Tensor,
sample_rate: int,
channels_first: bool = True,
format: Optional[str] = None,
encoding: Optional[str] = None,
bits_per_sample: Optional[int] = None,
buffer_size: int = 4096,
) -> None:
soundfile_backend.save(
uri, src, sample_rate, channels_first, format=format, encoding=encoding, bits_per_sample=bits_per_sample
)
@staticmethod
def can_decode(uri, format) -> bool:
return True
@staticmethod
def can_encode(uri, format) -> bool:
return True
import os
from typing import BinaryIO, Optional, Tuple, Union
import torch
from torchaudio.backend.common import AudioMetaData
from .backend import Backend
class SoXBackend(Backend):
@staticmethod
def info(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str], buffer_size: int = 4096) -> AudioMetaData:
if hasattr(uri, "read"):
raise ValueError(
"SoX backend does not support reading from file-like objects. ",
"Please use an alternative backend that does support reading from file-like objects, e.g. FFmpeg.",
)
else:
sinfo = torch.ops.torchaudio.sox_io_get_info(uri, format)
if sinfo:
return AudioMetaData(*sinfo)
else:
raise RuntimeError(f"Failed to fetch metadata for {uri}.")
@staticmethod
def load(
uri: Union[BinaryIO, str, os.PathLike],
frame_offset: int = 0,
num_frames: int = -1,
normalize: bool = True,
channels_first: bool = True,
format: Optional[str] = None,
buffer_size: int = 4096,
) -> Tuple[torch.Tensor, int]:
if hasattr(uri, "read"):
raise ValueError(
"SoX backend does not support loading from file-like objects. ",
"Please use an alternative backend that does support loading from file-like objects, e.g. FFmpeg.",
)
else:
ret = torch.ops.torchaudio.sox_io_load_audio_file(
uri, frame_offset, num_frames, normalize, channels_first, format
)
if not ret:
raise RuntimeError(f"Failed to load audio from {uri}.")
return ret
@staticmethod
def save(
uri: Union[BinaryIO, str, os.PathLike],
src: torch.Tensor,
sample_rate: int,
channels_first: bool = True,
format: Optional[str] = None,
encoding: Optional[str] = None,
bits_per_sample: Optional[int] = None,
buffer_size: int = 4096,
) -> None:
if hasattr(uri, "write"):
raise ValueError(
"SoX backend does not support writing to file-like objects. ",
"Please use an alternative backend that does support writing to file-like objects, e.g. FFmpeg.",
)
else:
torch.ops.torchaudio.sox_io_save_audio_file(
uri,
src,
sample_rate,
channels_first,
None,
format,
encoding,
bits_per_sample,
)
@staticmethod
def can_decode(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str]) -> bool:
# i.e. not a file-like object.
return not hasattr(uri, "read")
@staticmethod
def can_encode(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str]) -> bool:
# i.e. not a file-like object.
return not hasattr(uri, "write")
import os
import re
from abc import ABC, abstractmethod
from functools import lru_cache
from typing import BinaryIO, Dict, Optional, Tuple, Union
from typing import BinaryIO, Dict, Optional, Tuple, Type, Union
import torch
......@@ -10,258 +8,15 @@ from torchaudio._extension import _FFMPEG_EXT, _SOX_INITIALIZED
from torchaudio.backend import soundfile_backend
from torchaudio.backend.common import AudioMetaData
from . import ffmpeg
class Backend(ABC):
@staticmethod
@abstractmethod
def info(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str], buffer_size: int = 4096) -> AudioMetaData:
raise NotImplementedError
@staticmethod
@abstractmethod
def load(
uri: Union[BinaryIO, str, os.PathLike],
frame_offset: int = 0,
num_frames: int = -1,
normalize: bool = True,
channels_first: bool = True,
format: Optional[str] = None,
buffer_size: int = 4096,
) -> Tuple[torch.Tensor, int]:
raise NotImplementedError
@staticmethod
@abstractmethod
def save(
uri: Union[BinaryIO, str, os.PathLike],
src: torch.Tensor,
sample_rate: int,
channels_first: bool = True,
format: Optional[str] = None,
encoding: Optional[str] = None,
bits_per_sample: Optional[int] = None,
buffer_size: int = 4096,
) -> None:
raise NotImplementedError
@staticmethod
@abstractmethod
def can_decode(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str]) -> bool:
raise NotImplementedError
@staticmethod
@abstractmethod
def can_encode(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str]) -> bool:
raise NotImplementedError
def _map_encoding(encoding: str) -> str:
for dst in ["PCM_S", "PCM_U", "PCM_F"]:
if dst in encoding:
return dst
if encoding == "PCM_MULAW":
return "ULAW"
elif encoding == "PCM_ALAW":
return "ALAW"
return encoding
def _get_bits_per_sample(encoding: str, bits_per_sample: int) -> str:
if m := re.search(r"PCM_\w(\d+)\w*", encoding):
return int(m.group(1))
elif encoding in ["PCM_ALAW", "PCM_MULAW"]:
return 8
return bits_per_sample
class FFmpegBackend(Backend):
@staticmethod
def info(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str], buffer_size: int = 4096) -> AudioMetaData:
if hasattr(uri, "read"):
metadata = ffmpeg.info_audio_fileobj(uri, format, buffer_size=buffer_size)
else:
metadata = ffmpeg.info_audio(os.path.normpath(uri), format)
metadata.bits_per_sample = _get_bits_per_sample(metadata.encoding, metadata.bits_per_sample)
metadata.encoding = _map_encoding(metadata.encoding)
return metadata
@staticmethod
def load(
uri: Union[BinaryIO, str, os.PathLike],
frame_offset: int = 0,
num_frames: int = -1,
normalize: bool = True,
channels_first: bool = True,
format: Optional[str] = None,
buffer_size: int = 4096,
) -> Tuple[torch.Tensor, int]:
if hasattr(uri, "read"):
return ffmpeg.load_audio_fileobj(
uri,
frame_offset,
num_frames,
normalize,
channels_first,
format,
buffer_size,
)
else:
return ffmpeg.load_audio(os.path.normpath(uri), frame_offset, num_frames, normalize, channels_first, format)
@staticmethod
def save(
uri: Union[BinaryIO, str, os.PathLike],
src: torch.Tensor,
sample_rate: int,
channels_first: bool = True,
format: Optional[str] = None,
encoding: Optional[str] = None,
bits_per_sample: Optional[int] = None,
buffer_size: int = 4096,
) -> None:
ffmpeg.save_audio(
uri,
src,
sample_rate,
channels_first,
format,
encoding,
bits_per_sample,
buffer_size,
)
@staticmethod
def can_decode(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str]) -> bool:
return True
@staticmethod
def can_encode(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str]) -> bool:
return True
class SoXBackend(Backend):
@staticmethod
def info(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str], buffer_size: int = 4096) -> AudioMetaData:
if hasattr(uri, "read"):
raise ValueError(
"SoX backend does not support reading from file-like objects. ",
"Please use an alternative backend that does support reading from file-like objects, e.g. FFmpeg.",
)
else:
sinfo = torch.ops.torchaudio.sox_io_get_info(uri, format)
if sinfo:
return AudioMetaData(*sinfo)
else:
raise RuntimeError(f"Failed to fetch metadata for {uri}.")
@staticmethod
def load(
uri: Union[BinaryIO, str, os.PathLike],
frame_offset: int = 0,
num_frames: int = -1,
normalize: bool = True,
channels_first: bool = True,
format: Optional[str] = None,
buffer_size: int = 4096,
) -> Tuple[torch.Tensor, int]:
if hasattr(uri, "read"):
raise ValueError(
"SoX backend does not support loading from file-like objects. ",
"Please use an alternative backend that does support loading from file-like objects, e.g. FFmpeg.",
)
else:
ret = torch.ops.torchaudio.sox_io_load_audio_file(
uri, frame_offset, num_frames, normalize, channels_first, format
)
if not ret:
raise RuntimeError(f"Failed to load audio from {uri}.")
return ret
@staticmethod
def save(
uri: Union[BinaryIO, str, os.PathLike],
src: torch.Tensor,
sample_rate: int,
channels_first: bool = True,
format: Optional[str] = None,
encoding: Optional[str] = None,
bits_per_sample: Optional[int] = None,
buffer_size: int = 4096,
) -> None:
if hasattr(uri, "write"):
raise ValueError(
"SoX backend does not support writing to file-like objects. ",
"Please use an alternative backend that does support writing to file-like objects, e.g. FFmpeg.",
)
else:
torch.ops.torchaudio.sox_io_save_audio_file(
uri,
src,
sample_rate,
channels_first,
None,
format,
encoding,
bits_per_sample,
)
@staticmethod
def can_decode(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str]) -> bool:
# i.e. not a file-like object.
return not hasattr(uri, "read")
@staticmethod
def can_encode(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str]) -> bool:
# i.e. not a file-like object.
return not hasattr(uri, "write")
class SoundfileBackend(Backend):
@staticmethod
def info(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str], buffer_size: int = 4096) -> AudioMetaData:
return soundfile_backend.info(uri, format)
@staticmethod
def load(
uri: Union[BinaryIO, str, os.PathLike],
frame_offset: int = 0,
num_frames: int = -1,
normalize: bool = True,
channels_first: bool = True,
format: Optional[str] = None,
buffer_size: int = 4096,
) -> Tuple[torch.Tensor, int]:
return soundfile_backend.load(uri, frame_offset, num_frames, normalize, channels_first, format)
@staticmethod
def save(
uri: Union[BinaryIO, str, os.PathLike],
src: torch.Tensor,
sample_rate: int,
channels_first: bool = True,
format: Optional[str] = None,
encoding: Optional[str] = None,
bits_per_sample: Optional[int] = None,
buffer_size: int = 4096,
) -> None:
soundfile_backend.save(
uri, src, sample_rate, channels_first, format=format, encoding=encoding, bits_per_sample=bits_per_sample
)
@staticmethod
def can_decode(uri, format) -> bool:
return True
@staticmethod
def can_encode(uri, format) -> bool:
return True
from .backend import Backend
from .ffmpeg import FFmpegBackend
from .soundfile import SoundfileBackend
from .sox import SoXBackend
@lru_cache(None)
def get_available_backends() -> Dict[str, Backend]:
backend_specs = {}
def get_available_backends() -> Dict[str, Type[Backend]]:
backend_specs: Dict[str, Type[Backend]] = {}
if _FFMPEG_EXT is not None:
backend_specs["ffmpeg"] = FFmpegBackend
if _SOX_INITIALIZED:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment