Commit 365313ed authored by moto's avatar moto Committed by Facebook GitHub Bot
Browse files

Improve device streaming (#2202)

Summary:
This commit adds tutorial for device ASR, and update API for device streaming.

The changes for the interface are
1. Add `timeout` and `backoff` parameters to `process_packet` and `stream` methods.
2. Move `fill_buffer` method to private.

When dealing with device stream, there are situations where the device buffer is not
ready and the system returns `EAGAIN`. In such case, the previous implementation of
`process_packet` method raised an exception in Python layer , but for device ASR,
this is inefficient. A better approach is to retry within C++ layer in blocking manner.
The new `timeout` parameter serves this purpose.

Pull Request resolved: https://github.com/pytorch/audio/pull/2202

Reviewed By: nateanl

Differential Revision: D34475829

Pulled By: mthrok

fbshipit-source-id: bb6d0b125d800f87d189db40815af06fbd4cab59
parent ea74813d
......@@ -16,7 +16,7 @@
#}
{%- block content %}
{% if pagename.endswith('tutorial') %}
{% if 'tutorial' in pagename %}
<div class="pytorch-call-to-action-links">
<div id="tutorial-type">{{ pagename }}</div>
......
......@@ -85,6 +85,7 @@ Advanced Usages
tutorials/speech_recognition_pipeline_tutorial
tutorials/online_asr_tutorial
tutorials/device_asr
tutorials/forced_alignment_tutorial
tutorials/tacotron2_pipeline_tutorial
tutorials/mvdr_tutorial
......
"""
Device ASR with Emformer RNN-T
==============================
**Author** : `Moto Hira <moto@fb.com>`__, `Jeff Hwang <jeffhwang@fb.com>`__.
This tutorial shows how to use Emformer RNN-T and streaming API
to perform speech recognition on a streaming device input, i.e. microphone
on laptop.
.. note::
This tutorial requires prototype Streaming API and ffmpeg>=4.1.
Prototype features are not part of binary releases, but available in
nightly build. Please refer to https://pytorch.org for installing
nightly build.
If you are using Anaconda Python distribution,
``conda install -c anaconda ffmpeg`` will install
the required libraries.
.. note::
This tutorial was tested on MacBook Pro and Dynabook with Windows 10.
This tutorial does NOT work on Google Colab because the server running
this tutorial does not have a microphone that you can talk to.
"""
######################################################################
# 1. Overview
# -----------
#
# We use streaming API to fetch audio from audio device (microphone)
# chunk by chunk, then run inference using Emformer RNN-T.
#
# For the basic usage of the streaming API and Emformer RNN-T
# please refer to
# `Media Stream API tutorial <./streaming_api_tutorial.html>`__ and
# `Online ASR with Emformer RNN-T <./online_asr_tutorial.html>`__.
#
######################################################################
# 2. Checking the supported devices
# ---------------------------------
#
# Firstly, we need to check the devices that Streaming API can access,
# and figure out the arguments (``src`` and ``format``) we need to pass
# to :py:func:`~torchaudio.prototype.io.Streamer` class.
#
# We use ``ffmpeg`` command for this. ``ffmpeg`` abstracts away the
# difference of underlying hardware implementations, but the expected
# value for ``format`` vary across OS and each ``format`` defines
# different syntax for ``src``.
#
# The details of supported ``format`` values and ``src`` syntax can
# be found in https://ffmpeg.org/ffmpeg-devices.html.
#
# For macOS, the following command will list the available devices.
#
# .. code::
#
# $ ffmpeg -f avfoundation -list_devices true -i dummy
# ...
# [AVFoundation indev @ 0x126e049d0] AVFoundation video devices:
# [AVFoundation indev @ 0x126e049d0] [0] FaceTime HD Camera
# [AVFoundation indev @ 0x126e049d0] [1] Capture screen 0
# [AVFoundation indev @ 0x126e049d0] AVFoundation audio devices:
# [AVFoundation indev @ 0x126e049d0] [0] ZoomAudioDevice
# [AVFoundation indev @ 0x126e049d0] [1] MacBook Pro Microphone
#
# We will use the following values for Streaming API.
#
# .. code::
#
# Streamer(
# src = ":1", # no video, audio from device 1, "MacBook Pro Microphone"
# format = "avfoundation",
# )
######################################################################
#
# For Windows, ``dshow`` device should work.
#
# .. code::
#
# > ffmpeg -f dshow -list_devices true -i dummy
# ...
# [dshow @ 000001adcabb02c0] DirectShow video devices (some may be both video and audio devices)
# [dshow @ 000001adcabb02c0] "TOSHIBA Web Camera - FHD"
# [dshow @ 000001adcabb02c0] Alternative name "@device_pnp_\\?\usb#vid_10f1&pid_1a42&mi_00#7&27d916e6&0&0000#{65e8773d-8f56-11d0-a3b9-00a0c9223196}\global"
# [dshow @ 000001adcabb02c0] DirectShow audio devices
# [dshow @ 000001adcabb02c0] "... (Realtek High Definition Audio)"
# [dshow @ 000001adcabb02c0] Alternative name "@device_cm_{33D9A762-90C8-11D0-BD43-00A0C911CE86}\wave_{BF2B8AE1-10B8-4CA4-A0DC-D02E18A56177}"
#
# In the above case, the following value can be used to stream from microphone.
#
# .. code::
#
# Streamer(
# src = "audio=@device_cm_{33D9A762-90C8-11D0-BD43-00A0C911CE86}\wave_{BF2B8AE1-10B8-4CA4-A0DC-D02E18A56177}",
# format = "dshow",
# )
#
######################################################################
# 3. Data acquisition
# -------------------
#
# Streaming audio from microphone requires to properly time acquiring
# data form hardware. Failing to do so indtorduces discontinuity in
# data stream.
#
# For this reason, we will run the data acquisition in a subprocess.
#
# Firstly, we create a helper function that encupsulates the whole
# process executed in the subprocess.
#
# This function initializes the streaming API, acquire data then
# put it in a queue, which the main process is watching.
#
import torch
import torchaudio
# The data acquisition process will stop after this number of steps.
# This eliminates the need of process synchronization and makes this
# tutorial simple.
NUM_ITER = 100
def stream(q, format, src, segment_length, sample_rate):
from torchaudio.prototype.io import Streamer
print("Building Streamer...")
streamer = Streamer(src, format=format)
streamer.add_basic_audio_stream(frames_per_chunk=segment_length, sample_rate=sample_rate)
print(streamer.get_src_stream_info(0))
print(streamer.get_out_stream_info(0))
print("Streaming...")
print()
stream_iterator = streamer.stream(timeout=-1, backoff=1.0)
for _ in range(NUM_ITER):
(chunk,) = next(stream_iterator)
q.put(chunk)
######################################################################
#
# The notable difference from the non-device streaming is that,
# we provide ``timeout`` and ``backoff`` parameters to ``stream`` method.
#
# When acquiring data, if the rate of acquisition request is faster
# than what hardware can prepare the data, then the underlying implementation
# reports special error code, and expects client code to retry.
#
# Precise timing is the key for smooth streaming. Reporting this error
# from low level implementation to all the way back to Python layer,
# before retrying adds undesired overhead.
# For this reason, the retry behavior is implemented in C++ layer, and
# ``timeout`` and ``backoff`` parameters allow client code to control the
# behavior.
#
# For the detail of ``timeout`` and ``backoff`` parameters, please refer
# to the documentation of
# :py:meth:`~torchaudio.prototype.io.Streamer.stream` method.
#
# .. note::
#
# The proper value of ``backoff`` depends on the system configuration.
# One way to see if ``backoff`` value is appropriate is to save the
# series of acquired chunks as a continuous audio and listen to it.
# If ``backoff`` value is too large, then the data stream is discontinuous.
# The resulting audio sounds sped up.
# If ``backoff`` value is too small or zero, the audio stream is fine,
# but the data acquisitoin process enters busy-waiting state, and
# this increases the CPU consumption.
#
######################################################################
# 4. Building inference pipeline
# ------------------------------
#
# The next step is to create components required for inference.
#
# The is the same process as
# `Online ASR with Emformer RNN-T <./online_asr_tutorial.html>`__.
#
class Pipeline:
"""Build inference pipeline from RNNTBundle.
Args:
bundle (torchaudio.pipelines.RNNTBundle): Bundle object
beam_width (int): Beam size of beam search decoder.
"""
def __init__(self, bundle: torchaudio.pipelines.RNNTBundle, beam_width: int = 10):
self.bundle = bundle
self.feature_extractor = bundle.get_streaming_feature_extractor()
self.decoder = bundle.get_decoder()
self.token_processor = bundle.get_token_processor()
self.beam_width = beam_width
self.state = None
self.hypothesis = None
def infer(self, segment: torch.Tensor) -> str:
"""Peform streaming inference"""
features, length = self.feature_extractor(segment)
hypos, self.state = self.decoder.infer(
features, length, self.beam_width, state=self.state, hypothesis=self.hypothesis
)
self.hypothesis = hypos[0]
transcript = self.token_processor(self.hypothesis.tokens, lstrip=False)
return transcript
######################################################################
#
class ContextCacher:
"""Cache the end of input data and prepend the next input data with it.
Args:
segment_length (int): The size of main segment.
If the incoming segment is shorter, then the segment is padded.
context_length (int): The size of the context, cached and appended.
"""
def __init__(self, segment_length: int, context_length: int):
self.segment_length = segment_length
self.context_length = context_length
self.context = torch.zeros([context_length])
def __call__(self, chunk: torch.Tensor):
if chunk.size(0) < self.segment_length:
chunk = torch.nn.functional.pad(chunk, (0, self.segment_length - chunk.size(0)))
chunk_with_context = torch.cat((self.context, chunk))
self.context = chunk[-self.context_length :]
return chunk_with_context
######################################################################
# 5. The main process
# -------------------
#
# The execution flow of the main process is as follow
#
# 1. Initialize the inference pipeline.
# 2. Launch data acquisition subprocess.
# 3. Run inference.
# 4. Clean up
#
# .. note::
#
# As the data acquisition subprocess will be launched with `"spawn"`
# method, all the code on global scope are executed on the subprocess
# as well.
#
# We want to instantiate pipeline only in the main process,
# so we put them in a function and invoke it within
# `__name__ == "__main__"` guard.
#
def main(device, src, bundle):
print(torch.__version__)
print(torchaudio.__version__)
print("Building pipeline...")
pipeline = Pipeline(bundle)
sample_rate = bundle.sample_rate
segment_length = bundle.segment_length * bundle.hop_length
context_length = bundle.right_context_length * bundle.hop_length
print(f"Sample rate: {sample_rate}")
print(f"Main segment: {segment_length} frames ({segment_length / sample_rate} seconds)")
print(f"Right context: {context_length} frames ({context_length / sample_rate} seconds)")
cacher = ContextCacher(segment_length, context_length)
@torch.inference_mode()
def infer():
for _ in range(NUM_ITER):
chunk = q.get()
segment = cacher(chunk[:, 0])
transcript = pipeline.infer(segment)
print(transcript, end="", flush=True)
import torch.multiprocessing as mp
ctx = mp.get_context("spawn")
q = ctx.Queue()
p = ctx.Process(target=stream, args=(q, device, src, segment_length, sample_rate))
p.start()
infer()
p.join()
if __name__ == "__main__":
main(
device="avfoundation",
src=":1",
bundle=torchaudio.pipelines.EMFORMER_RNNT_BASE_LIBRISPEECH,
)
######################################################################
#
# .. code::
#
# Building pipeline...
# Sample rate: 16000
# Main segment: 2560 frames (0.16 seconds)
# Right context: 640 frames (0.04 seconds)
# Building Streamer...
# SourceAudioStream(media_type='audio', codec='pcm_f32le', codec_long_name='PCM 32-bit floating point little-endian', format='flt', bit_rate=1536000, sample_rate=48000.0, num_channels=1)
# OutputStream(source_index=0, filter_description='aresample=16000,aformat=sample_fmts=fltp')
# Streaming...
#
# hello world
#
......@@ -521,6 +521,10 @@ plt.show(block=False)
# 1. Audio / Video device input
# -----------------------------
#
# .. seealso::
#
# `Device ASR with Emformer RNN-T <./device_asr.html>`__.
#
# Given that the system has proper media devices and libavdevice is
# configured to use the devices, the streaming API can
# pull media streams from these devices.
......
......@@ -256,8 +256,16 @@ void remove_stream(S s, int64_t i) {
s->s.remove_stream(i);
}
int64_t process_packet(Streamer& s) {
int64_t code = s.process_packet();
int64_t process_packet(
Streamer& s,
const c10::optional<double>& timeout = c10::optional<double>(),
const double backoff = 10.) {
int64_t code = [&]() {
if (timeout.has_value()) {
return s.process_packet_block(timeout.value(), backoff);
}
return s.process_packet();
}();
if (code < 0) {
throw std::runtime_error(
"Failed to process a packet. (" + av_err2string(code) + "). ");
......@@ -320,9 +328,11 @@ TORCH_LIBRARY_FRAGMENT(torchaudio, m) {
m.def("torchaudio::ffmpeg_streamer_add_audio_stream", add_audio_stream);
m.def("torchaudio::ffmpeg_streamer_add_video_stream", add_video_stream);
m.def("torchaudio::ffmpeg_streamer_remove_stream", remove_stream);
m.def("torchaudio::ffmpeg_streamer_process_packet", [](S s) {
return process_packet(s->s);
});
m.def(
"torchaudio::ffmpeg_streamer_process_packet",
[](S s, const c10::optional<double>& timeout, double backoff) {
return process_packet(s->s, timeout, backoff);
});
m.def("torchaudio::ffmpeg_streamer_process_all_packets", [](S s) {
return process_all_packets(s->s);
});
......
#include <torchaudio/csrc/ffmpeg/ffmpeg.h>
#include <torchaudio/csrc/ffmpeg/streamer.h>
#include <chrono>
#include <sstream>
#include <stdexcept>
#include <thread>
namespace torchaudio {
namespace ffmpeg {
......@@ -229,6 +231,45 @@ int Streamer::process_packet() {
return (ret < 0) ? ret : 0;
}
// Similar to `process_packet()`, but in case process_packet returns EAGAIN,
// it keeps retrying until timeout happens,
//
// timeout and backoff is given in millisecond
int Streamer::process_packet_block(double timeout, double backoff) {
auto dead_line = [&]() {
// If timeout < 0, then it repeats forever
if (timeout < 0) {
return std::chrono::time_point<std::chrono::steady_clock>::max();
}
auto timeout_ = static_cast<int64_t>(1000 * timeout);
return std::chrono::steady_clock::now() +
std::chrono::microseconds{timeout_};
}();
std::chrono::microseconds sleep{static_cast<int64_t>(1000 * backoff)};
while (true) {
int ret = process_packet();
if (ret != AVERROR(EAGAIN)) {
return ret;
}
if (dead_line < std::chrono::steady_clock::now()) {
return ret;
}
// ffmpeg sleeps 10 milli seconds if the read happens in a separate thread
// https://github.com/FFmpeg/FFmpeg/blob/b0f8dbb0cacc45a19f18c043afc706d7d26bef74/fftools/ffmpeg.c#L3952
// https://github.com/FFmpeg/FFmpeg/blob/b0f8dbb0cacc45a19f18c043afc706d7d26bef74/fftools/ffmpeg.c#L4542
//
// But it does not seem to sleep when running in single thread.
// Empirically we observed that the streaming result is worse with sleep.
// busy-waiting is not a recommended way to resolve this, but after simple
// testing, there wasn't a noticible difference in CPU utility. So we do not
// sleep here.
//
std::this_thread::sleep_for(sleep);
}
}
// <0: Some error happened.
int Streamer::drain() {
int ret = 0, tmp = 0;
......
......@@ -87,6 +87,7 @@ class Streamer {
// Stream methods
//////////////////////////////////////////////////////////////////////////////
int process_packet();
int process_packet_block(double timeout, double backoff);
int drain();
......
......@@ -420,11 +420,11 @@ class Streamer:
"""
torch.ops.torchaudio.ffmpeg_streamer_remove_stream(self._s, i)
def process_packet(self) -> int:
def process_packet(self, timeout: Optional[float] = None, backoff: float = 10.0) -> int:
"""Read the source media and process one packet.
The data in the packet will be decoded and passed to corresponding
output stream processors.
If a packet is read successfuly, then the data in the packet will
be decoded and passed to corresponding output stream processors.
If the packet belongs to a source stream that is not connected to
an output stream, then the data are discarded.
......@@ -433,6 +433,39 @@ class Streamer:
processors to enter drain mode. All the output stream processors
flush the pending frames.
Args:
timeout (float or None, optional): Timeout in milli seconds.
This argument changes the retry behavior when it failed to
process a packet due to the underlying media resource being
temporarily unavailable.
When using a media device such as a microphone, there are cases
where the underlying buffer is not ready.
Calling this function in such case would cause the system to report
`EAGAIN (resource temporarily unavailable)`.
* ``>=0``: Keep retrying until the given time passes.
* ``0<``: Keep retrying forever.
* ``None`` : No retrying and raise an exception immediately.
Default: ``None``.
Note:
The retry behavior is applicable only when the reason is the
unavailable resource. It is not invoked if the reason of failure is
other.
backoff (float, optional): Time to wait before retrying in milli seconds.
This optioin is effective only when `timeout` is effective. (not ``None``)
When `timeout` is effective, this `backoff` controls how long the function
should wait before retry-ing. Default: ``10.0``.
Returns:
int:
``0``
......@@ -444,7 +477,7 @@ class Streamer:
flushed the pending frames. The caller should stop calling
this method.
"""
return torch.ops.torchaudio.ffmpeg_streamer_process_packet(self._s)
return torch.ops.torchaudio.ffmpeg_streamer_process_packet(self._s, timeout, backoff)
def process_all_packets(self):
"""Process packets until it reaches EOF."""
......@@ -457,14 +490,14 @@ class Streamer:
def pop_chunks(self) -> Tuple[Optional[torch.Tensor]]:
"""Pop one chunk from all the output stream buffers.
Returns
Returns:
Tuple[Optional[Tensor]]:
Buffer contents.
If a buffer does not contain any frame, then `None` is returned instead.
"""
return torch.ops.torchaudio.ffmpeg_streamer_pop_chunks(self._s)
def fill_buffer(self) -> int:
def _fill_buffer(self, timeout: Optional[float], backoff: float) -> int:
"""Keep processing packets until all buffers have at least one chunk
Returns:
......@@ -479,19 +512,36 @@ class Streamer:
this method.
"""
while not self.is_buffer_ready():
for _ in range(3):
code = self.process_packet()
if code != 0:
return code
code = self.process_packet(timeout, backoff)
if code != 0:
return code
return 0
def stream(self) -> Iterator[Tuple[Optional[torch.Tensor]]]:
"""Return an iterator that generates output tensors"""
def stream(
self, timeout: Optional[float] = None, backoff: float = 10.0
) -> Iterator[Tuple[Optional[torch.Tensor], ...]]:
"""Return an iterator that generates output tensors
Arguments:
timeout (float or None, optional): See
:py:func:`~Streamer.process_packet`. (Default: ``None``)
backoff (float, optional): See
:py:func:`~Streamer.process_packet`. (Default: ``10.0``)
Returns:
Iterator[Tuple[Optional[torch.Tensor], ...]]:
Iterator that yields a tuple of chunks that correpond to the output
streams defined by client code.
If an output stream is exhausted, then the chunk Tensor is substituted
with ``None``.
The iterator stops if all the output streams are exhausted.
"""
if self.num_out_streams == 0:
raise RuntimeError("No output stream is configured.")
while True:
if self.fill_buffer():
if self._fill_buffer(timeout, backoff):
break
yield self.pop_chunks()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment