Commit bf085b1f authored by moto's avatar moto Committed by Facebook GitHub Bot
Browse files

Make fill_buffer a public API and move the impl to C++ (#2954)

Summary:
Currently, when iterating media data with StreamReader, using the for-loop is the only way with public API.

This does not support usecases like "Fetch one chunk after seek" well.

```python
s = StreamReader
s.add_audio_stream(...)
s.seek(10)
chunk = None
for chunk, in s.stream():
    break
```

This commit make the `fill_buffer` used in iterative method public API so that one acn do

```python
s.seek(10)
s.fill_buffer()
chunk, = s.pop_chunks()
```

 ---

Also this commit moves the implementation to C++ so that it reduces the number of FFI boundary crossing.
This improves the performance when the iteration is longer.

AVI (generated with `ffmpeg -hide_banner -f lavfi -t ${duration} -i testsrc "${file}.avi"`)

| Video Duration [sec] | Original [msec] | Fill Buffer C++ | One Go  (reference) |
|----------------------|----------|-----------------|--------|
|                    1 |       18 |            18.4 |   16.6 |
|                    5 |       44 |            42.6 |   35.1 |
|                   10 |     75.3 |            74.4 |   60.9 |
|                   30 |      200 |             195 |    158 |
|                   60 |      423 |             382 |    343 |

MP4 (generated with `ffmpeg -hide_banner -f lavfi -t ${duration} -i testsrc "${file}.mp4"`)

| Video Duration [sec] | Original [msec] | Fill Buffer C++ | One Go |
|----------------------|-----------------|-----------------|--------|
|                    1 |            18.7 |            18.1 |   10.3 |
|                    5 |            42.2 |            40.6 |   25.2 |
|                   10 |            73.9 |            71.8 |   43.6 |
|                   30 |             202 |             194 |    116 |
|                   60 |             396 |             386 |    227 |
* Original (Python implementation)

```python
r = StreamReader(src)
r.add_video_stream(1, decoder_option={"threads": "1"})
for chunk, in r.stream():
    pass
```

* This (C++)

```python
r = StreamReader(src)
r.add_video_stream(1, decoder_option={"threads": "1"})
for chunk, in r.stream():
    pass
```

* Using `process_all_packets` (process all in one go)

```python
r = StreamReader(src)
r.add_video_stream(1, decoder_option={"threads": "1"})
r.process_all_packets()
```

Pull Request resolved: https://github.com/pytorch/audio/pull/2954

Reviewed By: carolineechen

Differential Revision: D42349446

Pulled By: mthrok

fbshipit-source-id: 9e4e37923e46299c3f43f4ad17a2a2b938b2b197
parent 7f778fc9
......@@ -44,6 +44,7 @@ PYBIND11_MODULE(_torchaudio_ffmpeg, m) {
.def("remove_stream", &StreamReaderFileObj::remove_stream)
.def("process_packet", &StreamReaderFileObj::process_packet)
.def("process_all_packets", &StreamReaderFileObj::process_all_packets)
.def("fill_buffer", &StreamReaderFileObj::fill_buffer)
.def("is_buffer_ready", &StreamReaderFileObj::is_buffer_ready)
.def("pop_chunks", &StreamReaderFileObj::pop_chunks);
}
......
......@@ -86,6 +86,11 @@ TORCH_LIBRARY_FRAGMENT(torchaudio, m) {
return s->process_packet(timeout, backoff);
})
.def("process_all_packets", [](S s) { s->process_all_packets(); })
.def(
"fill_buffer",
[](S s, const c10::optional<double>& timeout, const double backoff) {
return s->fill_buffer(timeout, backoff);
})
.def("is_buffer_ready", [](S s) { return s->is_buffer_ready(); })
.def("pop_chunks", [](S s) { return s->pop_chunks(); });
}
......
......@@ -167,6 +167,11 @@ TORCH_LIBRARY_FRAGMENT(torchaudio, m) {
return s->process_packet(timeout, backoff);
})
.def("process_all_packets", [](S s) { s->process_all_packets(); })
.def(
"fill_buffer",
[](S s, const c10::optional<double>& timeout, const double backoff) {
return s->fill_buffer(timeout, backoff);
})
.def("is_buffer_ready", [](S s) { return s->is_buffer_ready(); })
.def("pop_chunks", [](S s) { return s->pop_chunks(); });
}
......
......@@ -91,5 +91,17 @@ void StreamReaderBinding::process_all_packets() {
} while (!ret);
}
int64_t StreamReaderBinding::fill_buffer(
const c10::optional<double>& timeout,
const double backoff) {
while (!is_buffer_ready()) {
int code = process_packet(timeout, backoff);
if (code != 0) {
return code;
}
}
return 0;
}
} // namespace ffmpeg
} // namespace torchaudio
......@@ -74,6 +74,10 @@ struct StreamReaderBinding : public StreamReader,
const double backoff = 10.);
void process_all_packets();
int64_t fill_buffer(
const c10::optional<double>& timeout = c10::optional<double>(),
const double backoff = 10.);
};
} // namespace ffmpeg
......
......@@ -749,7 +749,7 @@ class StreamReader:
"""
return self._be.pop_chunks()
def _fill_buffer(self, timeout: Optional[float], backoff: float) -> int:
def fill_buffer(self, timeout: Optional[float], backoff: float) -> int:
"""Keep processing packets until all buffers have at least one chunk
Returns:
......@@ -763,11 +763,7 @@ class StreamReader:
flushed the pending frames. The caller should stop calling
this method.
"""
while not self.is_buffer_ready():
code = self.process_packet(timeout, backoff)
if code != 0:
return code
return 0
return self._be.fill_buffer(timeout, backoff)
def stream(
self, timeout: Optional[float] = None, backoff: float = 10.0
......@@ -793,7 +789,7 @@ class StreamReader:
raise RuntimeError("No output stream is configured.")
while True:
if self._fill_buffer(timeout, backoff):
if self.fill_buffer(timeout, backoff):
break
yield self.pop_chunks()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment