Unverified Commit 626a450a authored by Bruno Korbar's avatar Bruno Korbar Committed by GitHub
Browse files

[VideoReader] FFMPEG Multithreading, attempt 3 (#4474)



* initial commit

* initial commit

* flake8

* last nits
Co-authored-by: default avatarPrabhat Roy <prabhatroy@fb.com>
parent c88423b0
...@@ -447,7 +447,7 @@ bool Decoder::openStreams(std::vector<DecoderMetadata>* metadata) { ...@@ -447,7 +447,7 @@ bool Decoder::openStreams(std::vector<DecoderMetadata>* metadata) {
it->format, it->format,
params_.loggingUuid); params_.loggingUuid);
CHECK(stream); CHECK(stream);
if (stream->openCodec(metadata) < 0) { if (stream->openCodec(metadata, params_.numThreads) < 0) {
LOG(ERROR) << "uuid=" << params_.loggingUuid LOG(ERROR) << "uuid=" << params_.loggingUuid
<< " open codec failed, stream_idx=" << i; << " open codec failed, stream_idx=" << i;
return false; return false;
......
...@@ -194,6 +194,9 @@ struct DecoderParameters { ...@@ -194,6 +194,9 @@ struct DecoderParameters {
bool preventStaleness{true}; bool preventStaleness{true};
// seek tolerated accuracy (us) // seek tolerated accuracy (us)
double seekAccuracy{1000000.0}; double seekAccuracy{1000000.0};
// Allow multithreaded decoding for numThreads > 1;
// 0 numThreads=0 sets up sensible defaults
int numThreads{1};
// what media types should be processed, default none // what media types should be processed, default none
std::set<MediaFormat> formats; std::set<MediaFormat> formats;
......
#include "stream.h" #include "stream.h"
#include <c10/util/Logging.h> #include <c10/util/Logging.h>
#include <stdio.h>
#include <string.h>
#include "util.h" #include "util.h"
namespace ffmpeg { namespace ffmpeg {
...@@ -33,7 +35,7 @@ AVCodec* Stream::findCodec(AVCodecParameters* params) { ...@@ -33,7 +35,7 @@ AVCodec* Stream::findCodec(AVCodecParameters* params) {
// decode/encode process. Then fill this codec context with CODEC parameters // decode/encode process. Then fill this codec context with CODEC parameters
// defined in stream parameters. Open the codec, and allocate the global frame // defined in stream parameters. Open the codec, and allocate the global frame
// defined in the header file // defined in the header file
int Stream::openCodec(std::vector<DecoderMetadata>* metadata) { int Stream::openCodec(std::vector<DecoderMetadata>* metadata, int num_threads) {
AVStream* steam = inputCtx_->streams[format_.stream]; AVStream* steam = inputCtx_->streams[format_.stream];
AVCodec* codec = findCodec(steam->codecpar); AVCodec* codec = findCodec(steam->codecpar);
...@@ -49,6 +51,37 @@ int Stream::openCodec(std::vector<DecoderMetadata>* metadata) { ...@@ -49,6 +51,37 @@ int Stream::openCodec(std::vector<DecoderMetadata>* metadata) {
<< ", avcodec_alloc_context3 failed"; << ", avcodec_alloc_context3 failed";
return AVERROR(ENOMEM); return AVERROR(ENOMEM);
} }
// multithreading heuristics
// if user defined,
if (num_threads > max_threads) {
num_threads = max_threads;
}
if (num_threads > 0) {
// if user defined, respect that
// note that default thread_type will be used
codecCtx_->thread_count = num_threads;
} else {
// otherwise set sensible defaults
// with the special case for the different MPEG4 codecs
// that don't have threading context functions
if (codecCtx_->codec->capabilities & AV_CODEC_CAP_INTRA_ONLY) {
codecCtx_->thread_type = FF_THREAD_FRAME;
codecCtx_->thread_count = 2;
} else {
codecCtx_->thread_count = 8;
codecCtx_->thread_type = FF_THREAD_SLICE;
}
}
// print codec type and number of threads
LOG(INFO) << "Codec " << codecCtx_->codec->long_name
<< " Codec id: " << codecCtx_->codec_id
<< " Codec tag: " << codecCtx_->codec_tag
<< " Codec type: " << codecCtx_->codec_type
<< " Codec extradata: " << codecCtx_->extradata
<< " Number of threads: " << codecCtx_->thread_count
<< " Thread type: " << codecCtx_->thread_type;
int ret; int ret;
// Copy codec parameters from input stream to output codec context // Copy codec parameters from input stream to output codec context
......
...@@ -20,7 +20,9 @@ class Stream { ...@@ -20,7 +20,9 @@ class Stream {
virtual ~Stream(); virtual ~Stream();
// returns 0 - on success or negative error // returns 0 - on success or negative error
int openCodec(std::vector<DecoderMetadata>* metadata); // num_threads sets up the codec context for multithreading if needed
// default is set to single thread in order to not break BC
int openCodec(std::vector<DecoderMetadata>* metadata, int num_threads = 1);
// returns 1 - if packet got consumed, 0 - if it's not, and < 0 on error // returns 1 - if packet got consumed, 0 - if it's not, and < 0 on error
int decodePacket( int decodePacket(
const AVPacket* packet, const AVPacket* packet,
...@@ -69,6 +71,10 @@ class Stream { ...@@ -69,6 +71,10 @@ class Stream {
// estimated next frame pts for flushing the last frame // estimated next frame pts for flushing the last frame
int64_t nextPts_{0}; int64_t nextPts_{0};
double fps_{30.}; double fps_{30.};
// this is a dumb conservative limit; ideally we'd use
// int max_threads = at::get_num_threads(); but this would cause
// fb sync to fail as it would add dependency to ATen to the decoder API
const int max_threads = 12;
}; };
} // namespace ffmpeg } // namespace ffmpeg
...@@ -99,6 +99,7 @@ void Video::_getDecoderParams( ...@@ -99,6 +99,7 @@ void Video::_getDecoderParams(
std::string stream, std::string stream,
long stream_id = -1, long stream_id = -1,
bool all_streams = false, bool all_streams = false,
int64_t num_threads = 1,
double seekFrameMarginUs = 10) { double seekFrameMarginUs = 10) {
int64_t videoStartUs = int64_t(videoStartS * 1e6); int64_t videoStartUs = int64_t(videoStartS * 1e6);
...@@ -106,6 +107,7 @@ void Video::_getDecoderParams( ...@@ -106,6 +107,7 @@ void Video::_getDecoderParams(
params.startOffset = videoStartUs; params.startOffset = videoStartUs;
params.seekAccuracy = seekFrameMarginUs; params.seekAccuracy = seekFrameMarginUs;
params.headerOnly = false; params.headerOnly = false;
params.numThreads = num_threads;
params.preventStaleness = false; // not sure what this is about params.preventStaleness = false; // not sure what this is about
...@@ -152,7 +154,9 @@ void Video::_getDecoderParams( ...@@ -152,7 +154,9 @@ void Video::_getDecoderParams(
} // _get decoder params } // _get decoder params
Video::Video(std::string videoPath, std::string stream) { Video::Video(std::string videoPath, std::string stream, int64_t numThreads) {
// set number of threads global
numThreads_ = numThreads;
// parse stream information // parse stream information
current_stream = _parseStream(stream); current_stream = _parseStream(stream);
// note that in the initial call we want to get all streams // note that in the initial call we want to get all streams
...@@ -161,7 +165,8 @@ Video::Video(std::string videoPath, std::string stream) { ...@@ -161,7 +165,8 @@ Video::Video(std::string videoPath, std::string stream) {
0, // headerOnly 0, // headerOnly
std::get<0>(current_stream), // stream info - remove that std::get<0>(current_stream), // stream info - remove that
long(-1), // stream_id parsed from info above change to -2 long(-1), // stream_id parsed from info above change to -2
true // read all streams true, // read all streams
numThreads_ // global number of Threads for decoding
); );
std::string logMessage, logType; std::string logMessage, logType;
...@@ -241,7 +246,8 @@ bool Video::setCurrentStream(std::string stream = "video") { ...@@ -241,7 +246,8 @@ bool Video::setCurrentStream(std::string stream = "video") {
std::get<0>(current_stream), // stream std::get<0>(current_stream), // stream
long(std::get<1>( long(std::get<1>(
current_stream)), // stream_id parsed from info above change to -2 current_stream)), // stream_id parsed from info above change to -2
false // read all streams false, // read all streams
numThreads_ // global number of threads
); );
// calback and metadata defined in Video.h // calback and metadata defined in Video.h
...@@ -265,7 +271,8 @@ void Video::Seek(double ts) { ...@@ -265,7 +271,8 @@ void Video::Seek(double ts) {
std::get<0>(current_stream), // stream std::get<0>(current_stream), // stream
long(std::get<1>( long(std::get<1>(
current_stream)), // stream_id parsed from info above change to -2 current_stream)), // stream_id parsed from info above change to -2
false // read all streams false, // read all streams
numThreads_ // global number of threads
); );
// calback and metadata defined in Video.h // calback and metadata defined in Video.h
...@@ -331,7 +338,7 @@ std::tuple<torch::Tensor, double> Video::Next() { ...@@ -331,7 +338,7 @@ std::tuple<torch::Tensor, double> Video::Next() {
static auto registerVideo = static auto registerVideo =
torch::class_<Video>("torchvision", "Video") torch::class_<Video>("torchvision", "Video")
.def(torch::init<std::string, std::string>()) .def(torch::init<std::string, std::string, int64_t>())
.def("get_current_stream", &Video::getCurrentStream) .def("get_current_stream", &Video::getCurrentStream)
.def("set_current_stream", &Video::setCurrentStream) .def("set_current_stream", &Video::setCurrentStream)
.def("get_metadata", &Video::getStreamMetadata) .def("get_metadata", &Video::getStreamMetadata)
......
...@@ -16,9 +16,10 @@ struct Video : torch::CustomClassHolder { ...@@ -16,9 +16,10 @@ struct Video : torch::CustomClassHolder {
// global video metadata // global video metadata
c10::Dict<std::string, c10::Dict<std::string, std::vector<double>>> c10::Dict<std::string, c10::Dict<std::string, std::vector<double>>>
streamsMetadata; streamsMetadata;
int64_t numThreads_{0};
public: public:
Video(std::string videoPath, std::string stream); Video(std::string videoPath, std::string stream, int64_t numThreads);
std::tuple<std::string, int64_t> getCurrentStream() const; std::tuple<std::string, int64_t> getCurrentStream() const;
c10::Dict<std::string, c10::Dict<std::string, std::vector<double>>> c10::Dict<std::string, c10::Dict<std::string, std::vector<double>>>
getStreamMetadata() const; getStreamMetadata() const;
...@@ -39,6 +40,7 @@ struct Video : torch::CustomClassHolder { ...@@ -39,6 +40,7 @@ struct Video : torch::CustomClassHolder {
std::string stream, std::string stream,
long stream_id, long stream_id,
bool all_streams, bool all_streams,
int64_t num_threads,
double seekFrameMarginUs); // this needs to be improved double seekFrameMarginUs); // this needs to be improved
std::map<std::string, std::vector<double>> streamTimeBase; // not used std::map<std::string, std::vector<double>> streamTimeBase; // not used
......
...@@ -99,9 +99,13 @@ class VideoReader: ...@@ -99,9 +99,13 @@ class VideoReader:
stream (string, optional): descriptor of the required stream, followed by the stream id, stream (string, optional): descriptor of the required stream, followed by the stream id,
in the format ``{stream_type}:{stream_id}``. Defaults to ``"video:0"``. in the format ``{stream_type}:{stream_id}``. Defaults to ``"video:0"``.
Currently available options include ``['video', 'audio']`` Currently available options include ``['video', 'audio']``
num_threads (int, optional): number of threads used by the codec to decode video.
Default value (0) enables multithreading with codec-dependent heuristic. The performance
will depend on the version of FFMPEG codecs supported.
""" """
def __init__(self, path: str, stream: str = "video") -> None: def __init__(self, path: str, stream: str = "video", num_threads: int = 0) -> None:
if not _has_video_opt(): if not _has_video_opt():
raise RuntimeError( raise RuntimeError(
"Not compiled with video_reader support, " "Not compiled with video_reader support, "
...@@ -109,7 +113,7 @@ class VideoReader: ...@@ -109,7 +113,7 @@ class VideoReader:
+ "ffmpeg (version 4.2 is currently supported) and" + "ffmpeg (version 4.2 is currently supported) and"
+ "build torchvision from source." + "build torchvision from source."
) )
self._c = torch.classes.torchvision.Video(path, stream) self._c = torch.classes.torchvision.Video(path, stream, num_threads)
def __next__(self) -> Dict[str, Any]: def __next__(self) -> Dict[str, Any]:
"""Decodes and returns the next frame of the current stream. """Decodes and returns the next frame of the current stream.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment