test_io.py 7.5 KB
Newer Older
1
import os
2
import contextlib
3
4
import tempfile
import torch
5
import torchvision.datasets.utils as utils
6
import torchvision.io as io
7
from torchvision import get_video_backend
8
import unittest
9
10
import sys
import warnings
11

12
13
14
15
16
17
from common_utils import get_tmp_dir

if sys.version_info < (3,):
    from urllib2 import URLError
else:
    from urllib.error import URLError
18
19
20

try:
    import av
21
22
    # Do a version test too
    io.video._check_av_available()
23
24
25
except ImportError:
    av = None

26
27
28
29
30
31
32
33
34
35
36
37
38
39
_video_backend = get_video_backend()


def _read_video(filename, start_pts=0, end_pts=None):
    if _video_backend == "pyav":
        return io.read_video(filename, start_pts, end_pts)
    else:
        if end_pts is None:
            end_pts = -1
        return io._read_video_from_file(
            filename,
            video_pts_range=(start_pts, end_pts),
        )

40

41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def _create_video_frames(num_frames, height, width):
    y, x = torch.meshgrid(torch.linspace(-2, 2, height), torch.linspace(-2, 2, width))
    data = []
    for i in range(num_frames):
        xc = float(i) / num_frames
        yc = 1 - float(i) / (2 * num_frames)
        d = torch.exp(-((x - xc) ** 2 + (y - yc) ** 2) / 2) * 255
        data.append(d.unsqueeze(2).repeat(1, 1, 3).byte())

    return torch.stack(data, 0)


@contextlib.contextmanager
def temp_video(num_frames, height, width, fps, lossless=False, video_codec=None, options=None):
    if lossless:
        assert video_codec is None, "video_codec can't be specified together with lossless"
        assert options is None, "options can't be specified together with lossless"
        video_codec = 'libx264rgb'
        options = {'crf': '0'}

    if video_codec is None:
62
63
64
65
66
67
        if _video_backend == "pyav":
            video_codec = 'libx264'
        else:
            # when video_codec is not set, we assume it is libx264rgb which accepts
            # RGB pixel formats as input instead of YUV
            video_codec = 'libx264rgb'
68
69
70
71
72
73
74
75
76
    if options is None:
        options = {}

    data = _create_video_frames(num_frames, height, width)
    with tempfile.NamedTemporaryFile(suffix='.mp4') as f:
        io.write_video(f.name, data, fps=fps, video_codec=video_codec, options=options)
        yield f.name, data


77
@unittest.skipIf(av is None, "PyAV unavailable")
Francisco Massa's avatar
Francisco Massa committed
78
@unittest.skipIf('win' in sys.platform, 'temporarily disabled on Windows')
79
80
81
82
83
84
class Tester(unittest.TestCase):
    # compression adds artifacts, thus we add a tolerance of
    # 6 in 0-255 range
    TOLERANCE = 6

    def test_write_read_video(self):
85
        with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data):
86
            lv, _, info = _read_video(f_name)
87
            self.assertTrue(data.equal(lv))
88
89
90
            self.assertEqual(info["video_fps"], 5)

    def test_read_timestamps(self):
91
        with temp_video(10, 300, 300, 5) as (f_name, data):
92
93
94
95
            if _video_backend == "pyav":
                pts, _ = io.read_video_timestamps(f_name)
            else:
                pts, _, _ = io._read_video_timestamps_from_file(f_name)
96
97
98
            # note: not all formats/codecs provide accurate information for computing the
            # timestamps. For the format that we use here, this information is available,
            # so we use it as a baseline
99
            container = av.open(f_name)
100
101
102
103
104
105
106
107
            stream = container.streams[0]
            pts_step = int(round(float(1 / (stream.average_rate * stream.time_base))))
            num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration)))
            expected_pts = [i * pts_step for i in range(num_frames)]

            self.assertEqual(pts, expected_pts)

    def test_read_partial_video(self):
108
        with temp_video(10, 300, 300, 5, lossless=True) as (f_name, data):
109
110
111
112
            if _video_backend == "pyav":
                pts, _ = io.read_video_timestamps(f_name)
            else:
                pts, _, _ = io._read_video_timestamps_from_file(f_name)
113
114
            for start in range(5):
                for l in range(1, 4):
115
                    lv, _, _ = _read_video(f_name, pts[start], pts[start + l - 1])
116
117
118
                    s_data = data[start:(start + l)]
                    self.assertEqual(len(lv), l)
                    self.assertTrue(s_data.equal(lv))
119

120
121
122
123
124
125
            if _video_backend == "pyav":
                # for "video_reader" backend, we don't decode the closest early frame
                # when the given start pts is not matching any frame pts
                lv, _, _ = _read_video(f_name, pts[4] + 1, pts[7])
                self.assertEqual(len(lv), 4)
                self.assertTrue(data[4:8].equal(lv))
126

127
128
129
130
    def test_read_partial_video_bframes(self):
        # do not use lossless encoding, to test the presence of B-frames
        options = {'bframes': '16', 'keyint': '10', 'min-keyint': '4'}
        with temp_video(100, 300, 300, 5, options=options) as (f_name, data):
131
132
133
134
            if _video_backend == "pyav":
                pts, _ = io.read_video_timestamps(f_name)
            else:
                pts, _, _ = io._read_video_timestamps_from_file(f_name)
135
            for start in range(0, 80, 20):
136
                for l in range(1, 4):
137
                    lv, _, _ = _read_video(f_name, pts[start], pts[start + l - 1])
138
139
140
141
                    s_data = data[start:(start + l)]
                    self.assertEqual(len(lv), l)
                    self.assertTrue((s_data.float() - lv.float()).abs().max() < self.TOLERANCE)

142
            lv, _, _ = io.read_video(f_name, pts[4] + 1, pts[7])
143
144
145
            self.assertEqual(len(lv), 4)
            self.assertTrue((data[4:8].float() - lv.float()).abs().max() < self.TOLERANCE)

146
147
148
149
150
151
152
    def test_read_packed_b_frames_divx_file(self):
        with get_tmp_dir() as temp_dir:
            name = "hmdb51_Turnk_r_Pippi_Michel_cartwheel_f_cm_np2_le_med_6.avi"
            f_name = os.path.join(temp_dir, name)
            url = "https://download.pytorch.org/vision_tests/io/" + name
            try:
                utils.download_url(url, temp_dir)
153
154
155
156
157
158
                if _video_backend == "pyav":
                    pts, fps = io.read_video_timestamps(f_name)
                else:
                    pts, _, info = io._read_video_timestamps_from_file(f_name)
                    fps = info["video_fps"]

159
160
161
162
163
164
165
                self.assertEqual(pts, sorted(pts))
                self.assertEqual(fps, 30)
            except URLError:
                msg = "could not download test file '{}'".format(url)
                warnings.warn(msg, RuntimeWarning)
                raise unittest.SkipTest(msg)

166
167
    def test_read_timestamps_from_packet(self):
        with temp_video(10, 300, 300, 5, video_codec='mpeg4') as (f_name, data):
168
169
170
171
            if _video_backend == "pyav":
                pts, _ = io.read_video_timestamps(f_name)
            else:
                pts, _, _ = io._read_video_timestamps_from_file(f_name)
172
173
174
175
176
177
178
179
180
181
182
183
184
            # note: not all formats/codecs provide accurate information for computing the
            # timestamps. For the format that we use here, this information is available,
            # so we use it as a baseline
            container = av.open(f_name)
            stream = container.streams[0]
            # make sure we went through the optimized codepath
            self.assertIn(b'Lavc', stream.codec_context.extradata)
            pts_step = int(round(float(1 / (stream.average_rate * stream.time_base))))
            num_frames = int(round(float(stream.average_rate * stream.time_base * stream.duration)))
            expected_pts = [i * pts_step for i in range(num_frames)]

            self.assertEqual(pts, expected_pts)

185
186
187
188
189
    # TODO add tests for audio


if __name__ == '__main__':
    unittest.main()