Unverified Commit 3bfdb427 authored by Bruno Korbar's avatar Bruno Korbar Committed by GitHub
Browse files

Implementing multithreaded video decoding (#3389)



* multithreading allowed in stream codec context

* numThreads is passed as a decoder parameter. At this stage code should be unchanged

* enabling multithreading in videoReader API

* moving defaults to header files

* replace long with int64_t because torchscript

* docstring for Num threads

* Enable codec-related heuristics as defaults

* Update torchvision/csrc/io/decoder/stream.cpp
Co-authored-by: default avatarVasilis Vryniotis <datumbox@users.noreply.github.com>

* Fixing build errors

* minor docs

* Linting

* updating defaults for the C++ function calls to be single threaded

* adding special case for single threaded stuff
Co-authored-by: default avatarVasilis Vryniotis <datumbox@users.noreply.github.com>
Co-authored-by: default avatarFrancisco Massa <fvsmassa@gmail.com>
parent 38bb2704
......@@ -432,7 +432,7 @@ bool Decoder::openStreams(std::vector<DecoderMetadata>* metadata) {
it->format,
params_.loggingUuid);
CHECK(stream);
if (stream->openCodec(metadata) < 0) {
if (stream->openCodec(metadata, params_.numThreads) < 0) {
LOG(ERROR) << "uuid=" << params_.loggingUuid
<< " open codec failed, stream_idx=" << i;
return false;
......
......@@ -194,6 +194,9 @@ struct DecoderParameters {
bool preventStaleness{true};
// seek tolerated accuracy (us)
double seekAccuracy{1000000.0};
// Allow multithreaded decoding for numThreads > 1;
// 0 numThreads=0 sets up sensible defaults
int numThreads{1};
// what media types should be processed, default none
std::set<MediaFormat> formats;
......
#include "stream.h"
#include <ATen/Parallel.h>
#include <c10/util/Logging.h>
#include <stdio.h>
#include <string.h>
#include "util.h"
namespace ffmpeg {
......@@ -28,7 +31,7 @@ AVCodec* Stream::findCodec(AVCodecParameters* params) {
return avcodec_find_decoder(params->codec_id);
}
int Stream::openCodec(std::vector<DecoderMetadata>* metadata) {
int Stream::openCodec(std::vector<DecoderMetadata>* metadata, int num_threads) {
AVStream* steam = inputCtx_->streams[format_.stream];
AVCodec* codec = findCodec(steam->codecpar);
......@@ -53,6 +56,49 @@ int Stream::openCodec(std::vector<DecoderMetadata>* metadata) {
return ret;
}
// multithreading heuristics
int max_threads = at::get_num_threads();
// first a safety check
if (num_threads > max_threads) {
num_threads = max_threads;
}
if (num_threads > 0) {
if (num_threads > 1) {
codecCtx_->active_thread_type = 1;
}
// if user defined, respect that
codecCtx_->thread_count = num_threads;
} else {
// otherwise set sensible defaults
// with the special case for the different MPEG4 codecs
// that don't have threading context functions
// TODO: potentially automate this using native c++ function lookups
if (strcmp(codecCtx_->codec->name, "mpeg4") == 0 &&
codecCtx_->codec_type == 0) {
if (codecCtx_->codec_tag == 1684633208) {
codecCtx_->thread_count = (8 <= max_threads) ? 8 : max_threads;
codecCtx_->thread_type = 1;
} else {
codecCtx_->thread_count = (2 <= max_threads) ? 2 : max_threads;
codecCtx_->thread_type = 2;
}
} else {
// otherwise default to multithreading
codecCtx_->thread_count = (8 <= max_threads) ? 8 : max_threads;
codecCtx_->active_thread_type = 1;
}
}
// print codec type and number of threads
LOG(INFO) << "Codec " << codecCtx_->codec->long_name
<< " Codec id: " << codecCtx_->codec_id
<< " Codec tag: " << codecCtx_->codec_tag
<< " Codec type: " << codecCtx_->codec_type
<< " Codec extradata: " << codecCtx_->extradata
<< " Number of threads: " << at::get_num_threads();
// after avcodec_open2, value of codecCtx_->time_base is NOT meaningful
if ((ret = avcodec_open2(codecCtx_, codec, nullptr)) < 0) {
LOG(ERROR) << "LoggingUuid #" << loggingUuid_
......
......@@ -20,7 +20,8 @@ class Stream {
virtual ~Stream();
// returns 0 - on success or negative error
int openCodec(std::vector<DecoderMetadata>* metadata);
// num_threads sets up the codec context for multithreading if needed
int openCodec(std::vector<DecoderMetadata>* metadata, int num_threads = 1);
// returns 1 - if packet got consumed, 0 - if it's not, and < 0 on error
int decodePacket(
const AVPacket* packet,
......
......@@ -97,15 +97,17 @@ void Video::_getDecoderParams(
double videoStartS,
int64_t getPtsOnly,
std::string stream,
long stream_id = -1,
bool all_streams = false,
double seekFrameMarginUs = 10) {
long stream_id,
bool all_streams,
int64_t num_threads,
double seekFrameMarginUs) {
int64_t videoStartUs = int64_t(videoStartS * 1e6);
params.timeoutMs = decoderTimeoutMs;
params.startOffset = videoStartUs;
params.seekAccuracy = seekFrameMarginUs;
params.headerOnly = false;
params.numThreads = num_threads;
params.preventStaleness = false; // not sure what this is about
......@@ -152,7 +154,9 @@ void Video::_getDecoderParams(
} // _get decoder params
Video::Video(std::string videoPath, std::string stream) {
Video::Video(std::string videoPath, std::string stream, int64_t numThreads) {
// set number of threads global
numThreads_ = numThreads;
// parse stream information
current_stream = _parseStream(stream);
// note that in the initial call we want to get all streams
......@@ -161,7 +165,8 @@ Video::Video(std::string videoPath, std::string stream) {
0, // headerOnly
std::get<0>(current_stream), // stream info - remove that
long(-1), // stream_id parsed from info above change to -2
true // read all streams
true, // read all streams
numThreads_ // global number of Threads for decoding
);
std::string logMessage, logType;
......@@ -225,7 +230,7 @@ Video::Video(std::string videoPath, std::string stream) {
}
} // video
bool Video::setCurrentStream(std::string stream = "video") {
bool Video::setCurrentStream(std::string stream) {
if ((!stream.empty()) && (_parseStream(stream) != current_stream)) {
current_stream = _parseStream(stream);
}
......@@ -241,7 +246,8 @@ bool Video::setCurrentStream(std::string stream = "video") {
std::get<0>(current_stream), // stream
long(std::get<1>(
current_stream)), // stream_id parsed from info above change to -2
false // read all streams
false, // read all streams
numThreads_ // global number of threads
);
// calback and metadata defined in Video.h
......@@ -265,7 +271,8 @@ void Video::Seek(double ts) {
std::get<0>(current_stream), // stream
long(std::get<1>(
current_stream)), // stream_id parsed from info above change to -2
false // read all streams
false, // read all streams
numThreads_ // global num threads
);
// calback and metadata defined in Video.h
......@@ -331,7 +338,7 @@ std::tuple<torch::Tensor, double> Video::Next() {
static auto registerVideo =
torch::class_<Video>("torchvision", "Video")
.def(torch::init<std::string, std::string>())
.def(torch::init<std::string, std::string, int64_t>())
.def("get_current_stream", &Video::getCurrentStream)
.def("set_current_stream", &Video::setCurrentStream)
.def("get_metadata", &Video::getStreamMetadata)
......
......@@ -16,14 +16,15 @@ struct Video : torch::CustomClassHolder {
// global video metadata
c10::Dict<std::string, c10::Dict<std::string, std::vector<double>>>
streamsMetadata;
int64_t numThreads_{0};
public:
Video(std::string videoPath, std::string stream);
Video(std::string videoPath, std::string stream, int64_t numThreads);
std::tuple<std::string, int64_t> getCurrentStream() const;
c10::Dict<std::string, c10::Dict<std::string, std::vector<double>>>
getStreamMetadata() const;
void Seek(double ts);
bool setCurrentStream(std::string stream);
bool setCurrentStream(std::string stream = "video");
std::tuple<torch::Tensor, double> Next();
private:
......@@ -37,9 +38,10 @@ struct Video : torch::CustomClassHolder {
double videoStartS,
int64_t getPtsOnly,
std::string stream,
long stream_id,
bool all_streams,
double seekFrameMarginUs); // this needs to be improved
long stream_id = -1,
bool all_streams = false,
int64_t num_threads = 0,
double seekFrameMarginUs = 10); // this needs to be improved
std::map<std::string, std::vector<double>> streamTimeBase; // not used
......
......@@ -96,9 +96,13 @@ class VideoReader:
stream (string, optional): descriptor of the required stream, followed by the stream id,
in the format ``{stream_type}:{stream_id}``. Defaults to ``"video:0"``.
Currently available options include ``['video', 'audio']``
num_threads (int, optional): number of threads used by the codec to decode video.
Default value (0) enables multithreading with codec-dependent heuristic. The performance
will depend on the version of FFMPEG codecs supported.
"""
def __init__(self, path, stream="video"):
def __init__(self, path, stream="video", num_threads=0):
if not _has_video_opt():
raise RuntimeError(
"Not compiled with video_reader support, "
......@@ -106,7 +110,7 @@ class VideoReader:
+ "ffmpeg (version 4.2 is currently supported) and"
+ "build torchvision from source."
)
self._c = torch.classes.torchvision.Video(path, stream)
self._c = torch.classes.torchvision.Video(path, stream, num_threads)
def __next__(self):
"""Decodes and returns the next frame of the current stream.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment