_stream_reader.py 33.4 KB
Newer Older
moto's avatar
moto committed
1
2
3
from __future__ import annotations

from dataclasses import dataclass
4
from pathlib import Path
5
from typing import BinaryIO, Dict, Iterator, Optional, Tuple, TypeVar, Union
moto's avatar
moto committed
6
7
8

import torch
import torchaudio
moto's avatar
moto committed
9
10
from torch.utils._pytree import tree_map

11
12
13
14
15
if torchaudio._extension._FFMPEG_EXT is not None:
    _StreamReader = torchaudio._extension._FFMPEG_EXT.StreamReader
    _StreamReaderFileObj = torchaudio._extension._FFMPEG_EXT.StreamReaderFileObj


moto's avatar
moto committed
16
17
18
__all__ = [
    "StreamReader",
]
moto's avatar
moto committed
19
20
21


@dataclass
moto's avatar
moto committed
22
class SourceStream:
23
    """The metadata of a source stream, returned by :meth:`~torchaudio.io.StreamReader.get_src_stream_info`.
moto's avatar
moto committed
24

25
    This class is used when representing streams of media type other than `audio` or `video`.
moto's avatar
moto committed
26

moto's avatar
moto committed
27
28
    When source stream is `audio` or `video` type, :class:`SourceAudioStream` and
    :class:`SourceVideoStream`, which reports additional media-specific attributes,
moto's avatar
moto committed
29
30
31
32
33
    are used respectively.
    """

    media_type: str
    """The type of the stream.
34
    One of ``"audio"``, ``"video"``, ``"data"``, ``"subtitle"``, ``"attachment"`` and empty string.
moto's avatar
moto committed
35
36

    .. note::
37
       Only audio and video streams are supported for output.
moto's avatar
moto committed
38
    .. note::
39
       Still images, such as PNG and JPEG formats are reported as video.
moto's avatar
moto committed
40
41
    """
    codec: str
42
    """Short name of the codec. Such as ``"pcm_s16le"`` and ``"h264"``."""
moto's avatar
moto committed
43
44
45
    codec_long_name: str
    """Detailed name of the codec.

46
    Such as "`PCM signed 16-bit little-endian`" and "`H.264 / AVC / MPEG-4 AVC / MPEG-4 part 10`".
moto's avatar
moto committed
47
48
    """
    format: Optional[str]
49
    """Media format. Such as ``"s16"`` and ``"yuv420p"``.
moto's avatar
moto committed
50
51
52

    Commonly found audio values are;

53
54
55
56
    - ``"u8"``, ``"u8p"``: Unsigned 8-bit unsigned interger.
    - ``"s16"``, ``"s16p"``: 16-bit signed integer.
    - ``"s32"``, ``"s32p"``: 32-bit signed integer.
    - ``"flt"``, ``"fltp"``: 32-bit floating-point.
moto's avatar
moto committed
57
58
59
60
61
62
63
64
65
66
67

    .. note::

       `p` at the end indicates the format is `planar`.
       Channels are grouped together instead of interspersed in memory.
    """
    bit_rate: Optional[int]
    """Bit rate of the stream in bits-per-second.
    This is an estimated values based on the initial few frames of the stream.
    For container formats and variable bit rate, it can be 0.
    """
moto's avatar
moto committed
68
69
70
71
72
73
    num_frames: Optional[int]
    """The number of frames in the stream"""
    bits_per_sample: Optional[int]
    """This is the number of valid bits in each output sample.
    For compressed format, it can be 0.
    """
74
    metadata: Dict[str, str]
moto's avatar
moto committed
75
    """Metadata attached to the source stream."""
moto's avatar
moto committed
76
77
78


@dataclass
moto's avatar
moto committed
79
class SourceAudioStream(SourceStream):
80
    """The metadata of an audio source stream, returned by :meth:`~torchaudio.io.StreamReader.get_src_stream_info`.
moto's avatar
moto committed
81

82
    This class is used when representing audio stream.
moto's avatar
moto committed
83

moto's avatar
moto committed
84
    In addition to the attributes reported by :class:`SourceStream`,
85
    the following attributes are reported.
moto's avatar
moto committed
86
87
88
89
90
91
92
93
94
    """

    sample_rate: float
    """Sample rate of the audio."""
    num_channels: int
    """Number of channels."""


@dataclass
moto's avatar
moto committed
95
class SourceVideoStream(SourceStream):
96
    """The metadata of a video source stream, returned by :meth:`~torchaudio.io.StreamReader.get_src_stream_info`.
moto's avatar
moto committed
97

98
    This class is used when representing video stream.
moto's avatar
moto committed
99

moto's avatar
moto committed
100
    In addition to the attributes reported by :class:`SourceStream`,
101
    the following attributes are reported.
moto's avatar
moto committed
102
103
104
105
106
107
108
109
110
111
112
    """

    width: int
    """Width of the video frame in pixel."""
    height: int
    """Height of the video frame in pixel."""
    frame_rate: float
    """Frame rate."""


def _parse_si(i):
113
    media_type = i.media_type
moto's avatar
moto committed
114
    if media_type == "audio":
moto's avatar
moto committed
115
        return SourceAudioStream(
116
117
118
119
120
121
122
123
124
125
            media_type=i.media_type,
            codec=i.codec_name,
            codec_long_name=i.codec_long_name,
            format=i.format,
            bit_rate=i.bit_rate,
            num_frames=i.num_frames,
            bits_per_sample=i.bits_per_sample,
            metadata=i.metadata,
            sample_rate=i.sample_rate,
            num_channels=i.num_channels,
moto's avatar
moto committed
126
127
        )
    if media_type == "video":
moto's avatar
moto committed
128
        return SourceVideoStream(
129
130
131
132
133
134
135
136
137
138
139
            media_type=i.media_type,
            codec=i.codec_name,
            codec_long_name=i.codec_long_name,
            format=i.format,
            bit_rate=i.bit_rate,
            num_frames=i.num_frames,
            bits_per_sample=i.bits_per_sample,
            metadata=i.metadata,
            width=i.width,
            height=i.height,
            frame_rate=i.frame_rate,
moto's avatar
moto committed
140
        )
moto's avatar
moto committed
141
    return SourceStream(
142
143
144
        media_type=i.media_type,
        codec=i.codec_name,
        codec_long_name=i.codec_long_name,
moto's avatar
moto committed
145
146
147
148
        format=None,
        bit_rate=None,
        num_frames=None,
        bits_per_sample=None,
149
        metadata=i.metadata,
moto's avatar
moto committed
150
    )
moto's avatar
moto committed
151
152
153


@dataclass
moto's avatar
moto committed
154
class OutputStream:
155
156
    """Output stream configured on :class:`StreamReader`,
    returned by :meth:`~torchaudio.io.StreamReader.get_out_stream_info`.
moto's avatar
moto committed
157
158
159
160
161
162
    """

    source_index: int
    """Index of the source stream that this output stream is connected."""
    filter_description: str
    """Description of filter graph applied to the source stream."""
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
    media_type: str
    """The type of the stream. ``"audio"`` or ``"video"``."""
    format: str
    """Media format. Such as ``"s16"`` and ``"yuv420p"``.

    Commonly found audio values are;

    - ``"u8"``, ``"u8p"``: Unsigned 8-bit unsigned interger.
    - ``"s16"``, ``"s16p"``: 16-bit signed integer.
    - ``"s32"``, ``"s32p"``: 32-bit signed integer.
    - ``"flt"``, ``"fltp"``: 32-bit floating-point.

    .. note::

       `p` at the end indicates the format is `planar`.
       Channels are grouped together instead of interspersed in memory."""


@dataclass
class OutputAudioStream(OutputStream):
    """Information about an audio output stream configured with
    :meth:`~torchaudio.io.StreamReader.add_audio_stream` or
    :meth:`~torchaudio.io.StreamReader.add_basic_audio_stream`.

    In addition to the attributes reported by :class:`OutputStream`,
    the following attributes are reported.
    """

    sample_rate: float
    """Sample rate of the audio."""
    num_channels: int
    """Number of channels."""


@dataclass
class OutputVideoStream(OutputStream):
    """Information about a video output stream configured with
    :meth:`~torchaudio.io.StreamReader.add_video_stream` or
    :meth:`~torchaudio.io.StreamReader.add_basic_video_stream`.

    In addition to the attributes reported by :class:`OutputStream`,
    the following attributes are reported.
    """

    width: int
    """Width of the video frame in pixel."""
    height: int
    """Height of the video frame in pixel."""
    frame_rate: float
    """Frame rate."""


def _parse_oi(i):
    media_type = i.media_type
    if media_type == "audio":
        return OutputAudioStream(
            source_index=i.source_index,
            filter_description=i.filter_description,
            media_type=i.media_type,
            format=i.format,
            sample_rate=i.sample_rate,
            num_channels=i.num_channels,
        )
    if media_type == "video":
        return OutputVideoStream(
            source_index=i.source_index,
            filter_description=i.filter_description,
            media_type=i.media_type,
            format=i.format,
            width=i.width,
            height=i.height,
            frame_rate=i.frame_rate,
        )
    raise ValueError(f"Unexpected media_type: {i.media_type}({i})")
moto's avatar
moto committed
237
238


239
def _get_afilter_desc(sample_rate: Optional[int], fmt: Optional[str], num_channels: Optional[int]):
240
241
242
    descs = []
    if sample_rate is not None:
        descs.append(f"aresample={sample_rate}")
243
244
245
246
247
248
249
    if fmt is not None or num_channels is not None:
        parts = []
        if fmt is not None:
            parts.append(f"sample_fmts={fmt}")
        if num_channels is not None:
            parts.append(f"channel_layouts={num_channels}c")
        descs.append(f"aformat={':'.join(parts)}")
250
251
252
    return ",".join(descs) if descs else None


253
def _get_vfilter_desc(frame_rate: Optional[float], width: Optional[int], height: Optional[int], fmt: Optional[str]):
254
255
256
257
258
259
260
261
262
263
    descs = []
    if frame_rate is not None:
        descs.append(f"fps={frame_rate}")
    scales = []
    if width is not None:
        scales.append(f"width={width}")
    if height is not None:
        scales.append(f"height={height}")
    if scales:
        descs.append(f"scale={':'.join(scales)}")
264
    if fmt is not None:
265
266
267
268
        descs.append(f"format=pix_fmts={fmt}")
    return ",".join(descs) if descs else None


moto's avatar
moto committed
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# Base class for ChunkTensor
# Based off of TrivialTensorViaComposition
# https://github.com/albanD/subclass_zoo/blob/0eeb1d68fb59879029c610bc407f2997ae43ba0a/trivial_tensors.py#L83
class ChunkTensorBase(torch.Tensor):
    __torch_function__ = torch._C._disabled_torch_function_impl

    @staticmethod
    def __new__(cls, _elem, *_):
        return super().__new__(cls, _elem)

    @classmethod
    def __torch_dispatch__(cls, func, _, args=(), kwargs=None):
        def unwrap(t):
            return t._elem if isinstance(t, cls) else t

        return func(*tree_map(unwrap, args), **tree_map(unwrap, kwargs))


@dataclass
class ChunkTensor(ChunkTensorBase):
    """Decoded media frames with metadata.

    The instance of this class represents the decoded video/audio frames with
    metadata, and the instance itself behave like :py:class:`~torch.Tensor`.

    Client codes can pass instance of this class as-if it's
    :py:class:`~torch.Tensor` class, or call the methods defined on
    :py:class:`~torch.Tensor` class.

    Example:
        >>> # Define input streams
        >>> reader = StreamReader(...)
        >>> reader.add_audio_stream(frames_per_chunk=4000, sample_rate=8000)
        >>> reader.add_video_stream(frames_per_chunk=7, frame_rate=28)
        >>> # Decode the streams and fetch frames
        >>> reader.fill_buffer()
        >>> audio_chunk, video_chunk = reader.pop_chunks()

        >>> # Access metadata
        >>> (audio_chunk.pts, video_chunks.pts)
        (0.0, 0.0)
        >>>
        >>> # The second time the PTS is different
        >>> reader.fill_buffer()
        >>> audio_chunk, video_chunk = reader.pop_chunks()
        >>> (audio_chunk.pts, video_chunks.pts)
        (0.5, 0.25)

        >>> # Call PyTorch ops on chunk
        >>> audio_chunk.shape
        torch.Size([4000, 2]
        >>> power = torch.pow(video_chunk, 2)
        >>>
        >>> # the result is a plain torch.Tensor class
        >>> type(power)
        <class 'torch.Tensor'>
        >>>
        >>> # Metadata is not available on the result
        >>> power.pts
        AttributeError: 'Tensor' object has no attribute 'pts'
    """

    # Keep it private for now
    _elem: torch.Tensor

    pts: float
    """Presentation time stamp of the first frame in the chunk.

    Unit: second.
    """


341
342
343
344
345
346
347
348
349
350
def _format_doc(**kwargs):
    def decorator(obj):
        obj.__doc__ = obj.__doc__.format(**kwargs)
        return obj

    return decorator


_frames_per_chunk = """Number of frames returned as one chunk.
                If the source stream is exhausted before enough frames are buffered,
351
352
353
354
                then the chunk is returned as-is.

                Providing ``-1`` disables chunking and :py:func:`pop_chunks` method
                will concatenate all the buffered frames and return it."""
355
356
357

_buffer_chunk_size = """Internal buffer size.
                When the number of chunks buffered exceeds this number, old frames are
moto's avatar
moto committed
358
359
                dropped. For example, if ``frames_per_chunk`` is 5 and ``buffer_chunk_size`` is
                3, then frames older than ``15`` are dropped.
360
                Providing ``-1`` disables this behavior.
361
362
363
364
365
366
367
368
369
370
371
372
373

                Default: ``3``."""

_audio_stream_index = """The source audio stream index.
                If omitted, :py:attr:`default_audio_stream` is used."""


_video_stream_index = """The source video stream index.
                If omitted, :py:attr:`default_video_stream` is used."""

_decoder = """The name of the decoder to be used.
                When provided, use the specified decoder instead of the default one.

374
375
376
                To list the available decoders, please use
                :py:func:`~torchaudio.utils.ffmpeg_utils.get_audio_decoders` for audio, and
                :py:func:`~torchaudio.utils.ffmpeg_utils.get_video_decoders` for video.
377
378
379
380

                Default: ``None``."""

_decoder_option = """Options passed to decoder.
381
                Mapping from str to str. (Default: ``None``)
382
383

                To list decoder options for a decoder, you can use
moto's avatar
moto committed
384
385
386
                ``ffmpeg -h decoder=<DECODER>`` command.

                |
387

388
389
390
391
                In addition to decoder-specific options, you can also pass options related
                to multithreading. They are effective only if the decoder support them.
                If neither of them are provided, StreamReader defaults to single thread.

moto's avatar
moto committed
392
393
394
395
396
397
398
399
400
401
402
403
404
405
                ``"threads"``: The number of threads (in str).
                Providing the value ``"0"`` will let FFmpeg decides based on its heuristics.

                ``"thread_type"``: Which multithreading method to use.
                The valid values are ``"frame"`` or ``"slice"``.
                Note that each decoder supports different set of methods.
                If not provided, a default value is used.

                - ``"frame"``: Decode more than one frame at once.
                  Each thread handles one frame.
                  This will increase decoding delay by one frame per thread
                - ``"slice"``: Decode more than one part of a single frame at once.

                |
406
                """
407
408
409
410
411


_hw_accel = """Enable hardware acceleration.

                When video is decoded on CUDA hardware, for example
moto's avatar
moto committed
412
413
414
                `decoder="h264_cuvid"`, passing CUDA device indicator to `hw_accel`
                (i.e. `hw_accel="cuda:0"`) will make StreamReader place the resulting
                frames directly on the specified CUDA device as CUDA tensor.
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438

                If `None`, the frame will be moved to CPU memory.
                Default: ``None``."""


_format_audio_args = _format_doc(
    frames_per_chunk=_frames_per_chunk,
    buffer_chunk_size=_buffer_chunk_size,
    stream_index=_audio_stream_index,
    decoder=_decoder,
    decoder_option=_decoder_option,
)


_format_video_args = _format_doc(
    frames_per_chunk=_frames_per_chunk,
    buffer_chunk_size=_buffer_chunk_size,
    stream_index=_video_stream_index,
    decoder=_decoder,
    decoder_option=_decoder_option,
    hw_accel=_hw_accel,
)


439
440
441
442
InputStreamTypes = TypeVar("InputStream", bound=SourceStream)
OutputStreamTypes = TypeVar("OutputStream", bound=OutputStream)


443
@torchaudio._extension.fail_if_no_ffmpeg
444
class StreamReader:
moto's avatar
moto committed
445
446
447
448
449
    """Fetch and decode audio/video streams chunk by chunk.

    For the detailed usage of this class, please refer to the tutorial.

    Args:
450
        src (str, path-like or file-like object): The media source.
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
            If string-type, it must be a resource indicator that FFmpeg can
            handle. This includes a file path, URL, device identifier or
            filter expression. The supported value depends on the FFmpeg found
            in the system.

            If file-like object, it must support `read` method with the signature
            `read(size: int) -> bytes`.
            Additionally, if the file-like object has `seek` method, it uses
            the method when parsing media metadata. This improves the reliability
            of codec detection. The signagure of `seek` method must be
            `seek(offset: int, whence: int) -> int`.

            Please refer to the following for the expected signature and behavior
            of `read` and `seek` method.

            - https://docs.python.org/3/library/io.html#io.BufferedIOBase.read
            - https://docs.python.org/3/library/io.html#io.IOBase.seek

moto's avatar
moto committed
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
        format (str or None, optional):
            Override the input format, or specify the source sound device.
            Default: ``None`` (no override nor device input).

            This argument serves two different usecases.

            1) Override the source format.
               This is useful when the input data do not contain a header.

            2) Specify the input source device.
               This allows to load media stream from hardware devices,
               such as microphone, camera and screen, or a virtual device.


            .. note::

               This option roughly corresponds to ``-f`` option of ``ffmpeg`` command.
               Please refer to the ffmpeg documentations for the possible values.

moto's avatar
moto committed
488
489
               https://ffmpeg.org/ffmpeg-formats.html#Demuxers

490
491
               Please use :py:func:`~torchaudio.utils.ffmpeg_utils.get_demuxers` to list the
               demultiplexers available in the current environment.
moto's avatar
moto committed
492
493
494
495

               For device access, the available values vary based on hardware (AV device) and
               software configuration (ffmpeg build).

moto's avatar
moto committed
496
497
               https://ffmpeg.org/ffmpeg-devices.html#Input-Devices

498
499
               Please use :py:func:`~torchaudio.utils.ffmpeg_utils.get_input_devices` to list
               the input devices available in the current environment.
moto's avatar
moto committed
500
501
502
503
504
505
506

        option (dict of str to str, optional):
            Custom option passed when initializing format context (opening source).

            You can use this argument to change the input source before it is passed to decoder.

            Default: ``None``.
507
508
509
510
511

        buffer_size (int):
            The internal buffer size in byte. Used only when `src` is file-like object.

            Default: `4096`.
moto's avatar
moto committed
512
513
514
515
    """

    def __init__(
        self,
516
        src: Union[str, Path, BinaryIO],
moto's avatar
moto committed
517
518
        format: Optional[str] = None,
        option: Optional[Dict[str, str]] = None,
519
        buffer_size: int = 4096,
moto's avatar
moto committed
520
    ):
521
        if hasattr(src, "read"):
522
            self._be = _StreamReaderFileObj(src, format, option, buffer_size)
523
        else:
524
            self._be = _StreamReader(str(src), format, option)
525
526
527
528
529

        i = self._be.find_best_audio_stream()
        self._default_audio_stream = None if i < 0 else i
        i = self._be.find_best_video_stream()
        self._default_video_stream = None if i < 0 else i
moto's avatar
moto committed
530
531
532
533
534
535
536

    @property
    def num_src_streams(self):
        """Number of streams found in the provided media source.

        :type: int
        """
537
        return self._be.num_src_streams()
moto's avatar
moto committed
538
539
540
541
542
543
544

    @property
    def num_out_streams(self):
        """Number of output streams configured by client code.

        :type: int
        """
545
        return self._be.num_out_streams()
moto's avatar
moto committed
546
547
548
549
550
551
552

    @property
    def default_audio_stream(self):
        """The index of default audio stream. ``None`` if there is no audio stream

        :type: Optional[int]
        """
553
        return self._default_audio_stream
moto's avatar
moto committed
554
555
556
557
558
559
560

    @property
    def default_video_stream(self):
        """The index of default video stream. ``None`` if there is no video stream

        :type: Optional[int]
        """
561
        return self._default_video_stream
moto's avatar
moto committed
562

moto's avatar
moto committed
563
564
565
566
567
568
569
570
    def get_metadata(self) -> Dict[str, str]:
        """Get the metadata of the source media.

        Returns:
            dict
        """
        return self._be.get_metadata()

571
    def get_src_stream_info(self, i: int) -> InputStreamTypes:
moto's avatar
moto committed
572
573
574
575
576
        """Get the metadata of source stream

        Args:
            i (int): Stream index.
        Returns:
577
578
            InputStreamTypes:
                Information about the source stream.
moto's avatar
moto committed
579
580
581
582
                If the source stream is audio type, then
                :class:`~torchaudio.io._stream_reader.SourceAudioStream` is returned.
                If it is video type, then
                :class:`~torchaudio.io._stream_reader.SourceVideoStream` is returned.
583
                Otherwise :class:`~torchaudio.io._stream_reader.SourceStream` class is returned.
moto's avatar
moto committed
584
        """
585
        return _parse_si(self._be.get_src_stream_info(i))
moto's avatar
moto committed
586

587
    def get_out_stream_info(self, i: int) -> OutputStreamTypes:
moto's avatar
moto committed
588
589
590
591
592
        """Get the metadata of output stream

        Args:
            i (int): Stream index.
        Returns:
593
594
            OutputStreamTypes
                Information about the output stream.
moto's avatar
moto committed
595
596
597
598
                If the output stream is audio type, then
                :class:`~torchaudio.io._stream_reader.OutputAudioStream` is returned.
                If it is video type, then
                :class:`~torchaudio.io._stream_reader.OutputVideoStream` is returned.
moto's avatar
moto committed
599
        """
600
        info = self._be.get_out_stream_info(i)
601
        return _parse_oi(info)
moto's avatar
moto committed
602

Joao Gomes's avatar
Joao Gomes committed
603
    def seek(self, timestamp: float, mode: str = "precise"):
moto's avatar
moto committed
604
605
606
607
        """Seek the stream to the given timestamp [second]

        Args:
            timestamp (float): Target time in second.
Joao Gomes's avatar
Joao Gomes committed
608
609
610
611
612
613
614
615
616
617
618
619
620
621
            mode (str): Controls how seek is done.
                Valid choices are;

                * "key": Seek into the nearest key frame before the given timestamp.
                * "any": Seek into any frame (including non-key frames) before the given timestamp.
                * "precise": First seek into the nearest key frame before the given timestamp, then
                  decode frames until it reaches the closes frame to the given timestamp.

                Note:
                   All the modes invalidate and reset the internal state of decoder.
                   When using "any" mode and if it ends up seeking into non-key frame,
                   the image decoded may be invalid due to lack of key frame.
                   Using "precise" will workaround this issue by decoding frames from previous
                   key frame, but will be slower.
moto's avatar
moto committed
622
        """
Joao Gomes's avatar
Joao Gomes committed
623
624
625
626
627
628
629
630
        modes = {
            "key": 0,
            "any": 1,
            "precise": 2,
        }
        if mode not in modes:
            raise ValueError(f"The value of mode must be one of {list(modes.keys())}. Found: {mode}")
        self._be.seek(timestamp, modes[mode])
moto's avatar
moto committed
631

632
    @_format_audio_args
moto's avatar
moto committed
633
634
635
636
    def add_basic_audio_stream(
        self,
        frames_per_chunk: int,
        buffer_chunk_size: int = 3,
637
        *,
moto's avatar
moto committed
638
        stream_index: Optional[int] = None,
639
640
641
        decoder: Optional[str] = None,
        decoder_option: Optional[Dict[str, str]] = None,
        format: Optional[str] = "fltp",
moto's avatar
moto committed
642
        sample_rate: Optional[int] = None,
643
        num_channels: Optional[int] = None,
moto's avatar
moto committed
644
645
646
647
    ):
        """Add output audio stream

        Args:
648
            frames_per_chunk (int): {frames_per_chunk}
moto's avatar
moto committed
649

650
            buffer_chunk_size (int, optional): {buffer_chunk_size}
moto's avatar
moto committed
651

652
            stream_index (int or None, optional): {stream_index}
moto's avatar
moto committed
653

654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
            decoder (str or None, optional): {decoder}

            decoder_option (dict or None, optional): {decoder_option}

            format (str, optional): Output sample format (precision).

                If ``None``, the output chunk has dtype corresponding to
                the precision of the source audio.

                Otherwise, the sample is converted and the output dtype is changed
                as following.

                - ``"u8p"``: The output is ``torch.uint8`` type.
                - ``"s16p"``: The output is ``torch.int16`` type.
                - ``"s32p"``: The output is ``torch.int32`` type.
                - ``"s64p"``: The output is ``torch.int64`` type.
                - ``"fltp"``: The output is ``torch.float32`` type.
                - ``"dblp"``: The output is ``torch.float64`` type.
moto's avatar
moto committed
672

673
674
675
                Default: ``"fltp"``.

            sample_rate (int or None, optional): If provided, resample the audio.
676
677

            num_channels (int, or None, optional): If provided, change the number of channels.
moto's avatar
moto committed
678
        """
679
        self.add_audio_stream(
680
681
            frames_per_chunk,
            buffer_chunk_size,
682
683
684
685
            stream_index=stream_index,
            decoder=decoder,
            decoder_option=decoder_option,
            filter_desc=_get_afilter_desc(sample_rate, format, num_channels),
moto's avatar
moto committed
686
687
        )

688
    @_format_video_args
moto's avatar
moto committed
689
690
691
692
    def add_basic_video_stream(
        self,
        frames_per_chunk: int,
        buffer_chunk_size: int = 3,
693
        *,
moto's avatar
moto committed
694
        stream_index: Optional[int] = None,
695
696
697
        decoder: Optional[str] = None,
        decoder_option: Optional[Dict[str, str]] = None,
        format: Optional[str] = "rgb24",
moto's avatar
moto committed
698
699
700
        frame_rate: Optional[int] = None,
        width: Optional[int] = None,
        height: Optional[int] = None,
701
        hw_accel: Optional[str] = None,
moto's avatar
moto committed
702
703
704
705
    ):
        """Add output video stream

        Args:
706
707
708
709
710
711
712
            frames_per_chunk (int): {frames_per_chunk}

            buffer_chunk_size (int, optional): {buffer_chunk_size}

            stream_index (int or None, optional): {stream_index}

            decoder (str or None, optional): {decoder}
moto's avatar
moto committed
713

714
            decoder_option (dict or None, optional): {decoder_option}
moto's avatar
moto committed
715

716
717
718
719
720
721
722
723
            format (str, optional): Change the format of image channels. Valid values are,

                - ``"rgb24"``: 8 bits * 3 channels (R, G, B)
                - ``"bgr24"``: 8 bits * 3 channels (B, G, R)
                - ``"yuv420p"``: 8 bits * 3 channels (Y, U, V)
                - ``"gray"``: 8 bits * 1 channels

                Default: ``"rgb24"``.
moto's avatar
moto committed
724
725
726
727
728

            frame_rate (int or None, optional): If provided, change the frame rate.

            width (int or None, optional): If provided, change the image width. Unit: Pixel.

729
            height (int or None, optional): If provided, change the image height. Unit: Pixel.
730
731

            hw_accel (str or None, optional): {hw_accel}
moto's avatar
moto committed
732
        """
733
        self.add_video_stream(
moto's avatar
moto committed
734
735
            frames_per_chunk,
            buffer_chunk_size,
736
737
738
739
740
            stream_index=stream_index,
            decoder=decoder,
            decoder_option=decoder_option,
            filter_desc=_get_vfilter_desc(frame_rate, width, height, format),
            hw_accel=hw_accel,
moto's avatar
moto committed
741
742
        )

743
    @_format_audio_args
moto's avatar
moto committed
744
745
746
747
    def add_audio_stream(
        self,
        frames_per_chunk: int,
        buffer_chunk_size: int = 3,
748
        *,
moto's avatar
moto committed
749
        stream_index: Optional[int] = None,
750
        decoder: Optional[str] = None,
751
752
        decoder_option: Optional[Dict[str, str]] = None,
        filter_desc: Optional[str] = None,
moto's avatar
moto committed
753
754
755
756
    ):
        """Add output audio stream

        Args:
757
758
759
            frames_per_chunk (int): {frames_per_chunk}

            buffer_chunk_size (int, optional): {buffer_chunk_size}
moto's avatar
moto committed
760

761
            stream_index (int or None, optional): {stream_index}
moto's avatar
moto committed
762

763
764
765
            decoder (str or None, optional): {decoder}

            decoder_option (dict or None, optional): {decoder_option}
moto's avatar
moto committed
766
767
768
769
770

            filter_desc (str or None, optional): Filter description.
                The list of available filters can be found at
                https://ffmpeg.org/ffmpeg-filters.html
                Note that complex filters are not supported.
771

moto's avatar
moto committed
772
773
        """
        i = self.default_audio_stream if stream_index is None else stream_index
774
775
776
        if i is None:
            raise RuntimeError("There is no audio stream.")
        self._be.add_audio_stream(
777
778
779
780
781
            i,
            frames_per_chunk,
            buffer_chunk_size,
            filter_desc,
            decoder,
782
            decoder_option or {},
moto's avatar
moto committed
783
784
        )

785
    @_format_video_args
moto's avatar
moto committed
786
787
788
789
    def add_video_stream(
        self,
        frames_per_chunk: int,
        buffer_chunk_size: int = 3,
790
        *,
moto's avatar
moto committed
791
        stream_index: Optional[int] = None,
792
        decoder: Optional[str] = None,
793
794
        decoder_option: Optional[Dict[str, str]] = None,
        filter_desc: Optional[str] = None,
795
        hw_accel: Optional[str] = None,
moto's avatar
moto committed
796
797
798
799
    ):
        """Add output video stream

        Args:
800
801
802
803
804
805
806
            frames_per_chunk (int): {frames_per_chunk}

            buffer_chunk_size (int, optional): {buffer_chunk_size}

            stream_index (int or None, optional): {stream_index}

            decoder (str or None, optional): {decoder}
moto's avatar
moto committed
807

808
            decoder_option (dict or None, optional): {decoder_option}
moto's avatar
moto committed
809

810
            hw_accel (str or None, optional): {hw_accel}
moto's avatar
moto committed
811
812
813
814
815
816
817

            filter_desc (str or None, optional): Filter description.
                The list of available filters can be found at
                https://ffmpeg.org/ffmpeg-filters.html
                Note that complex filters are not supported.
        """
        i = self.default_video_stream if stream_index is None else stream_index
818
819
820
        if i is None:
            raise RuntimeError("There is no video stream.")
        self._be.add_video_stream(
821
822
823
824
825
            i,
            frames_per_chunk,
            buffer_chunk_size,
            filter_desc,
            decoder,
826
            decoder_option or {},
827
            hw_accel,
moto's avatar
moto committed
828
829
830
831
832
833
834
835
        )

    def remove_stream(self, i: int):
        """Remove an output stream.

        Args:
            i (int): Index of the output stream to be removed.
        """
836
        self._be.remove_stream(i)
moto's avatar
moto committed
837

moto's avatar
moto committed
838
    def process_packet(self, timeout: Optional[float] = None, backoff: float = 10.0) -> int:
moto's avatar
moto committed
839
840
        """Read the source media and process one packet.

moto's avatar
moto committed
841
        If a packet is read successfully, then the data in the packet will
moto's avatar
moto committed
842
        be decoded and passed to corresponding output stream processors.
moto's avatar
moto committed
843
844
845
846
847
848
849
850

        If the packet belongs to a source stream that is not connected to
        an output stream, then the data are discarded.

        When the source reaches EOF, then it triggers all the output stream
        processors to enter drain mode. All the output stream processors
        flush the pending frames.

moto's avatar
moto committed
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
        Args:
            timeout (float or None, optional): Timeout in milli seconds.

                This argument changes the retry behavior when it failed to
                process a packet due to the underlying media resource being
                temporarily unavailable.

                When using a media device such as a microphone, there are cases
                where the underlying buffer is not ready.
                Calling this function in such case would cause the system to report
                `EAGAIN (resource temporarily unavailable)`.

                * ``>=0``: Keep retrying until the given time passes.

                * ``0<``: Keep retrying forever.

                * ``None`` : No retrying and raise an exception immediately.

                Default: ``None``.

                Note:

                    The retry behavior is applicable only when the reason is the
                    unavailable resource. It is not invoked if the reason of failure is
                    other.

            backoff (float, optional): Time to wait before retrying in milli seconds.

moto's avatar
moto committed
879
                This option is effective only when `timeout` is effective. (not ``None``)
moto's avatar
moto committed
880
881

                When `timeout` is effective, this `backoff` controls how long the function
moto's avatar
moto committed
882
                should wait before retrying. Default: ``10.0``.
moto's avatar
moto committed
883

moto's avatar
moto committed
884
885
886
887
888
889
890
891
892
893
894
        Returns:
            int:
                ``0``
                A packet was processed properly. The caller can keep
                calling this function to buffer more frames.

                ``1``
                The streamer reached EOF. All the output stream processors
                flushed the pending frames. The caller should stop calling
                this method.
        """
895
        return self._be.process_packet(timeout, backoff)
moto's avatar
moto committed
896
897
898

    def process_all_packets(self):
        """Process packets until it reaches EOF."""
899
        self._be.process_all_packets()
moto's avatar
moto committed
900
901
902

    def is_buffer_ready(self) -> bool:
        """Returns true if all the output streams have at least one chunk filled."""
903
        return self._be.is_buffer_ready()
moto's avatar
moto committed
904

moto's avatar
moto committed
905
    def pop_chunks(self) -> Tuple[Optional[ChunkTensor]]:
moto's avatar
moto committed
906
907
        """Pop one chunk from all the output stream buffers.

moto's avatar
moto committed
908
        Returns:
moto's avatar
moto committed
909
            Tuple[Optional[ChunkTensor]]:
moto's avatar
moto committed
910
911
912
                Buffer contents.
                If a buffer does not contain any frame, then `None` is returned instead.
        """
moto's avatar
moto committed
913
914
915
916
917
        ret = []
        for chunk in self._be.pop_chunks():
            if chunk is None:
                ret.append(None)
            else:
918
                ret.append(ChunkTensor(chunk.frames, chunk.pts))
moto's avatar
moto committed
919
        return ret
moto's avatar
moto committed
920

moto's avatar
moto committed
921
    def fill_buffer(self, timeout: Optional[float] = None, backoff: float = 10.0) -> int:
moto's avatar
moto committed
922
923
        """Keep processing packets until all buffers have at least one chunk

moto's avatar
moto committed
924
925
926
927
928
929
930
        Arguments:
            timeout (float or None, optional): See
                :py:func:`~StreamReader.process_packet`. (Default: ``None``)

            backoff (float, optional): See
                :py:func:`~StreamReader.process_packet`. (Default: ``10.0``)

moto's avatar
moto committed
931
932
933
934
935
936
937
938
939
940
941
        Returns:
            int:
                ``0``
                Packets are processed properly and buffers are
                ready to be popped once.

                ``1``
                The streamer reached EOF. All the output stream processors
                flushed the pending frames. The caller should stop calling
                this method.
        """
942
        return self._be.fill_buffer(timeout, backoff)
moto's avatar
moto committed
943

moto's avatar
moto committed
944
945
    def stream(
        self, timeout: Optional[float] = None, backoff: float = 10.0
moto's avatar
moto committed
946
    ) -> Iterator[Tuple[Optional[ChunkTensor], ...]]:
moto's avatar
moto committed
947
948
949
950
        """Return an iterator that generates output tensors

        Arguments:
            timeout (float or None, optional): See
951
                :py:func:`~StreamReader.process_packet`. (Default: ``None``)
moto's avatar
moto committed
952
953

            backoff (float, optional): See
954
                :py:func:`~StreamReader.process_packet`. (Default: ``10.0``)
moto's avatar
moto committed
955
956

        Returns:
moto's avatar
moto committed
957
            Iterator[Tuple[Optional[ChunkTensor], ...]]:
moto's avatar
moto committed
958
                Iterator that yields a tuple of chunks that correspond to the output
moto's avatar
moto committed
959
960
961
962
963
                streams defined by client code.
                If an output stream is exhausted, then the chunk Tensor is substituted
                with ``None``.
                The iterator stops if all the output streams are exhausted.
        """
moto's avatar
moto committed
964
965
966
967
        if self.num_out_streams == 0:
            raise RuntimeError("No output stream is configured.")

        while True:
968
            if self.fill_buffer(timeout, backoff):
moto's avatar
moto committed
969
970
971
972
973
974
975
976
                break
            yield self.pop_chunks()

        while True:
            chunks = self.pop_chunks()
            if all(c is None for c in chunks):
                return
            yield chunks