Commit 4699ef21 authored by moto's avatar moto Committed by Facebook GitHub Bot
Browse files

Refactor Buffer implementation in StreamReader (#2939)

Summary:
The `Buffer` class is responsible for converting `AVFrame` into `torch::Tensor` and storing the frames in accordance to `frames_per_chunk` and `buffer_chunk_size`.

There are four operating modes of Buffer; [audio|video] x [chunked|unchunked]. Audio and video have a separate class implementations, but the behavior of chunked/unchunked depends on `frames_per_chunk<0` or not.

Chunked mode is where frames should be returned by chunk of a unit number frames, while unchunked mode is where frames are returned as-is.

When frames are accumulated, in chunked mode, old frames are dropped, while in unchunked mode all the frames are retained.

Currently, the underlying buffer implementations are the same `std::dequeu<torch::Tensor>`. As we plan to make chunked-mode behavior more efficient by changing the underlying buffer container, it will be easier if the unchuked-mode behavior is kept as-is as a separate class.

This commit makes the following changes.

* Change `Buffer` class into pure virtual class (interface).
* Split `AudioBuffer` into` UnchunkedAudioBuffer` and `ChunkedAudioBuffer`.
* Split `VideoBuffer` into` UnchunkedVideoBuffer` and `ChunkedVideoBuffer`.

Pull Request resolved: https://github.com/pytorch/audio/pull/2939

Reviewed By: carolineechen

Differential Revision: D42247509

Pulled By: mthrok

fbshipit-source-id: 7363e442a5b2db5dcbaaf0ffbfa702e088726d1b
parent 69e8dbb2
......@@ -134,6 +134,8 @@ if(USE_FFMPEG)
ffmpeg/ffmpeg.cpp
ffmpeg/filter_graph.cpp
ffmpeg/stream_reader/buffer.cpp
ffmpeg/stream_reader/buffer/chunked_buffer.cpp
ffmpeg/stream_reader/buffer/unchunked_buffer.cpp
ffmpeg/stream_reader/decoder.cpp
ffmpeg/stream_reader/sink.cpp
ffmpeg/stream_reader/stream_processor.cpp
......
......@@ -9,31 +9,9 @@
namespace torchaudio {
namespace ffmpeg {
Buffer::Buffer(int frames_per_chunk, int num_chunks)
: frames_per_chunk(frames_per_chunk), num_chunks(num_chunks) {}
AudioBuffer::AudioBuffer(int frames_per_chunk, int num_chunks)
: Buffer(frames_per_chunk, num_chunks) {}
VideoBuffer::VideoBuffer(
int frames_per_chunk,
int num_chunks,
const torch::Device& device_)
: Buffer(frames_per_chunk, num_chunks), device(device_) {}
////////////////////////////////////////////////////////////////////////////////
// Query
////////////////////////////////////////////////////////////////////////////////
bool Buffer::is_ready() const {
if (frames_per_chunk < 0)
return num_buffered_frames > 0;
return num_buffered_frames >= frames_per_chunk;
}
////////////////////////////////////////////////////////////////////////////////
// Modifiers - Push Audio
////////////////////////////////////////////////////////////////////////////////
namespace {
//////////////////////////////////////////////////////////////////////////////
// Helper functions - audio
//////////////////////////////////////////////////////////////////////////////
torch::Tensor convert_audio_tensor(AVFrame* pFrame) {
// ref: https://ffmpeg.org/doxygen/4.1/filter__audio_8c_source.html#l00215
AVSampleFormat format = static_cast<AVSampleFormat>(pFrame->format);
......@@ -107,70 +85,10 @@ torch::Tensor convert_audio_tensor(AVFrame* pFrame) {
t = t.t();
return t;
}
} // namespace
void AudioBuffer::push_tensor(torch::Tensor t) {
// If frames_per_chunk < 0, users want to fetch all frames.
// Just push back to chunks and that's it.
if (frames_per_chunk < 0) {
chunks.push_back(t);
num_buffered_frames += t.size(0);
return;
}
// Push
// Note:
// For audio, the incoming tensor contains multiple of samples.
// For small `frames_per_chunk` value, it might be more than `max_frames`.
// If we push the tensor as-is, then, the whole frame might be popped at
// trimming stage, resulting buffer always empty. So we slice push the
// incoming Tensor.
// Check the last inserted Tensor and if the numbe of frames is not
// frame_per_chunk, reprocess it again with the incomping tensor
if (num_buffered_frames % frames_per_chunk) {
torch::Tensor prev = chunks.back();
chunks.pop_back();
num_buffered_frames -= prev.size(0);
t = torch::cat({prev, t}, 0);
}
while (true) {
int num_input_frames = t.size(0);
if (num_input_frames <= frames_per_chunk) {
chunks.push_back(t);
num_buffered_frames += num_input_frames;
break;
}
// The input tensor contains more frames than frames_per_chunk
auto splits = torch::tensor_split(t, {frames_per_chunk, num_input_frames});
chunks.push_back(splits[0]);
num_buffered_frames += frames_per_chunk;
t = splits[1];
}
// Trim
// If frames_per_chunk > 0, we only retain the following number of frames and
// Discard older frames.
int max_frames = num_chunks * frames_per_chunk;
while (num_buffered_frames > max_frames) {
TORCH_WARN_ONCE(
"The number of buffered frames exceeded the buffer size. "
"Dropping the old frames. "
"To avoid this, you can set a higher buffer_chunk_size value.");
torch::Tensor& t = chunks.front();
num_buffered_frames -= t.size(0);
chunks.pop_front();
}
}
void AudioBuffer::push_frame(AVFrame* frame) {
push_tensor(convert_audio_tensor(frame));
}
////////////////////////////////////////////////////////////////////////////////
// Modifiers - Push Video
////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////
// Helper functions - video
//////////////////////////////////////////////////////////////////////////////
namespace {
torch::Tensor convert_interlaced_video(AVFrame* pFrame) {
int width = pFrame->width;
......@@ -359,6 +277,7 @@ torch::Tensor convert_nv12_cuda(AVFrame* pFrame, const torch::Device& device) {
return t.permute({0, 3, 1, 2}); // NCHW
}
#endif
} // namespace
torch::Tensor convert_image_tensor(
AVFrame* pFrame,
......@@ -414,89 +333,6 @@ torch::Tensor convert_image_tensor(
std::string(av_get_pix_fmt_name(format)));
}
}
} // namespace
void VideoBuffer::push_tensor(torch::Tensor t) {
// the video frames is expected to contain only one frame
chunks.push_back(t);
num_buffered_frames += t.size(0);
if (frames_per_chunk < 0) {
return;
}
// Trim
int max_frames = num_chunks * frames_per_chunk;
if (num_buffered_frames > max_frames) {
TORCH_WARN_ONCE(
"The number of buffered frames exceeded the buffer size. "
"Dropping the old frames. "
"To avoid this, you can set a higher buffer_chunk_size value.");
torch::Tensor& t = chunks.front();
num_buffered_frames -= t.size(0);
chunks.pop_front();
}
}
void VideoBuffer::push_frame(AVFrame* frame) {
push_tensor(convert_image_tensor(frame, device));
}
////////////////////////////////////////////////////////////////////////////////
// Modifiers - Pop
////////////////////////////////////////////////////////////////////////////////
using namespace torch::indexing;
c10::optional<torch::Tensor> Buffer::pop_chunk() {
if (!num_buffered_frames) {
return c10::optional<torch::Tensor>{};
}
if (frames_per_chunk < 0) {
return c10::optional<torch::Tensor>{pop_all()};
}
return c10::optional<torch::Tensor>{pop_one_chunk()};
}
torch::Tensor AudioBuffer::pop_one_chunk() {
// Audio deque are aligned with `frames_per_chunk`
torch::Tensor ret = chunks.front();
chunks.pop_front();
num_buffered_frames -= ret.size(0);
return ret;
}
torch::Tensor VideoBuffer::pop_one_chunk() {
// Video deque contains one frame par one tensor
std::vector<torch::Tensor> ret;
while (num_buffered_frames > 0 && ret.size() < frames_per_chunk) {
torch::Tensor& t = chunks.front();
ret.push_back(t);
chunks.pop_front();
num_buffered_frames -= 1;
}
return torch::cat(ret, 0);
}
torch::Tensor Buffer::pop_all() {
// Note:
// This method is common to audio/video.
// In audio case, each Tensor contains multiple frames
// In video case, each Tensor contains one frame,
std::vector<torch::Tensor> ret;
while (chunks.size()) {
torch::Tensor& t = chunks.front();
int n_frames = t.size(0);
ret.push_back(t);
chunks.pop_front();
num_buffered_frames -= n_frames;
}
return torch::cat(ret, 0);
}
void Buffer::flush() {
chunks.clear();
}
} // namespace ffmpeg
} // namespace torchaudio
......@@ -6,90 +6,34 @@
namespace torchaudio {
namespace ffmpeg {
//////////////////////////////////////////////////////////////////////////////
// Buffer Interface
//////////////////////////////////////////////////////////////////////////////
class Buffer {
protected:
// Each AVFrame is converted to a Tensor and stored here.
std::deque<torch::Tensor> chunks;
// The number of frames to return as a chunk
// If <0, then user wants to receive all the frames
const int frames_per_chunk;
// The numbe of chunks to retain
const int num_chunks;
// The number of currently stored chunks
// For video, one Tensor corresponds to one frame, but for audio,
// one Tensor contains multiple samples, so we track here.
int num_buffered_frames = 0;
public:
Buffer(int frames_per_chunk, int num_chunks);
virtual ~Buffer() = default;
//////////////////////////////////////////////////////////////////////////////
// Query
//////////////////////////////////////////////////////////////////////////////
// Check if buffeer has enoough number of frames for a chunk
// If frame_per_chunk <0, returns true if there is >0 frames.
// Otherwise, returns if num_frames >= frame_per_chunk.
bool is_ready() const;
virtual bool is_ready() const = 0;
//////////////////////////////////////////////////////////////////////////////
// Modifiers
//////////////////////////////////////////////////////////////////////////////
virtual void push_frame(AVFrame* frame) = 0;
c10::optional<torch::Tensor> pop_chunk();
void flush();
private:
virtual torch::Tensor pop_one_chunk() = 0;
torch::Tensor pop_all();
};
// Specialization of the handling around push/pop for audio/video.
////////////////////////////////////////////////////////////////////////////////
// AudioBuffer specialization
////////////////////////////////////////////////////////////////////////////////
// For audio, input AVFrame contains multiple frames.
// When popping the buffered frames chunk-by-chunk, it is easier if they are
// organized by chunk when pushed to deque object.
// Therefore, audio implements pushing mechanism that makes sure that
// each Tensor in deque consists Tensors with `frames_per_chunk` frames.
class AudioBuffer : public Buffer {
public:
AudioBuffer(int frames_per_chunk, int num_chunks);
void push_frame(AVFrame* frame);
virtual c10::optional<torch::Tensor> pop_chunk() = 0;
private:
void push_tensor(torch::Tensor tensor);
torch::Tensor pop_one_chunk();
virtual void flush() = 0;
};
////////////////////////////////////////////////////////////////////////////////
// VideoBuffer specialization
////////////////////////////////////////////////////////////////////////////////
// For video, input AVFrame contains one frame.
// Contraty to audio, it is simple to push one frame each time to deque.
// But this mean that chunks consisting of multiple frames have to be created
// at popping time.
class VideoBuffer : public Buffer {
const torch::Device device;
public:
VideoBuffer(
int frames_per_chunk,
int num_chunks,
const torch::Device& device);
void push_frame(AVFrame* frame);
private:
void push_tensor(torch::Tensor tensor);
torch::Tensor pop_one_chunk();
};
//////////////////////////////////////////////////////////////////////////////
// Helper functions
//////////////////////////////////////////////////////////////////////////////
torch::Tensor convert_audio_tensor(AVFrame* frame);
torch::Tensor convert_image_tensor(AVFrame* frame, const torch::Device& device);
} // namespace ffmpeg
} // namespace torchaudio
#include <torchaudio/csrc/ffmpeg/stream_reader/buffer/chunked_buffer.h>
namespace torchaudio {
namespace ffmpeg {
ChunkedBuffer::ChunkedBuffer(int frames_per_chunk, int num_chunks)
: frames_per_chunk(frames_per_chunk), num_chunks(num_chunks) {}
ChunkedAudioBuffer::ChunkedAudioBuffer(int frames_per_chunk, int num_chunks)
: ChunkedBuffer(frames_per_chunk, num_chunks) {}
ChunkedVideoBuffer::ChunkedVideoBuffer(
int frames_per_chunk,
int num_chunks,
const torch::Device& device_)
: ChunkedBuffer(frames_per_chunk, num_chunks), device(device_) {}
bool ChunkedBuffer::is_ready() const {
return num_buffered_frames >= frames_per_chunk;
}
void ChunkedAudioBuffer::push_tensor(torch::Tensor frame) {
// Push
// Note:
// For audio, the incoming tensor contains multiple of samples.
// For small `frames_per_chunk` value, it might be more than `max_frames`.
// If we push the tensor as-is, then, the whole frame might be popped at
// trimming stage, resulting buffer always empty. So we slice push the
// incoming Tensor.
// Check the last inserted Tensor and if the numbe of frames is not
// frame_per_chunk, reprocess it again with the incomping tensor
if (num_buffered_frames % frames_per_chunk) {
torch::Tensor prev = chunks.back();
chunks.pop_back();
num_buffered_frames -= prev.size(0);
frame = torch::cat({prev, frame}, 0);
}
while (true) {
int64_t num_input_frames = frame.size(0);
if (num_input_frames <= frames_per_chunk) {
chunks.push_back(frame);
num_buffered_frames += num_input_frames;
break;
}
// The input tensor contains more frames than frames_per_chunk
auto splits =
torch::tensor_split(frame, {frames_per_chunk, num_input_frames});
chunks.push_back(splits[0]);
num_buffered_frames += frames_per_chunk;
frame = splits[1];
}
// Trim
// If frames_per_chunk > 0, we only retain the following number of frames and
// Discard older frames.
int64_t max_frames = num_chunks * frames_per_chunk;
while (num_buffered_frames > max_frames) {
TORCH_WARN_ONCE(
"The number of buffered frames exceeded the buffer size. "
"Dropping the old frames. "
"To avoid this, you can set a higher buffer_chunk_size value.");
torch::Tensor& t = chunks.front();
num_buffered_frames -= t.size(0);
chunks.pop_front();
}
}
void ChunkedAudioBuffer::push_frame(AVFrame* frame) {
push_tensor(convert_audio_tensor(frame));
}
void ChunkedVideoBuffer::push_tensor(const torch::Tensor& frame) {
// the video frames is expected to contain only one frame
chunks.push_back(frame);
num_buffered_frames += frame.size(0);
// Trim
int64_t max_frames = num_chunks * frames_per_chunk;
if (num_buffered_frames > max_frames) {
TORCH_WARN_ONCE(
"The number of buffered frames exceeded the buffer size. "
"Dropping the old frames. "
"To avoid this, you can set a higher buffer_chunk_size value.");
torch::Tensor& t = chunks.front();
num_buffered_frames -= t.size(0);
chunks.pop_front();
}
}
void ChunkedVideoBuffer::push_frame(AVFrame* frame) {
push_tensor(convert_image_tensor(frame, device));
}
c10::optional<torch::Tensor> ChunkedAudioBuffer::pop_chunk() {
if (!num_buffered_frames) {
return {};
}
// Audio deque are aligned with `frames_per_chunk`
torch::Tensor ret = chunks.front();
chunks.pop_front();
num_buffered_frames -= ret.size(0);
return c10::optional<torch::Tensor>{ret};
}
c10::optional<torch::Tensor> ChunkedVideoBuffer::pop_chunk() {
if (!num_buffered_frames) {
return {};
}
// Video deque contains one frame par one tensor
std::vector<torch::Tensor> ret;
while (num_buffered_frames > 0 && ret.size() < frames_per_chunk) {
torch::Tensor& t = chunks.front();
ret.push_back(t);
chunks.pop_front();
num_buffered_frames -= 1;
}
return c10::optional<torch::Tensor>{torch::cat(ret, 0)};
}
void ChunkedBuffer::flush() {
chunks.clear();
}
} // namespace ffmpeg
} // namespace torchaudio
#pragma once
#include <torchaudio/csrc/ffmpeg/ffmpeg.h>
#include <torchaudio/csrc/ffmpeg/stream_reader/buffer.h>
namespace torchaudio {
namespace ffmpeg {
//////////////////////////////////////////////////////////////////////////////
// Chunked Buffer Implementation
//////////////////////////////////////////////////////////////////////////////
// Common to both audio and video
class ChunkedBuffer : public Buffer {
protected:
ChunkedBuffer(int frames_per_chunk, int num_chunks);
// Each AVFrame is converted to a Tensor and stored here.
std::deque<torch::Tensor> chunks;
// The number of frames to return as a chunk
// If <0, then user wants to receive all the frames
const int64_t frames_per_chunk;
// The numbe of chunks to retain
const int64_t num_chunks;
// The number of currently stored chunks
// For video, one Tensor corresponds to one frame, but for audio,
// one Tensor contains multiple samples, so we track here.
int64_t num_buffered_frames = 0;
public:
bool is_ready() const override;
void flush() override;
};
class ChunkedAudioBuffer : public ChunkedBuffer {
void push_tensor(torch::Tensor frame);
public:
ChunkedAudioBuffer(int frames_per_chunk, int num_chunks);
void push_frame(AVFrame* frame) override;
c10::optional<torch::Tensor> pop_chunk() override;
};
class ChunkedVideoBuffer : public ChunkedBuffer {
const torch::Device device;
void push_tensor(const torch::Tensor& frame);
public:
ChunkedVideoBuffer(
int frames_per_chunk,
int num_chunks,
const torch::Device& device);
void push_frame(AVFrame* frame) override;
c10::optional<torch::Tensor> pop_chunk() override;
};
} // namespace ffmpeg
} // namespace torchaudio
#include <torchaudio/csrc/ffmpeg/stream_reader/buffer/unchunked_buffer.h>
namespace torchaudio {
namespace ffmpeg {
UnchunkedVideoBuffer::UnchunkedVideoBuffer(const torch::Device& device)
: device(device) {}
bool UnchunkedBuffer::is_ready() const {
return num_buffered_frames > 0;
}
void UnchunkedBuffer::push_tensor(const torch::Tensor& t) {
// If frames_per_chunk < 0, users want to fetch all frames.
// Just push back to chunks and that's it.
chunks.push_back(t);
num_buffered_frames += t.size(0);
}
void UnchunkedAudioBuffer::push_frame(AVFrame* frame) {
push_tensor(convert_audio_tensor(frame));
}
void UnchunkedVideoBuffer::push_frame(AVFrame* frame) {
push_tensor(convert_image_tensor(frame, device));
}
c10::optional<torch::Tensor> UnchunkedBuffer::pop_chunk() {
if (!num_buffered_frames) {
return c10::optional<torch::Tensor>{};
}
std::vector<torch::Tensor> ret;
while (chunks.size()) {
torch::Tensor& t = chunks.front();
int64_t n_frames = t.size(0);
ret.push_back(t);
chunks.pop_front();
num_buffered_frames -= n_frames;
}
return c10::optional<torch::Tensor>{torch::cat(ret, 0)};
}
void UnchunkedBuffer::flush() {
chunks.clear();
}
} // namespace ffmpeg
} // namespace torchaudio
#pragma once
#include <torch/torch.h>
#include <torchaudio/csrc/ffmpeg/ffmpeg.h>
#include <torchaudio/csrc/ffmpeg/stream_reader/buffer.h>
#include <deque>
namespace torchaudio {
namespace ffmpeg {
//////////////////////////////////////////////////////////////////////////////
// Unchunked Buffer Interface
//////////////////////////////////////////////////////////////////////////////
// Partial implementation for unchunked buffer common to both audio and video
// Used for buffering audio/video streams without chunking
class UnchunkedBuffer : public Buffer {
// Each AVFrame is converted to a Tensor and stored here.
std::deque<torch::Tensor> chunks;
protected:
// The number of currently stored chunks
// For video, one Tensor corresponds to one frame, but for audio,
// one Tensor contains multiple samples, so we track here.
int64_t num_buffered_frames = 0;
void push_tensor(const torch::Tensor& t);
public:
bool is_ready() const override;
c10::optional<torch::Tensor> pop_chunk() override;
void flush() override;
};
class UnchunkedAudioBuffer : public UnchunkedBuffer {
public:
void push_frame(AVFrame* frame) override;
};
class UnchunkedVideoBuffer : public UnchunkedBuffer {
const torch::Device device;
public:
explicit UnchunkedVideoBuffer(const torch::Device& device);
void push_frame(AVFrame* frame) override;
};
} // namespace ffmpeg
} // namespace torchaudio
#include <torchaudio/csrc/ffmpeg/stream_reader/buffer/chunked_buffer.h>
#include <torchaudio/csrc/ffmpeg/stream_reader/buffer/unchunked_buffer.h>
#include <torchaudio/csrc/ffmpeg/stream_reader/sink.h>
#include <stdexcept>
......@@ -11,12 +13,22 @@ std::unique_ptr<Buffer> get_buffer(
int num_chunks,
const torch::Device& device) {
switch (type) {
case AVMEDIA_TYPE_AUDIO:
return std::unique_ptr<Buffer>(
new AudioBuffer(frames_per_chunk, num_chunks));
case AVMEDIA_TYPE_VIDEO:
return std::unique_ptr<Buffer>(
new VideoBuffer(frames_per_chunk, num_chunks, device));
case AVMEDIA_TYPE_AUDIO: {
if (frames_per_chunk < 0) {
return std::unique_ptr<Buffer>(new UnchunkedAudioBuffer());
} else {
return std::unique_ptr<Buffer>(
new ChunkedAudioBuffer(frames_per_chunk, num_chunks));
}
}
case AVMEDIA_TYPE_VIDEO: {
if (frames_per_chunk < 0) {
return std::unique_ptr<Buffer>(new UnchunkedVideoBuffer(device));
} else {
return std::unique_ptr<Buffer>(
new ChunkedVideoBuffer(frames_per_chunk, num_chunks, device));
}
}
default:
TORCH_CHECK(
false,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment