"...text-generation-inference.git" did not exist on "bd405e035b9e05d4b1e74e029ff1d5de86854ea0"
Commit 572cd2e2 authored by moto's avatar moto Committed by Facebook GitHub Bot
Browse files

Add StreamProcessor class (#2045)

Summary:
Part of https://github.com/pytorch/audio/issues/1986. Splitting the PR for easier review.

Add StreamProcessor class that bundles `Buffer`, `FilterGraph` and `Decoder`.
Note: The API to retrieve the buffered Tensors is tentative.
For the overall architecture, see https://github.com/mthrok/audio/blob/ffmpeg/torchaudio/csrc/ffmpeg/README.md.

Note: Without a change to build process, the code added here won't be compiled. The build process will be updated later.
Needs to be imported after https://github.com/pytorch/audio/issues/2044.

Pull Request resolved: https://github.com/pytorch/audio/pull/2045

Reviewed By: carolineechen

Differential Revision: D33299858

Pulled By: mthrok

fbshipit-source-id: d85bececed475f45622743f137dd59cb1390ceed
parent 5cc4765a
#include <torchaudio/csrc/ffmpeg/stream_processor.h>
#include "libavutil/frame.h"
namespace torchaudio {
namespace ffmpeg {
using KeyType = StreamProcessor::KeyType;
StreamProcessor::StreamProcessor(AVCodecParameters* codecpar)
: media_type(codecpar->codec_type), decoder(codecpar) {}
////////////////////////////////////////////////////////////////////////////////
// Configurations
////////////////////////////////////////////////////////////////////////////////
KeyType StreamProcessor::add_stream(
AVRational input_time_base,
AVCodecParameters* codecpar,
int frames_per_chunk,
int num_chunks,
double output_rate,
std::string filter_description) {
switch (codecpar->codec_type) {
case AVMEDIA_TYPE_AUDIO:
case AVMEDIA_TYPE_VIDEO:
break;
default:
throw std::runtime_error("Only Audio and Video are supported");
}
KeyType key = current_key++;
sinks.emplace(
std::piecewise_construct,
std::forward_as_tuple(key),
std::forward_as_tuple(
input_time_base,
codecpar,
frames_per_chunk,
num_chunks,
(output_rate > 0) ? 1 / output_rate : av_q2d(input_time_base),
std::move(filter_description)));
decoder_time_base = av_q2d(input_time_base);
return key;
}
void StreamProcessor::remove_stream(KeyType key) {
sinks.erase(key);
}
////////////////////////////////////////////////////////////////////////////////
// Query methods
////////////////////////////////////////////////////////////////////////////////
std::string StreamProcessor::get_filter_description(KeyType key) const {
return sinks.at(key).filter.get_description();
}
bool StreamProcessor::is_buffer_ready() const {
for (const auto& it : sinks) {
if (!it.second.is_buffer_ready()) {
return false;
}
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
// The streaming process
////////////////////////////////////////////////////////////////////////////////
namespace {
void debug_print_frame(AVFrame* pFrame, double time_rate) {
if (pFrame->sample_rate)
std::cerr << " ---- format: "
<< av_get_sample_fmt_name(
static_cast<AVSampleFormat>(pFrame->format))
<< ", num_frames: " << pFrame->nb_samples
<< ", num_channels: " << pFrame->channels
<< ", num_samples: " << pFrame->nb_samples * pFrame->channels
<< ", sample_rate: " << pFrame->sample_rate
<< ", pts: " << pFrame->pts << ", pts/sample_rate: "
<< pFrame->pts / (double)pFrame->sample_rate
<< ", time: " << pFrame->pts * time_rate << std::endl;
else
std::cerr << " -------- format: "
<< av_get_pix_fmt_name(static_cast<AVPixelFormat>(pFrame->format))
<< ", width: " << pFrame->width << ", height: " << pFrame->height
<< ", pts: " << pFrame->pts
<< ", time: " << pFrame->pts * time_rate << std::endl;
}
} // namespace
// 0: some kind of success
// <0: Some error happened
int StreamProcessor::process_packet(AVPacket* packet) {
int ret = decoder.process_packet(packet);
while (ret >= 0) {
ret = decoder.get_frame(pFrame1);
// AVERROR(EAGAIN) means that new input data is required to return new
// output.
if (ret == AVERROR(EAGAIN))
return 0;
if (ret == AVERROR_EOF)
return send_frame(NULL);
if (ret < 0)
return ret;
send_frame(pFrame1);
av_frame_unref(pFrame1);
}
return ret;
}
// 0: some kind of success
// <0: Some error happened
int StreamProcessor::send_frame(AVFrame* pFrame) {
int ret = 0;
for (auto& ite : sinks) {
int ret2 = ite.second.process_frame(pFrame);
if (ret2 < 0)
ret = ret2;
}
return ret;
}
////////////////////////////////////////////////////////////////////////////////
// Retrieval
////////////////////////////////////////////////////////////////////////////////
c10::optional<torch::Tensor> StreamProcessor::pop_chunk(KeyType key) {
return sinks.at(key).buffer->pop_chunk();
}
} // namespace ffmpeg
} // namespace torchaudio
#pragma once
#include <torch/torch.h>
#include <torchaudio/csrc/ffmpeg/decoder.h>
#include <torchaudio/csrc/ffmpeg/ffmpeg.h>
#include <torchaudio/csrc/ffmpeg/sink.h>
#include <map>
namespace torchaudio {
namespace ffmpeg {
class StreamProcessor {
public:
using KeyType = int;
private:
AVMediaType media_type = AVMEDIA_TYPE_UNKNOWN;
AVFramePtr pFrame1;
AVFramePtr pFrame2;
// Components for decoding source media
double decoder_time_base; // for debug
Decoder decoder;
KeyType current_key = 0;
std::map<KeyType, Sink> sinks;
public:
StreamProcessor(AVCodecParameters* codecpar);
~StreamProcessor() = default;
// Non-copyable
StreamProcessor(const StreamProcessor&) = delete;
StreamProcessor& operator=(const StreamProcessor&) = delete;
// Movable
StreamProcessor(StreamProcessor&&) = default;
StreamProcessor& operator=(StreamProcessor&&) = default;
//////////////////////////////////////////////////////////////////////////////
// Configurations
//////////////////////////////////////////////////////////////////////////////
// 1. Initialize decoder (if not initialized yet)
// 2. Configure a new audio/video filter.
// If the custom parameter is provided, then perform resize, resample etc..
// otherwise, the filter only converts the sample type.
// 3. Configure a buffer.
// 4. Return filter ID.
KeyType add_stream(
AVRational input_time_base,
AVCodecParameters* codecpar,
int frames_per_chunk,
int num_chunks,
double output_rate,
std::string filter_description);
// 1. Remove the stream
void remove_stream(KeyType key);
//////////////////////////////////////////////////////////////////////////////
// Query methods
//////////////////////////////////////////////////////////////////////////////
std::string get_filter_description(KeyType key) const;
bool is_buffer_ready() const;
//////////////////////////////////////////////////////////////////////////////
// The streaming process
//////////////////////////////////////////////////////////////////////////////
// 1. decode the input frame
// 2. pass the decoded data to filters
// 3. each filter store the result to the corresponding buffer
// - Sending NULL will drain (flush) the internal
int process_packet(AVPacket* packet);
private:
int send_frame(AVFrame* pFrame);
//////////////////////////////////////////////////////////////////////////////
// Retrieval
//////////////////////////////////////////////////////////////////////////////
public:
// Get the chunk from the given filter result
c10::optional<torch::Tensor> pop_chunk(KeyType key);
};
} // namespace ffmpeg
} // namespace torchaudio
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment