Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
OpenDAS
vision
Commits
e3f1a822
Unverified
Commit
e3f1a822
authored
Mar 01, 2022
by
Prabhat Roy
Committed by
GitHub
Mar 01, 2022
Browse files
Improve test_videoapi (#5497)
parent
c29a20ab
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
144 additions
and
152 deletions
+144
-152
test/test_videoapi.py
test/test_videoapi.py
+144
-152
No files found.
test/test_videoapi.py
View file @
e3f1a822
...
...
@@ -52,139 +52,131 @@ test_videos = {
@
pytest
.
mark
.
skipif
(
_HAS_VIDEO_OPT
is
False
,
reason
=
"Didn't compile with ffmpeg"
)
class
TestVideoApi
:
@
pytest
.
mark
.
skipif
(
av
is
None
,
reason
=
"PyAV unavailable"
)
def
test_frame_reading
(
self
):
for
test_video
,
config
in
test_videos
.
items
():
full_path
=
os
.
path
.
join
(
VIDEO_DIR
,
test_video
)
with
av
.
open
(
full_path
)
as
av_reader
:
is_video
=
True
if
av_reader
.
streams
.
video
else
False
if
is_video
:
av_frames
,
vr_frames
=
[],
[]
av_pts
,
vr_pts
=
[],
[]
# get av frames
for
av_frame
in
av_reader
.
decode
(
av_reader
.
streams
.
video
[
0
]):
av_frames
.
append
(
torch
.
tensor
(
av_frame
.
to_rgb
().
to_ndarray
()).
permute
(
2
,
0
,
1
))
av_pts
.
append
(
av_frame
.
pts
*
av_frame
.
time_base
)
# get vr frames
video_reader
=
VideoReader
(
full_path
,
"video"
)
for
vr_frame
in
video_reader
:
vr_frames
.
append
(
vr_frame
[
"data"
])
vr_pts
.
append
(
vr_frame
[
"pts"
])
# same number of frames
assert
len
(
vr_frames
)
==
len
(
av_frames
)
assert
len
(
vr_pts
)
==
len
(
av_pts
)
# compare the frames and ptss
for
i
in
range
(
len
(
vr_frames
)):
assert
float
(
av_pts
[
i
])
==
approx
(
vr_pts
[
i
],
abs
=
0.1
)
mean_delta
=
torch
.
mean
(
torch
.
abs
(
av_frames
[
i
].
float
()
-
vr_frames
[
i
].
float
()))
# on average the difference is very small and caused
# by decoding (around 1%)
# TODO: asses empirically how to set this? atm it's 1%
# averaged over all frames
assert
mean_delta
.
item
()
<
2.55
del
vr_frames
,
av_frames
,
vr_pts
,
av_pts
# test audio reading compared to PYAV
with
av
.
open
(
full_path
)
as
av_reader
:
is_audio
=
True
if
av_reader
.
streams
.
audio
else
False
if
is_audio
:
av_frames
,
vr_frames
=
[],
[]
av_pts
,
vr_pts
=
[],
[]
# get av frames
for
av_frame
in
av_reader
.
decode
(
av_reader
.
streams
.
audio
[
0
]):
av_frames
.
append
(
torch
.
tensor
(
av_frame
.
to_ndarray
()).
permute
(
1
,
0
))
av_pts
.
append
(
av_frame
.
pts
*
av_frame
.
time_base
)
av_reader
.
close
()
# get vr frames
video_reader
=
VideoReader
(
full_path
,
"audio"
)
for
vr_frame
in
video_reader
:
vr_frames
.
append
(
vr_frame
[
"data"
])
vr_pts
.
append
(
vr_frame
[
"pts"
])
# same number of frames
assert
len
(
vr_frames
)
==
len
(
av_frames
)
assert
len
(
vr_pts
)
==
len
(
av_pts
)
# compare the frames and ptss
for
i
in
range
(
len
(
vr_frames
)):
assert
float
(
av_pts
[
i
])
==
approx
(
vr_pts
[
i
],
abs
=
0.1
)
max_delta
=
torch
.
max
(
torch
.
abs
(
av_frames
[
i
].
float
()
-
vr_frames
[
i
].
float
()))
# we assure that there is never more than 1% difference in signal
assert
max_delta
.
item
()
<
0.001
def
test_metadata
(
self
):
@
pytest
.
mark
.
parametrize
(
"test_video"
,
test_videos
.
keys
())
def
test_frame_reading
(
self
,
test_video
):
full_path
=
os
.
path
.
join
(
VIDEO_DIR
,
test_video
)
with
av
.
open
(
full_path
)
as
av_reader
:
if
av_reader
.
streams
.
video
:
av_frames
,
vr_frames
=
[],
[]
av_pts
,
vr_pts
=
[],
[]
# get av frames
for
av_frame
in
av_reader
.
decode
(
av_reader
.
streams
.
video
[
0
]):
av_frames
.
append
(
torch
.
tensor
(
av_frame
.
to_rgb
().
to_ndarray
()).
permute
(
2
,
0
,
1
))
av_pts
.
append
(
av_frame
.
pts
*
av_frame
.
time_base
)
# get vr frames
video_reader
=
VideoReader
(
full_path
,
"video"
)
for
vr_frame
in
video_reader
:
vr_frames
.
append
(
vr_frame
[
"data"
])
vr_pts
.
append
(
vr_frame
[
"pts"
])
# same number of frames
assert
len
(
vr_frames
)
==
len
(
av_frames
)
assert
len
(
vr_pts
)
==
len
(
av_pts
)
# compare the frames and ptss
for
i
in
range
(
len
(
vr_frames
)):
assert
float
(
av_pts
[
i
])
==
approx
(
vr_pts
[
i
],
abs
=
0.1
)
mean_delta
=
torch
.
mean
(
torch
.
abs
(
av_frames
[
i
].
float
()
-
vr_frames
[
i
].
float
()))
# on average the difference is very small and caused
# by decoding (around 1%)
# TODO: asses empirically how to set this? atm it's 1%
# averaged over all frames
assert
mean_delta
.
item
()
<
2.55
del
vr_frames
,
av_frames
,
vr_pts
,
av_pts
# test audio reading compared to PYAV
with
av
.
open
(
full_path
)
as
av_reader
:
if
av_reader
.
streams
.
audio
:
av_frames
,
vr_frames
=
[],
[]
av_pts
,
vr_pts
=
[],
[]
# get av frames
for
av_frame
in
av_reader
.
decode
(
av_reader
.
streams
.
audio
[
0
]):
av_frames
.
append
(
torch
.
tensor
(
av_frame
.
to_ndarray
()).
permute
(
1
,
0
))
av_pts
.
append
(
av_frame
.
pts
*
av_frame
.
time_base
)
av_reader
.
close
()
# get vr frames
video_reader
=
VideoReader
(
full_path
,
"audio"
)
for
vr_frame
in
video_reader
:
vr_frames
.
append
(
vr_frame
[
"data"
])
vr_pts
.
append
(
vr_frame
[
"pts"
])
# same number of frames
assert
len
(
vr_frames
)
==
len
(
av_frames
)
assert
len
(
vr_pts
)
==
len
(
av_pts
)
# compare the frames and ptss
for
i
in
range
(
len
(
vr_frames
)):
assert
float
(
av_pts
[
i
])
==
approx
(
vr_pts
[
i
],
abs
=
0.1
)
max_delta
=
torch
.
max
(
torch
.
abs
(
av_frames
[
i
].
float
()
-
vr_frames
[
i
].
float
()))
# we assure that there is never more than 1% difference in signal
assert
max_delta
.
item
()
<
0.001
@
pytest
.
mark
.
parametrize
(
"test_video,config"
,
test_videos
.
items
())
def
test_metadata
(
self
,
test_video
,
config
):
"""
Test that the metadata returned via pyav corresponds to the one returned
by the new video decoder API
"""
for
test_video
,
config
in
test_videos
.
items
():
full_path
=
os
.
path
.
join
(
VIDEO_DIR
,
test_video
)
reader
=
VideoReader
(
full_path
,
"video"
)
reader_md
=
reader
.
get_metadata
()
assert
config
.
video_fps
==
approx
(
reader_md
[
"video"
][
"fps"
][
0
],
abs
=
0.0001
)
assert
config
.
duration
==
approx
(
reader_md
[
"video"
][
"duration"
][
0
],
abs
=
0.5
)
def
test_seek_start
(
self
):
for
test_video
,
config
in
test_videos
.
items
():
full_path
=
os
.
path
.
join
(
VIDEO_DIR
,
test_video
)
video_reader
=
VideoReader
(
full_path
,
"video"
)
full_path
=
os
.
path
.
join
(
VIDEO_DIR
,
test_video
)
reader
=
VideoReader
(
full_path
,
"video"
)
reader_md
=
reader
.
get_metadata
()
assert
config
.
video_fps
==
approx
(
reader_md
[
"video"
][
"fps"
][
0
],
abs
=
0.0001
)
assert
config
.
duration
==
approx
(
reader_md
[
"video"
][
"duration"
][
0
],
abs
=
0.5
)
@
pytest
.
mark
.
parametrize
(
"test_video"
,
test_videos
.
keys
())
def
test_seek_start
(
self
,
test_video
):
full_path
=
os
.
path
.
join
(
VIDEO_DIR
,
test_video
)
video_reader
=
VideoReader
(
full_path
,
"video"
)
num_frames
=
0
for
_
in
video_reader
:
num_frames
+=
1
# now seek the container to 0 and do it again
# It's often that starting seek can be inprecise
# this way and it doesn't start at 0
video_reader
.
seek
(
0
)
start_num_frames
=
0
for
_
in
video_reader
:
start_num_frames
+=
1
assert
start_num_frames
==
num_frames
# now seek the container to < 0 to check for unexpected behaviour
video_reader
.
seek
(
-
1
)
start_num_frames
=
0
for
_
in
video_reader
:
start_num_frames
+=
1
assert
start_num_frames
==
num_frames
@
pytest
.
mark
.
parametrize
(
"test_video"
,
test_videos
.
keys
())
def
test_accurateseek_middle
(
self
,
test_video
):
full_path
=
os
.
path
.
join
(
VIDEO_DIR
,
test_video
)
stream
=
"video"
video_reader
=
VideoReader
(
full_path
,
stream
)
md
=
video_reader
.
get_metadata
()
duration
=
md
[
stream
][
"duration"
][
0
]
if
duration
is
not
None
:
num_frames
=
0
for
frame
in
video_reader
:
for
_
in
video_reader
:
num_frames
+=
1
# now seek the container to 0 and do it again
# It's often that starting seek can be inprecise
# this way and it doesn't start at 0
video_reader
.
seek
(
0
)
start_num_frames
=
0
for
frame
in
video_reader
:
start_num_frames
+=
1
assert
start_num_frames
==
num_frames
# now seek the container to < 0 to check for unexpected behaviour
video_reader
.
seek
(
-
1
)
start_num_frames
=
0
for
frame
in
video_reader
:
start_num_frames
+=
1
assert
start_num_frames
==
num_frames
def
test_accurateseek_middle
(
self
):
for
test_video
,
config
in
test_videos
.
items
():
full_path
=
os
.
path
.
join
(
VIDEO_DIR
,
test_video
)
video_reader
.
seek
(
duration
/
2
)
middle_num_frames
=
0
for
_
in
video_reader
:
middle_num_frames
+=
1
stream
=
"video"
video_reader
=
VideoReader
(
full_path
,
stream
)
md
=
video_reader
.
get_metadata
()
duration
=
md
[
stream
][
"duration"
][
0
]
if
duration
is
not
None
:
assert
middle_num_frames
<
num_frames
assert
middle_num_frames
==
approx
(
num_frames
//
2
,
abs
=
1
)
num_frames
=
0
for
frame
in
video_reader
:
num_frames
+=
1
video_reader
.
seek
(
duration
/
2
)
middle_num_frames
=
0
for
frame
in
video_reader
:
middle_num_frames
+=
1
assert
middle_num_frames
<
num_frames
assert
middle_num_frames
==
approx
(
num_frames
//
2
,
abs
=
1
)
video_reader
.
seek
(
duration
/
2
)
frame
=
next
(
video_reader
)
lb
=
duration
/
2
-
1
/
md
[
stream
][
"fps"
][
0
]
ub
=
duration
/
2
+
1
/
md
[
stream
][
"fps"
][
0
]
assert
(
lb
<=
frame
[
"pts"
])
and
(
ub
>=
frame
[
"pts"
])
video_reader
.
seek
(
duration
/
2
)
frame
=
next
(
video_reader
)
lb
=
duration
/
2
-
1
/
md
[
stream
][
"fps"
][
0
]
ub
=
duration
/
2
+
1
/
md
[
stream
][
"fps"
][
0
]
assert
(
lb
<=
frame
[
"pts"
])
and
(
ub
>=
frame
[
"pts"
])
def
test_fate_suite
(
self
):
# TODO: remove the try-except statement once the connectivity issues are resolved
...
...
@@ -199,41 +191,41 @@ class TestVideoApi:
os
.
remove
(
video_path
)
@
pytest
.
mark
.
skipif
(
av
is
None
,
reason
=
"PyAV unavailable"
)
def
test_keyframe_reading
(
self
):
for
test_video
,
config
in
test_videos
.
items
():
full_path
=
os
.
path
.
join
(
VIDEO_DIR
,
test_video
)
av_reader
=
av
.
open
(
full_path
)
# reduce streams to only keyframes
av_stream
=
av_reader
.
streams
.
video
[
0
]
av_stream
.
codec_context
.
skip_frame
=
"NONKEY"
@
pytest
.
mark
.
parametrize
(
"test_video,config"
,
test_videos
.
items
())
def
test_keyframe_reading
(
self
,
test_video
,
config
):
full_path
=
os
.
path
.
join
(
VIDEO_DIR
,
test_video
)
av_keyframes
=
[]
vr_keyframes
=
[]
if
av_reader
.
streams
.
video
:
av_reader
=
av
.
open
(
full_path
)
# reduce streams to only keyframes
av_stream
=
av_reader
.
streams
.
video
[
0
]
av_stream
.
codec_context
.
skip_frame
=
"NONKEY"
# get all keyframes using pyav. Then, seek randomly into video reader
# and assert that all the returned values are in AV_KEYFRAMES
av_keyframes
=
[]
vr_keyframes
=
[]
if
av_reader
.
streams
.
video
:
for
av_frame
in
av_reader
.
decode
(
av_stream
):
av_keyframes
.
append
(
float
(
av_frame
.
pts
*
av_frame
.
time_base
))
# get all keyframes using pyav. Then, seek randomly into video reader
# and assert that all the returned values are in AV_KEYFRAMES
if
len
(
av_keyframes
)
>
1
:
video_reader
=
VideoReader
(
full_path
,
"video"
)
for
i
in
range
(
1
,
len
(
av_keyframes
)):
seek_val
=
(
av_keyframes
[
i
]
+
av_keyframes
[
i
-
1
])
/
2
data
=
next
(
video_reader
.
seek
(
seek_val
,
True
))
vr_keyframes
.
append
(
data
[
"pts"
])
for
av_frame
in
av_reader
.
decode
(
av_stream
):
av_keyframes
.
append
(
float
(
av_frame
.
pts
*
av_frame
.
time_base
))
data
=
next
(
video_reader
.
seek
(
config
.
duration
,
True
))
if
len
(
av_keyframes
)
>
1
:
video_reader
=
VideoReader
(
full_path
,
"video"
)
for
i
in
range
(
1
,
len
(
av_keyframes
)):
seek_val
=
(
av_keyframes
[
i
]
+
av_keyframes
[
i
-
1
])
/
2
data
=
next
(
video_reader
.
seek
(
seek_val
,
True
))
vr_keyframes
.
append
(
data
[
"pts"
])
assert
len
(
av_keyframes
)
==
len
(
vr_keyframes
)
# NOTE: this video gets different keyframe with different
# loaders (0.333 pyav, 0.666 for us)
if
test_video
!=
"TrumanShow_wave_f_nm_np1_fr_med_26.avi"
:
for
i
in
range
(
len
(
av_keyframes
)):
assert
av_keyframes
[
i
]
==
approx
(
vr_keyframes
[
i
],
rel
=
0.001
)
data
=
next
(
video_reader
.
seek
(
config
.
duration
,
True
))
vr_keyframes
.
append
(
data
[
"pts"
])
assert
len
(
av_keyframes
)
==
len
(
vr_keyframes
)
# NOTE: this video gets different keyframe with different
# loaders (0.333 pyav, 0.666 for us)
if
test_video
!=
"TrumanShow_wave_f_nm_np1_fr_med_26.avi"
:
for
i
in
range
(
len
(
av_keyframes
)):
assert
av_keyframes
[
i
]
==
approx
(
vr_keyframes
[
i
],
rel
=
0.001
)
if
__name__
==
"__main__"
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment