"vscode:/vscode.git/clone" did not exist on "cdadb5ce575afcd54953878f42ddcbdb6b2830b0"
Commit 0fc002df authored by huchen's avatar huchen
Browse files

init the dlexamples new

parent 0e04b692
#include "audio_sampler.h"
#include <c10/util/Logging.h>
#include "util.h"
#define AVRESAMPLE_MAX_CHANNELS 32
// www.ffmpeg.org/doxygen/1.1/doc_2examples_2resampling_audio_8c-example.html#a24
namespace ffmpeg {
namespace {
int preparePlanes(
const AudioFormat& fmt,
const uint8_t* buffer,
int numSamples,
uint8_t** planes) {
int result;
if ((result = av_samples_fill_arrays(
planes,
nullptr, // linesize is not needed
buffer,
fmt.channels,
numSamples,
(AVSampleFormat)fmt.format,
1)) < 0) {
LOG(ERROR) << "av_samples_fill_arrays failed, err: "
<< Util::generateErrorDesc(result)
<< ", numSamples: " << numSamples << ", fmt: " << fmt.format;
}
return result;
}
} // namespace
AudioSampler::AudioSampler(void* logCtx) : logCtx_(logCtx) {}
AudioSampler::~AudioSampler() {
cleanUp();
}
void AudioSampler::shutdown() {
cleanUp();
}
bool AudioSampler::init(const SamplerParameters& params) {
cleanUp();
if (params.type != MediaType::TYPE_AUDIO) {
LOG(ERROR) << "Invalid media type, expected MediaType::TYPE_AUDIO";
return false;
}
swrContext_ = swr_alloc_set_opts(
nullptr,
av_get_default_channel_layout(params.out.audio.channels),
(AVSampleFormat)params.out.audio.format,
params.out.audio.samples,
av_get_default_channel_layout(params.in.audio.channels),
(AVSampleFormat)params.in.audio.format,
params.in.audio.samples,
0,
logCtx_);
if (swrContext_ == nullptr) {
LOG(ERROR) << "Cannot allocate SwrContext";
return false;
}
int result;
if ((result = swr_init(swrContext_)) < 0) {
LOG(ERROR) << "swr_init faield, err: " << Util::generateErrorDesc(result)
<< ", in -> format: " << params.in.audio.format
<< ", channels: " << params.in.audio.channels
<< ", samples: " << params.in.audio.samples
<< ", out -> format: " << params.out.audio.format
<< ", channels: " << params.out.audio.channels
<< ", samples: " << params.out.audio.samples;
return false;
}
// set formats
params_ = params;
return true;
}
int AudioSampler::numOutputSamples(int inSamples) const {
return swr_get_out_samples(swrContext_, inSamples);
}
int AudioSampler::sample(
const uint8_t* inPlanes[],
int inNumSamples,
ByteStorage* out,
int outNumSamples) {
int result;
int outBufferBytes = av_samples_get_buffer_size(
nullptr,
params_.out.audio.channels,
outNumSamples,
(AVSampleFormat)params_.out.audio.format,
1);
if (out) {
out->ensure(outBufferBytes);
uint8_t* outPlanes[AVRESAMPLE_MAX_CHANNELS] = {nullptr};
if ((result = preparePlanes(
params_.out.audio,
out->writableTail(),
outNumSamples,
outPlanes)) < 0) {
return result;
}
if ((result = swr_convert(
swrContext_,
&outPlanes[0],
outNumSamples,
inPlanes,
inNumSamples)) < 0) {
LOG(ERROR) << "swr_convert faield, err: "
<< Util::generateErrorDesc(result);
return result;
}
CHECK_LE(result, outNumSamples);
if (result) {
if ((result = av_samples_get_buffer_size(
nullptr,
params_.out.audio.channels,
result,
(AVSampleFormat)params_.out.audio.format,
1)) >= 0) {
out->append(result);
} else {
LOG(ERROR) << "av_samples_get_buffer_size faield, err: "
<< Util::generateErrorDesc(result);
}
}
} else {
// allocate a temporary buffer
auto* tmpBuffer = static_cast<uint8_t*>(av_malloc(outBufferBytes));
if (!tmpBuffer) {
LOG(ERROR) << "av_alloc faield, for size: " << outBufferBytes;
return -1;
}
uint8_t* outPlanes[AVRESAMPLE_MAX_CHANNELS] = {nullptr};
if ((result = preparePlanes(
params_.out.audio, tmpBuffer, outNumSamples, outPlanes)) < 0) {
av_free(tmpBuffer);
return result;
}
if ((result = swr_convert(
swrContext_,
&outPlanes[0],
outNumSamples,
inPlanes,
inNumSamples)) < 0) {
LOG(ERROR) << "swr_convert faield, err: "
<< Util::generateErrorDesc(result);
av_free(tmpBuffer);
return result;
}
av_free(tmpBuffer);
CHECK_LE(result, outNumSamples);
if (result) {
result = av_samples_get_buffer_size(
nullptr,
params_.out.audio.channels,
result,
(AVSampleFormat)params_.out.audio.format,
1);
}
}
return result;
}
int AudioSampler::sample(AVFrame* frame, ByteStorage* out) {
const auto outNumSamples = numOutputSamples(frame ? frame->nb_samples : 0);
if (!outNumSamples) {
return 0;
}
return sample(
frame ? (const uint8_t**)&frame->data[0] : nullptr,
frame ? frame->nb_samples : 0,
out,
outNumSamples);
}
int AudioSampler::sample(const ByteStorage* in, ByteStorage* out) {
const auto inSampleSize =
av_get_bytes_per_sample((AVSampleFormat)params_.in.audio.format);
const auto inNumSamples =
!in ? 0 : in->length() / inSampleSize / params_.in.audio.channels;
const auto outNumSamples = numOutputSamples(inNumSamples);
if (!outNumSamples) {
return 0;
}
uint8_t* inPlanes[AVRESAMPLE_MAX_CHANNELS] = {nullptr};
int result;
if (in &&
(result = preparePlanes(
params_.in.audio, in->data(), inNumSamples, inPlanes)) < 0) {
return result;
}
return sample(
in ? (const uint8_t**)inPlanes : nullptr,
inNumSamples,
out,
outNumSamples);
}
void AudioSampler::cleanUp() {
if (swrContext_) {
swr_free(&swrContext_);
swrContext_ = nullptr;
}
}
} // namespace ffmpeg
#pragma once
#include "defs.h"
namespace ffmpeg {
/**
* Class transcode audio frames from one format into another
*/
class AudioSampler : public MediaSampler {
public:
explicit AudioSampler(void* logCtx);
~AudioSampler() override;
// MediaSampler overrides
bool init(const SamplerParameters& params) override;
int sample(const ByteStorage* in, ByteStorage* out) override;
void shutdown() override;
int sample(AVFrame* frame, ByteStorage* out);
private:
// close resources
void cleanUp();
// helper functions for rescaling, cropping, etc.
int numOutputSamples(int inSamples) const;
int sample(
const uint8_t* inPlanes[],
int inNumSamples,
ByteStorage* out,
int outNumSamples);
private:
SwrContext* swrContext_{nullptr};
void* logCtx_{nullptr};
};
} // namespace ffmpeg
#include "audio_stream.h"
#include <c10/util/Logging.h>
#include <limits>
#include "util.h"
namespace ffmpeg {
namespace {
bool operator==(const AudioFormat& x, const AVFrame& y) {
return x.samples == y.sample_rate && x.channels == y.channels &&
x.format == y.format;
}
bool operator==(const AudioFormat& x, const AVCodecContext& y) {
return x.samples == y.sample_rate && x.channels == y.channels &&
x.format == y.sample_fmt;
}
AudioFormat& toAudioFormat(AudioFormat& x, const AVFrame& y) {
x.samples = y.sample_rate;
x.channels = y.channels;
x.format = y.format;
return x;
}
AudioFormat& toAudioFormat(AudioFormat& x, const AVCodecContext& y) {
x.samples = y.sample_rate;
x.channels = y.channels;
x.format = y.sample_fmt;
return x;
}
} // namespace
AudioStream::AudioStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const AudioFormat& format)
: Stream(
inputCtx,
MediaFormat::makeMediaFormat(format, index),
convertPtsToWallTime,
0) {}
AudioStream::~AudioStream() {
if (sampler_) {
sampler_->shutdown();
sampler_.reset();
}
}
int AudioStream::initFormat() {
// set output format
if (format_.format.audio.samples == 0) {
format_.format.audio.samples = codecCtx_->sample_rate;
}
if (format_.format.audio.channels == 0) {
format_.format.audio.channels = codecCtx_->channels;
}
if (format_.format.audio.format == AV_SAMPLE_FMT_NONE) {
format_.format.audio.format = codecCtx_->sample_fmt;
}
return format_.format.audio.samples != 0 &&
format_.format.audio.channels != 0 &&
format_.format.audio.format != AV_SAMPLE_FMT_NONE
? 0
: -1;
}
int AudioStream::copyFrameBytes(ByteStorage* out, bool flush) {
if (!sampler_) {
sampler_ = std::make_unique<AudioSampler>(codecCtx_);
}
// check if input format gets changed
if (flush ? !(sampler_->getInputFormat().audio == *codecCtx_)
: !(sampler_->getInputFormat().audio == *frame_)) {
// - reinit sampler
SamplerParameters params;
params.type = format_.type;
params.out = format_.format;
params.in = FormatUnion();
flush ? toAudioFormat(params.in.audio, *codecCtx_)
: toAudioFormat(params.in.audio, *frame_);
if (!sampler_->init(params)) {
return -1;
}
VLOG(1) << "Set input audio sampler format"
<< ", samples: " << params.in.audio.samples
<< ", channels: " << params.in.audio.channels
<< ", format: " << params.in.audio.format
<< " : output audio sampler format"
<< ", samples: " << format_.format.audio.samples
<< ", channels: " << format_.format.audio.channels
<< ", format: " << format_.format.audio.format;
}
return sampler_->sample(flush ? nullptr : frame_, out);
}
} // namespace ffmpeg
#pragma once
#include "audio_sampler.h"
#include "stream.h"
namespace ffmpeg {
/**
* Class uses FFMPEG library to decode one audio stream.
*/
class AudioStream : public Stream {
public:
AudioStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const AudioFormat& format);
~AudioStream() override;
private:
int initFormat() override;
int copyFrameBytes(ByteStorage* out, bool flush) override;
private:
std::unique_ptr<AudioSampler> sampler_;
};
} // namespace ffmpeg
#include "cc_stream.h"
namespace ffmpeg {
CCStream::CCStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const SubtitleFormat& format)
: SubtitleStream(inputCtx, index, convertPtsToWallTime, format) {
format_.type = TYPE_CC;
}
AVCodec* CCStream::findCodec(AVCodecParameters* params) {
if (params->codec_id == AV_CODEC_ID_BIN_DATA &&
params->codec_type == AVMEDIA_TYPE_DATA) {
// obtain subtitles codec
params->codec_id = AV_CODEC_ID_MOV_TEXT;
params->codec_type = AVMEDIA_TYPE_SUBTITLE;
}
return Stream::findCodec(params);
}
} // namespace ffmpeg
#pragma once
#include "subtitle_stream.h"
namespace ffmpeg {
/**
* Class uses FFMPEG library to decode one closed captions stream.
*/
class CCStream : public SubtitleStream {
public:
CCStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const SubtitleFormat& format);
private:
AVCodec* findCodec(AVCodecParameters* params) override;
};
} // namespace ffmpeg
#include "decoder.h"
#include <c10/util/Logging.h>
#include <future>
#include <iostream>
#include <mutex>
#include "audio_stream.h"
#include "cc_stream.h"
#include "subtitle_stream.h"
#include "util.h"
#include "video_stream.h"
namespace ffmpeg {
namespace {
constexpr size_t kIoBufferSize = 96 * 1024;
constexpr size_t kIoPaddingSize = AV_INPUT_BUFFER_PADDING_SIZE;
constexpr size_t kLogBufferSize = 1024;
int ffmpeg_lock(void** mutex, enum AVLockOp op) {
std::mutex** handle = (std::mutex**)mutex;
switch (op) {
case AV_LOCK_CREATE:
*handle = new std::mutex();
break;
case AV_LOCK_OBTAIN:
(*handle)->lock();
break;
case AV_LOCK_RELEASE:
(*handle)->unlock();
break;
case AV_LOCK_DESTROY:
delete *handle;
break;
}
return 0;
}
bool mapFfmpegType(AVMediaType media, MediaType* type) {
switch (media) {
case AVMEDIA_TYPE_AUDIO:
*type = TYPE_AUDIO;
return true;
case AVMEDIA_TYPE_VIDEO:
*type = TYPE_VIDEO;
return true;
case AVMEDIA_TYPE_SUBTITLE:
*type = TYPE_SUBTITLE;
return true;
case AVMEDIA_TYPE_DATA:
*type = TYPE_CC;
return true;
default:
return false;
}
}
std::unique_ptr<Stream> createStream(
MediaType type,
AVFormatContext* ctx,
int idx,
bool convertPtsToWallTime,
const FormatUnion& format,
int64_t loggingUuid) {
switch (type) {
case TYPE_AUDIO:
return std::make_unique<AudioStream>(
ctx, idx, convertPtsToWallTime, format.audio);
case TYPE_VIDEO:
return std::make_unique<VideoStream>(
// negative loggingUuid indicates video streams.
ctx,
idx,
convertPtsToWallTime,
format.video,
-loggingUuid);
case TYPE_SUBTITLE:
return std::make_unique<SubtitleStream>(
ctx, idx, convertPtsToWallTime, format.subtitle);
case TYPE_CC:
return std::make_unique<CCStream>(
ctx, idx, convertPtsToWallTime, format.subtitle);
default:
return nullptr;
}
}
} // Namespace
/* static */
void Decoder::logFunction(void* avcl, int level, const char* cfmt, va_list vl) {
if (!avcl) {
// Nothing can be done here
return;
}
AVClass* avclass = *reinterpret_cast<AVClass**>(avcl);
if (!avclass) {
// Nothing can be done here
return;
}
Decoder* decoder = nullptr;
if (strcmp(avclass->class_name, "AVFormatContext") == 0) {
AVFormatContext* context = reinterpret_cast<AVFormatContext*>(avcl);
if (context) {
decoder = reinterpret_cast<Decoder*>(context->opaque);
}
} else if (strcmp(avclass->class_name, "AVCodecContext") == 0) {
AVCodecContext* context = reinterpret_cast<AVCodecContext*>(avcl);
if (context) {
decoder = reinterpret_cast<Decoder*>(context->opaque);
}
} else if (strcmp(avclass->class_name, "AVIOContext") == 0) {
AVIOContext* context = reinterpret_cast<AVIOContext*>(avcl);
// only if opaque was assigned to Decoder pointer
if (context && context->read_packet == Decoder::readFunction) {
decoder = reinterpret_cast<Decoder*>(context->opaque);
}
} else if (strcmp(avclass->class_name, "SWResampler") == 0) {
// expect AVCodecContext as parent
if (avclass->parent_log_context_offset) {
AVClass** parent =
*(AVClass***)(((uint8_t*)avcl) + avclass->parent_log_context_offset);
AVCodecContext* context = reinterpret_cast<AVCodecContext*>(parent);
if (context) {
decoder = reinterpret_cast<Decoder*>(context->opaque);
}
}
} else if (strcmp(avclass->class_name, "SWScaler") == 0) {
// cannot find a way to pass context pointer through SwsContext struct
} else {
VLOG(2) << "Unknown context class: " << avclass->class_name;
}
if (decoder != nullptr && decoder->enableLogLevel(level)) {
char buf[kLogBufferSize] = {0};
// Format the line
int* prefix = decoder->getPrintPrefix();
*prefix = 1;
av_log_format_line(avcl, level, cfmt, vl, buf, sizeof(buf) - 1, prefix);
// pass message to the decoder instance
std::string msg(buf);
decoder->logCallback(level, msg);
}
}
bool Decoder::enableLogLevel(int level) const {
return ssize_t(level) <= params_.logLevel;
}
void Decoder::logCallback(int level, const std::string& message) {
LOG(INFO) << "Msg, level: " << level << ", msg: " << message;
}
/* static */
int Decoder::shutdownFunction(void* ctx) {
Decoder* decoder = (Decoder*)ctx;
if (decoder == nullptr) {
return 1;
}
return decoder->shutdownCallback();
}
int Decoder::shutdownCallback() {
return interrupted_ ? 1 : 0;
}
/* static */
int Decoder::readFunction(void* opaque, uint8_t* buf, int size) {
Decoder* decoder = reinterpret_cast<Decoder*>(opaque);
if (decoder == nullptr) {
return 0;
}
return decoder->readCallback(buf, size);
}
/* static */
int64_t Decoder::seekFunction(void* opaque, int64_t offset, int whence) {
Decoder* decoder = reinterpret_cast<Decoder*>(opaque);
if (decoder == nullptr) {
return -1;
}
return decoder->seekCallback(offset, whence);
}
int Decoder::readCallback(uint8_t* buf, int size) {
return seekableBuffer_.read(buf, size, params_.timeoutMs);
}
int64_t Decoder::seekCallback(int64_t offset, int whence) {
return seekableBuffer_.seek(offset, whence, params_.timeoutMs);
}
/* static */
void Decoder::initOnce() {
static std::once_flag flagInit;
std::call_once(flagInit, []() {
av_register_all();
avcodec_register_all();
avformat_network_init();
// register ffmpeg lock manager
av_lockmgr_register(&ffmpeg_lock);
av_log_set_callback(Decoder::logFunction);
av_log_set_level(AV_LOG_ERROR);
VLOG(1) << "Registered ffmpeg libs";
});
}
Decoder::Decoder() {
initOnce();
}
Decoder::~Decoder() {
cleanUp();
}
bool Decoder::init(
const DecoderParameters& params,
DecoderInCallback&& in,
std::vector<DecoderMetadata>* metadata) {
cleanUp();
if ((params.uri.empty() || in) && (!params.uri.empty() || !in)) {
LOG(ERROR) << "Either external URI gets provided"
<< " or explicit input callback";
return false;
}
// set callback and params
params_ = params;
if (!(inputCtx_ = avformat_alloc_context())) {
LOG(ERROR) << "Cannot allocate format context";
return false;
}
AVInputFormat* fmt = nullptr;
int result = 0;
if (in) {
ImageType type = ImageType::UNKNOWN;
if ((result = seekableBuffer_.init(
std::forward<DecoderInCallback>(in),
params_.timeoutMs,
params_.maxSeekableBytes,
params_.isImage ? &type : nullptr)) < 0) {
LOG(ERROR) << "can't initiate seekable buffer";
cleanUp();
return false;
}
if (params_.isImage) {
const char* fmtName = "image2";
switch (type) {
case ImageType::JPEG:
fmtName = "jpeg_pipe";
break;
case ImageType::PNG:
fmtName = "png_pipe";
break;
case ImageType::TIFF:
fmtName = "tiff_pipe";
break;
default:
break;
}
fmt = av_find_input_format(fmtName);
}
const size_t avioCtxBufferSize = kIoBufferSize;
uint8_t* avioCtxBuffer =
(uint8_t*)av_malloc(avioCtxBufferSize + kIoPaddingSize);
if (!avioCtxBuffer) {
LOG(ERROR) << "av_malloc cannot allocate " << avioCtxBufferSize
<< " bytes";
cleanUp();
return false;
}
if (!(avioCtx_ = avio_alloc_context(
avioCtxBuffer,
avioCtxBufferSize,
0,
reinterpret_cast<void*>(this),
&Decoder::readFunction,
nullptr,
result == 1 ? &Decoder::seekFunction : nullptr))) {
LOG(ERROR) << "avio_alloc_context failed";
av_free(avioCtxBuffer);
cleanUp();
return false;
}
inputCtx_->pb = avioCtx_;
inputCtx_->flags |= AVFMT_FLAG_CUSTOM_IO;
}
inputCtx_->opaque = reinterpret_cast<void*>(this);
inputCtx_->interrupt_callback.callback = Decoder::shutdownFunction;
inputCtx_->interrupt_callback.opaque = reinterpret_cast<void*>(this);
// add network timeout
inputCtx_->flags |= AVFMT_FLAG_NONBLOCK;
AVDictionary* options = nullptr;
av_dict_set_int(&options, "analyzeduration", params_.timeoutMs * 1000, 0);
av_dict_set_int(&options, "stimeout", params_.timeoutMs * 1000, 0);
if (params_.listen) {
av_dict_set_int(&options, "listen", 1, 0);
}
interrupted_ = false;
// ffmpeg avformat_open_input call can hang if media source doesn't respond
// set a guard for handle such situations, if requested
std::promise<bool> p;
std::future<bool> f = p.get_future();
std::unique_ptr<std::thread> guard;
if (params_.preventStaleness) {
guard = std::make_unique<std::thread>([&f, this]() {
auto timeout = std::chrono::milliseconds(params_.timeoutMs);
if (std::future_status::timeout == f.wait_for(timeout)) {
LOG(ERROR) << "Cannot open stream within " << params_.timeoutMs
<< " ms";
interrupted_ = true;
}
});
}
if (fmt) {
result = avformat_open_input(&inputCtx_, nullptr, fmt, &options);
} else {
result =
avformat_open_input(&inputCtx_, params_.uri.c_str(), nullptr, &options);
}
av_dict_free(&options);
if (guard) {
p.set_value(true);
guard->join();
guard.reset();
}
if (result < 0 || interrupted_) {
LOG(ERROR) << "avformat_open_input failed, error: "
<< Util::generateErrorDesc(result);
cleanUp();
return false;
}
result = avformat_find_stream_info(inputCtx_, nullptr);
if (result < 0) {
LOG(ERROR) << "avformat_find_stream_info failed, error: "
<< Util::generateErrorDesc(result);
cleanUp();
return false;
}
if (!openStreams(metadata)) {
LOG(ERROR) << "Cannot activate streams";
cleanUp();
return false;
}
onInit();
if (params.startOffset != 0) {
auto offset = params.startOffset <= params.seekAccuracy
? 0
: params.startOffset - params.seekAccuracy;
av_seek_frame(inputCtx_, -1, offset, AVSEEK_FLAG_BACKWARD);
}
VLOG(1) << "Decoder initialized, log level: " << params_.logLevel;
return true;
}
bool Decoder::openStreams(std::vector<DecoderMetadata>* metadata) {
for (int i = 0; i < inputCtx_->nb_streams; i++) {
// - find the corespondent format at params_.formats set
MediaFormat format;
const auto media = inputCtx_->streams[i]->codec->codec_type;
if (!mapFfmpegType(media, &format.type)) {
VLOG(1) << "Stream media: " << media << " at index " << i
<< " gets ignored, unknown type";
continue; // unsupported type
}
// check format
auto it = params_.formats.find(format);
if (it == params_.formats.end()) {
VLOG(1) << "Stream type: " << format.type << " at index: " << i
<< " gets ignored, caller is not interested";
continue; // clients don't care about this media format
}
// do we have stream of this type?
auto stream = findByType(format);
// should we process this stream?
if (it->stream == -2 || // all streams of this type are welcome
(!stream && (it->stream == -1 || it->stream == i))) { // new stream
VLOG(1) << "Stream type: " << format.type << " found, at index: " << i;
auto stream = createStream(
format.type,
inputCtx_,
i,
params_.convertPtsToWallTime,
it->format,
params_.loggingUuid);
CHECK(stream);
if (stream->openCodec(metadata) < 0) {
LOG(ERROR) << "Cannot open codec " << i;
return false;
}
streams_.emplace(i, std::move(stream));
inRange_.set(i, true);
}
}
return true;
}
void Decoder::shutdown() {
cleanUp();
}
void Decoder::interrupt() {
interrupted_ = true;
}
void Decoder::cleanUp() {
if (!interrupted_) {
interrupted_ = true;
}
if (inputCtx_) {
for (auto& stream : streams_) {
// Drain stream buffers.
DecoderOutputMessage msg;
while (msg.payload = nullptr, stream.second->flush(&msg, true) > 0) {
}
stream.second.reset();
}
streams_.clear();
avformat_close_input(&inputCtx_);
}
if (avioCtx_) {
av_freep(&avioCtx_->buffer);
av_freep(&avioCtx_);
}
// reset callback
seekableBuffer_.shutdown();
}
int Decoder::getFrame(size_t workingTimeInMs) {
if (inRange_.none()) {
return ENODATA;
}
// decode frames until cache is full and leave thread
// once decode() method gets called and grab some bytes
// run this method again
// init package
AVPacket avPacket;
av_init_packet(&avPacket);
avPacket.data = nullptr;
avPacket.size = 0;
auto end = std::chrono::steady_clock::now() +
std::chrono::milliseconds(workingTimeInMs);
// return true if elapsed time less than timeout
auto watcher = [end]() -> bool {
return std::chrono::steady_clock::now() <= end;
};
int result = 0;
size_t decodingErrors = 0;
bool decodedFrame = false;
while (!interrupted_ && inRange_.any() && !decodedFrame && watcher()) {
result = av_read_frame(inputCtx_, &avPacket);
if (result == AVERROR(EAGAIN)) {
VLOG(4) << "Decoder is busy...";
std::this_thread::yield();
result = 0; // reset error, EAGAIN is not an error at all
continue;
} else if (result == AVERROR_EOF) {
flushStreams();
VLOG(1) << "End of stream";
result = ENODATA;
break;
} else if (result < 0) {
flushStreams();
LOG(ERROR) << "Error detected: " << Util::generateErrorDesc(result);
break;
}
// get stream
auto stream = findByIndex(avPacket.stream_index);
if (stream == nullptr || !inRange_.test(stream->getIndex())) {
av_packet_unref(&avPacket);
continue;
}
size_t numConsecutiveNoBytes = 0;
// it can be only partial decoding of the package bytes
do {
// decode package
bool gotFrame = false;
bool hasMsg = false;
// packet either got consumed completely or not at all
if ((result = processPacket(stream, &avPacket, &gotFrame, &hasMsg)) < 0) {
LOG(ERROR) << "processPacket failed with code: " << result;
break;
}
if (!gotFrame && params_.maxProcessNoBytes != 0 &&
++numConsecutiveNoBytes > params_.maxProcessNoBytes) {
LOG(ERROR) << "Exceeding max amount of consecutive no bytes";
break;
}
if (result > 0) {
numConsecutiveNoBytes = 0;
}
decodedFrame |= hasMsg;
} while (result == 0);
// post loop check
if (result < 0) {
if (params_.maxPackageErrors != 0 && // check errors
++decodingErrors >= params_.maxPackageErrors) { // reached the limit
LOG(ERROR) << "Exceeding max amount of consecutive package errors";
break;
}
} else {
decodingErrors = 0; // reset on success
}
result = 0;
av_packet_unref(&avPacket);
}
av_packet_unref(&avPacket);
VLOG(2) << "Interrupted loop"
<< ", interrupted_ " << interrupted_ << ", inRange_.any() "
<< inRange_.any() << ", decodedFrame " << decodedFrame << ", result "
<< result;
// loop can be terminated, either by:
// 1. explcitly iterrupted
// 2. terminated by workable timeout
// 3. unrecoverable error or ENODATA (end of stream)
// 4. decoded frames pts are out of the specified range
// 5. success decoded frame
if (interrupted_) {
return EINTR;
}
if (result != 0) {
return result;
}
if (inRange_.none()) {
return ENODATA;
}
return 0;
}
Stream* Decoder::findByIndex(int streamIndex) const {
auto it = streams_.find(streamIndex);
return it != streams_.end() ? it->second.get() : nullptr;
}
Stream* Decoder::findByType(const MediaFormat& format) const {
for (auto& stream : streams_) {
if (stream.second->getMediaFormat().type == format.type) {
return stream.second.get();
}
}
return nullptr;
}
int Decoder::processPacket(
Stream* stream,
AVPacket* packet,
bool* gotFrame,
bool* hasMsg) {
// decode package
int result;
DecoderOutputMessage msg;
msg.payload = params_.headerOnly ? nullptr : createByteStorage(0);
*hasMsg = false;
if ((result = stream->decodePacket(
packet, &msg, params_.headerOnly, gotFrame)) >= 0 &&
*gotFrame) {
// check end offset
bool endInRange =
params_.endOffset <= 0 || msg.header.pts <= params_.endOffset;
inRange_.set(stream->getIndex(), endInRange);
if (endInRange && msg.header.pts >= params_.startOffset) {
*hasMsg = true;
push(std::move(msg));
}
}
return result;
}
void Decoder::flushStreams() {
VLOG(1) << "Flushing streams...";
for (auto& stream : streams_) {
DecoderOutputMessage msg;
while (msg.payload = (params_.headerOnly ? nullptr : createByteStorage(0)),
stream.second->flush(&msg, params_.headerOnly) > 0) {
// check end offset
bool endInRange =
params_.endOffset <= 0 || msg.header.pts <= params_.endOffset;
inRange_.set(stream.second->getIndex(), endInRange);
if (endInRange && msg.header.pts >= params_.startOffset) {
push(std::move(msg));
} else {
msg.payload.reset();
}
}
}
}
int Decoder::decode_all(const DecoderOutCallback& callback) {
int result;
do {
DecoderOutputMessage out;
if (0 == (result = decode(&out, params_.timeoutMs))) {
callback(std::move(out));
}
} while (result == 0);
return result;
}
} // namespace ffmpeg
#pragma once
#include <bitset>
#include <unordered_map>
#include "seekable_buffer.h"
#include "stream.h"
#if defined(_MSC_VER)
#include <BaseTsd.h>
typedef SSIZE_T ssize_t;
#endif
namespace ffmpeg {
/**
* Class uses FFMPEG library to decode media streams.
* Media bytes can be explicitly provided through read-callback
* or fetched internally by FFMPEG library
*/
class Decoder : public MediaDecoder {
public:
Decoder();
~Decoder() override;
// MediaDecoder overrides
bool init(
const DecoderParameters& params,
DecoderInCallback&& in,
std::vector<DecoderMetadata>* metadata) override;
int decode_all(const DecoderOutCallback& callback) override;
void shutdown() override;
void interrupt() override;
protected:
// function does actual work, derived class calls it in working thread
// periodically. On success method returns 0, ENOADATA on EOF, ETIMEDOUT if
// no frames got decoded in the specified timeout time, and error on
// unrecoverable error.
int getFrame(size_t workingTimeInMs = 100);
// Derived class must override method and consume the provided message
virtual void push(DecoderOutputMessage&& buffer) = 0;
// Fires on init call
virtual void onInit() {}
public:
// C-style FFMPEG API requires C/static methods for callbacks
static void logFunction(void* avcl, int level, const char* cfmt, va_list vl);
static int shutdownFunction(void* ctx);
static int readFunction(void* opaque, uint8_t* buf, int size);
static int64_t seekFunction(void* opaque, int64_t offset, int whence);
// can be called by any classes or API
static void initOnce();
int* getPrintPrefix() {
return &printPrefix;
}
private:
// mark below function for a proper invocation
virtual bool enableLogLevel(int level) const;
virtual void logCallback(int level, const std::string& message);
virtual int readCallback(uint8_t* buf, int size);
virtual int64_t seekCallback(int64_t offset, int whence);
virtual int shutdownCallback();
bool openStreams(std::vector<DecoderMetadata>* metadata);
Stream* findByIndex(int streamIndex) const;
Stream* findByType(const MediaFormat& format) const;
int processPacket(
Stream* stream,
AVPacket* packet,
bool* gotFrame,
bool* hasMsg);
void flushStreams();
void cleanUp();
protected:
DecoderParameters params_;
private:
SeekableBuffer seekableBuffer_;
int printPrefix{1};
std::atomic<bool> interrupted_{false};
AVFormatContext* inputCtx_{nullptr};
AVIOContext* avioCtx_{nullptr};
std::unordered_map<ssize_t, std::unique_ptr<Stream>> streams_;
std::bitset<64> inRange_;
};
} // namespace ffmpeg
#pragma once
#include <array>
#include <functional>
#include <memory>
#include <set>
#include <string>
#include <unordered_set>
#include <vector>
extern "C" {
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavformat/avio.h>
#include <libavutil/avutil.h>
#include <libavutil/imgutils.h>
#include <libswresample/swresample.h>
#include "libswscale/swscale.h"
}
namespace ffmpeg {
// bit mask of formats, keep them in form 2^n
enum MediaType : size_t {
TYPE_AUDIO = 1,
TYPE_VIDEO = 2,
TYPE_SUBTITLE = 4,
TYPE_CC = 8, // closed captions from transport streams
};
// audio
struct AudioFormat {
// fields are initialized for the auto detection
// caller can specify some/all of field values if specific output is desirable
bool operator==(const AudioFormat& x) const {
return x.format == format && x.samples == samples && x.channels == channels;
}
size_t samples{0}; // number samples per second (frequency)
size_t channels{0}; // number of channels
long format{-1}; // AVSampleFormat, auto AV_SAMPLE_FMT_NONE
size_t padding[2];
// -- alignment 40 bytes
};
// video
struct VideoFormat {
// fields are initialized for the auto detection
// caller can specify some/all of field values if specific output is desirable
bool operator==(const VideoFormat& x) const {
return x.format == format && x.width == width && x.height == height;
}
/*
When width = 0, height = 0, minDimension = 0, and maxDimension = 0,
keep the orignal frame resolution
When width = 0, height = 0, minDimension != 0, and maxDimension = 0,
keep the aspect ratio and resize the frame so that shorter edge size is
minDimension
When width = 0, height = 0, minDimension = 0, and maxDimension != 0,
keep the aspect ratio and resize the frame so that longer edge size is
maxDimension
When width = 0, height = 0, minDimension != 0, and maxDimension != 0,
resize the frame so that shorter edge size is minDimension, and
longer edge size is maxDimension. The aspect ratio may not be preserved
When width = 0, height != 0, minDimension = 0, and maxDimension = 0,
keep the aspect ratio and resize the frame so that frame height is $height
When width != 0, height = 0, minDimension = 0, and maxDimension = 0,
keep the aspect ratio and resize the frame so that frame width is $width
When width != 0, height != 0, minDimension = 0, and maxDimension = 0,
resize the frame so that frame width and height are set to $width and
$height,
respectively
*/
size_t width{0}; // width in pixels
size_t height{0}; // height in pixels
long format{-1}; // AVPixelFormat, auto AV_PIX_FMT_NONE
size_t minDimension{0}; // choose min dimension and rescale accordingly
size_t maxDimension{0}; // choose max dimension and rescale accordingly
size_t cropImage{0}; // request image crop
// -- alignment 40 bytes
};
// subtitle/cc
struct SubtitleFormat {
long type{0}; // AVSubtitleType, auto SUBTITLE_NONE
size_t padding[4];
// -- alignment 40 bytes
};
union FormatUnion {
FormatUnion() : audio() {}
explicit FormatUnion(int) : video() {}
explicit FormatUnion(char) : subtitle() {}
explicit FormatUnion(double) : subtitle() {}
AudioFormat audio;
VideoFormat video;
SubtitleFormat subtitle;
// -- alignment 40 bytes
};
/*
MediaFormat data structure serves as input/output parameter.
Caller assigns values for input formats
or leave default values for auto detection
For output formats all fields will be set to the specific values
*/
struct MediaFormat {
// for using map/set data structures
bool operator<(const MediaFormat& x) const {
return type < x.type;
}
bool operator==(const MediaFormat& x) const {
if (type != x.type) {
return false;
}
switch (type) {
case TYPE_AUDIO:
return format.audio == x.format.audio;
case TYPE_VIDEO:
return format.video == x.format.video;
case TYPE_SUBTITLE:
case TYPE_CC:
return true;
default:
return false;
}
}
explicit MediaFormat(long s = -1) : type(TYPE_AUDIO), stream(s), format() {}
explicit MediaFormat(int x, long s = -1)
: type(TYPE_VIDEO), stream(s), format(x) {}
explicit MediaFormat(char x, long s = -1)
: type(TYPE_SUBTITLE), stream(s), format(x) {}
explicit MediaFormat(double x, long s = -1)
: type(TYPE_CC), stream(s), format(x) {}
static MediaFormat makeMediaFormat(AudioFormat format, long stream) {
MediaFormat result(stream);
result.format.audio = format;
return result;
}
static MediaFormat makeMediaFormat(VideoFormat format, long stream) {
MediaFormat result(0, stream);
result.format.video = format;
return result;
}
static MediaFormat makeMediaFormat(SubtitleFormat format, long stream) {
MediaFormat result('0', stream);
result.format.subtitle = format;
return result;
}
// format type
MediaType type;
// stream index:
// set -1 for one stream auto detection, -2 for all streams auto detection,
// >= 0, specified stream, if caller knows the stream index (unlikely)
long stream;
// union keeps one of the possible formats, defined by MediaType
FormatUnion format;
};
struct DecoderParameters {
// local file, remote file, http url, rtmp stream uri, etc. anything that
// ffmpeg can recognize
std::string uri;
// timeout on getting bytes for decoding
size_t timeoutMs{1000};
// logging level, default AV_LOG_PANIC
long logLevel{0};
// when decoder would give up, 0 means never
size_t maxPackageErrors{0};
// max allowed consecutive times no bytes are processed. 0 means for infinite.
size_t maxProcessNoBytes{0};
// start offset (us)
long startOffset{0};
// end offset (us)
long endOffset{-1};
// logging id
int64_t loggingUuid{0};
// internal max seekable buffer size
size_t maxSeekableBytes{0};
// adjust header pts to the epoch time
bool convertPtsToWallTime{false};
// indicate if input stream is an encoded image
bool isImage{false};
// listen and wait for new rtmp stream
bool listen{false};
// don't copy frame body, only header
bool headerOnly{false};
// interrupt init method on timeout
bool preventStaleness{true};
// seek tolerated accuracy (us)
double seekAccuracy{1000000.0};
// what media types should be processed, default none
std::set<MediaFormat> formats;
// can be used for asynchronous decoders
size_t cacheSize{8192}; // mow many bytes to cache before stop reading bytes
size_t cacheTimeoutMs{1000}; // timeout on bytes writing
bool enforceCacheSize{false}; // drop output frames if cache is full
bool mergeAudioMessages{false}; // combine collocated audio messages together
};
struct DecoderHeader {
// message id, from 0 till ...
size_t seqno{0};
// decoded timestamp in microseconds from either beginning of the stream or
// from epoch time, see DecoderParameters::convertPtsToWallTime
long pts{0};
// decoded key frame
size_t keyFrame{0};
// frames per second, valid only for video streams
double fps{0};
// format specifies what kind frame is in a payload
MediaFormat format;
};
// Abstract interface ByteStorage class
class ByteStorage {
public:
virtual ~ByteStorage() = default;
// makes sure that buffer has at least n bytes available for writing, if not
// storage must reallocate memory.
virtual void ensure(size_t n) = 0;
// caller must not to write more than available bytes
virtual uint8_t* writableTail() = 0;
// caller confirms that n bytes were written to the writable tail
virtual void append(size_t n) = 0;
// caller confirms that n bytes were read from the read buffer
virtual void trim(size_t n) = 0;
// gives an access to the beginning of the read buffer
virtual const uint8_t* data() const = 0;
// returns the stored size in bytes
virtual size_t length() const = 0;
// returns available capacity for writable tail
virtual size_t tail() const = 0;
// clears content, keeps capacity
virtual void clear() = 0;
};
struct DecoderOutputMessage {
DecoderHeader header;
std::unique_ptr<ByteStorage> payload;
};
/*
* External provider of the ecnoded bytes, specific implementation is left for
* different use cases, like file, memory, external network end-points, etc.
* Normally input/output parameter @out set to valid, not null buffer pointer,
* which indicates "read" call, however there are "seek" modes as well.
* @out != nullptr => read from the current offset, @whence got ignored,
* @size bytes to read => return number bytes got read, 0 if no more bytes
* available, < 0 on error.
* @out == nullptr, @timeoutMs == 0 => does provider support "seek"
* capability in a first place? @size & @whence got ignored, return 0 on
* success, < 0 if "seek" mode is not supported.
* @out == nullptr, @timeoutMs != 0 => normal seek call
* offset == @size, i.e. @whence = [SEEK_SET, SEEK_CUR, SEEK_END, AVSEEK_SIZE)
* return < 0 on error, position if @whence = [SEEK_SET, SEEK_CUR, SEEK_END],
* length of buffer if @whence = [AVSEEK_SIZE].
*/
using DecoderInCallback =
std::function<int(uint8_t* out, int size, int whence, uint64_t timeoutMs)>;
using DecoderOutCallback = std::function<void(DecoderOutputMessage&&)>;
struct DecoderMetadata {
// time base numerator
long num{0};
// time base denominator
long den{1};
// duration of the stream, in miscroseconds, if available
long duration{-1};
// frames per second, valid only for video streams
double fps{0};
// format specifies what kind frame is in a payload
MediaFormat format;
};
/**
* Abstract class for decoding media bytes
* It has two diffrent modes. Internal media bytes retrieval for given uri and
* external media bytes provider in case of memory streams
*/
class MediaDecoder {
public:
virtual ~MediaDecoder() = default;
/**
* Initializes media decoder with parameters,
* calls callback when media bytes are available.
* Media bytes get fetched internally from provided URI
* or invokes provided input callback to get media bytes.
* Input callback must be empty for the internal media provider
* Caller can provide non-null pointer for the input container
* if headers to obtain the streams metadata (optional)
*/
virtual bool init(
const DecoderParameters& params,
DecoderInCallback&& in,
std::vector<DecoderMetadata>* metadata) = 0;
/**
* Polls available decoded one frame from decoder
* Returns error code, 0 - for success
*/
virtual int decode(DecoderOutputMessage* out, uint64_t timeoutMs) = 0;
/**
* Polls available decoded bytes from decoder, till EOF or error
*/
virtual int decode_all(const DecoderOutCallback& callback) = 0;
/**
* Stops calling callback, releases resources
*/
virtual void shutdown() = 0;
/**
* Interrupts whatever decoder is doing at any time
*/
virtual void interrupt() = 0;
/**
* Factory to create ByteStorage class instances, particular implementation is
* left to the derived class. Caller provides the initially allocated size
*/
virtual std::unique_ptr<ByteStorage> createByteStorage(size_t n) = 0;
};
struct SamplerParameters {
MediaType type{TYPE_AUDIO};
FormatUnion in;
FormatUnion out;
int64_t loggingUuid{0};
};
/**
* Abstract class for sampling media bytes
*/
class MediaSampler {
public:
virtual ~MediaSampler() = default;
/**
* Initializes media sampler with parameters
*/
virtual bool init(const SamplerParameters& params) = 0;
/**
* Samples media bytes
* Returns error code < 0, or >=0 - for success, indicating number of bytes
* processed.
* set @in to null for flushing data
*/
virtual int sample(const ByteStorage* in, ByteStorage* out) = 0;
/**
* Releases resources
*/
virtual void shutdown() = 0;
/*
* Returns media type
*/
MediaType getMediaType() const {
return params_.type;
}
/*
* Returns formats
*/
FormatUnion getInputFormat() const {
return params_.in;
}
FormatUnion getOutFormat() const {
return params_.out;
}
protected:
SamplerParameters params_;
};
} // namespace ffmpeg
#include "memory_buffer.h"
#include <c10/util/Logging.h>
namespace ffmpeg {
MemoryBuffer::MemoryBuffer(const uint8_t* buffer, size_t size)
: buffer_(buffer), len_(size) {}
int MemoryBuffer::read(uint8_t* buf, int size) {
if (pos_ < len_) {
auto available = std::min(int(len_ - pos_), size);
memcpy(buf, buffer_ + pos_, available);
pos_ += available;
return available;
}
return 0;
}
int64_t MemoryBuffer::seek(int64_t offset, int whence) {
if (whence & AVSEEK_SIZE) {
return len_;
}
// remove force flag
whence &= ~AVSEEK_FORCE;
switch (whence) {
case SEEK_SET:
if (offset >= 0 && offset <= len_) {
pos_ = offset;
}
break;
case SEEK_END:
if (len_ + offset >= 0 && len_ + offset <= len_) {
pos_ = len_ + offset;
}
break;
case SEEK_CUR:
if (pos_ + offset > 0 && pos_ + offset <= len_) {
pos_ += offset;
}
break;
default:
LOG(ERROR) << "Unknown whence flag gets provided: " << whence;
}
return pos_;
}
/* static */
DecoderInCallback MemoryBuffer::getCallback(
const uint8_t* buffer,
size_t size) {
MemoryBuffer object(buffer, size);
return
[object](uint8_t* out, int size, int whence, uint64_t timeoutMs) mutable
-> int {
if (out) { // see defs.h file
// read mode
return object.read(out, size);
}
// seek mode
if (!timeoutMs) {
// seek capabilty, yes - supported
return 0;
}
return object.seek(size, whence);
};
}
} // namespace ffmpeg
#pragma once
#include "defs.h"
namespace ffmpeg {
/**
* Class uses external memory buffer and implements a seekable interface.
*/
class MemoryBuffer {
public:
explicit MemoryBuffer(const uint8_t* buffer, size_t size);
int64_t seek(int64_t offset, int whence);
int read(uint8_t* buf, int size);
// static constructor for decoder callback.
static DecoderInCallback getCallback(const uint8_t* buffer, size_t size);
private:
const uint8_t* buffer_; // set at construction time
long pos_{0}; // current position
long len_{0}; // bytes in buffer
};
} // namespace ffmpeg
#include "seekable_buffer.h"
#include <c10/util/Logging.h>
#include <chrono>
#include "memory_buffer.h"
namespace ffmpeg {
int SeekableBuffer::init(
DecoderInCallback&& in,
uint64_t timeoutMs,
size_t maxSeekableBytes,
ImageType* type) {
shutdown();
isSeekable_ = in(nullptr, 0, 0, 0) == 0;
if (isSeekable_) { // seekable
if (type) {
if (!readBytes(in, 8, timeoutMs)) {
return -1;
}
setImageType(type);
end_ = 0;
eof_ = false;
std::vector<uint8_t>().swap(buffer_);
// reset callback
if (in(nullptr, 0, SEEK_SET, timeoutMs)) {
return -1;
}
}
inCallback_ = std::forward<DecoderInCallback>(in);
return 1;
}
if (!readBytes(in, maxSeekableBytes + (type ? 8 : 0), timeoutMs)) {
return -1;
}
if (type) {
setImageType(type);
}
if (eof_) {
end_ = 0;
eof_ = false;
// reuse MemoryBuffer functionality
inCallback_ = MemoryBuffer::getCallback(buffer_.data(), buffer_.size());
isSeekable_ = true;
return 1;
}
inCallback_ = std::forward<DecoderInCallback>(in);
return 0;
}
bool SeekableBuffer::readBytes(
DecoderInCallback& in,
size_t maxBytes,
uint64_t timeoutMs) {
// Resize to th minimum 4K page or less
buffer_.resize(std::min(maxBytes, size_t(4 * 1024UL)));
end_ = 0;
eof_ = false;
auto end =
std::chrono::steady_clock::now() + std::chrono::milliseconds(timeoutMs);
auto watcher = [end]() -> bool {
return std::chrono::steady_clock::now() <= end;
};
bool hasTime = true;
while (!eof_ && end_ < maxBytes && (hasTime = watcher())) {
// lets read all bytes into available buffer
auto res = in(buffer_.data() + end_, buffer_.size() - end_, 0, timeoutMs);
if (res > 0) {
end_ += res;
if (end_ == buffer_.size()) {
buffer_.resize(std::min(size_t(end_ * 4UL), maxBytes));
}
} else if (res == 0) {
eof_ = true;
} else {
// error
return false;
}
}
buffer_.resize(end_);
return hasTime;
}
void SeekableBuffer::setImageType(ImageType* type) {
if (buffer_.size() > 2 && buffer_[0] == 0xFF && buffer_[1] == 0xD8 &&
buffer_[2] == 0xFF) {
*type = ImageType::JPEG;
} else if (
buffer_.size() > 3 && buffer_[1] == 'P' && buffer_[2] == 'N' &&
buffer_[3] == 'G') {
*type = ImageType::PNG;
} else if (
buffer_.size() > 1 &&
((buffer_[0] == 0x49 && buffer_[1] == 0x49) ||
(buffer_[0] == 0x4D && buffer_[1] == 0x4D))) {
*type = ImageType::TIFF;
} else {
*type = ImageType::UNKNOWN;
}
}
int SeekableBuffer::read(uint8_t* buf, int size, uint64_t timeoutMs) {
if (isSeekable_) {
return inCallback_(buf, size, 0, timeoutMs);
}
if (pos_ < end_) {
// read cached bytes for non-seekable callback
auto available = std::min(int(end_ - pos_), size);
memcpy(buf, buffer_.data() + pos_, available);
pos_ += available;
return available;
} else if (!eof_) {
// normal sequential read (see defs.h file), i.e. @buf != null
auto res = inCallback_(buf, size, 0, timeoutMs); // read through
eof_ = res == 0;
return res;
} else {
return 0;
}
}
int64_t SeekableBuffer::seek(int64_t offset, int whence, uint64_t timeoutMs) {
return inCallback_(nullptr, offset, whence, timeoutMs);
}
void SeekableBuffer::shutdown() {
pos_ = end_ = 0;
eof_ = false;
std::vector<uint8_t>().swap(buffer_);
inCallback_ = nullptr;
}
} // namespace ffmpeg
#pragma once
#include "defs.h"
namespace ffmpeg {
/**
* Class uses internal buffer to store initial size bytes as a seekable cache
* from Media provider and let ffmpeg to seek and read bytes from cache
* and beyond - reading bytes directly from Media provider
*/
enum class ImageType {
UNKNOWN = 0,
JPEG = 1,
PNG = 2,
TIFF = 3,
};
class SeekableBuffer {
public:
// @type is optional, not nullptr only is image detection required
// \returns 1 is buffer seekable, 0 - if not seekable, < 0 on error
int init(
DecoderInCallback&& in,
uint64_t timeoutMs,
size_t maxSeekableBytes,
ImageType* type);
int read(uint8_t* buf, int size, uint64_t timeoutMs);
int64_t seek(int64_t offset, int whence, uint64_t timeoutMs);
void shutdown();
private:
bool readBytes(DecoderInCallback& in, size_t maxBytes, uint64_t timeoutMs);
void setImageType(ImageType* type);
private:
DecoderInCallback inCallback_;
std::vector<uint8_t> buffer_; // resized at init time
long pos_{0}; // current position (SEEK_CUR iff pos_ < end_)
long end_{0}; // current buffer size
bool eof_{0}; // indicates the EOF
bool isSeekable_{false}; // is callback seekable
};
} // namespace ffmpeg
#include "stream.h"
#include <c10/util/Logging.h>
#include "util.h"
namespace ffmpeg {
const AVRational timeBaseQ = AVRational{1, AV_TIME_BASE};
Stream::Stream(
AVFormatContext* inputCtx,
MediaFormat format,
bool convertPtsToWallTime,
int64_t loggingUuid)
: inputCtx_(inputCtx),
format_(format),
convertPtsToWallTime_(convertPtsToWallTime),
loggingUuid_(loggingUuid) {}
Stream::~Stream() {
if (frame_) {
av_free(frame_); // Copyright 2004-present Facebook. All Rights Reserved.
}
if (codecCtx_) {
avcodec_free_context(&codecCtx_);
}
}
AVCodec* Stream::findCodec(AVCodecParameters* params) {
return avcodec_find_decoder(params->codec_id);
}
int Stream::openCodec(std::vector<DecoderMetadata>* metadata) {
AVStream* steam = inputCtx_->streams[format_.stream];
AVCodec* codec = findCodec(steam->codecpar);
if (!codec) {
LOG(ERROR) << "LoggingUuid #" << loggingUuid_
<< ", avcodec_find_decoder failed for codec_id: "
<< int(steam->codecpar->codec_id);
return AVERROR(EINVAL);
}
if (!(codecCtx_ = avcodec_alloc_context3(codec))) {
LOG(ERROR) << "LoggingUuid #" << loggingUuid_
<< ", avcodec_alloc_context3 failed";
return AVERROR(ENOMEM);
}
int ret;
// Copy codec parameters from input stream to output codec context
if ((ret = avcodec_parameters_to_context(codecCtx_, steam->codecpar)) < 0) {
LOG(ERROR) << "LoggingUuid #" << loggingUuid_
<< ", avcodec_parameters_to_context failed";
return ret;
}
// after avcodec_open2, value of codecCtx_->time_base is NOT meaningful
if ((ret = avcodec_open2(codecCtx_, codec, nullptr)) < 0) {
LOG(ERROR) << "LoggingUuid #" << loggingUuid_
<< ", avcodec_open2 failed: " << Util::generateErrorDesc(ret);
avcodec_free_context(&codecCtx_);
codecCtx_ = nullptr;
return ret;
}
frame_ = av_frame_alloc();
switch (format_.type) {
case TYPE_VIDEO:
fps_ = av_q2d(av_guess_frame_rate(inputCtx_, steam, nullptr));
break;
case TYPE_AUDIO:
fps_ = codecCtx_->sample_rate;
break;
default:
fps_ = 30.0;
}
if ((ret = initFormat())) {
LOG(ERROR) << "initFormat failed, type: " << format_.type;
}
if (metadata) {
DecoderMetadata header;
header.format = format_;
header.fps = fps_;
header.num = steam->time_base.num;
header.den = steam->time_base.den;
header.duration =
av_rescale_q(steam->duration, steam->time_base, timeBaseQ);
metadata->push_back(header);
}
return ret;
}
int Stream::analyzePacket(const AVPacket* packet, bool* gotFrame) {
int consumed = 0;
int result = avcodec_send_packet(codecCtx_, packet);
if (result == AVERROR(EAGAIN)) {
*gotFrame = false; // no bytes get consumed, fetch frame
} else if (result == AVERROR_EOF) {
*gotFrame = false; // more than one flush packet
if (packet) {
// got packet after flush, this is an error
return result;
}
} else if (result < 0) {
LOG(ERROR) << "avcodec_send_packet failed, err: "
<< Util::generateErrorDesc(result);
return result; // error
} else {
consumed = packet ? packet->size : 0; // all bytes get consumed
}
result = avcodec_receive_frame(codecCtx_, frame_);
if (result >= 0) {
*gotFrame = true; // frame is available
} else if (result == AVERROR(EAGAIN)) {
*gotFrame = false; // no frames at this time, needs more packets
if (!consumed) {
// precaution, if no packages got consumed and no frames are available
return result;
}
} else if (result == AVERROR_EOF) {
*gotFrame = false; // the last frame has been flushed
// precaution, if no more frames are available assume we consume all bytes
consumed = 0;
} else { // error
LOG(ERROR) << "avcodec_receive_frame failed, err: "
<< Util::generateErrorDesc(result);
return result;
}
return consumed;
}
int Stream::decodePacket(
const AVPacket* packet,
DecoderOutputMessage* out,
bool headerOnly,
bool* hasMsg) {
int consumed;
bool gotFrame = false;
*hasMsg = false;
if ((consumed = analyzePacket(packet, &gotFrame)) >= 0 &&
(packet == nullptr || gotFrame)) {
int result;
if ((result = getMessage(out, !gotFrame, headerOnly)) < 0) {
return result; // report error
}
*hasMsg = result > 0;
}
return consumed;
}
int Stream::flush(DecoderOutputMessage* out, bool headerOnly) {
bool hasMsg = false;
int result = decodePacket(nullptr, out, headerOnly, &hasMsg);
if (result < 0) {
avcodec_flush_buffers(codecCtx_);
return result;
}
if (!hasMsg) {
avcodec_flush_buffers(codecCtx_);
return 0;
}
return 1;
}
int Stream::getMessage(DecoderOutputMessage* out, bool flush, bool headerOnly) {
if (flush) {
// only flush of audio frames makes sense
if (format_.type == TYPE_AUDIO) {
int processed = 0;
size_t total = 0;
// grab all audio bytes by chunks
do {
if ((processed = copyFrameBytes(out->payload.get(), flush)) < 0) {
return processed;
}
total += processed;
} while (processed);
if (total) {
// set header if message bytes are available
setHeader(&out->header, flush);
return 1;
}
}
return 0;
} else {
if (format_.type == TYPE_AUDIO) {
int processed = 0;
if ((processed = copyFrameBytes(out->payload.get(), flush)) < 0) {
return processed;
}
if (processed) {
// set header if message bytes are available
setHeader(&out->header, flush);
return 1;
}
return 0;
} else {
// set header
setHeader(&out->header, flush);
if (headerOnly) {
// Only header is requisted
return 1;
}
return copyFrameBytes(out->payload.get(), flush);
}
}
}
void Stream::setHeader(DecoderHeader* header, bool flush) {
header->seqno = numGenerator_++;
setFramePts(header, flush);
if (convertPtsToWallTime_) {
keeper_.adjust(header->pts);
}
header->format = format_;
header->keyFrame = 0;
header->fps = std::numeric_limits<double>::quiet_NaN();
}
void Stream::setFramePts(DecoderHeader* header, bool flush) {
if (flush) {
header->pts = nextPts_; // already in us
} else {
header->pts = av_frame_get_best_effort_timestamp(frame_);
if (header->pts == AV_NOPTS_VALUE) {
header->pts = nextPts_;
} else {
header->pts = av_rescale_q(
header->pts,
inputCtx_->streams[format_.stream]->time_base,
timeBaseQ);
}
switch (format_.type) {
case TYPE_AUDIO:
nextPts_ = header->pts + frame_->nb_samples * AV_TIME_BASE / fps_;
break;
case TYPE_VIDEO:
nextPts_ = header->pts + AV_TIME_BASE / fps_;
break;
default:
nextPts_ = header->pts;
}
}
}
} // namespace ffmpeg
#pragma once
#include <atomic>
#include "defs.h"
#include "time_keeper.h"
namespace ffmpeg {
/**
* Class uses FFMPEG library to decode one media stream (audio or video).
*/
class Stream {
public:
Stream(
AVFormatContext* inputCtx,
MediaFormat format,
bool convertPtsToWallTime,
int64_t loggingUuid);
virtual ~Stream();
// returns 0 - on success or negative error
int openCodec(std::vector<DecoderMetadata>* metadata);
// returns 1 - if packet got consumed, 0 - if it's not, and < 0 on error
int decodePacket(
const AVPacket* packet,
DecoderOutputMessage* out,
bool headerOnly,
bool* hasMsg);
// returns stream index
int getIndex() const {
return format_.stream;
}
// returns 1 - if message got a payload, 0 - if it's not, and < 0 on error
int flush(DecoderOutputMessage* out, bool headerOnly);
// return media format
MediaFormat getMediaFormat() const {
return format_;
}
protected:
virtual int initFormat() = 0;
// returns number processed bytes from packet, or negative error
virtual int analyzePacket(const AVPacket* packet, bool* gotFrame);
// returns number processed bytes from packet, or negative error
virtual int copyFrameBytes(ByteStorage* out, bool flush) = 0;
// sets output format
virtual void setHeader(DecoderHeader* header, bool flush);
// set frame pts
virtual void setFramePts(DecoderHeader* header, bool flush);
// finds codec
virtual AVCodec* findCodec(AVCodecParameters* params);
private:
// returns 1 - if message got a payload, 0 - if it's not, and < 0 on error
int getMessage(DecoderOutputMessage* out, bool flush, bool headerOnly);
protected:
AVFormatContext* const inputCtx_;
MediaFormat format_;
const bool convertPtsToWallTime_;
int64_t loggingUuid_;
AVCodecContext* codecCtx_{nullptr};
AVFrame* frame_{nullptr};
std::atomic<size_t> numGenerator_{0};
TimeKeeper keeper_;
// estimated next frame pts for flushing the last frame
int64_t nextPts_{0};
double fps_{30.};
};
} // namespace ffmpeg
#include "subtitle_sampler.h"
#include <c10/util/Logging.h>
#include "util.h"
namespace ffmpeg {
SubtitleSampler::~SubtitleSampler() {
cleanUp();
}
void SubtitleSampler::shutdown() {
cleanUp();
}
bool SubtitleSampler::init(const SamplerParameters& params) {
cleanUp();
// set formats
params_ = params;
return true;
}
int SubtitleSampler::sample(AVSubtitle* sub, ByteStorage* out) {
if (!sub || !out) {
return 0; // flush
}
out->ensure(Util::size(*sub));
return Util::serialize(*sub, out);
}
int SubtitleSampler::sample(const ByteStorage* in, ByteStorage* out) {
if (in && out) {
// Get a writable copy
if (size_t len = in->length()) {
out->ensure(len);
memcpy(out->writableTail(), in->data(), len);
}
return out->length();
}
return 0;
}
void SubtitleSampler::cleanUp() {}
} // namespace ffmpeg
#pragma once
#include "defs.h"
namespace ffmpeg {
/**
* Class transcode audio frames from one format into another
*/
class SubtitleSampler : public MediaSampler {
public:
SubtitleSampler() = default;
~SubtitleSampler() override;
bool init(const SamplerParameters& params) override;
int sample(const ByteStorage* in, ByteStorage* out) override;
void shutdown() override;
// returns number processed/scaling bytes
int sample(AVSubtitle* sub, ByteStorage* out);
// helper serialization/deserialization methods
static void serialize(const AVSubtitle& sub, ByteStorage* out);
static bool deserialize(const ByteStorage& buf, AVSubtitle* sub);
private:
// close resources
void cleanUp();
};
} // namespace ffmpeg
#include "subtitle_stream.h"
#include <c10/util/Logging.h>
#include <limits>
#include "util.h"
namespace ffmpeg {
const AVRational timeBaseQ = AVRational{1, AV_TIME_BASE};
SubtitleStream::SubtitleStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const SubtitleFormat& format)
: Stream(
inputCtx,
MediaFormat::makeMediaFormat(format, index),
convertPtsToWallTime,
0) {
memset(&sub_, 0, sizeof(sub_));
}
void SubtitleStream::releaseSubtitle() {
if (sub_.release) {
avsubtitle_free(&sub_);
memset(&sub_, 0, sizeof(sub_));
}
}
SubtitleStream::~SubtitleStream() {
releaseSubtitle();
sampler_.shutdown();
}
int SubtitleStream::initFormat() {
if (!codecCtx_->subtitle_header) {
LOG(ERROR) << "No subtitle header found";
} else {
VLOG(1) << "Subtitle header found!";
}
return 0;
}
int SubtitleStream::analyzePacket(const AVPacket* packet, bool* gotFrame) {
// clean-up
releaseSubtitle();
// check flush packet
AVPacket avPacket;
av_init_packet(&avPacket);
avPacket.data = nullptr;
avPacket.size = 0;
auto pkt = packet ? *packet : avPacket;
int gotFramePtr = 0;
int result = avcodec_decode_subtitle2(codecCtx_, &sub_, &gotFramePtr, &pkt);
if (result < 0) {
LOG(ERROR) << "avcodec_decode_subtitle2 failed, err: "
<< Util::generateErrorDesc(result);
return result;
} else if (result == 0) {
result = pkt.size; // discard the rest of the package
}
sub_.release = gotFramePtr;
*gotFrame = gotFramePtr > 0;
// set proper pts in us
if (gotFramePtr) {
sub_.pts = av_rescale_q(
pkt.pts, inputCtx_->streams[format_.stream]->time_base, timeBaseQ);
}
return result;
}
int SubtitleStream::copyFrameBytes(ByteStorage* out, bool flush) {
return sampler_.sample(flush ? nullptr : &sub_, out);
}
void SubtitleStream::setFramePts(DecoderHeader* header, bool) {
header->pts = sub_.pts; // already in us
}
} // namespace ffmpeg
#pragma once
#include "stream.h"
#include "subtitle_sampler.h"
namespace ffmpeg {
/**
* Class uses FFMPEG library to decode one subtitle stream.
*/
struct AVSubtitleKeeper : AVSubtitle {
int64_t release{0};
};
class SubtitleStream : public Stream {
public:
SubtitleStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const SubtitleFormat& format);
~SubtitleStream() override;
protected:
void setFramePts(DecoderHeader* header, bool flush) override;
private:
int initFormat() override;
int analyzePacket(const AVPacket* packet, bool* gotFrame) override;
int copyFrameBytes(ByteStorage* out, bool flush) override;
void releaseSubtitle();
private:
SubtitleSampler sampler_;
AVSubtitleKeeper sub_;
};
} // namespace ffmpeg
#include "sync_decoder.h"
#include <c10/util/Logging.h>
namespace ffmpeg {
SyncDecoder::AVByteStorage::AVByteStorage(size_t n) {
ensure(n);
}
SyncDecoder::AVByteStorage::~AVByteStorage() {
av_free(buffer_);
}
void SyncDecoder::AVByteStorage::ensure(size_t n) {
if (tail() < n) {
capacity_ = offset_ + length_ + n;
buffer_ = static_cast<uint8_t*>(av_realloc(buffer_, capacity_));
}
}
uint8_t* SyncDecoder::AVByteStorage::writableTail() {
CHECK_LE(offset_ + length_, capacity_);
return buffer_ + offset_ + length_;
}
void SyncDecoder::AVByteStorage::append(size_t n) {
CHECK_LE(n, tail());
length_ += n;
}
void SyncDecoder::AVByteStorage::trim(size_t n) {
CHECK_LE(n, length_);
offset_ += n;
length_ -= n;
}
const uint8_t* SyncDecoder::AVByteStorage::data() const {
return buffer_ + offset_;
}
size_t SyncDecoder::AVByteStorage::length() const {
return length_;
}
size_t SyncDecoder::AVByteStorage::tail() const {
CHECK_LE(offset_ + length_, capacity_);
return capacity_ - offset_ - length_;
}
void SyncDecoder::AVByteStorage::clear() {
offset_ = 0;
length_ = 0;
}
std::unique_ptr<ByteStorage> SyncDecoder::createByteStorage(size_t n) {
return std::make_unique<AVByteStorage>(n);
}
void SyncDecoder::onInit() {
eof_ = false;
queue_.clear();
}
int SyncDecoder::decode(DecoderOutputMessage* out, uint64_t timeoutMs) {
if (eof_ && queue_.empty()) {
return ENODATA;
}
if (queue_.empty()) {
int result = getFrame(timeoutMs);
// assign EOF
eof_ = result == ENODATA;
// check unrecoverable error, any error but ENODATA
if (result && result != ENODATA) {
return result;
}
// still empty
if (queue_.empty()) {
if (eof_) {
return ENODATA;
} else {
LOG(INFO) << "Queue is empty";
return ETIMEDOUT;
}
}
}
*out = std::move(queue_.front());
queue_.pop_front();
return 0;
}
void SyncDecoder::push(DecoderOutputMessage&& buffer) {
queue_.push_back(std::move(buffer));
}
} // namespace ffmpeg
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment