Unverified Commit 32e16805 authored by Francisco Massa's avatar Francisco Massa Committed by GitHub
Browse files

Update video reader to use new decoder (#1978)

* Base decoder for video. (#1747)

Summary:
Pull Request resolved: https://github.com/pytorch/vision/pull/1747

Pull Request resolved: https://github.com/pytorch/vision/pull/1746

Added the implementation of ffmpeg based decoder with functionality that can be used in VUE and TorchVision.

Reviewed By: fmassa

Differential Revision: D19358914

fbshipit-source-id: abb672f89bfaca6351dda2354f0d35cf8e47fa0f

* Integrated base decoder into VideoReader class and video_utils.py (#1766)

Summary:
Pull Request resolved: https://github.com/pytorch/vision/pull/1766

Replaced FfmpegDecoder (incompativle with VUE) by base decoder (compatible with VUE).
Modified python utilities video_utils.py for internal simplification. Public interface got preserved.

Reviewed By: fmassa

Differential Revision: D19415903

fbshipit-source-id: 4d7a0158bd77bac0a18732fe4183fdd9a57f6402

* Optimizating base decoder performance. (#1852)

Summary:
Pull Request resolved: https://github.com/pytorch/vision/pull/1852

Changed base decoder internals for a faster clip processing.

Reviewed By: stephenyan1231

Differential Revision: D19748379

fbshipit-source-id: 58a435f0a0b25545e7bd1a3edb0b1d558176a806

* Minor fix and decoder class members access.

Summary:
Found and fix a bug in cropping algorithm (simple mistyping).
Also derived classes need access to some decoder class members, like initialization parameters - make it protected.

Reviewed By: stephenyan1231, fmassa

Differential Revision: D19895076

fbshipit-source-id: 691336c8e18526b085ae5792ac3546bc387a6db9

* Added missing header for less dependencies. (#1898)

Summary:
Pull Request resolved: https://github.com/pytorch/vision/pull/1898

Include streams/samplers shouldn't depend on decoder headers. Add dependencies directly to the place where they are required.

Reviewed By: stephenyan1231

Differential Revision: D19911404

fbshipit-source-id: ef322a053708405c02cee4562b456b1602fb12fc

* Implemented VUE Asynchronous Decoder

Summary: For Mothership we have found that asynchronous decoder provides a better performance.

Differential Revision: D20026194

fbshipit-source-id: 627b91844b4e3f917002031dd32cb19c239f4ba8

* fix a bug in API read_video_from_memory (#1942)

Summary:
Pull Request resolved: https://github.com/pytorch/vision/pull/1942

In D18720474, it introduces a bug in `read_video_from_memory` API. Thank weiyaowang for reporting it.

Reviewed By: weiyaowang

Differential Revision: D20270179

fbshipit-source-id: 66348c99a5ad1f9129b90e934524ddfaad59de03

* extend decoder to support new video_max_dimension argument (#1924)

Summary:
Pull Request resolved: https://github.com/pytorch/vision/pull/1924

Extend `video reader` decoder python API in Torchvision to support a new argument `video_max_dimension`. This enables the new video decoding use cases. When setting `video_width=0`, `video_height=0`, `video_min_dimension != 0`, and `video_max_dimension != 0`, we can rescale the video clips so that its spatial resolution (height, width) becomes
 - (video_min_dimension, video_max_dimension) if original height < original width
 - (video_max_dimension, video_min_dimension) if original height >= original width

This is useful at video model testing stage, where we perform fully convolution evaluation and take entire video frames without cropping as input. Previously, for instance we can only set `video_width=0`, `video_height=0`, `video_min_dimension = 128`, which will preserve aspect ratio. In production dataset, there are a small number of videos where aspect ratio is either extremely large or small, and when the shorter edge is rescaled to 128, the longer edge is still large. This will easily cause GPU memory OOM when we sample multiple video clips, and put them in a single minibatch.

Now, we can set (for instance) `video_width=0`, `video_height=0`, `video_min_dimension = 128` and `video_max_dimension = 171` so that the rescale resolution is either (128, 171) or (171, 128) depending on whether original height is larger than original width. Thus, we are less likely to have gpu OOM because the spatial size of video clips is determined.

Reviewed By: putivsky

Differential Revision: D20182529

fbshipit-source-id: f9c40afb7590e7c45e6908946597141efa35f57c

* Fixing samplers initialization (#1967)

Summary:
Pull Request resolved: https://github.com/pytorch/vision/pull/1967



No-ops for torchvision diff, which fixes samplers.

Differential Revision: D20397218

fbshipit-source-id: 6dc4d04364f305fbda7ca4f67a25ceecd73d0f20

* Exclude C++ test files
Co-authored-by: default avatarYuri Putivsky <yuri@fb.com>
Co-authored-by: default avatarZhicheng Yan <zyan3@fb.com>
parent 8b9859d3
......@@ -152,17 +152,6 @@ def get_extensions():
include_dirs = [extensions_dir]
ffmpeg_exe = distutils.spawn.find_executable('ffmpeg')
has_ffmpeg = ffmpeg_exe is not None
if has_ffmpeg:
ffmpeg_bin = os.path.dirname(ffmpeg_exe)
ffmpeg_root = os.path.dirname(ffmpeg_bin)
ffmpeg_include_dir = os.path.join(ffmpeg_root, 'include')
# TorchVision video reader
video_reader_src_dir = os.path.join(this_dir, 'torchvision', 'csrc', 'cpu', 'video_reader')
video_reader_src = glob.glob(os.path.join(video_reader_src_dir, "*.cpp"))
ext_modules = [
extension(
'torchvision._C',
......@@ -182,12 +171,30 @@ def get_extensions():
extra_compile_args=extra_compile_args,
)
)
ffmpeg_exe = distutils.spawn.find_executable('ffmpeg')
has_ffmpeg = ffmpeg_exe is not None
if has_ffmpeg:
ffmpeg_bin = os.path.dirname(ffmpeg_exe)
ffmpeg_root = os.path.dirname(ffmpeg_bin)
ffmpeg_include_dir = os.path.join(ffmpeg_root, 'include')
# TorchVision base decoder + video reader
video_reader_src_dir = os.path.join(this_dir, 'torchvision', 'csrc', 'cpu', 'video_reader')
video_reader_src = glob.glob(os.path.join(video_reader_src_dir, "*.cpp"))
base_decoder_src_dir = os.path.join(this_dir, 'torchvision', 'csrc', 'cpu', 'decoder')
base_decoder_src = glob.glob(
os.path.join(base_decoder_src_dir, "[!sync_decoder_test,!utils_test]*.cpp"))
combined_src = video_reader_src + base_decoder_src
ext_modules.append(
CppExtension(
'torchvision.video_reader',
video_reader_src,
combined_src,
include_dirs=[
base_decoder_src_dir,
video_reader_src_dir,
ffmpeg_include_dir,
extensions_dir,
......
......@@ -395,7 +395,7 @@ class TestVideoReader(unittest.TestCase):
def test_stress_test_read_video_from_file(self):
num_iter = 10000
# video related
width, height, min_dimension = 0, 0, 0
width, height, min_dimension, max_dimension = 0, 0, 0, 0
video_start_pts, video_end_pts = 0, -1
video_timebase_num, video_timebase_den = 0, 1
# audio related
......@@ -416,6 +416,7 @@ class TestVideoReader(unittest.TestCase):
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
......@@ -434,7 +435,7 @@ class TestVideoReader(unittest.TestCase):
Test the case when decoder starts with a video file to decode frames.
"""
# video related
width, height, min_dimension = 0, 0, 0
width, height, min_dimension, max_dimension = 0, 0, 0, 0
video_start_pts, video_end_pts = 0, -1
video_timebase_num, video_timebase_den = 0, 1
# audio related
......@@ -454,6 +455,7 @@ class TestVideoReader(unittest.TestCase):
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
......@@ -479,7 +481,7 @@ class TestVideoReader(unittest.TestCase):
only reads video stream and ignores audio stream
"""
# video related
width, height, min_dimension = 0, 0, 0
width, height, min_dimension, max_dimension = 0, 0, 0, 0
video_start_pts, video_end_pts = 0, -1
video_timebase_num, video_timebase_den = 0, 1
# audio related
......@@ -499,6 +501,7 @@ class TestVideoReader(unittest.TestCase):
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
......@@ -536,7 +539,7 @@ class TestVideoReader(unittest.TestCase):
video min dimension between height and width is set.
"""
# video related
width, height, min_dimension = 0, 0, 128
width, height, min_dimension, max_dimension = 0, 0, 128, 0
video_start_pts, video_end_pts = 0, -1
video_timebase_num, video_timebase_den = 0, 1
# audio related
......@@ -555,6 +558,7 @@ class TestVideoReader(unittest.TestCase):
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
......@@ -571,13 +575,100 @@ class TestVideoReader(unittest.TestCase):
min_dimension, min(tv_result[0].size(1), tv_result[0].size(2))
)
def test_read_video_from_file_rescale_max_dimension(self):
"""
Test the case when decoder starts with a video file to decode frames, and
video min dimension between height and width is set.
"""
# video related
width, height, min_dimension, max_dimension = 0, 0, 0, 85
video_start_pts, video_end_pts = 0, -1
video_timebase_num, video_timebase_den = 0, 1
# audio related
samples, channels = 0, 0
audio_start_pts, audio_end_pts = 0, -1
audio_timebase_num, audio_timebase_den = 0, 1
for test_video, _config in test_videos.items():
full_path = os.path.join(VIDEO_DIR, test_video)
tv_result = torch.ops.video_reader.read_video_from_file(
full_path,
seek_frame_margin,
0, # getPtsOnly
1, # readVideoStream
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
video_timebase_den,
1, # readAudioStream
samples,
channels,
audio_start_pts,
audio_end_pts,
audio_timebase_num,
audio_timebase_den,
)
self.assertEqual(
max_dimension, max(tv_result[0].size(1), tv_result[0].size(2))
)
def test_read_video_from_file_rescale_both_min_max_dimension(self):
"""
Test the case when decoder starts with a video file to decode frames, and
video min dimension between height and width is set.
"""
# video related
width, height, min_dimension, max_dimension = 0, 0, 64, 85
video_start_pts, video_end_pts = 0, -1
video_timebase_num, video_timebase_den = 0, 1
# audio related
samples, channels = 0, 0
audio_start_pts, audio_end_pts = 0, -1
audio_timebase_num, audio_timebase_den = 0, 1
for test_video, _config in test_videos.items():
full_path = os.path.join(VIDEO_DIR, test_video)
tv_result = torch.ops.video_reader.read_video_from_file(
full_path,
seek_frame_margin,
0, # getPtsOnly
1, # readVideoStream
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
video_timebase_den,
1, # readAudioStream
samples,
channels,
audio_start_pts,
audio_end_pts,
audio_timebase_num,
audio_timebase_den,
)
self.assertEqual(
min_dimension, min(tv_result[0].size(1), tv_result[0].size(2))
)
self.assertEqual(
max_dimension, max(tv_result[0].size(1), tv_result[0].size(2))
)
def test_read_video_from_file_rescale_width(self):
"""
Test the case when decoder starts with a video file to decode frames, and
video width is set.
"""
# video related
width, height, min_dimension = 256, 0, 0
width, height, min_dimension, max_dimension = 256, 0, 0, 0
video_start_pts, video_end_pts = 0, -1
video_timebase_num, video_timebase_den = 0, 1
# audio related
......@@ -596,6 +687,7 @@ class TestVideoReader(unittest.TestCase):
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
......@@ -616,7 +708,7 @@ class TestVideoReader(unittest.TestCase):
video height is set.
"""
# video related
width, height, min_dimension = 0, 224, 0
width, height, min_dimension, max_dimension = 0, 224, 0, 0
video_start_pts, video_end_pts = 0, -1
video_timebase_num, video_timebase_den = 0, 1
# audio related
......@@ -635,6 +727,7 @@ class TestVideoReader(unittest.TestCase):
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
......@@ -655,7 +748,7 @@ class TestVideoReader(unittest.TestCase):
both video height and width are set.
"""
# video related
width, height, min_dimension = 320, 240, 0
width, height, min_dimension, max_dimension = 320, 240, 0, 0
video_start_pts, video_end_pts = 0, -1
video_timebase_num, video_timebase_den = 0, 1
# audio related
......@@ -674,6 +767,7 @@ class TestVideoReader(unittest.TestCase):
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
......@@ -697,7 +791,7 @@ class TestVideoReader(unittest.TestCase):
for samples in [9600, 96000]: # downsampling # upsampling
# video related
width, height, min_dimension = 0, 0, 0
width, height, min_dimension, max_dimension = 0, 0, 0, 0
video_start_pts, video_end_pts = 0, -1
video_timebase_num, video_timebase_den = 0, 1
# audio related
......@@ -716,6 +810,7 @@ class TestVideoReader(unittest.TestCase):
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
......@@ -752,7 +847,7 @@ class TestVideoReader(unittest.TestCase):
Test the case when video is already in memory, and decoder reads data in memory
"""
# video related
width, height, min_dimension = 0, 0, 0
width, height, min_dimension, max_dimension = 0, 0, 0, 0
video_start_pts, video_end_pts = 0, -1
video_timebase_num, video_timebase_den = 0, 1
# audio related
......@@ -772,6 +867,7 @@ class TestVideoReader(unittest.TestCase):
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
......@@ -794,6 +890,7 @@ class TestVideoReader(unittest.TestCase):
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
......@@ -816,7 +913,7 @@ class TestVideoReader(unittest.TestCase):
Test the case when video is already in memory, and decoder reads data in memory
"""
# video related
width, height, min_dimension = 0, 0, 0
width, height, min_dimension, max_dimension = 0, 0, 0, 0
video_start_pts, video_end_pts = 0, -1
video_timebase_num, video_timebase_den = 0, 1
# audio related
......@@ -836,6 +933,7 @@ class TestVideoReader(unittest.TestCase):
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
......@@ -861,7 +959,7 @@ class TestVideoReader(unittest.TestCase):
for both pts and frame data
"""
# video related
width, height, min_dimension = 0, 0, 0
width, height, min_dimension, max_dimension = 0, 0, 0, 0
video_start_pts, video_end_pts = 0, -1
video_timebase_num, video_timebase_den = 0, 1
# audio related
......@@ -881,6 +979,7 @@ class TestVideoReader(unittest.TestCase):
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
......@@ -904,6 +1003,7 @@ class TestVideoReader(unittest.TestCase):
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
......@@ -930,7 +1030,7 @@ class TestVideoReader(unittest.TestCase):
for test_video, config in test_videos.items():
full_path, video_tensor = _get_video_tensor(VIDEO_DIR, test_video)
# video related
width, height, min_dimension = 0, 0, 0
width, height, min_dimension, max_dimension = 0, 0, 0, 0
video_start_pts, video_end_pts = 0, -1
video_timebase_num, video_timebase_den = 0, 1
# audio related
......@@ -946,6 +1046,7 @@ class TestVideoReader(unittest.TestCase):
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
......@@ -1000,6 +1101,7 @@ class TestVideoReader(unittest.TestCase):
width,
height,
min_dimension,
max_dimension,
video_start_pts,
video_end_pts,
video_timebase_num,
......@@ -1099,7 +1201,7 @@ class TestVideoReader(unittest.TestCase):
Test the case when video is already in memory, and decoder reads data in memory
"""
# video related
width, height, min_dimension = 0, 0, 0
width, height, min_dimension, max_dimension = 0, 0, 0, 0
video_start_pts, video_end_pts = 0, -1
video_timebase_num, video_timebase_den = 0, 1
# audio related
......@@ -1130,6 +1232,7 @@ class TestVideoReader(unittest.TestCase):
[audio_start_pts, audio_end_pts],
audio_timebase_num,
audio_timebase_den,
max_dimension,
)
# FUTURE: check value of video / audio frames
......
#include "audio_sampler.h"
#include <c10/util/Logging.h>
#include "util.h"
#define AVRESAMPLE_MAX_CHANNELS 32
// www.ffmpeg.org/doxygen/1.1/doc_2examples_2resampling_audio_8c-example.html#a24
namespace ffmpeg {
namespace {
int preparePlanes(
const AudioFormat& fmt,
const uint8_t* buffer,
int numSamples,
uint8_t** planes) {
int result;
if ((result = av_samples_fill_arrays(
planes,
nullptr, // linesize is not needed
buffer,
fmt.channels,
numSamples,
(AVSampleFormat)fmt.format,
1)) < 0) {
LOG(ERROR) << "av_samples_fill_arrays failed, err: "
<< Util::generateErrorDesc(result)
<< ", numSamples: " << numSamples << ", fmt: " << fmt.format;
}
return result;
}
} // namespace
AudioSampler::AudioSampler(void* logCtx) : logCtx_(logCtx) {}
AudioSampler::~AudioSampler() {
cleanUp();
}
void AudioSampler::shutdown() {
cleanUp();
}
bool AudioSampler::init(const SamplerParameters& params) {
cleanUp();
if (params.type != MediaType::TYPE_AUDIO) {
LOG(ERROR) << "Invalid media type, expected MediaType::TYPE_AUDIO";
return false;
}
swrContext_ = swr_alloc_set_opts(
nullptr,
av_get_default_channel_layout(params.out.audio.channels),
(AVSampleFormat)params.out.audio.format,
params.out.audio.samples,
av_get_default_channel_layout(params.in.audio.channels),
(AVSampleFormat)params.in.audio.format,
params.in.audio.samples,
0,
logCtx_);
if (swrContext_ == nullptr) {
LOG(ERROR) << "Cannot allocate SwrContext";
return false;
}
int result;
if ((result = swr_init(swrContext_)) < 0) {
LOG(ERROR) << "swr_init faield, err: " << Util::generateErrorDesc(result)
<< ", in -> format: " << params.in.audio.format
<< ", channels: " << params.in.audio.channels
<< ", samples: " << params.in.audio.samples
<< ", out -> format: " << params.out.audio.format
<< ", channels: " << params.out.audio.channels
<< ", samples: " << params.out.audio.samples;
return false;
}
// set formats
params_ = params;
return true;
}
int AudioSampler::numOutputSamples(int inSamples) const {
return swr_get_out_samples(swrContext_, inSamples);
}
int AudioSampler::sample(
const uint8_t* inPlanes[],
int inNumSamples,
ByteStorage* out,
int outNumSamples) {
int result;
int outBufferBytes = av_samples_get_buffer_size(
nullptr,
params_.out.audio.channels,
outNumSamples,
(AVSampleFormat)params_.out.audio.format,
1);
if (out) {
out->ensure(outBufferBytes);
uint8_t* outPlanes[AVRESAMPLE_MAX_CHANNELS] = {nullptr};
if ((result = preparePlanes(
params_.out.audio,
out->writableTail(),
outNumSamples,
outPlanes)) < 0) {
return result;
}
if ((result = swr_convert(
swrContext_,
&outPlanes[0],
outNumSamples,
inPlanes,
inNumSamples)) < 0) {
LOG(ERROR) << "swr_convert faield, err: "
<< Util::generateErrorDesc(result);
return result;
}
CHECK_LE(result, outNumSamples);
if (result) {
if ((result = av_samples_get_buffer_size(
nullptr,
params_.out.audio.channels,
result,
(AVSampleFormat)params_.out.audio.format,
1)) >= 0) {
out->append(result);
} else {
LOG(ERROR) << "av_samples_get_buffer_size faield, err: "
<< Util::generateErrorDesc(result);
}
}
} else {
// allocate a temporary buffer
auto* tmpBuffer = static_cast<uint8_t*>(av_malloc(outBufferBytes));
if (!tmpBuffer) {
LOG(ERROR) << "av_alloc faield, for size: " << outBufferBytes;
return -1;
}
uint8_t* outPlanes[AVRESAMPLE_MAX_CHANNELS] = {nullptr};
if ((result = preparePlanes(
params_.out.audio, tmpBuffer, outNumSamples, outPlanes)) < 0) {
av_free(tmpBuffer);
return result;
}
if ((result = swr_convert(
swrContext_,
&outPlanes[0],
outNumSamples,
inPlanes,
inNumSamples)) < 0) {
LOG(ERROR) << "swr_convert faield, err: "
<< Util::generateErrorDesc(result);
av_free(tmpBuffer);
return result;
}
av_free(tmpBuffer);
CHECK_LE(result, outNumSamples);
if (result) {
result = av_samples_get_buffer_size(
nullptr,
params_.out.audio.channels,
result,
(AVSampleFormat)params_.out.audio.format,
1);
}
}
return result;
}
int AudioSampler::sample(AVFrame* frame, ByteStorage* out) {
const auto outNumSamples = numOutputSamples(frame ? frame->nb_samples : 0);
if (!outNumSamples) {
return 0;
}
return sample(
frame ? (const uint8_t**)&frame->data[0] : nullptr,
frame ? frame->nb_samples : 0,
out,
outNumSamples);
}
int AudioSampler::sample(const ByteStorage* in, ByteStorage* out) {
const auto inSampleSize =
av_get_bytes_per_sample((AVSampleFormat)params_.in.audio.format);
const auto inNumSamples =
!in ? 0 : in->length() / inSampleSize / params_.in.audio.channels;
const auto outNumSamples = numOutputSamples(inNumSamples);
if (!outNumSamples) {
return 0;
}
uint8_t* inPlanes[AVRESAMPLE_MAX_CHANNELS] = {nullptr};
int result;
if (in &&
(result = preparePlanes(
params_.in.audio, in->data(), inNumSamples, inPlanes)) < 0) {
return result;
}
return sample(
in ? (const uint8_t**)inPlanes : nullptr,
inNumSamples,
out,
outNumSamples);
}
void AudioSampler::cleanUp() {
if (swrContext_) {
swr_free(&swrContext_);
swrContext_ = nullptr;
}
}
} // namespace ffmpeg
#pragma once
#include "defs.h"
namespace ffmpeg {
/**
* Class transcode audio frames from one format into another
*/
class AudioSampler : public MediaSampler {
public:
explicit AudioSampler(void* logCtx);
~AudioSampler() override;
// MediaSampler overrides
bool init(const SamplerParameters& params) override;
int sample(const ByteStorage* in, ByteStorage* out) override;
void shutdown() override;
int sample(AVFrame* frame, ByteStorage* out);
private:
// close resources
void cleanUp();
// helper functions for rescaling, cropping, etc.
int numOutputSamples(int inSamples) const;
int sample(
const uint8_t* inPlanes[],
int inNumSamples,
ByteStorage* out,
int outNumSamples);
private:
SwrContext* swrContext_{nullptr};
void* logCtx_{nullptr};
};
} // namespace ffmpeg
#include "audio_stream.h"
#include <c10/util/Logging.h>
#include <limits>
#include "util.h"
namespace ffmpeg {
namespace {
bool operator==(const AudioFormat& x, const AVFrame& y) {
return x.samples == y.sample_rate && x.channels == y.channels &&
x.format == y.format;
}
bool operator==(const AudioFormat& x, const AVCodecContext& y) {
return x.samples == y.sample_rate && x.channels == y.channels &&
x.format == y.sample_fmt;
}
AudioFormat& toAudioFormat(AudioFormat& x, const AVFrame& y) {
x.samples = y.sample_rate;
x.channels = y.channels;
x.format = y.format;
return x;
}
AudioFormat& toAudioFormat(AudioFormat& x, const AVCodecContext& y) {
x.samples = y.sample_rate;
x.channels = y.channels;
x.format = y.sample_fmt;
return x;
}
} // namespace
AudioStream::AudioStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const AudioFormat& format)
: Stream(
inputCtx,
MediaFormat::makeMediaFormat(format, index),
convertPtsToWallTime,
0) {}
AudioStream::~AudioStream() {
if (sampler_) {
sampler_->shutdown();
sampler_.reset();
}
}
int AudioStream::initFormat() {
// set output format
if (format_.format.audio.samples == 0) {
format_.format.audio.samples = codecCtx_->sample_rate;
}
if (format_.format.audio.channels == 0) {
format_.format.audio.channels = codecCtx_->channels;
}
if (format_.format.audio.format == AV_SAMPLE_FMT_NONE) {
format_.format.audio.format = codecCtx_->sample_fmt;
}
return format_.format.audio.samples != 0 &&
format_.format.audio.channels != 0 &&
format_.format.audio.format != AV_SAMPLE_FMT_NONE
? 0
: -1;
}
int AudioStream::copyFrameBytes(ByteStorage* out, bool flush) {
if (!sampler_) {
sampler_ = std::make_unique<AudioSampler>(codecCtx_);
}
// check if input format gets changed
if (flush ? !(sampler_->getInputFormat().audio == *codecCtx_)
: !(sampler_->getInputFormat().audio == *frame_)) {
// - reinit sampler
SamplerParameters params;
params.type = format_.type;
params.out = format_.format;
params.in = FormatUnion();
flush ? toAudioFormat(params.in.audio, *codecCtx_)
: toAudioFormat(params.in.audio, *frame_);
if (!sampler_->init(params)) {
return -1;
}
VLOG(1) << "Set input audio sampler format"
<< ", samples: " << params.in.audio.samples
<< ", channels: " << params.in.audio.channels
<< ", format: " << params.in.audio.format
<< " : output audio sampler format"
<< ", samples: " << format_.format.audio.samples
<< ", channels: " << format_.format.audio.channels
<< ", format: " << format_.format.audio.format;
}
return sampler_->sample(flush ? nullptr : frame_, out);
}
} // namespace ffmpeg
#pragma once
#include "audio_sampler.h"
#include "stream.h"
namespace ffmpeg {
/**
* Class uses FFMPEG library to decode one audio stream.
*/
class AudioStream : public Stream {
public:
AudioStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const AudioFormat& format);
~AudioStream() override;
private:
int initFormat() override;
int copyFrameBytes(ByteStorage* out, bool flush) override;
private:
std::unique_ptr<AudioSampler> sampler_;
};
} // namespace ffmpeg
#include "cc_stream.h"
namespace ffmpeg {
CCStream::CCStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const SubtitleFormat& format)
: SubtitleStream(inputCtx, index, convertPtsToWallTime, format) {
format_.type = TYPE_CC;
}
AVCodec* CCStream::findCodec(AVCodecParameters* params) {
if (params->codec_id == AV_CODEC_ID_BIN_DATA &&
params->codec_type == AVMEDIA_TYPE_DATA) {
// obtain subtitles codec
params->codec_id = AV_CODEC_ID_MOV_TEXT;
params->codec_type = AVMEDIA_TYPE_SUBTITLE;
}
return Stream::findCodec(params);
}
} // namespace ffmpeg
#pragma once
#include "subtitle_stream.h"
namespace ffmpeg {
/**
* Class uses FFMPEG library to decode one closed captions stream.
*/
class CCStream : public SubtitleStream {
public:
CCStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const SubtitleFormat& format);
private:
AVCodec* findCodec(AVCodecParameters* params) override;
};
} // namespace ffmpeg
#include "decoder.h"
#include <c10/util/Logging.h>
#include <future>
#include <iostream>
#include <mutex>
#include "audio_stream.h"
#include "cc_stream.h"
#include "subtitle_stream.h"
#include "util.h"
#include "video_stream.h"
namespace ffmpeg {
namespace {
constexpr size_t kIoBufferSize = 96 * 1024;
constexpr size_t kIoPaddingSize = AV_INPUT_BUFFER_PADDING_SIZE;
constexpr size_t kLogBufferSize = 1024;
int ffmpeg_lock(void** mutex, enum AVLockOp op) {
std::mutex** handle = (std::mutex**)mutex;
switch (op) {
case AV_LOCK_CREATE:
*handle = new std::mutex();
break;
case AV_LOCK_OBTAIN:
(*handle)->lock();
break;
case AV_LOCK_RELEASE:
(*handle)->unlock();
break;
case AV_LOCK_DESTROY:
delete *handle;
break;
}
return 0;
}
bool mapFfmpegType(AVMediaType media, MediaType* type) {
switch (media) {
case AVMEDIA_TYPE_AUDIO:
*type = TYPE_AUDIO;
return true;
case AVMEDIA_TYPE_VIDEO:
*type = TYPE_VIDEO;
return true;
case AVMEDIA_TYPE_SUBTITLE:
*type = TYPE_SUBTITLE;
return true;
case AVMEDIA_TYPE_DATA:
*type = TYPE_CC;
return true;
default:
return false;
}
}
std::unique_ptr<Stream> createStream(
MediaType type,
AVFormatContext* ctx,
int idx,
bool convertPtsToWallTime,
const FormatUnion& format,
int64_t loggingUuid) {
switch (type) {
case TYPE_AUDIO:
return std::make_unique<AudioStream>(
ctx, idx, convertPtsToWallTime, format.audio);
case TYPE_VIDEO:
return std::make_unique<VideoStream>(
// negative loggingUuid indicates video streams.
ctx,
idx,
convertPtsToWallTime,
format.video,
-loggingUuid);
case TYPE_SUBTITLE:
return std::make_unique<SubtitleStream>(
ctx, idx, convertPtsToWallTime, format.subtitle);
case TYPE_CC:
return std::make_unique<CCStream>(
ctx, idx, convertPtsToWallTime, format.subtitle);
default:
return nullptr;
}
}
} // Namespace
/* static */
void Decoder::logFunction(void* avcl, int level, const char* cfmt, va_list vl) {
if (!avcl) {
// Nothing can be done here
return;
}
AVClass* avclass = *reinterpret_cast<AVClass**>(avcl);
if (!avclass) {
// Nothing can be done here
return;
}
Decoder* decoder = nullptr;
if (strcmp(avclass->class_name, "AVFormatContext") == 0) {
AVFormatContext* context = reinterpret_cast<AVFormatContext*>(avcl);
if (context) {
decoder = reinterpret_cast<Decoder*>(context->opaque);
}
} else if (strcmp(avclass->class_name, "AVCodecContext") == 0) {
AVCodecContext* context = reinterpret_cast<AVCodecContext*>(avcl);
if (context) {
decoder = reinterpret_cast<Decoder*>(context->opaque);
}
} else if (strcmp(avclass->class_name, "AVIOContext") == 0) {
AVIOContext* context = reinterpret_cast<AVIOContext*>(avcl);
// only if opaque was assigned to Decoder pointer
if (context && context->read_packet == Decoder::readFunction) {
decoder = reinterpret_cast<Decoder*>(context->opaque);
}
} else if (strcmp(avclass->class_name, "SWResampler") == 0) {
// expect AVCodecContext as parent
if (avclass->parent_log_context_offset) {
AVClass** parent =
*(AVClass***)(((uint8_t*)avcl) + avclass->parent_log_context_offset);
AVCodecContext* context = reinterpret_cast<AVCodecContext*>(parent);
if (context) {
decoder = reinterpret_cast<Decoder*>(context->opaque);
}
}
} else if (strcmp(avclass->class_name, "SWScaler") == 0) {
// cannot find a way to pass context pointer through SwsContext struct
} else {
VLOG(2) << "Unknown context class: " << avclass->class_name;
}
if (decoder != nullptr && decoder->enableLogLevel(level)) {
char buf[kLogBufferSize] = {0};
// Format the line
int* prefix = decoder->getPrintPrefix();
*prefix = 1;
av_log_format_line(avcl, level, cfmt, vl, buf, sizeof(buf) - 1, prefix);
// pass message to the decoder instance
std::string msg(buf);
decoder->logCallback(level, msg);
}
}
bool Decoder::enableLogLevel(int level) const {
return ssize_t(level) <= params_.logLevel;
}
void Decoder::logCallback(int level, const std::string& message) {
LOG(INFO) << "Msg, level: " << level << ", msg: " << message;
}
/* static */
int Decoder::shutdownFunction(void* ctx) {
Decoder* decoder = (Decoder*)ctx;
if (decoder == nullptr) {
return 1;
}
return decoder->shutdownCallback();
}
int Decoder::shutdownCallback() {
return interrupted_ ? 1 : 0;
}
/* static */
int Decoder::readFunction(void* opaque, uint8_t* buf, int size) {
Decoder* decoder = reinterpret_cast<Decoder*>(opaque);
if (decoder == nullptr) {
return 0;
}
return decoder->readCallback(buf, size);
}
/* static */
int64_t Decoder::seekFunction(void* opaque, int64_t offset, int whence) {
Decoder* decoder = reinterpret_cast<Decoder*>(opaque);
if (decoder == nullptr) {
return -1;
}
return decoder->seekCallback(offset, whence);
}
int Decoder::readCallback(uint8_t* buf, int size) {
return seekableBuffer_.read(buf, size, params_.timeoutMs);
}
int64_t Decoder::seekCallback(int64_t offset, int whence) {
return seekableBuffer_.seek(offset, whence, params_.timeoutMs);
}
/* static */
void Decoder::initOnce() {
static std::once_flag flagInit;
std::call_once(flagInit, []() {
av_register_all();
avcodec_register_all();
avformat_network_init();
// register ffmpeg lock manager
av_lockmgr_register(&ffmpeg_lock);
av_log_set_callback(Decoder::logFunction);
av_log_set_level(AV_LOG_ERROR);
VLOG(1) << "Registered ffmpeg libs";
});
}
Decoder::Decoder() {
initOnce();
}
Decoder::~Decoder() {
cleanUp();
}
bool Decoder::init(
const DecoderParameters& params,
DecoderInCallback&& in,
std::vector<DecoderMetadata>* metadata) {
cleanUp();
if ((params.uri.empty() || in) && (!params.uri.empty() || !in)) {
LOG(ERROR) << "Either external URI gets provided"
<< " or explicit input callback";
return false;
}
// set callback and params
params_ = params;
if (!(inputCtx_ = avformat_alloc_context())) {
LOG(ERROR) << "Cannot allocate format context";
return false;
}
AVInputFormat* fmt = nullptr;
int result = 0;
if (in) {
ImageType type = ImageType::UNKNOWN;
if ((result = seekableBuffer_.init(
std::forward<DecoderInCallback>(in),
params_.timeoutMs,
params_.maxSeekableBytes,
params_.isImage ? &type : nullptr)) < 0) {
LOG(ERROR) << "can't initiate seekable buffer";
cleanUp();
return false;
}
if (params_.isImage) {
const char* fmtName = "image2";
switch (type) {
case ImageType::JPEG:
fmtName = "jpeg_pipe";
break;
case ImageType::PNG:
fmtName = "png_pipe";
break;
case ImageType::TIFF:
fmtName = "tiff_pipe";
break;
default:
break;
}
fmt = av_find_input_format(fmtName);
}
const size_t avioCtxBufferSize = kIoBufferSize;
uint8_t* avioCtxBuffer =
(uint8_t*)av_malloc(avioCtxBufferSize + kIoPaddingSize);
if (!avioCtxBuffer) {
LOG(ERROR) << "av_malloc cannot allocate " << avioCtxBufferSize
<< " bytes";
cleanUp();
return false;
}
if (!(avioCtx_ = avio_alloc_context(
avioCtxBuffer,
avioCtxBufferSize,
0,
reinterpret_cast<void*>(this),
&Decoder::readFunction,
nullptr,
result == 1 ? &Decoder::seekFunction : nullptr))) {
LOG(ERROR) << "avio_alloc_context failed";
av_free(avioCtxBuffer);
cleanUp();
return false;
}
inputCtx_->pb = avioCtx_;
inputCtx_->flags |= AVFMT_FLAG_CUSTOM_IO;
}
inputCtx_->opaque = reinterpret_cast<void*>(this);
inputCtx_->interrupt_callback.callback = Decoder::shutdownFunction;
inputCtx_->interrupt_callback.opaque = reinterpret_cast<void*>(this);
// add network timeout
inputCtx_->flags |= AVFMT_FLAG_NONBLOCK;
AVDictionary* options = nullptr;
av_dict_set_int(&options, "analyzeduration", params_.timeoutMs * 1000, 0);
av_dict_set_int(&options, "stimeout", params_.timeoutMs * 1000, 0);
if (params_.listen) {
av_dict_set_int(&options, "listen", 1, 0);
}
interrupted_ = false;
// ffmpeg avformat_open_input call can hang if media source doesn't respond
// set a guard for handle such situations, if requested
std::promise<bool> p;
std::future<bool> f = p.get_future();
std::unique_ptr<std::thread> guard;
if (params_.preventStaleness) {
guard = std::make_unique<std::thread>([&f, this]() {
auto timeout = std::chrono::milliseconds(params_.timeoutMs);
if (std::future_status::timeout == f.wait_for(timeout)) {
LOG(ERROR) << "Cannot open stream within " << params_.timeoutMs
<< " ms";
interrupted_ = true;
}
});
}
if (fmt) {
result = avformat_open_input(&inputCtx_, nullptr, fmt, &options);
} else {
result =
avformat_open_input(&inputCtx_, params_.uri.c_str(), nullptr, &options);
}
av_dict_free(&options);
if (guard) {
p.set_value(true);
guard->join();
guard.reset();
}
if (result < 0 || interrupted_) {
LOG(ERROR) << "avformat_open_input failed, error: "
<< Util::generateErrorDesc(result);
cleanUp();
return false;
}
result = avformat_find_stream_info(inputCtx_, nullptr);
if (result < 0) {
LOG(ERROR) << "avformat_find_stream_info failed, error: "
<< Util::generateErrorDesc(result);
cleanUp();
return false;
}
if (!openStreams(metadata)) {
LOG(ERROR) << "Cannot activate streams";
cleanUp();
return false;
}
onInit();
if (params.startOffset != 0) {
auto offset = params.startOffset <= params.seekAccuracy
? 0
: params.startOffset - params.seekAccuracy;
av_seek_frame(inputCtx_, -1, offset, AVSEEK_FLAG_BACKWARD);
}
VLOG(1) << "Decoder initialized, log level: " << params_.logLevel;
return true;
}
bool Decoder::openStreams(std::vector<DecoderMetadata>* metadata) {
for (int i = 0; i < inputCtx_->nb_streams; i++) {
// - find the corespondent format at params_.formats set
MediaFormat format;
const auto media = inputCtx_->streams[i]->codec->codec_type;
if (!mapFfmpegType(media, &format.type)) {
VLOG(1) << "Stream media: " << media << " at index " << i
<< " gets ignored, unknown type";
continue; // unsupported type
}
// check format
auto it = params_.formats.find(format);
if (it == params_.formats.end()) {
VLOG(1) << "Stream type: " << format.type << " at index: " << i
<< " gets ignored, caller is not interested";
continue; // clients don't care about this media format
}
// do we have stream of this type?
auto stream = findByType(format);
// should we process this stream?
if (it->stream == -2 || // all streams of this type are welcome
(!stream && (it->stream == -1 || it->stream == i))) { // new stream
VLOG(1) << "Stream type: " << format.type << " found, at index: " << i;
auto stream = createStream(
format.type,
inputCtx_,
i,
params_.convertPtsToWallTime,
it->format,
params_.loggingUuid);
CHECK(stream);
if (stream->openCodec(metadata) < 0) {
LOG(ERROR) << "Cannot open codec " << i;
return false;
}
streams_.emplace(i, std::move(stream));
inRange_.set(i, true);
}
}
return true;
}
void Decoder::shutdown() {
cleanUp();
}
void Decoder::interrupt() {
interrupted_ = true;
}
void Decoder::cleanUp() {
if (!interrupted_) {
interrupted_ = true;
}
if (inputCtx_) {
for (auto& stream : streams_) {
// Drain stream buffers.
DecoderOutputMessage msg;
while (msg.payload = nullptr, stream.second->flush(&msg, true) > 0) {
}
stream.second.reset();
}
streams_.clear();
avformat_close_input(&inputCtx_);
}
if (avioCtx_) {
av_freep(&avioCtx_->buffer);
av_freep(&avioCtx_);
}
// reset callback
seekableBuffer_.shutdown();
}
int Decoder::getFrame(size_t workingTimeInMs) {
if (inRange_.none()) {
return ENODATA;
}
// decode frames until cache is full and leave thread
// once decode() method gets called and grab some bytes
// run this method again
// init package
AVPacket avPacket;
av_init_packet(&avPacket);
avPacket.data = nullptr;
avPacket.size = 0;
auto end = std::chrono::steady_clock::now() +
std::chrono::milliseconds(workingTimeInMs);
// return true if elapsed time less than timeout
auto watcher = [end]() -> bool {
return std::chrono::steady_clock::now() <= end;
};
int result = 0;
size_t decodingErrors = 0;
bool decodedFrame = false;
while (!interrupted_ && inRange_.any() && !decodedFrame && watcher()) {
result = av_read_frame(inputCtx_, &avPacket);
if (result == AVERROR(EAGAIN)) {
VLOG(4) << "Decoder is busy...";
std::this_thread::yield();
result = 0; // reset error, EAGAIN is not an error at all
continue;
} else if (result == AVERROR_EOF) {
flushStreams();
VLOG(1) << "End of stream";
result = ENODATA;
break;
} else if (result < 0) {
flushStreams();
LOG(ERROR) << "Error detected: " << Util::generateErrorDesc(result);
break;
}
// get stream
auto stream = findByIndex(avPacket.stream_index);
if (stream == nullptr || !inRange_.test(stream->getIndex())) {
av_packet_unref(&avPacket);
continue;
}
size_t numConsecutiveNoBytes = 0;
// it can be only partial decoding of the package bytes
do {
// decode package
bool gotFrame = false;
bool hasMsg = false;
// packet either got consumed completely or not at all
if ((result = processPacket(stream, &avPacket, &gotFrame, &hasMsg)) < 0) {
LOG(ERROR) << "processPacket failed with code: " << result;
break;
}
if (!gotFrame && params_.maxProcessNoBytes != 0 &&
++numConsecutiveNoBytes > params_.maxProcessNoBytes) {
LOG(ERROR) << "Exceeding max amount of consecutive no bytes";
break;
}
if (result > 0) {
numConsecutiveNoBytes = 0;
}
decodedFrame |= hasMsg;
} while (result == 0);
// post loop check
if (result < 0) {
if (params_.maxPackageErrors != 0 && // check errors
++decodingErrors >= params_.maxPackageErrors) { // reached the limit
LOG(ERROR) << "Exceeding max amount of consecutive package errors";
break;
}
} else {
decodingErrors = 0; // reset on success
}
result = 0;
av_packet_unref(&avPacket);
}
av_packet_unref(&avPacket);
VLOG(2) << "Interrupted loop"
<< ", interrupted_ " << interrupted_ << ", inRange_.any() "
<< inRange_.any() << ", decodedFrame " << decodedFrame << ", result "
<< result;
// loop can be terminated, either by:
// 1. explcitly iterrupted
// 2. terminated by workable timeout
// 3. unrecoverable error or ENODATA (end of stream)
// 4. decoded frames pts are out of the specified range
// 5. success decoded frame
if (interrupted_) {
return EINTR;
}
if (result != 0) {
return result;
}
if (inRange_.none()) {
return ENODATA;
}
return 0;
}
Stream* Decoder::findByIndex(int streamIndex) const {
auto it = streams_.find(streamIndex);
return it != streams_.end() ? it->second.get() : nullptr;
}
Stream* Decoder::findByType(const MediaFormat& format) const {
for (auto& stream : streams_) {
if (stream.second->getMediaFormat().type == format.type) {
return stream.second.get();
}
}
return nullptr;
}
int Decoder::processPacket(
Stream* stream,
AVPacket* packet,
bool* gotFrame,
bool* hasMsg) {
// decode package
int result;
DecoderOutputMessage msg;
msg.payload = params_.headerOnly ? nullptr : createByteStorage(0);
*hasMsg = false;
if ((result = stream->decodePacket(
packet, &msg, params_.headerOnly, gotFrame)) >= 0 &&
*gotFrame) {
// check end offset
bool endInRange =
params_.endOffset <= 0 || msg.header.pts <= params_.endOffset;
inRange_.set(stream->getIndex(), endInRange);
if (endInRange && msg.header.pts >= params_.startOffset) {
*hasMsg = true;
push(std::move(msg));
}
}
return result;
}
void Decoder::flushStreams() {
VLOG(1) << "Flushing streams...";
for (auto& stream : streams_) {
DecoderOutputMessage msg;
while (msg.payload = (params_.headerOnly ? nullptr : createByteStorage(0)),
stream.second->flush(&msg, params_.headerOnly) > 0) {
// check end offset
bool endInRange =
params_.endOffset <= 0 || msg.header.pts <= params_.endOffset;
inRange_.set(stream.second->getIndex(), endInRange);
if (endInRange && msg.header.pts >= params_.startOffset) {
push(std::move(msg));
} else {
msg.payload.reset();
}
}
}
}
int Decoder::decode_all(const DecoderOutCallback& callback) {
int result;
do {
DecoderOutputMessage out;
if (0 == (result = decode(&out, params_.timeoutMs))) {
callback(std::move(out));
}
} while (result == 0);
return result;
}
} // namespace ffmpeg
#pragma once
#include <bitset>
#include <unordered_map>
#include "seekable_buffer.h"
#include "stream.h"
namespace ffmpeg {
/**
* Class uses FFMPEG library to decode media streams.
* Media bytes can be explicitly provided through read-callback
* or fetched internally by FFMPEG library
*/
class Decoder : public MediaDecoder {
public:
Decoder();
~Decoder() override;
// MediaDecoder overrides
bool init(
const DecoderParameters& params,
DecoderInCallback&& in,
std::vector<DecoderMetadata>* metadata) override;
int decode_all(const DecoderOutCallback& callback) override;
void shutdown() override;
void interrupt() override;
protected:
// function does actual work, derived class calls it in working thread
// periodically. On success method returns 0, ENOADATA on EOF, ETIMEDOUT if
// no frames got decoded in the specified timeout time, and error on
// unrecoverable error.
int getFrame(size_t workingTimeInMs = 100);
// Derived class must override method and consume the provided message
virtual void push(DecoderOutputMessage&& buffer) = 0;
// Fires on init call
virtual void onInit() {}
public:
// C-style FFMPEG API requires C/static methods for callbacks
static void logFunction(void* avcl, int level, const char* cfmt, va_list vl);
static int shutdownFunction(void* ctx);
static int readFunction(void* opaque, uint8_t* buf, int size);
static int64_t seekFunction(void* opaque, int64_t offset, int whence);
// can be called by any classes or API
static void initOnce();
int* getPrintPrefix() {
return &printPrefix;
}
private:
// mark below function for a proper invocation
virtual bool enableLogLevel(int level) const;
virtual void logCallback(int level, const std::string& message);
virtual int readCallback(uint8_t* buf, int size);
virtual int64_t seekCallback(int64_t offset, int whence);
virtual int shutdownCallback();
bool openStreams(std::vector<DecoderMetadata>* metadata);
Stream* findByIndex(int streamIndex) const;
Stream* findByType(const MediaFormat& format) const;
int processPacket(
Stream* stream,
AVPacket* packet,
bool* gotFrame,
bool* hasMsg);
void flushStreams();
void cleanUp();
protected:
DecoderParameters params_;
private:
SeekableBuffer seekableBuffer_;
int printPrefix{1};
std::atomic<bool> interrupted_{false};
AVFormatContext* inputCtx_{nullptr};
AVIOContext* avioCtx_{nullptr};
std::unordered_map<ssize_t, std::unique_ptr<Stream>> streams_;
std::bitset<64> inRange_;
};
} // namespace ffmpeg
#pragma once
#include <functional>
#include <memory>
#include <set>
#include <string>
#include <unordered_set>
#include <vector>
extern "C" {
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavformat/avio.h>
#include <libavutil/avutil.h>
#include <libavutil/imgutils.h>
#include <libswresample/swresample.h>
#include "libswscale/swscale.h"
}
namespace ffmpeg {
// bit mask of formats, keep them in form 2^n
enum MediaType : size_t {
TYPE_AUDIO = 1,
TYPE_VIDEO = 2,
TYPE_SUBTITLE = 4,
TYPE_CC = 8, // closed captions from transport streams
};
// audio
struct AudioFormat {
// fields are initialized for the auto detection
// caller can specify some/all of field values if specific output is desirable
bool operator==(const AudioFormat& x) const {
return x.format == format && x.samples == samples && x.channels == channels;
}
size_t samples{0}; // number samples per second (frequency)
size_t channels{0}; // number of channels
long format{-1}; // AVSampleFormat, auto AV_SAMPLE_FMT_NONE
size_t padding[2];
// -- alignment 40 bytes
};
// video
struct VideoFormat {
// fields are initialized for the auto detection
// caller can specify some/all of field values if specific output is desirable
bool operator==(const VideoFormat& x) const {
return x.format == format && x.width == width && x.height == height;
}
/*
When width = 0, height = 0, minDimension = 0, and maxDimension = 0,
keep the orignal frame resolution
When width = 0, height = 0, minDimension != 0, and maxDimension = 0,
keep the aspect ratio and resize the frame so that shorter edge size is minDimension
When width = 0, height = 0, minDimension = 0, and maxDimension != 0,
keep the aspect ratio and resize the frame so that longer edge size is maxDimension
When width = 0, height = 0, minDimension != 0, and maxDimension != 0,
resize the frame so that shorter edge size is minDimension, and
longer edge size is maxDimension. The aspect ratio may not be preserved
When width = 0, height != 0, minDimension = 0, and maxDimension = 0,
keep the aspect ratio and resize the frame so that frame height is $height
When width != 0, height = 0, minDimension = 0, and maxDimension = 0,
keep the aspect ratio and resize the frame so that frame width is $width
When width != 0, height != 0, minDimension = 0, and maxDimension = 0,
resize the frame so that frame width and height are set to $width and $height,
respectively
*/
size_t width{0}; // width in pixels
size_t height{0}; // height in pixels
long format{-1}; // AVPixelFormat, auto AV_PIX_FMT_NONE
size_t minDimension{0}; // choose min dimension and rescale accordingly
size_t maxDimension{0}; // choose max dimension and rescale accordingly
size_t cropImage{0}; // request image crop
// -- alignment 40 bytes
};
// subtitle/cc
struct SubtitleFormat {
long type{0}; // AVSubtitleType, auto SUBTITLE_NONE
size_t padding[4];
// -- alignment 40 bytes
};
union FormatUnion {
FormatUnion() : audio() {}
explicit FormatUnion(int) : video() {}
explicit FormatUnion(char) : subtitle() {}
explicit FormatUnion(double) : subtitle() {}
AudioFormat audio;
VideoFormat video;
SubtitleFormat subtitle;
// -- alignment 40 bytes
};
/*
MediaFormat data structure serves as input/output parameter.
Caller assigns values for input formats
or leave default values for auto detection
For output formats all fields will be set to the specific values
*/
struct MediaFormat {
// for using map/set data structures
bool operator<(const MediaFormat& x) const {
return type < x.type;
}
bool operator==(const MediaFormat& x) const {
if (type != x.type) {
return false;
}
switch (type) {
case TYPE_AUDIO:
return format.audio == x.format.audio;
case TYPE_VIDEO:
return format.video == x.format.video;
case TYPE_SUBTITLE:
case TYPE_CC:
return true;
default:
return false;
}
}
explicit MediaFormat(long s = -1) : type(TYPE_AUDIO), stream(s), format() {}
explicit MediaFormat(int x, long s = -1)
: type(TYPE_VIDEO), stream(s), format(x) {}
explicit MediaFormat(char x, long s = -1)
: type(TYPE_SUBTITLE), stream(s), format(x) {}
explicit MediaFormat(double x, long s = -1)
: type(TYPE_CC), stream(s), format(x) {}
static MediaFormat makeMediaFormat(AudioFormat format, long stream) {
MediaFormat result(stream);
result.format.audio = format;
return result;
}
static MediaFormat makeMediaFormat(VideoFormat format, long stream) {
MediaFormat result(0, stream);
result.format.video = format;
return result;
}
static MediaFormat makeMediaFormat(SubtitleFormat format, long stream) {
MediaFormat result('0', stream);
result.format.subtitle = format;
return result;
}
// format type
MediaType type;
// stream index:
// set -1 for one stream auto detection, -2 for all streams auto detection,
// >= 0, specified stream, if caller knows the stream index (unlikely)
long stream;
// union keeps one of the possible formats, defined by MediaType
FormatUnion format;
};
struct DecoderParameters {
// local file, remote file, http url, rtmp stream uri, etc. anything that
// ffmpeg can recognize
std::string uri;
// timeout on getting bytes for decoding
size_t timeoutMs{1000};
// logging level, default AV_LOG_PANIC
long logLevel{0};
// when decoder would give up, 0 means never
size_t maxPackageErrors{0};
// max allowed consecutive times no bytes are processed. 0 means for infinite.
size_t maxProcessNoBytes{0};
// start offset (us)
long startOffset{0};
// end offset (us)
long endOffset{-1};
// logging id
int64_t loggingUuid{0};
// internal max seekable buffer size
size_t maxSeekableBytes{0};
// adjust header pts to the epoch time
bool convertPtsToWallTime{false};
// indicate if input stream is an encoded image
bool isImage{false};
// listen and wait for new rtmp stream
bool listen{false};
// don't copy frame body, only header
bool headerOnly{false};
// interrupt init method on timeout
bool preventStaleness{true};
// seek tolerated accuracy (us)
double seekAccuracy{1000000.0};
// what media types should be processed, default none
std::set<MediaFormat> formats;
// can be used for asynchronous decoders
size_t cacheSize{8192}; // mow many bytes to cache before stop reading bytes
size_t cacheTimeoutMs{1000}; // timeout on bytes writing
bool enforceCacheSize{false}; // drop output frames if cache is full
bool mergeAudioMessages{false}; // combine collocated audio messages together
};
struct DecoderHeader {
// message id, from 0 till ...
size_t seqno{0};
// decoded timestamp in microseconds from either beginning of the stream or
// from epoch time, see DecoderParameters::convertPtsToWallTime
long pts{0};
// decoded key frame
size_t keyFrame{0};
// frames per second, valid only for video streams
double fps{0};
// format specifies what kind frame is in a payload
MediaFormat format;
};
// Abstract interface ByteStorage class
class ByteStorage {
public:
virtual ~ByteStorage() = default;
// makes sure that buffer has at least n bytes available for writing, if not
// storage must reallocate memory.
virtual void ensure(size_t n) = 0;
// caller must not to write more than available bytes
virtual uint8_t* writableTail() = 0;
// caller confirms that n bytes were written to the writable tail
virtual void append(size_t n) = 0;
// caller confirms that n bytes were read from the read buffer
virtual void trim(size_t n) = 0;
// gives an access to the beginning of the read buffer
virtual const uint8_t* data() const = 0;
// returns the stored size in bytes
virtual size_t length() const = 0;
// returns available capacity for writable tail
virtual size_t tail() const = 0;
// clears content, keeps capacity
virtual void clear() = 0;
};
struct DecoderOutputMessage {
DecoderHeader header;
std::unique_ptr<ByteStorage> payload;
};
/*
* External provider of the ecnoded bytes, specific implementation is left for
* different use cases, like file, memory, external network end-points, etc.
* Normally input/output parameter @out set to valid, not null buffer pointer,
* which indicates "read" call, however there are "seek" modes as well.
* @out != nullptr => read from the current offset, @whence got ignored,
* @size bytes to read => return number bytes got read, 0 if no more bytes
* available, < 0 on error.
* @out == nullptr, @timeoutMs == 0 => does provider support "seek"
* capability in a first place? @size & @whence got ignored, return 0 on
* success, < 0 if "seek" mode is not supported.
* @out == nullptr, @timeoutMs != 0 => normal seek call
* offset == @size, i.e. @whence = [SEEK_SET, SEEK_CUR, SEEK_END, AVSEEK_SIZE)
* return < 0 on error, position if @whence = [SEEK_SET, SEEK_CUR, SEEK_END],
* length of buffer if @whence = [AVSEEK_SIZE].
*/
using DecoderInCallback =
std::function<int(uint8_t* out, int size, int whence, uint64_t timeoutMs)>;
using DecoderOutCallback = std::function<void(DecoderOutputMessage&&)>;
struct DecoderMetadata {
// time base numerator
long num{0};
// time base denominator
long den{1};
// duration of the stream, in miscroseconds, if available
long duration{-1};
// frames per second, valid only for video streams
double fps{0};
// format specifies what kind frame is in a payload
MediaFormat format;
};
/**
* Abstract class for decoding media bytes
* It has two diffrent modes. Internal media bytes retrieval for given uri and
* external media bytes provider in case of memory streams
*/
class MediaDecoder {
public:
virtual ~MediaDecoder() = default;
/**
* Initializes media decoder with parameters,
* calls callback when media bytes are available.
* Media bytes get fetched internally from provided URI
* or invokes provided input callback to get media bytes.
* Input callback must be empty for the internal media provider
* Caller can provide non-null pointer for the input container
* if headers to obtain the streams metadata (optional)
*/
virtual bool init(
const DecoderParameters& params,
DecoderInCallback&& in,
std::vector<DecoderMetadata>* metadata) = 0;
/**
* Polls available decoded one frame from decoder
* Returns error code, 0 - for success
*/
virtual int decode(DecoderOutputMessage* out, uint64_t timeoutMs) = 0;
/**
* Polls available decoded bytes from decoder, till EOF or error
*/
virtual int decode_all(const DecoderOutCallback& callback) = 0;
/**
* Stops calling callback, releases resources
*/
virtual void shutdown() = 0;
/**
* Interrupts whatever decoder is doing at any time
*/
virtual void interrupt() = 0;
/**
* Factory to create ByteStorage class instances, particular implementation is
* left to the derived class. Caller provides the initially allocated size
*/
virtual std::unique_ptr<ByteStorage> createByteStorage(size_t n) = 0;
};
struct SamplerParameters {
MediaType type{TYPE_AUDIO};
FormatUnion in;
FormatUnion out;
int64_t loggingUuid{0};
};
/**
* Abstract class for sampling media bytes
*/
class MediaSampler {
public:
virtual ~MediaSampler() = default;
/**
* Initializes media sampler with parameters
*/
virtual bool init(const SamplerParameters& params) = 0;
/**
* Samples media bytes
* Returns error code < 0, or >=0 - for success, indicating number of bytes
* processed.
* set @in to null for flushing data
*/
virtual int sample(const ByteStorage* in, ByteStorage* out) = 0;
/**
* Releases resources
*/
virtual void shutdown() = 0;
/*
* Returns media type
*/
MediaType getMediaType() const {
return params_.type;
}
/*
* Returns formats
*/
FormatUnion getInputFormat() const {
return params_.in;
}
FormatUnion getOutFormat() const {
return params_.out;
}
protected:
SamplerParameters params_;
};
} // namespace ffmpeg
#include "memory_buffer.h"
#include <c10/util/Logging.h>
namespace ffmpeg {
MemoryBuffer::MemoryBuffer(const uint8_t* buffer, size_t size)
: buffer_(buffer), len_(size) {}
int MemoryBuffer::read(uint8_t* buf, int size) {
if (pos_ < len_) {
auto available = std::min(int(len_ - pos_), size);
memcpy(buf, buffer_ + pos_, available);
pos_ += available;
return available;
}
return 0;
}
int64_t MemoryBuffer::seek(int64_t offset, int whence) {
if (whence & AVSEEK_SIZE) {
return len_;
}
// remove force flag
whence &= ~AVSEEK_FORCE;
switch (whence) {
case SEEK_SET:
if (offset >= 0 && offset <= len_) {
pos_ = offset;
}
break;
case SEEK_END:
if (len_ + offset >= 0 && len_ + offset <= len_) {
pos_ = len_ + offset;
}
break;
case SEEK_CUR:
if (pos_ + offset > 0 && pos_ + offset <= len_) {
pos_ += offset;
}
break;
default:
LOG(ERROR) << "Unknown whence flag gets provided: " << whence;
}
return pos_;
}
/* static */
DecoderInCallback MemoryBuffer::getCallback(
const uint8_t* buffer,
size_t size) {
MemoryBuffer object(buffer, size);
return
[object](uint8_t* out, int size, int whence, uint64_t timeoutMs) mutable
-> int {
if (out) { // see defs.h file
// read mode
return object.read(out, size);
}
// seek mode
if (!timeoutMs) {
// seek capabilty, yes - supported
return 0;
}
return object.seek(size, whence);
};
}
} // namespace ffmpeg
#pragma once
#include "defs.h"
namespace ffmpeg {
/**
* Class uses external memory buffer and implements a seekable interface.
*/
class MemoryBuffer {
public:
explicit MemoryBuffer(const uint8_t* buffer, size_t size);
int64_t seek(int64_t offset, int whence);
int read(uint8_t* buf, int size);
// static constructor for decoder callback.
static DecoderInCallback getCallback(const uint8_t* buffer, size_t size);
private:
const uint8_t* buffer_; // set at construction time
long pos_{0}; // current position
long len_{0}; // bytes in buffer
};
} // namespace ffmpeg
#include "seekable_buffer.h"
#include <c10/util/Logging.h>
#include <chrono>
#include "memory_buffer.h"
namespace ffmpeg {
int SeekableBuffer::init(
DecoderInCallback&& in,
uint64_t timeoutMs,
size_t maxSeekableBytes,
ImageType* type) {
shutdown();
isSeekable_ = in(nullptr, 0, 0, 0) == 0;
if (isSeekable_) { // seekable
if (type) {
if (!readBytes(in, 8, timeoutMs)) {
return -1;
}
setImageType(type);
end_ = 0;
eof_ = false;
std::vector<uint8_t>().swap(buffer_);
// reset callback
if (in(nullptr, 0, SEEK_SET, timeoutMs)) {
return -1;
}
}
inCallback_ = std::forward<DecoderInCallback>(in);
return 1;
}
if (!readBytes(in, maxSeekableBytes + (type ? 8 : 0), timeoutMs)) {
return -1;
}
if (type) {
setImageType(type);
}
if (eof_) {
end_ = 0;
eof_ = false;
// reuse MemoryBuffer functionality
inCallback_ = MemoryBuffer::getCallback(buffer_.data(), buffer_.size());
isSeekable_ = true;
return 1;
}
inCallback_ = std::forward<DecoderInCallback>(in);
return 0;
}
bool SeekableBuffer::readBytes(
DecoderInCallback& in,
size_t maxBytes,
uint64_t timeoutMs) {
// Resize to th minimum 4K page or less
buffer_.resize(std::min(maxBytes, 4 * 1024UL));
end_ = 0;
eof_ = false;
auto end =
std::chrono::steady_clock::now() + std::chrono::milliseconds(timeoutMs);
auto watcher = [end]() -> bool {
return std::chrono::steady_clock::now() <= end;
};
bool hasTime = true;
while (!eof_ && end_ < maxBytes && (hasTime = watcher())) {
// lets read all bytes into available buffer
auto res = in(buffer_.data() + end_, buffer_.size() - end_, 0, timeoutMs);
if (res > 0) {
end_ += res;
if (end_ == buffer_.size()) {
buffer_.resize(std::min(end_ * 4UL, maxBytes));
}
} else if (res == 0) {
eof_ = true;
} else {
// error
return false;
}
}
buffer_.resize(end_);
return hasTime;
}
void SeekableBuffer::setImageType(ImageType* type) {
if (buffer_.size() > 2 && buffer_[0] == 0xFF && buffer_[1] == 0xD8 &&
buffer_[2] == 0xFF) {
*type = ImageType::JPEG;
} else if (
buffer_.size() > 3 && buffer_[1] == 'P' && buffer_[2] == 'N' &&
buffer_[3] == 'G') {
*type = ImageType::PNG;
} else if (
buffer_.size() > 1 &&
((buffer_[0] == 0x49 && buffer_[1] == 0x49) ||
(buffer_[0] == 0x4D && buffer_[1] == 0x4D))) {
*type = ImageType::TIFF;
} else {
*type = ImageType::UNKNOWN;
}
}
int SeekableBuffer::read(uint8_t* buf, int size, uint64_t timeoutMs) {
if (isSeekable_) {
return inCallback_(buf, size, 0, timeoutMs);
}
if (pos_ < end_) {
// read cached bytes for non-seekable callback
auto available = std::min(int(end_ - pos_), size);
memcpy(buf, buffer_.data() + pos_, available);
pos_ += available;
return available;
} else if (!eof_) {
// normal sequential read (see defs.h file), i.e. @buf != null
auto res = inCallback_(buf, size, 0, timeoutMs); // read through
eof_ = res == 0;
return res;
} else {
return 0;
}
}
int64_t SeekableBuffer::seek(int64_t offset, int whence, uint64_t timeoutMs) {
return inCallback_(nullptr, offset, whence, timeoutMs);
}
void SeekableBuffer::shutdown() {
pos_ = end_ = 0;
eof_ = false;
std::vector<uint8_t>().swap(buffer_);
inCallback_ = nullptr;
}
} // namespace ffmpeg
#pragma once
#include "defs.h"
namespace ffmpeg {
/**
* Class uses internal buffer to store initial size bytes as a seekable cache
* from Media provider and let ffmpeg to seek and read bytes from cache
* and beyond - reading bytes directly from Media provider
*/
enum class ImageType {
UNKNOWN = 0,
JPEG = 1,
PNG = 2,
TIFF = 3,
};
class SeekableBuffer {
public:
// @type is optional, not nullptr only is image detection required
// \returns 1 is buffer seekable, 0 - if not seekable, < 0 on error
int init(
DecoderInCallback&& in,
uint64_t timeoutMs,
size_t maxSeekableBytes,
ImageType* type);
int read(uint8_t* buf, int size, uint64_t timeoutMs);
int64_t seek(int64_t offset, int whence, uint64_t timeoutMs);
void shutdown();
private:
bool readBytes(DecoderInCallback& in, size_t maxBytes, uint64_t timeoutMs);
void setImageType(ImageType* type);
private:
DecoderInCallback inCallback_;
std::vector<uint8_t> buffer_; // resized at init time
long pos_{0}; // current position (SEEK_CUR iff pos_ < end_)
long end_{0}; // current buffer size
bool eof_{0}; // indicates the EOF
bool isSeekable_{false}; // is callback seekable
};
} // namespace ffmpeg
#include "stream.h"
#include <c10/util/Logging.h>
#include "util.h"
namespace ffmpeg {
Stream::Stream(
AVFormatContext* inputCtx,
MediaFormat format,
bool convertPtsToWallTime,
int64_t loggingUuid)
: inputCtx_(inputCtx),
format_(format),
convertPtsToWallTime_(convertPtsToWallTime),
loggingUuid_(loggingUuid) {}
Stream::~Stream() {
if (frame_) {
av_free(frame_); // Copyright 2004-present Facebook. All Rights Reserved.
}
if (codecCtx_) {
avcodec_free_context(&codecCtx_);
}
}
AVCodec* Stream::findCodec(AVCodecParameters* params) {
return avcodec_find_decoder(params->codec_id);
}
int Stream::openCodec(std::vector<DecoderMetadata>* metadata) {
AVStream* steam = inputCtx_->streams[format_.stream];
AVCodec* codec = findCodec(steam->codecpar);
if (!codec) {
LOG(ERROR) << "LoggingUuid #" << loggingUuid_
<< ", avcodec_find_decoder failed for codec_id: "
<< int(steam->codecpar->codec_id);
return AVERROR(EINVAL);
}
if (!(codecCtx_ = avcodec_alloc_context3(codec))) {
LOG(ERROR) << "LoggingUuid #" << loggingUuid_
<< ", avcodec_alloc_context3 failed";
return AVERROR(ENOMEM);
}
int ret;
// Copy codec parameters from input stream to output codec context
if ((ret = avcodec_parameters_to_context(codecCtx_, steam->codecpar)) < 0) {
LOG(ERROR) << "LoggingUuid #" << loggingUuid_
<< ", avcodec_parameters_to_context failed";
return ret;
}
// after avcodec_open2, value of codecCtx_->time_base is NOT meaningful
if ((ret = avcodec_open2(codecCtx_, codec, nullptr)) < 0) {
LOG(ERROR) << "LoggingUuid #" << loggingUuid_
<< ", avcodec_open2 failed: " << Util::generateErrorDesc(ret);
avcodec_free_context(&codecCtx_);
codecCtx_ = nullptr;
return ret;
}
frame_ = av_frame_alloc();
switch (format_.type) {
case TYPE_VIDEO:
fps_ = av_q2d(av_guess_frame_rate(inputCtx_, steam, nullptr));
break;
case TYPE_AUDIO:
fps_ = codecCtx_->sample_rate;
break;
default:
fps_ = 30.0;
}
if ((ret = initFormat())) {
LOG(ERROR) << "initFormat failed, type: " << format_.type;
}
if (metadata) {
DecoderMetadata header;
header.format = format_;
header.fps = fps_;
header.num = steam->time_base.num;
header.den = steam->time_base.den;
header.duration =
av_rescale_q(steam->duration, steam->time_base, AV_TIME_BASE_Q);
metadata->push_back(header);
}
return ret;
}
int Stream::analyzePacket(const AVPacket* packet, bool* gotFrame) {
int consumed = 0;
int result = avcodec_send_packet(codecCtx_, packet);
if (result == AVERROR(EAGAIN)) {
*gotFrame = false; // no bytes get consumed, fetch frame
} else if (result == AVERROR_EOF) {
*gotFrame = false; // more than one flush packet
if (packet) {
// got packet after flush, this is an error
return result;
}
} else if (result < 0) {
LOG(ERROR) << "avcodec_send_packet failed, err: "
<< Util::generateErrorDesc(result);
return result; // error
} else {
consumed = packet ? packet->size : 0; // all bytes get consumed
}
result = avcodec_receive_frame(codecCtx_, frame_);
if (result >= 0) {
*gotFrame = true; // frame is available
} else if (result == AVERROR(EAGAIN)) {
*gotFrame = false; // no frames at this time, needs more packets
if (!consumed) {
// precaution, if no packages got consumed and no frames are available
return result;
}
} else if (result == AVERROR_EOF) {
*gotFrame = false; // the last frame has been flushed
// precaution, if no more frames are available assume we consume all bytes
consumed = 0;
} else { // error
LOG(ERROR) << "avcodec_receive_frame failed, err: "
<< Util::generateErrorDesc(result);
return result;
}
return consumed;
}
int Stream::decodePacket(
const AVPacket* packet,
DecoderOutputMessage* out,
bool headerOnly,
bool* hasMsg) {
int consumed;
bool gotFrame = false;
*hasMsg = false;
if ((consumed = analyzePacket(packet, &gotFrame)) >= 0 &&
(packet == nullptr || gotFrame)) {
int result;
if ((result = getMessage(out, !gotFrame, headerOnly)) < 0) {
return result; // report error
}
*hasMsg = result > 0;
}
return consumed;
}
int Stream::flush(DecoderOutputMessage* out, bool headerOnly) {
bool hasMsg = false;
int result = decodePacket(nullptr, out, headerOnly, &hasMsg);
if (result < 0) {
avcodec_flush_buffers(codecCtx_);
return result;
}
if (!hasMsg) {
avcodec_flush_buffers(codecCtx_);
return 0;
}
return 1;
}
int Stream::getMessage(DecoderOutputMessage* out, bool flush, bool headerOnly) {
if (flush) {
// only flush of audio frames makes sense
if (format_.type == TYPE_AUDIO) {
int processed = 0;
size_t total = 0;
// grab all audio bytes by chunks
do {
if ((processed = copyFrameBytes(out->payload.get(), flush)) < 0) {
return processed;
}
total += processed;
} while (processed);
if (total) {
// set header if message bytes are available
setHeader(&out->header, flush);
return 1;
}
}
return 0;
} else {
if (format_.type == TYPE_AUDIO) {
int processed = 0;
if ((processed = copyFrameBytes(out->payload.get(), flush)) < 0) {
return processed;
}
if (processed) {
// set header if message bytes are available
setHeader(&out->header, flush);
return 1;
}
return 0;
} else {
// set header
setHeader(&out->header, flush);
if (headerOnly) {
// Only header is requisted
return 1;
}
return copyFrameBytes(out->payload.get(), flush);
}
}
}
void Stream::setHeader(DecoderHeader* header, bool flush) {
header->seqno = numGenerator_++;
setFramePts(header, flush);
if (convertPtsToWallTime_) {
keeper_.adjust(header->pts);
}
header->format = format_;
header->keyFrame = 0;
header->fps = std::numeric_limits<double>::quiet_NaN();
}
void Stream::setFramePts(DecoderHeader* header, bool flush) {
if (flush) {
header->pts = nextPts_; // already in us
} else {
header->pts = av_frame_get_best_effort_timestamp(frame_);
if (header->pts == AV_NOPTS_VALUE) {
header->pts = nextPts_;
} else {
header->pts = av_rescale_q(
header->pts,
inputCtx_->streams[format_.stream]->time_base,
AV_TIME_BASE_Q);
}
switch (format_.type) {
case TYPE_AUDIO:
nextPts_ = header->pts + frame_->nb_samples * AV_TIME_BASE / fps_;
break;
case TYPE_VIDEO:
nextPts_ = header->pts + AV_TIME_BASE / fps_;
break;
default:
nextPts_ = header->pts;
}
}
}
} // namespace ffmpeg
#pragma once
#include <atomic>
#include "defs.h"
#include "time_keeper.h"
namespace ffmpeg {
/**
* Class uses FFMPEG library to decode one media stream (audio or video).
*/
class Stream {
public:
Stream(
AVFormatContext* inputCtx,
MediaFormat format,
bool convertPtsToWallTime,
int64_t loggingUuid);
virtual ~Stream();
// returns 0 - on success or negative error
int openCodec(std::vector<DecoderMetadata>* metadata);
// returns 1 - if packet got consumed, 0 - if it's not, and < 0 on error
int decodePacket(
const AVPacket* packet,
DecoderOutputMessage* out,
bool headerOnly,
bool* hasMsg);
// returns stream index
int getIndex() const {
return format_.stream;
}
// returns 1 - if message got a payload, 0 - if it's not, and < 0 on error
int flush(DecoderOutputMessage* out, bool headerOnly);
// return media format
MediaFormat getMediaFormat() const {
return format_;
}
protected:
virtual int initFormat() = 0;
// returns number processed bytes from packet, or negative error
virtual int analyzePacket(const AVPacket* packet, bool* gotFrame);
// returns number processed bytes from packet, or negative error
virtual int copyFrameBytes(ByteStorage* out, bool flush) = 0;
// sets output format
virtual void setHeader(DecoderHeader* header, bool flush);
// set frame pts
virtual void setFramePts(DecoderHeader* header, bool flush);
// finds codec
virtual AVCodec* findCodec(AVCodecParameters* params);
private:
// returns 1 - if message got a payload, 0 - if it's not, and < 0 on error
int getMessage(DecoderOutputMessage* out, bool flush, bool headerOnly);
protected:
AVFormatContext* const inputCtx_;
MediaFormat format_;
const bool convertPtsToWallTime_;
int64_t loggingUuid_;
AVCodecContext* codecCtx_{nullptr};
AVFrame* frame_{nullptr};
std::atomic<size_t> numGenerator_{0};
TimeKeeper keeper_;
// estimated next frame pts for flushing the last frame
int64_t nextPts_{0};
double fps_{30.};
};
} // namespace ffmpeg
#include "subtitle_sampler.h"
#include <c10/util/Logging.h>
#include "util.h"
namespace ffmpeg {
SubtitleSampler::~SubtitleSampler() {
cleanUp();
}
void SubtitleSampler::shutdown() {
cleanUp();
}
bool SubtitleSampler::init(const SamplerParameters& params) {
cleanUp();
// set formats
params_ = params;
return true;
}
int SubtitleSampler::sample(AVSubtitle* sub, ByteStorage* out) {
if (!sub || !out) {
return 0; // flush
}
out->ensure(Util::size(*sub));
return Util::serialize(*sub, out);
}
int SubtitleSampler::sample(const ByteStorage* in, ByteStorage* out) {
if (in && out) {
// Get a writable copy
if (size_t len = in->length()) {
out->ensure(len);
memcpy(out->writableTail(), in->data(), len);
}
return out->length();
}
return 0;
}
void SubtitleSampler::cleanUp() {}
} // namespace ffmpeg
#pragma once
#include "defs.h"
namespace ffmpeg {
/**
* Class transcode audio frames from one format into another
*/
class SubtitleSampler : public MediaSampler {
public:
SubtitleSampler() = default;
~SubtitleSampler() override;
bool init(const SamplerParameters& params) override;
int sample(const ByteStorage* in, ByteStorage* out) override;
void shutdown() override;
// returns number processed/scaling bytes
int sample(AVSubtitle* sub, ByteStorage* out);
// helper serialization/deserialization methods
static void serialize(const AVSubtitle& sub, ByteStorage* out);
static bool deserialize(const ByteStorage& buf, AVSubtitle* sub);
private:
// close resources
void cleanUp();
};
} // namespace ffmpeg
#include "subtitle_stream.h"
#include <c10/util/Logging.h>
#include <limits>
#include "util.h"
namespace ffmpeg {
SubtitleStream::SubtitleStream(
AVFormatContext* inputCtx,
int index,
bool convertPtsToWallTime,
const SubtitleFormat& format)
: Stream(
inputCtx,
MediaFormat::makeMediaFormat(format, index),
convertPtsToWallTime,
0) {
memset(&sub_, 0, sizeof(sub_));
}
void SubtitleStream::releaseSubtitle() {
if (sub_.release) {
avsubtitle_free(&sub_);
memset(&sub_, 0, sizeof(sub_));
}
}
SubtitleStream::~SubtitleStream() {
releaseSubtitle();
sampler_.shutdown();
}
int SubtitleStream::initFormat() {
if (!codecCtx_->subtitle_header) {
LOG(ERROR) << "No subtitle header found";
} else {
VLOG(1) << "Subtitle header found!";
}
return 0;
}
int SubtitleStream::analyzePacket(const AVPacket* packet, bool* gotFrame) {
// clean-up
releaseSubtitle();
// check flush packet
AVPacket avPacket;
av_init_packet(&avPacket);
avPacket.data = nullptr;
avPacket.size = 0;
auto pkt = packet ? *packet : avPacket;
int gotFramePtr = 0;
int result = avcodec_decode_subtitle2(codecCtx_, &sub_, &gotFramePtr, &pkt);
if (result < 0) {
LOG(ERROR) << "avcodec_decode_subtitle2 failed, err: "
<< Util::generateErrorDesc(result);
return result;
} else if (result == 0) {
result = pkt.size; // discard the rest of the package
}
sub_.release = gotFramePtr;
*gotFrame = gotFramePtr > 0;
// set proper pts in us
if (gotFramePtr) {
sub_.pts = av_rescale_q(
pkt.pts, inputCtx_->streams[format_.stream]->time_base, AV_TIME_BASE_Q);
}
return result;
}
int SubtitleStream::copyFrameBytes(ByteStorage* out, bool flush) {
return sampler_.sample(flush ? nullptr : &sub_, out);
}
void SubtitleStream::setFramePts(DecoderHeader* header, bool) {
header->pts = sub_.pts; // already in us
}
} // namespace ffmpeg
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment