Unverified Commit 28b7f8ae authored by Francisco Massa's avatar Francisco Massa Committed by GitHub
Browse files

Base decoder for video. (#1747) (#1793)

Summary:
Pull Request resolved: https://github.com/pytorch/vision/pull/1747

Pull Request resolved: https://github.com/pytorch/vision/pull/1746



Added the implementation of ffmpeg based decoder with functionality that can be used in VUE and TorchVision.

Reviewed By: fmassa

Differential Revision: D19358914

fbshipit-source-id: abb672f89bfaca6351dda2354f0d35cf8e47fa0f
Co-authored-by: default avatarYuri Putivsky <yuri@fb.com>
parent 59324a9f
......@@ -127,17 +127,6 @@ def get_extensions():
include_dirs = [extensions_dir]
ffmpeg_exe = distutils.spawn.find_executable('ffmpeg')
has_ffmpeg = ffmpeg_exe is not None
if has_ffmpeg:
ffmpeg_bin = os.path.dirname(ffmpeg_exe)
ffmpeg_root = os.path.dirname(ffmpeg_bin)
ffmpeg_include_dir = os.path.join(ffmpeg_root, 'include')
# TorchVision video reader
video_reader_src_dir = os.path.join(this_dir, 'torchvision', 'csrc', 'cpu', 'video_reader')
video_reader_src = glob.glob(os.path.join(video_reader_src_dir, "*.cpp"))
ext_modules = [
extension(
'torchvision._C',
......@@ -157,7 +146,19 @@ def get_extensions():
extra_compile_args=extra_compile_args,
)
)
ffmpeg_exe = distutils.spawn.find_executable('ffmpeg')
has_ffmpeg = ffmpeg_exe is not None
if has_ffmpeg:
ffmpeg_bin = os.path.dirname(ffmpeg_exe)
ffmpeg_root = os.path.dirname(ffmpeg_bin)
ffmpeg_include_dir = os.path.join(ffmpeg_root, 'include')
# TorchVision video reader
video_reader_src_dir = os.path.join(this_dir, 'torchvision', 'csrc', 'cpu', 'video_reader')
video_reader_src = glob.glob(os.path.join(video_reader_src_dir, "*.cpp"))
ext_modules.append(
CppExtension(
'torchvision.video_reader',
......@@ -179,6 +180,31 @@ def get_extensions():
)
)
# TorchVision base decoder
base_decoder_src_dir = os.path.join(this_dir, 'torchvision', 'csrc', 'cpu', 'decoder')
base_decoder_src = glob.glob(os.path.join(base_decoder_src_dir, "[!sync_decoder_test]*.cpp"))
ext_modules.append(
CppExtension(
'torchvision.base_decoder',
base_decoder_src,
include_dirs=[
base_decoder_src_dir,
ffmpeg_include_dir,
extensions_dir,
],
libraries=[
'avcodec',
'avformat',
'avutil',
'swresample',
'swscale',
],
extra_compile_args=["-std=c++14"],
extra_link_args=["-std=c++14"],
)
)
return ext_modules
......
// Copyright 2004-present Facebook. All Rights Reserved.
#include "audio_sampler.h"
#include <c10/util/Logging.h>
#include "util.h"
// www.ffmpeg.org/doxygen/1.1/doc_2examples_2resampling_audio_8c-example.html#a24
#ifndef SWR_CH_MAX
#define SWR_CH_MAX 32
#endif
namespace ffmpeg {
namespace {
int preparePlanes(
const AudioFormat& fmt,
const uint8_t* buffer,
int numSamples,
uint8_t** planes) {
int result;
if ((result = av_samples_fill_arrays(
planes,
nullptr, // linesize is not needed
buffer,
fmt.channels,
numSamples,
(AVSampleFormat)fmt.format,
1)) < 0) {
LOG(ERROR) << "av_samples_fill_arrays failed, err: "
<< Util::generateErrorDesc(result)
<< ", numSamples: " << numSamples << ", fmt: " << fmt.format;
}
return result;
}
} // namespace
AudioSampler::AudioSampler(void* logCtx) : logCtx_(logCtx) {}
AudioSampler::~AudioSampler() {
cleanUp();
}
void AudioSampler::shutdown() {
cleanUp();
}
bool AudioSampler::init(const SamplerParameters& params) {
cleanUp();
if (params.type != MediaType::TYPE_AUDIO) {
LOG(ERROR) << "Invalid media type, expected MediaType::TYPE_AUDIO";
return false;
}
swrContext_ = swr_alloc_set_opts(
nullptr,
av_get_default_channel_layout(params.out.audio.channels),
(AVSampleFormat)params.out.audio.format,
params.out.audio.samples,
av_get_default_channel_layout(params.in.audio.channels),
(AVSampleFormat)params.in.audio.format,
params.in.audio.samples,
0,
logCtx_);
if (swrContext_ == nullptr) {
LOG(ERROR) << "Cannot allocate SwrContext";
return false;
}
int result;
if ((result = swr_init(swrContext_)) < 0) {
LOG(ERROR) << "swr_init faield, err: " << Util::generateErrorDesc(result)
<< ", in -> format: " << params.in.audio.format
<< ", channels: " << params.in.audio.channels
<< ", samples: " << params.in.audio.samples
<< ", out -> format: " << params.out.audio.format
<< ", channels: " << params.out.audio.channels
<< ", samples: " << params.out.audio.samples;
return false;
}
// set formats
params_ = params;
return true;
}
int AudioSampler::numOutputSamples(int inSamples) const {
return av_rescale_rnd(
swr_get_delay(swrContext_, params_.in.audio.samples) + inSamples,
params_.out.audio.samples,
params_.in.audio.samples,
AV_ROUND_UP);
}
int AudioSampler::getSamplesBytes(AVFrame* frame) const {
return av_get_bytes_per_sample((AVSampleFormat)params_.out.audio.format) *
numOutputSamples(frame ? frame->nb_samples : 0) *
params_.out.audio.channels;
}
int AudioSampler::sample(
const uint8_t* inPlanes[],
int inNumSamples,
ByteStorage* out,
int outNumSamples) {
uint8_t* outPlanes[SWR_CH_MAX] = {nullptr};
int result;
if ((result = preparePlanes(
params_.out.audio, out->writableTail(), outNumSamples, outPlanes)) <
0) {
return result;
}
if ((result = swr_convert(
swrContext_, &outPlanes[0], outNumSamples, inPlanes, inNumSamples)) <
0) {
LOG(ERROR) << "swr_convert faield, err: "
<< Util::generateErrorDesc(result);
return result;
}
CHECK_LE(result, outNumSamples);
if ((result = av_samples_get_buffer_size(
nullptr,
params_.out.audio.channels,
result,
(AVSampleFormat)params_.out.audio.format,
1)) > 0) {
out->append(result);
}
return result;
}
int AudioSampler::sample(AVFrame* frame, ByteStorage* out) {
const auto outNumSamples = numOutputSamples(frame ? frame->nb_samples : 0);
if (!outNumSamples) {
return 0;
}
const auto samplesBytes =
av_get_bytes_per_sample((AVSampleFormat)params_.out.audio.format) *
outNumSamples * params_.out.audio.channels;
// bytes must be allocated
CHECK_LE(samplesBytes, out->tail());
return sample(
frame ? (const uint8_t**)&frame->data[0] : nullptr,
frame ? frame->nb_samples : 0,
out,
outNumSamples);
}
int AudioSampler::sample(const ByteStorage* in, ByteStorage* out) {
const auto inSampleSize =
av_get_bytes_per_sample((AVSampleFormat)params_.in.audio.format);
const auto inNumSamples =
!in ? 0 : in->length() / inSampleSize / params_.in.audio.channels;
const auto outNumSamples = numOutputSamples(inNumSamples);
if (!outNumSamples) {
return 0;
}
const auto samplesBytes =
av_get_bytes_per_sample((AVSampleFormat)params_.out.audio.format) *
outNumSamples * params_.out.audio.channels;
out->clear();
out->ensure(samplesBytes);
uint8_t* inPlanes[SWR_CH_MAX] = {nullptr};
int result;
if (in &&
(result = preparePlanes(
params_.in.audio, in->data(), inNumSamples, inPlanes)) < 0) {
return result;
}
return sample(
in ? (const uint8_t**)inPlanes : nullptr,
inNumSamples,
out,
outNumSamples);
}
void AudioSampler::cleanUp() {
if (swrContext_) {
swr_free(&swrContext_);
swrContext_ = nullptr;
}
}
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#pragma once
#include "defs.h"
extern "C" {
#include <libswresample/swresample.h>
}
namespace ffmpeg {
/**
* Class transcode audio frames from one format into another
*/
class AudioSampler : public MediaSampler {
public:
explicit AudioSampler(void* logCtx);
~AudioSampler() override;
// MediaSampler overrides
bool init(const SamplerParameters& params) override;
int sample(const ByteStorage* in, ByteStorage* out) override;
void shutdown() override;
int getSamplesBytes(AVFrame* frame) const;
int sample(AVFrame* frame, ByteStorage* out);
private:
// close resources
void cleanUp();
// helper functions for rescaling, cropping, etc.
int numOutputSamples(int inSamples) const;
int sample(
const uint8_t* inPlanes[],
int inNumSamples,
ByteStorage* out,
int outNumSamples);
private:
SwrContext* swrContext_{nullptr};
void* logCtx_{nullptr};
};
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#include "audio_stream.h"
#include <c10/util/Logging.h>
#include <limits>
#include "util.h"
namespace ffmpeg {
namespace {
bool operator==(const AudioFormat& x, const AVCodecContext& y) {
return x.samples == y.sample_rate && x.channels == y.channels &&
x.format == y.sample_fmt;
}
AudioFormat& toAudioFormat(AudioFormat& x, const AVCodecContext& y) {
x.samples = y.sample_rate;
x.channels = y.channels;
x.format = y.sample_fmt;
return x;
}
} // namespace
AudioStream::AudioStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const AudioFormat& format)
: Stream(
inputCtx,
MediaFormat::makeMediaFormat(format, index),
convertPtsToWallTime) {}
AudioStream::~AudioStream() {
if (sampler_) {
sampler_->shutdown();
sampler_.reset();
}
}
void AudioStream::ensureSampler() {
if (!sampler_) {
sampler_ = std::make_unique<AudioSampler>(codecCtx_);
}
}
int AudioStream::initFormat() {
// set output format
if (format_.format.audio.samples == 0) {
format_.format.audio.samples = codecCtx_->sample_rate;
}
if (format_.format.audio.channels == 0) {
format_.format.audio.channels = codecCtx_->channels;
}
if (format_.format.audio.format == AV_SAMPLE_FMT_NONE) {
format_.format.audio.format = codecCtx_->sample_fmt;
}
return format_.format.audio.samples != 0 &&
format_.format.audio.channels != 0 &&
format_.format.audio.format != AV_SAMPLE_FMT_NONE
? 0
: -1;
}
int AudioStream::estimateBytes(bool flush) {
ensureSampler();
if (!(sampler_->getInputFormat().audio == *codecCtx_)) {
// - reinit sampler
SamplerParameters params;
params.type = format_.type;
params.out = format_.format;
toAudioFormat(params.in.audio, *codecCtx_);
if (flush || !sampler_->init(params)) {
return -1;
}
VLOG(1) << "Set input audio sampler format"
<< ", samples: " << params.in.audio.samples
<< ", channels: " << params.in.audio.channels
<< ", format: " << params.in.audio.format
<< " : output audio sampler format"
<< ", samples: " << format_.format.audio.samples
<< ", channels: " << format_.format.audio.channels
<< ", format: " << format_.format.audio.format;
}
return sampler_->getSamplesBytes(frame_);
}
int AudioStream::copyFrameBytes(ByteStorage* out, bool flush) {
ensureSampler();
return sampler_->sample(flush ? nullptr : frame_, out);
}
void AudioStream::setHeader(DecoderHeader* header) {
header->seqno = numGenerator_++;
if (codecCtx_->time_base.num != 0) {
header->pts = av_rescale_q(
av_frame_get_best_effort_timestamp(frame_),
codecCtx_->time_base,
AV_TIME_BASE_Q);
} else {
// If the codec time_base is missing then we would've skipped the
// rescalePackage step to rescale to codec time_base, so here we can
// rescale straight from the stream time_base into AV_TIME_BASE_Q.
header->pts = av_rescale_q(
av_frame_get_best_effort_timestamp(frame_),
inputCtx_->streams[format_.stream]->time_base,
AV_TIME_BASE_Q);
}
if (convertPtsToWallTime_) {
keeper_.adjust(header->pts);
}
header->keyFrame = 1;
header->fps = std::numeric_limits<double>::quiet_NaN();
header->format = format_;
}
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#pragma once
#include "audio_sampler.h"
#include "stream.h"
#include "time_keeper.h"
namespace ffmpeg {
/**
* Class uses FFMPEG library to decode one audio stream.
*/
class AudioStream : public Stream {
public:
AudioStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const AudioFormat& format);
~AudioStream() override;
private:
int initFormat() override;
int estimateBytes(bool flush) override;
int copyFrameBytes(ByteStorage* out, bool flush) override;
void setHeader(DecoderHeader* header) override;
void ensureSampler();
private:
std::unique_ptr<AudioSampler> sampler_;
TimeKeeper keeper_;
};
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#include "cc_stream.h"
namespace ffmpeg {
CCStream::CCStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const SubtitleFormat& format)
: SubtitleStream(inputCtx, index, convertPtsToWallTime, format) {
format_.type = TYPE_CC;
}
AVCodec* CCStream::findCodec(AVCodecContext* ctx) {
if (ctx->codec_id == AV_CODEC_ID_BIN_DATA &&
ctx->codec_type == AVMEDIA_TYPE_DATA) {
// obtain subtitles codec
ctx->codec_id = AV_CODEC_ID_MOV_TEXT;
ctx->codec_type = AVMEDIA_TYPE_SUBTITLE;
}
return Stream::findCodec(ctx);
}
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#pragma once
#include "subtitle_stream.h"
namespace ffmpeg {
/**
* Class uses FFMPEG library to decode one closed captions stream.
*/
class CCStream : public SubtitleStream {
public:
CCStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const SubtitleFormat& format);
private:
AVCodec* findCodec(AVCodecContext* ctx) override;
};
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#include "decoder.h"
#include <c10/util/Logging.h>
#include <future>
#include <iostream>
#include <mutex>
#include "audio_stream.h"
#include "cc_stream.h"
#include "subtitle_stream.h"
#include "util.h"
#include "video_stream.h"
namespace ffmpeg {
namespace {
constexpr ssize_t kMinSeekBufferSize = 1024;
constexpr ssize_t kMaxSeekBufferSize = 4 * 1024;
constexpr size_t kIoBufferSize = 4 * 1024;
constexpr size_t kLogBufferSize = 1024;
int ffmpeg_lock(void** mutex, enum AVLockOp op) {
std::mutex** handle = (std::mutex**)mutex;
switch (op) {
case AV_LOCK_CREATE:
*handle = new std::mutex();
break;
case AV_LOCK_OBTAIN:
(*handle)->lock();
break;
case AV_LOCK_RELEASE:
(*handle)->unlock();
break;
case AV_LOCK_DESTROY:
delete *handle;
break;
}
return 0;
}
bool mapFfmpegType(AVMediaType media, MediaType* type) {
switch (media) {
case AVMEDIA_TYPE_AUDIO:
*type = TYPE_AUDIO;
return true;
case AVMEDIA_TYPE_VIDEO:
*type = TYPE_VIDEO;
return true;
case AVMEDIA_TYPE_SUBTITLE:
*type = TYPE_SUBTITLE;
return true;
case AVMEDIA_TYPE_DATA:
*type = TYPE_CC;
return true;
default:
return false;
}
}
std::unique_ptr<Stream> createStream(
MediaType type,
AVFormatContext* ctx,
int idx,
bool convertPtsToWallTime,
const FormatUnion& format,
int64_t loggingUuid) {
switch (type) {
case TYPE_AUDIO:
return std::make_unique<AudioStream>(
ctx, idx, convertPtsToWallTime, format.audio);
case TYPE_VIDEO:
return std::make_unique<VideoStream>(
// negative loggingUuid indicates video streams.
ctx,
idx,
convertPtsToWallTime,
format.video,
-loggingUuid);
case TYPE_SUBTITLE:
return std::make_unique<SubtitleStream>(
ctx, idx, convertPtsToWallTime, format.subtitle);
case TYPE_CC:
return std::make_unique<CCStream>(
ctx, idx, convertPtsToWallTime, format.subtitle);
default:
return nullptr;
}
}
} // Namespace
/* static */
void Decoder::logFunction(void* avcl, int level, const char* cfmt, va_list vl) {
if (!avcl) {
// Nothing can be done here
return;
}
AVClass* avclass = *reinterpret_cast<AVClass**>(avcl);
if (!avclass) {
// Nothing can be done here
return;
}
Decoder* decoder = nullptr;
if (strcmp(avclass->class_name, "AVFormatContext") == 0) {
AVFormatContext* context = reinterpret_cast<AVFormatContext*>(avcl);
if (context) {
decoder = reinterpret_cast<Decoder*>(context->opaque);
}
} else if (strcmp(avclass->class_name, "AVCodecContext") == 0) {
AVCodecContext* context = reinterpret_cast<AVCodecContext*>(avcl);
if (context) {
decoder = reinterpret_cast<Decoder*>(context->opaque);
}
} else if (strcmp(avclass->class_name, "AVIOContext") == 0) {
AVIOContext* context = reinterpret_cast<AVIOContext*>(avcl);
// only if opaque was assigned to Decoder pointer
if (context && context->read_packet == Decoder::readFunction) {
decoder = reinterpret_cast<Decoder*>(context->opaque);
}
} else if (strcmp(avclass->class_name, "SWResampler") == 0) {
// expect AVCodecContext as parent
if (avclass->parent_log_context_offset) {
AVClass** parent =
*(AVClass***)(((uint8_t*)avcl) + avclass->parent_log_context_offset);
AVCodecContext* context = reinterpret_cast<AVCodecContext*>(parent);
if (context) {
decoder = reinterpret_cast<Decoder*>(context->opaque);
}
}
} else if (strcmp(avclass->class_name, "SWScaler") == 0) {
// cannot find a way to pass context pointer through SwsContext struct
} else {
VLOG(2) << "Unknown context class: " << avclass->class_name;
}
if (decoder != nullptr && decoder->enableLogLevel(level)) {
char buf[kLogBufferSize] = {0};
// Format the line
int* prefix = decoder->getPrintPrefix();
*prefix = 1;
av_log_format_line(avcl, level, cfmt, vl, buf, sizeof(buf) - 1, prefix);
// pass message to the decoder instance
std::string msg(buf);
decoder->logCallback(level, msg);
}
}
bool Decoder::enableLogLevel(int level) const {
return ssize_t(level) <= params_.logLevel;
}
void Decoder::logCallback(int level, const std::string& message) {
LOG(INFO) << "Msg, level: " << level << ", msg: " << message;
}
/* static */
int Decoder::shutdownFunction(void* ctx) {
Decoder* decoder = (Decoder*)ctx;
if (decoder == nullptr) {
return 1;
}
return decoder->shutdownCallback();
}
int Decoder::shutdownCallback() {
return interrupted_ ? 1 : 0;
}
/* static */
int Decoder::readFunction(void* opaque, uint8_t* buf, int size) {
Decoder* decoder = reinterpret_cast<Decoder*>(opaque);
if (decoder == nullptr) {
return 0;
}
return decoder->readCallback(buf, size);
}
/* static */
int64_t Decoder::seekFunction(void* opaque, int64_t offset, int whence) {
Decoder* decoder = reinterpret_cast<Decoder*>(opaque);
if (decoder == nullptr) {
return -1;
}
return decoder->seekCallback(offset, whence);
}
int Decoder::readCallback(uint8_t* buf, int size) {
return seekableBuffer_.read(buf, size, params_.timeoutMs);
}
int64_t Decoder::seekCallback(int64_t offset, int whence) {
return seekableBuffer_.seek(offset, whence, params_.timeoutMs);
}
/* static */
void Decoder::initOnce() {
static std::once_flag flagInit;
std::call_once(flagInit, []() {
av_register_all();
avcodec_register_all();
avformat_network_init();
// register ffmpeg lock manager
av_lockmgr_register(&ffmpeg_lock);
av_log_set_callback(Decoder::logFunction);
av_log_set_level(AV_LOG_ERROR);
LOG(INFO) << "Registered ffmpeg libs";
});
}
Decoder::Decoder() {
initOnce();
}
Decoder::~Decoder() {
cleanUp();
}
bool Decoder::init(const DecoderParameters& params, DecoderInCallback&& in) {
cleanUp();
if ((params.uri.empty() || in) && (!params.uri.empty() || !in)) {
LOG(ERROR) << "Either external URI gets provided"
<< " or explicit input callback";
return false;
}
// set callback and params
params_ = params;
auto tmpCtx = avformat_alloc_context();
if (!tmpCtx) {
LOG(ERROR) << "Cannot allocate format context";
return false;
}
AVInputFormat* fmt = nullptr;
if (in) {
const size_t avioCtxBufferSize = kIoBufferSize;
uint8_t* avioCtxBuffer = (uint8_t*)av_malloc(avioCtxBufferSize);
if (!avioCtxBuffer) {
LOG(ERROR) << "av_malloc cannot allocate " << avioCtxBufferSize
<< " bytes";
avformat_close_input(&tmpCtx);
cleanUp();
return false;
}
bool canSeek = in(nullptr, 0, 0) == 0;
if (!seekableBuffer_.init(
std::forward<DecoderInCallback>(in),
kMinSeekBufferSize,
kMaxSeekBufferSize,
params_.timeoutMs)) {
LOG(ERROR) << "seekable buffer initialization failed";
av_free(avioCtxBuffer);
avformat_close_input(&tmpCtx);
cleanUp();
return false;
}
if (params_.isImage) {
const char* fmtName = "image2";
switch (seekableBuffer_.getImageType()) {
case ImageType::JPEG:
fmtName = "jpeg_pipe";
break;
case ImageType::PNG:
fmtName = "png_pipe";
break;
case ImageType::TIFF:
fmtName = "tiff_pipe";
break;
default:
break;
}
fmt = av_find_input_format(fmtName);
}
if (!(avioCtx_ = avio_alloc_context(
avioCtxBuffer,
avioCtxBufferSize,
0,
reinterpret_cast<void*>(this),
&Decoder::readFunction,
nullptr,
canSeek ? &Decoder::seekFunction : nullptr))) {
LOG(ERROR) << "avio_alloc_context failed";
av_free(avioCtxBuffer);
avformat_close_input(&tmpCtx);
cleanUp();
return false;
}
tmpCtx->pb = avioCtx_;
}
interrupted_ = false;
// ffmpeg avformat_open_input call can hang if media source doesn't respond
// set a guard for handle such situations
std::promise<bool> p;
std::future<bool> f = p.get_future();
std::thread guard([&f, this]() {
auto timeout = std::chrono::milliseconds(params_.timeoutMs);
if (std::future_status::timeout == f.wait_for(timeout)) {
LOG(ERROR) << "Cannot open stream within " << params_.timeoutMs << " ms";
interrupted_ = true;
}
});
tmpCtx->opaque = reinterpret_cast<void*>(this);
tmpCtx->interrupt_callback.callback = Decoder::shutdownFunction;
tmpCtx->interrupt_callback.opaque = reinterpret_cast<void*>(this);
// add network timeout
tmpCtx->flags |= AVFMT_FLAG_NONBLOCK;
AVDictionary* options = nullptr;
av_dict_set_int(&options, "analyzeduration", params_.timeoutMs * 1000, 0);
av_dict_set_int(&options, "stimeout", params_.timeoutMs * 1000, 0);
if (params_.listen) {
av_dict_set_int(&options, "listen", 1, 0);
}
int result = 0;
if (fmt) {
result = avformat_open_input(&tmpCtx, nullptr, fmt, &options);
} else {
result =
avformat_open_input(&tmpCtx, params_.uri.c_str(), nullptr, &options);
}
av_dict_free(&options);
p.set_value(true);
guard.join();
inputCtx_ = tmpCtx;
if (result < 0 || interrupted_) {
LOG(ERROR) << "avformat_open_input failed, error: "
<< Util::generateErrorDesc(result);
cleanUp();
return false;
}
result = avformat_find_stream_info(inputCtx_, nullptr);
if (result < 0) {
LOG(ERROR) << "avformat_find_stream_info failed, error: "
<< Util::generateErrorDesc(result);
cleanUp();
return false;
}
if (!activateStreams()) {
LOG(ERROR) << "Cannot activate streams";
cleanUp();
return false;
}
onInit();
if (params.startOffsetMs != 0) {
av_seek_frame(
inputCtx_,
-1,
params.startOffsetMs * AV_TIME_BASE / 1000,
AVSEEK_FLAG_FRAME | AVSEEK_FLAG_ANY);
}
LOG(INFO) << "Decoder initialized, log level: " << params_.logLevel;
outOfRange_ = false;
return true;
}
bool Decoder::activateStreams() {
for (int i = 0; i < inputCtx_->nb_streams; i++) {
// - find the corespondent format at params_.formats set
MediaFormat format;
const auto media = inputCtx_->streams[i]->codec->codec_type;
if (!mapFfmpegType(media, &format.type)) {
VLOG(1) << "Stream media: " << media << " at index " << i
<< " gets ignored, unknown type";
continue; // unsupported type
}
// check format
auto it = params_.formats.find(format);
if (it == params_.formats.end()) {
VLOG(1) << "Stream type: " << format.type << " at index: " << i
<< " gets ignored, caller is not interested";
continue; // clients don't care about this media format
}
// do we have stream of this type?
auto stream = findByType(format);
// should we process this stream?
if (it->stream == -2 || // all streams of this type are welcome
(!stream && (it->stream == -1 || it->stream == i))) { // new stream
VLOG(1) << "Stream type: " << format.type << " found, at index: " << i;
auto stream = createStream(
format.type,
inputCtx_,
i,
params_.convertPtsToWallTime,
it->format,
params_.loggingUuid);
CHECK(stream);
if (stream->openCodec() < 0) {
LOG(ERROR) << "Cannot open codec " << i;
return false;
}
streams_.emplace(i, std::move(stream));
}
}
return true;
}
void Decoder::shutdown() {
cleanUp();
}
void Decoder::interrupt() {
interrupted_ = true;
}
void Decoder::cleanUp() {
if (!interrupted_) {
interrupted_ = true;
}
if (inputCtx_) {
for (auto& stream : streams_) {
// Drain stream buffers.
DecoderOutputMessage msg;
while (msg.payload = createByteStorage(0),
stream.second->flush(&msg, params_.headerOnly) > 0) {
}
stream.second.reset();
}
streams_.clear();
avformat_close_input(&inputCtx_);
}
if (avioCtx_) {
av_freep(&avioCtx_->buffer);
av_freep(&avioCtx_);
}
// reset callback
seekableBuffer_.shutdown();
}
int Decoder::getBytes(size_t workingTimeInMs) {
if (outOfRange_) {
return ENODATA;
}
// decode frames until cache is full and leave thread
// once decode() method gets called and grab some bytes
// run this method again
// init package
AVPacket avPacket;
av_init_packet(&avPacket);
avPacket.data = nullptr;
avPacket.size = 0;
auto end = std::chrono::steady_clock::now() +
std::chrono::milliseconds(workingTimeInMs);
// return true if elapsed time less than timeout
auto watcher = [end]() -> bool {
return std::chrono::steady_clock::now() <= end;
};
int result = ETIMEDOUT;
size_t decodingErrors = 0;
while (!interrupted_ && watcher()) {
result = av_read_frame(inputCtx_, &avPacket);
if (result == AVERROR(EAGAIN)) {
VLOG(4) << "Decoder is busy...";
result = 0; // reset error, EAGAIN is not an error at all
break;
} else if (result == AVERROR_EOF) {
flushStreams();
VLOG(1) << "End of stream";
result = ENODATA;
break;
} else if (result < 0) {
flushStreams();
LOG(ERROR) << "Error detected: " << Util::generateErrorDesc(result);
break;
}
// get stream
auto stream = findByIndex(avPacket.stream_index);
if (stream == nullptr) {
av_packet_unref(&avPacket);
continue;
}
stream->rescalePackage(&avPacket);
AVPacket copyPacket = avPacket;
size_t numConsecutiveNoBytes = 0;
// it can be only partial decoding of the package bytes
do {
// decode package
if ((result = processPacket(stream, &copyPacket)) < 0) {
break;
}
if (result == 0 && params_.maxProcessNoBytes != 0 &&
++numConsecutiveNoBytes > params_.maxProcessNoBytes) {
LOG(ERROR) << "Exceeding max amount of consecutive no bytes";
break;
}
if (result > 0) {
numConsecutiveNoBytes = 0;
}
copyPacket.size -= result;
copyPacket.data += result;
} while (copyPacket.size > 0);
// post loop check
if (result < 0) {
if (params_.maxPackageErrors != 0 && // check errors
++decodingErrors >= params_.maxPackageErrors) { // reached the limit
break;
}
} else {
decodingErrors = 0; // reset on success
}
result = 0;
av_packet_unref(&avPacket);
}
av_packet_unref(&avPacket);
return result;
}
Stream* Decoder::findByIndex(int streamIndex) const {
auto it = streams_.find(streamIndex);
return it != streams_.end() ? it->second.get() : nullptr;
}
Stream* Decoder::findByType(const MediaFormat& format) const {
for (auto& stream : streams_) {
if (stream.second->getMediaFormat().type == format.type) {
return stream.second.get();
}
}
return nullptr;
}
int Decoder::processPacket(Stream* stream, AVPacket* packet) {
// decode package
int gotFrame = 0;
int result;
DecoderOutputMessage msg;
msg.payload = createByteStorage(0);
if ((result = stream->decodeFrame(packet, &gotFrame)) >= 0 && gotFrame &&
stream->getFrameBytes(&msg, params_.headerOnly) > 0) {
// check end offset
if (params_.endOffsetMs <= 0 ||
!(outOfRange_ = msg.header.pts > params_.endOffsetMs * 1000)) {
push(std::move(msg));
}
}
return result;
}
void Decoder::flushStreams() {
VLOG(1) << "Flushing streams...";
for (auto& stream : streams_) {
DecoderOutputMessage msg;
while (msg.payload = createByteStorage(0),
stream.second->flush(&msg, params_.headerOnly) > 0) {
// check end offset
if (params_.endOffsetMs <= 0 ||
!(outOfRange_ = msg.header.pts > params_.endOffsetMs * 1000)) {
push(std::move(msg));
}
}
}
}
int Decoder::decode_all(const DecoderOutCallback& callback) {
int result;
do {
DecoderOutputMessage out;
if (0 == (result = decode(&out, params_.timeoutMs))) {
callback(std::move(out));
}
} while (result == 0);
return result;
}
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#pragma once
#include "seekable_buffer.h"
#include "stream.h"
namespace ffmpeg {
/**
* Class uses FFMPEG library to decode media streams.
* Media bytes can be explicitly provided through read-callback
* or fetched internally by FFMPEG library
*/
class Decoder : public MediaDecoder {
public:
Decoder();
~Decoder() override;
// MediaDecoder overrides
bool init(const DecoderParameters& params, DecoderInCallback&& in) override;
int decode_all(const DecoderOutCallback& callback) override;
void shutdown() override;
void interrupt() override;
protected:
// function does actual work, derived class calls it in working thread
// periodically. On success method returns 0, ENOADATA on EOF and error on
// unrecoverable error.
int getBytes(size_t workingTimeInMs = 100);
// Derived class must override method and consume the provided message
virtual void push(DecoderOutputMessage&& buffer) = 0;
// Fires on init call
virtual void onInit() {}
public:
// C-style FFMPEG API requires C/static methods for callbacks
static void logFunction(void* avcl, int level, const char* cfmt, va_list vl);
static int shutdownFunction(void* ctx);
static int readFunction(void* opaque, uint8_t* buf, int size);
static int64_t seekFunction(void* opaque, int64_t offset, int whence);
// can be called by any classes or API
static void initOnce();
int* getPrintPrefix() {
return &printPrefix;
}
private:
// mark below function for a proper invocation
virtual bool enableLogLevel(int level) const;
virtual void logCallback(int level, const std::string& message);
virtual int readCallback(uint8_t* buf, int size);
virtual int64_t seekCallback(int64_t offset, int whence);
virtual int shutdownCallback();
bool activateStreams();
Stream* findByIndex(int streamIndex) const;
Stream* findByType(const MediaFormat& format) const;
int processPacket(Stream* stream, AVPacket* packet);
void flushStreams();
void cleanUp();
private:
DecoderParameters params_;
SeekableBuffer seekableBuffer_;
int printPrefix{1};
std::atomic<bool> interrupted_{false};
AVFormatContext* inputCtx_{nullptr};
AVIOContext* avioCtx_{nullptr};
std::unordered_map<ssize_t, std::unique_ptr<Stream>> streams_;
bool outOfRange_{false};
};
} // namespace ffmpeg
#pragma once
#include <functional>
#include <memory>
#include <set>
#include <string>
#include <unordered_set>
#include <vector>
namespace ffmpeg {
// bit mask of formats, keep them in form 2^n
enum MediaType : size_t {
TYPE_AUDIO = 1,
TYPE_VIDEO = 2,
TYPE_SUBTITLE = 4,
TYPE_CC = 8, // closed captions from transport streams
};
// audio
struct AudioFormat {
// fields are initialized for the auto detection
// caller can specify some/all of field values if specific output is desirable
bool operator==(const AudioFormat& x) const {
return x.format == format && x.samples == samples && x.channels == channels;
}
size_t samples{0}; // number samples per second (frequency)
size_t channels{0}; // number of channels
ssize_t format{-1}; // AVSampleFormat, auto AV_SAMPLE_FMT_NONE
size_t padding[2];
// -- alignment 40 bytes
};
// video
struct VideoFormat {
// fields are initialized for the auto detection
// caller can specify some/all of field values if specific output is desirable
bool operator==(const VideoFormat& x) const {
return x.format == format && x.width == width && x.height == height;
}
size_t width{0}; // width in pixels
size_t height{0}; // height in pixels
ssize_t format{-1}; // AVPixelFormat, auto AV_PIX_FMT_NONE
size_t minDimension{0}; // choose min dimension and rescale accordingly
size_t cropImage{0}; // request image crop
// -- alignment 40 bytes
};
// subtitle/cc
struct SubtitleFormat {
ssize_t type{0}; // AVSubtitleType, auto SUBTITLE_NONE
size_t padding[4];
// -- alignment 40 bytes
};
union FormatUnion {
FormatUnion() : audio() {}
explicit FormatUnion(int) : video() {}
explicit FormatUnion(char) : subtitle() {}
explicit FormatUnion(double) : subtitle() {}
AudioFormat audio;
VideoFormat video;
SubtitleFormat subtitle;
// -- alignment 40 bytes
};
/*
MediaFormat data structure serves as input/output parameter.
Caller assigns values for input formats
or leave default values for auto detection
For output formats all fields will be set to the specific values
*/
struct MediaFormat {
// for using map/set data structures
bool operator<(const MediaFormat& x) const {
return type < x.type;
}
bool operator==(const MediaFormat& x) const {
if (type != x.type) {
return false;
}
switch (type) {
case TYPE_AUDIO:
return format.audio == x.format.audio;
case TYPE_VIDEO:
return format.video == x.format.video;
case TYPE_SUBTITLE:
case TYPE_CC:
return true;
default:
return false;
}
}
explicit MediaFormat(ssize_t s = -1)
: type(TYPE_AUDIO), stream(s), format() {}
explicit MediaFormat(int x, ssize_t s = -1)
: type(TYPE_VIDEO), stream(s), format(x) {}
explicit MediaFormat(char x, ssize_t s = -1)
: type(TYPE_SUBTITLE), stream(s), format(x) {}
explicit MediaFormat(double x, ssize_t s = -1)
: type(TYPE_CC), stream(s), format(x) {}
static MediaFormat makeMediaFormat(AudioFormat format, ssize_t stream) {
MediaFormat result(stream);
result.format.audio = format;
return result;
}
static MediaFormat makeMediaFormat(VideoFormat format, ssize_t stream) {
MediaFormat result(0, stream);
result.format.video = format;
return result;
}
static MediaFormat makeMediaFormat(SubtitleFormat format, ssize_t stream) {
MediaFormat result('0', stream);
result.format.subtitle = format;
return result;
}
// format type
MediaType type;
// stream index:
// set -1 for one stream auto detection, -2 for all streams auto detection,
// >= 0, specified stream, if caller knows the stream index (unlikely)
ssize_t stream;
// union keeps one of the possible formats, defined by MediaType
FormatUnion format;
// output parameters, ignored while initialization
// time base numerator
ssize_t num{0};
// time base denominator
ssize_t den{1};
// duration of the stream, in stream time base, if available
ssize_t duration{-1};
};
struct DecoderParameters {
// local file, remote file, http url, rtmp stream uri, etc. anything that
// ffmpeg can recognize
std::string uri;
// timeout on getting bytes for decoding
size_t timeoutMs{1000};
// logging level, default AV_LOG_PANIC
ssize_t logLevel{0};
// when decoder would give up, 0 means never
size_t maxPackageErrors{0};
// max allowed consecutive times no bytes are processed. 0 means for infinite.
size_t maxProcessNoBytes{0};
// start offset
ssize_t startOffsetMs{0};
// end offset
ssize_t endOffsetMs{-1};
// logging id
int64_t loggingUuid{0};
// adjust header pts to the epoch time
bool convertPtsToWallTime{false};
// indicate if input stream is an encoded image
bool isImage{false};
// what media types should be processed, default none
std::set<MediaFormat> formats;
// listen and wait for new rtmp stream
bool listen{false};
// don't copy frame body, only header
bool headerOnly{false};
// seek tolerated accuracy
double seekAccuracySec{1.0};
};
struct DecoderHeader {
// message id, from 0 till ...
size_t seqno{0};
// decoded timestamp in microseconds from either beginning of the stream or
// from epoch time, see DecoderParameters::convertPtsToWallTime
ssize_t pts{0};
// decoded key frame
size_t keyFrame{0};
// frames per second, valid only for video streams
double fps{0};
// format specifies what kind frame is in a payload
MediaFormat format;
};
// Abstract interface ByteStorage class
class ByteStorage {
public:
virtual ~ByteStorage() = default;
// makes sure that buffer has at least n bytes available for writing, if not
// storage must reallocate memory.
virtual void ensure(size_t n) = 0;
// caller must not to write more than available bytes
virtual uint8_t* writableTail() = 0;
// caller confirms that n bytes were written to the writable tail
virtual void append(size_t n) = 0;
// caller confirms that n bytes were read from the read buffer
virtual void trim(size_t n) = 0;
// gives an access to the beginning of the read buffer
virtual const uint8_t* data() const = 0;
// returns the stored size in bytes
virtual size_t length() const = 0;
// returns available capacity for writable tail
virtual size_t tail() const = 0;
// clears content, keeps capacity
virtual void clear() = 0;
};
struct DecoderOutputMessage {
DecoderHeader header;
std::unique_ptr<ByteStorage> payload;
};
/*
* External provider of the ecnoded bytes, specific implementation is left for
* different use cases, like file, memory, external network end-points, etc.
* Normally input/output parameter @out set to valid, not null buffer pointer,
* which indicates "read" call, however there are "seek" modes as well.
* @out != nullptr, @size != 0, @timeoutMs != 0 => read from the current offset
* @size bytes => return number bytes read, 0 if no more bytes available, < 0
* on error.
* @out == nullptr, @size == 0, @timeoutMs == 0 => does provider support "seek"
* capability in a first place? return 0 on success, < 0 if "seek" mode is not
* supported.
* @out == nullptr, @size > 0 => seek the absolute offset == @size, return
* 0 on success and < 0 on error.
* @out == nullptr, @size < 0 => seek the end of the media, return 0 on success
* and < 0 on failure. Provider might support seek doesn't know the media size.
* Additionally if @out is set to null AND @size is set to zero AND
* @timeoutMs is set to zero, caller requests the seek capability of the
* provider, i.e. returns 0 on success and error if provider is not supporting
* seek.
*/
using DecoderInCallback =
std::function<int(uint8_t* out, int size, uint64_t timeoutMs)>;
using DecoderOutCallback = std::function<void(DecoderOutputMessage&&)>;
/**
* Abstract class for decoding media bytes
* It has two diffrent modes. Internal media bytes retrieval for given uri and
* external media bytes provider in case of memory streams
*/
class MediaDecoder {
public:
virtual ~MediaDecoder() = default;
/**
* Initializes media decoder with parameters,
* calls callback when media bytes are available.
* Media bytes get fetched internally from provided URI
* or invokes provided input callback to get media bytes.
* Input callback must be empty for the internal media provider
*/
virtual bool init(
const DecoderParameters& params,
DecoderInCallback&& in) = 0;
/**
* Polls available decoded one frame from decoder
* Returns error code, 0 - for success
*/
virtual int decode(DecoderOutputMessage* out, uint64_t timeoutMs) = 0;
/**
* Polls available decoded bytes from decoder, till EOF or error
*/
virtual int decode_all(const DecoderOutCallback& callback) = 0;
/**
* Stops calling callback, releases resources
*/
virtual void shutdown() = 0;
/**
* Interrupts whatever decoder is doing at any time
*/
virtual void interrupt() = 0;
/**
* Factory to create ByteStorage class instances, particular implementation is
* left to the derived class. Caller provides the initially allocated size
*/
virtual std::unique_ptr<ByteStorage> createByteStorage(size_t n) = 0;
};
struct SamplerParameters {
MediaType type{TYPE_AUDIO};
FormatUnion in;
FormatUnion out;
int64_t loggingUuid{0};
};
/**
* Abstract class for sampling media bytes
*/
class MediaSampler {
public:
virtual ~MediaSampler() = default;
/**
* Initializes media sampler with parameters
*/
virtual bool init(const SamplerParameters& params) = 0;
/**
* Samples media bytes
* Returns error code < 0, or >=0 - for success, indicating number of bytes
* processed.
* set @in to null for flushing data
*/
virtual int sample(const ByteStorage* in, ByteStorage* out) = 0;
/**
* Releases resources
*/
virtual void shutdown() = 0;
/*
* Returns media type
*/
MediaType getMediaType() const {
return params_.type;
}
/*
* Returns formats
*/
FormatUnion getInputFormat() const {
return params_.in;
}
FormatUnion getOutFormat() const {
return params_.out;
}
protected:
SamplerParameters params_;
};
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#include "seekable_buffer.h"
#include <c10/util/Logging.h>
#include <chrono>
extern "C" {
#include <libavformat/avio.h>
}
namespace ffmpeg {
bool SeekableBuffer::init(
DecoderInCallback&& in,
ssize_t minSize,
ssize_t maxSize,
uint64_t timeoutMs) {
inCallback_ = std::forward<DecoderInCallback>(in);
len_ = minSize;
buffer_.resize(len_);
pos_ = 0;
end_ = 0;
eof_ = 0;
auto end =
std::chrono::steady_clock::now() + std::chrono::milliseconds(timeoutMs);
auto watcher = [end]() -> bool {
return std::chrono::steady_clock::now() <= end;
};
bool hasTime = false;
while (!eof_ && end_ < maxSize && (hasTime = watcher())) {
// lets read all bytes into available buffer
auto res = inCallback_(buffer_.data() + end_, len_ - end_, timeoutMs);
if (res > 0) {
end_ += res;
if (end_ == len_) {
len_ = std::min(len_ * 4, maxSize);
buffer_.resize(len_);
}
} else if (res == 0) {
eof_ = 1;
} else {
// error
return false;
}
}
if (!hasTime) {
return false;
}
if (buffer_.size() > 2 && buffer_[0] == 0xFF && buffer_[1] == 0xD8 &&
buffer_[2] == 0xFF) {
imageType_ = ImageType::JPEG;
} else if (
buffer_.size() > 3 && buffer_[1] == 'P' && buffer_[2] == 'N' &&
buffer_[3] == 'G') {
imageType_ = ImageType::PNG;
} else if (
buffer_.size() > 1 &&
((buffer_[0] == 0x49 && buffer_[1] == 0x49) ||
(buffer_[0] == 0x4D && buffer_[1] == 0x4D))) {
imageType_ = ImageType::TIFF;
}
return true;
}
int SeekableBuffer::read(uint8_t* buf, int size, uint64_t timeoutMs) {
// 1. pos_ < end_
if (pos_ < end_) {
auto available = std::min(int(end_ - pos_), size);
memcpy(buf, buffer_.data() + pos_, available);
pos_ += available;
return available;
} else if (!eof_) {
auto res = inCallback_(buf, size, timeoutMs); // read through
if (res > 0) {
pos_ += res;
if (pos_ > end_ && !buffer_.empty()) {
std::vector<uint8_t>().swap(buffer_);
}
} else if (res == 0) {
eof_ = 1;
}
return res;
} else {
return 0;
}
}
int64_t SeekableBuffer::seek(int64_t offset, int whence, uint64_t timeoutMs) {
// remove force flag
whence &= ~AVSEEK_FORCE;
// get size request
int size = whence & AVSEEK_SIZE;
// remove size flag
whence &= ~AVSEEK_SIZE;
if (size) {
return eof_ ? end_ : AVERROR(EINVAL);
} else {
switch (whence) {
case SEEK_SET:
if (offset < 0) {
return AVERROR(EINVAL);
}
if (offset <= end_) {
pos_ = offset;
return pos_;
}
if (!inCallback_(0, offset, timeoutMs)) {
pos_ = offset;
return 0;
}
break;
case SEEK_END:
if (eof_ && pos_ <= end_ && offset < 0 && end_ + offset >= 0) {
pos_ = end_ + offset;
return 0;
}
break;
case SEEK_CUR:
if (pos_ + offset < 0) {
return AVERROR(EINVAL);
}
if (pos_ + offset <= end_) {
pos_ += offset;
return 0;
}
if (!inCallback_(0, pos_ + offset, timeoutMs)) {
pos_ += offset;
return 0;
}
break;
default:
LOG(ERROR) << "Unknown whence flag gets provided: " << whence;
}
}
return AVERROR(EINVAL); // we have no idea what the media size is
}
void SeekableBuffer::shutdown() {
inCallback_ = nullptr;
}
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#pragma once
#include "defs.h"
namespace ffmpeg {
/**
* Class uses internal buffer to store initial size bytes as a seekable cache
* from Media provider and let ffmpeg to seek and read bytes from cache
* and beyond - reading bytes directly from Media provider
*/
enum class ImageType {
UNKNOWN = 0,
JPEG = 1,
PNG = 2,
TIFF = 3,
};
class SeekableBuffer {
public:
// try to fill out buffer, returns true if EOF detected (seek will supported)
bool init(
DecoderInCallback&& in,
ssize_t minSize,
ssize_t maxSize,
uint64_t timeoutMs);
int read(uint8_t* buf, int size, uint64_t timeoutMs);
int64_t seek(int64_t offset, int whence, uint64_t timeoutMs);
void shutdown();
ImageType getImageType() const {
return imageType_;
}
private:
DecoderInCallback inCallback_;
std::vector<uint8_t> buffer_; // resized at init time
ssize_t len_{0}; // current buffer size
ssize_t pos_{0}; // current position (SEEK_CUR iff pos_ < end_)
ssize_t end_{0}; // bytes in buffer [0, buffer_.size()]
ssize_t eof_{0}; // indicates the EOF
ImageType imageType_{ImageType::UNKNOWN};
};
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#include "stream.h"
#include <c10/util/Logging.h>
#include "util.h"
namespace ffmpeg {
namespace {
const size_t kDecoderHeaderSize = sizeof(DecoderHeader);
}
Stream::Stream(
AVFormatContext* inputCtx,
MediaFormat format,
bool convertPtsToWallTime)
: inputCtx_(inputCtx),
format_(format),
convertPtsToWallTime_(convertPtsToWallTime) {}
Stream::~Stream() {
if (frame_) {
av_free(frame_); // Copyright 2004-present Facebook. All Rights Reserved.
}
if (codecCtx_) {
avcodec_free_context(&codecCtx_);
}
}
AVCodec* Stream::findCodec(AVCodecContext* ctx) {
return avcodec_find_decoder(ctx->codec_id);
}
int Stream::openCodec() {
AVStream* steam = inputCtx_->streams[format_.stream];
auto codec_id = steam->codecpar->codec_id;
AVCodec* codec = avcodec_find_decoder(codec_id);
if (!codec) {
LOG(ERROR) << "avcodec_find_decoder failed for codec_id: " << int(codec_id);
return AVERROR(EINVAL);
}
if (!(codecCtx_ = avcodec_alloc_context3(codec))) {
LOG(ERROR) << "avcodec_alloc_context3 fails";
return AVERROR(ENOMEM);
}
int ret;
// Copy codec parameters from input stream to output codec context
if ((ret = avcodec_parameters_to_context(codecCtx_, steam->codecpar)) < 0) {
LOG(ERROR) << "Failed to copy codec parameters to decoder context";
return ret;
}
// after avcodec_open2, value of codecCtx_->time_base is NOT meaningful
if ((ret = avcodec_open2(codecCtx_, codec, nullptr)) < 0) {
LOG(ERROR) << "avcodec_open2 failed. " << Util::generateErrorDesc(ret);
avcodec_free_context(&codecCtx_);
codecCtx_ = nullptr;
return ret;
}
frame_ = av_frame_alloc();
format_.num = inputCtx_->streams[format_.stream]->time_base.num;
format_.den = inputCtx_->streams[format_.stream]->time_base.den;
format_.duration = inputCtx_->streams[format_.stream]->duration;
return initFormat();
}
// rescale package
void Stream::rescalePackage(AVPacket* packet) {
if (codecCtx_->time_base.num != 0) {
av_packet_rescale_ts(
packet,
inputCtx_->streams[format_.stream]->time_base,
codecCtx_->time_base);
}
}
int Stream::analyzePacket(const AVPacket* packet, int* gotFramePtr) {
int consumed = 0;
int result = avcodec_send_packet(codecCtx_, packet);
if (result == AVERROR(EAGAIN)) {
*gotFramePtr = 0; // no bytes get consumed, fetch frame
} else if (result == AVERROR_EOF) {
*gotFramePtr = 0; // more than one flush packet
if (packet) {
// got packet after flush, this is an error
return result;
}
} else if (result < 0) {
LOG(ERROR) << "avcodec_send_packet failed, err: "
<< Util::generateErrorDesc(result);
return result; // error
} else {
consumed = packet ? packet->size : 0; // all bytes get consumed
}
result = avcodec_receive_frame(codecCtx_, frame_);
if (result >= 0) {
*gotFramePtr = 1; // frame is available
} else if (result == AVERROR(EAGAIN)) {
*gotFramePtr = 0; // no frames at this time, needs more packets
if (!consumed) {
// precaution, if no packages got consumed and no frames are available
return result;
}
} else if (result == AVERROR_EOF) {
*gotFramePtr = 0; // the last frame has been flushed
// precaution, if no more frames are available assume we consume all bytes
consumed = packet ? packet->size : 0;
} else { // error
LOG(ERROR) << "avcodec_receive_frame failed, err: "
<< Util::generateErrorDesc(result);
return result;
}
return consumed;
}
int Stream::decodeFrame(const AVPacket* packet, int* gotFramePtr) {
return analyzePacket(packet, gotFramePtr);
}
int Stream::getFrameBytes(DecoderOutputMessage* out, bool headerOnly) {
return fillBuffer(out, false, headerOnly);
}
int Stream::flush(DecoderOutputMessage* out, bool headerOnly) {
int gotFramePtr = 0;
int result;
if (analyzePacket(nullptr, &gotFramePtr) >= 0 && gotFramePtr &&
(result = fillBuffer(out, false, headerOnly)) > 0) {
return result;
} else if ((result = fillBuffer(out, true, headerOnly)) > 0) {
return result;
}
return result;
}
int Stream::fillBuffer(DecoderOutputMessage* out, bool flush, bool headerOnly) {
int result = -1;
if (!codecCtx_) {
LOG(INFO) << "Codec is not initialized";
return result;
}
// assign message
setHeader(&out->header);
if (headerOnly) {
return sizeof(out->header);
}
// init sampler, if any and return required bytes
if ((result = estimateBytes(flush)) < 0) {
return result;
}
out->payload->ensure(result);
return copyFrameBytes(out->payload.get(), flush);
}
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#pragma once
#include <atomic>
#include "defs.h"
extern "C" {
#include <libavformat/avformat.h>
#include <libavformat/avio.h>
#include <libavutil/imgutils.h>
}
namespace ffmpeg {
/**
* Class uses FFMPEG library to decode one media stream (audio or video).
*/
class Stream {
public:
Stream(
AVFormatContext* inputCtx,
MediaFormat format,
bool convertPtsToWallTime);
virtual ~Stream();
// returns 0 - on success or negative error
int openCodec();
// returns number processed bytes from packet, or negative error
int decodeFrame(const AVPacket* packet, int* gotFramePtr);
// returns stream index
int getIndex() const {
return format_.stream;
}
// returns number decoded/sampled bytes
int getFrameBytes(DecoderOutputMessage* out, bool headerOnly);
// returns number decoded/sampled bytes
int flush(DecoderOutputMessage* out, bool headerOnly);
// rescale package
void rescalePackage(AVPacket* packet);
// return media format
MediaFormat getMediaFormat() const {
return format_;
}
protected:
virtual int initFormat() = 0;
// returns number processed bytes from packet, or negative error
virtual int analyzePacket(const AVPacket* packet, int* gotFramePtr);
// returns number decoded/sampled bytes, or negative error
virtual int copyFrameBytes(ByteStorage* out, bool flush) = 0;
// initialize codec, returns output buffer size, or negative error
virtual int estimateBytes(bool flush) = 0;
// sets output format
virtual void setHeader(DecoderHeader* header) = 0;
// finds codec
virtual AVCodec* findCodec(AVCodecContext* ctx);
private:
int fillBuffer(DecoderOutputMessage* out, bool flush, bool headerOnly);
protected:
AVFormatContext* const inputCtx_;
MediaFormat format_;
const bool convertPtsToWallTime_;
AVCodecContext* codecCtx_{nullptr};
AVFrame* frame_{nullptr};
std::atomic<size_t> numGenerator_{0};
};
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#include "subtitle_sampler.h"
#include "util.h"
namespace ffmpeg {
SubtitleSampler::~SubtitleSampler() {
cleanUp();
}
void SubtitleSampler::shutdown() {
cleanUp();
}
bool SubtitleSampler::init(const SamplerParameters& params) {
cleanUp();
// set formats
params_ = params;
return true;
}
int SubtitleSampler::getSamplesBytes(AVSubtitle* sub) const {
return Util::size(*sub);
}
int SubtitleSampler::sample(AVSubtitle* sub, ByteStorage* out) {
if (!sub) {
return 0; // flush
}
return Util::serialize(*sub, out);
}
int SubtitleSampler::sample(const ByteStorage* in, ByteStorage* out) {
if (in) {
// Get a writable copy
*out = *in;
return out->length();
}
return 0;
}
void SubtitleSampler::cleanUp() {}
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#pragma once
#include "defs.h"
extern "C" {
#include <libavcodec/avcodec.h>
}
namespace ffmpeg {
/**
* Class transcode audio frames from one format into another
*/
class SubtitleSampler : public MediaSampler {
public:
SubtitleSampler() = default;
~SubtitleSampler() override;
bool init(const SamplerParameters& params) override;
int sample(const ByteStorage* in, ByteStorage* out) override;
void shutdown() override;
// returns number processed/scaling bytes
int sample(AVSubtitle* sub, ByteStorage* out);
int getSamplesBytes(AVSubtitle* sub) const;
// helper serialization/deserialization methods
static void serialize(const AVSubtitle& sub, ByteStorage* out);
static bool deserialize(const ByteStorage& buf, AVSubtitle* sub);
private:
// close resources
void cleanUp();
};
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#include "subtitle_stream.h"
#include <c10/util/Logging.h>
#include <limits>
#include "util.h"
namespace ffmpeg {
namespace {
bool operator==(const SubtitleFormat&, const AVCodecContext&) {
return true;
}
SubtitleFormat& toSubtitleFormat(SubtitleFormat& x, const AVCodecContext&) {
return x;
}
} // namespace
SubtitleStream::SubtitleStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const SubtitleFormat& format)
: Stream(
inputCtx,
MediaFormat::makeMediaFormat(format, index),
convertPtsToWallTime) {
memset(&sub_, 0, sizeof(sub_));
}
void SubtitleStream::releaseSubtitle() {
if (sub_.release) {
avsubtitle_free(&sub_);
memset(&sub_, 0, sizeof(sub_));
}
}
SubtitleStream::~SubtitleStream() {
releaseSubtitle();
sampler_.shutdown();
}
int SubtitleStream::initFormat() {
if (!codecCtx_->subtitle_header) {
LOG(ERROR) << "No subtitle header found";
} else {
LOG(INFO) << "Subtitle header found!";
}
return 0;
}
int SubtitleStream::analyzePacket(const AVPacket* packet, int* gotFramePtr) {
// clean-up
releaseSubtitle();
// check flush packet
AVPacket avPacket;
av_init_packet(&avPacket);
avPacket.data = nullptr;
auto pkt = packet ? *packet : avPacket;
int result = avcodec_decode_subtitle2(codecCtx_, &sub_, gotFramePtr, &pkt);
if (result < 0) {
VLOG(1) << "avcodec_decode_subtitle2 failed, err: "
<< Util::generateErrorDesc(result);
} else if (result == 0) {
result = packet ? packet->size : 0; // discard the rest of the package
}
sub_.release = *gotFramePtr;
return result;
}
int SubtitleStream::estimateBytes(bool flush) {
if (!(sampler_.getInputFormat().subtitle == *codecCtx_)) {
// - reinit sampler
SamplerParameters params;
params.type = MediaType::TYPE_SUBTITLE;
toSubtitleFormat(params.in.subtitle, *codecCtx_);
if (flush || !sampler_.init(params)) {
return -1;
}
VLOG(1) << "Set input subtitle sampler format";
}
return sampler_.getSamplesBytes(&sub_);
}
int SubtitleStream::copyFrameBytes(ByteStorage* out, bool flush) {
return sampler_.sample(flush ? nullptr : &sub_, out);
}
void SubtitleStream::setHeader(DecoderHeader* header) {
header->seqno = numGenerator_++;
header->pts = sub_.pts; // already in us
if (convertPtsToWallTime_) {
keeper_.adjust(header->pts);
}
header->keyFrame = 0;
header->fps = std::numeric_limits<double>::quiet_NaN();
header->format = format_;
}
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#pragma once
#include "stream.h"
#include "subtitle_sampler.h"
#include "time_keeper.h"
namespace ffmpeg {
/**
* Class uses FFMPEG library to decode one subtitle stream.
*/
struct AVSubtitleKeeper : AVSubtitle {
int64_t release{0};
};
class SubtitleStream : public Stream {
public:
SubtitleStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const SubtitleFormat& format);
~SubtitleStream() override;
protected:
void setHeader(DecoderHeader* header) override;
private:
int initFormat() override;
int analyzePacket(const AVPacket* packet, int* gotFramePtr) override;
int estimateBytes(bool flush) override;
int copyFrameBytes(ByteStorage* out, bool flush) override;
void releaseSubtitle();
private:
SubtitleSampler sampler_;
TimeKeeper keeper_;
AVSubtitleKeeper sub_;
};
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#include "sync_decoder.h"
#include <c10/util/Logging.h>
namespace ffmpeg {
SyncDecoder::VectorByteStorage::VectorByteStorage(size_t n) {
buffer_.resize(n);
}
void SyncDecoder::VectorByteStorage::ensure(size_t n) {
if (tail() < n) {
buffer_.resize(offset_ + length_ + n);
}
}
uint8_t* SyncDecoder::VectorByteStorage::writableTail() {
CHECK_LE(offset_ + length_, buffer_.size());
return buffer_.data() + offset_ + length_;
}
void SyncDecoder::VectorByteStorage::append(size_t n) {
CHECK_LE(n, tail());
length_ += n;
}
void SyncDecoder::VectorByteStorage::trim(size_t n) {
CHECK_LE(n, length_);
offset_ += n;
length_ -= n;
}
const uint8_t* SyncDecoder::VectorByteStorage::data() const {
return buffer_.data() + offset_;
}
size_t SyncDecoder::VectorByteStorage::length() const {
return length_;
}
size_t SyncDecoder::VectorByteStorage::tail() const {
auto size = buffer_.size();
CHECK_LE(offset_ + length_, buffer_.size());
return size - offset_ - length_;
}
void SyncDecoder::VectorByteStorage::clear() {
buffer_.clear();
offset_ = 0;
length_ = 0;
}
std::unique_ptr<ByteStorage> SyncDecoder::createByteStorage(size_t n) {
return std::make_unique<VectorByteStorage>(n);
}
void SyncDecoder::onInit() {
eof_ = false;
queue_.clear();
}
int SyncDecoder::decode(DecoderOutputMessage* out, uint64_t timeoutMs) {
if (eof_ && queue_.empty()) {
return ENODATA;
}
if (queue_.empty()) {
int result = getBytes(timeoutMs);
eof_ = result == ENODATA;
if (result && result != ENODATA) {
return result;
}
// still empty
if (queue_.empty()) {
return ETIMEDOUT;
}
}
*out = std::move(queue_.front());
queue_.pop_front();
return 0;
}
void SyncDecoder::push(DecoderOutputMessage&& buffer) {
queue_.push_back(std::move(buffer));
}
} // namespace ffmpeg
// Copyright 2004-present Facebook. All Rights Reserved.
#pragma once
#include <list>
#include "decoder.h"
namespace ffmpeg {
/**
* Class uses FFMPEG library to decode media streams.
* Media bytes can be explicitly provided through read-callback
* or fetched internally by FFMPEG library
*/
class SyncDecoder : public Decoder {
class VectorByteStorage : public ByteStorage {
public:
VectorByteStorage(size_t n);
void ensure(size_t n) override;
uint8_t* writableTail() override;
void append(size_t n) override;
void trim(size_t n) override;
const uint8_t* data() const override;
size_t length() const override;
size_t tail() const override;
void clear() override;
private:
size_t offset_{0};
size_t length_{0};
std::vector<uint8_t> buffer_;
};
public:
int decode(DecoderOutputMessage* out, uint64_t timeoutMs) override;
private:
void push(DecoderOutputMessage&& buffer) override;
void onInit() override;
std::unique_ptr<ByteStorage> createByteStorage(size_t n) override;
private:
std::list<DecoderOutputMessage> queue_;
bool eof_{false};
};
} // namespace ffmpeg
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment