audio_classification.py 5.72 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2021 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import subprocess
15
from typing import Union
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

import numpy as np

from ..file_utils import add_end_docstrings, is_torch_available
from ..utils import logging
from .base import PIPELINE_INIT_ARGS, Pipeline


if is_torch_available():
    from ..models.auto.modeling_auto import MODEL_FOR_AUDIO_CLASSIFICATION_MAPPING

logger = logging.get_logger(__name__)


def ffmpeg_read(bpayload: bytes, sampling_rate: int) -> np.array:
    """
    Helper function to read an audio file through ffmpeg.
    """
    ar = f"{sampling_rate}"
    ac = "1"
    format_for_conversion = "f32le"
    ffmpeg_command = [
        "ffmpeg",
        "-i",
        "pipe:0",
        "-ac",
        ac,
        "-ar",
        ar,
        "-f",
        format_for_conversion,
        "-hide_banner",
        "-loglevel",
        "quiet",
        "pipe:1",
    ]

    try:
        ffmpeg_process = subprocess.Popen(ffmpeg_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
    except FileNotFoundError:
        raise ValueError("ffmpeg was not found but is required to load audio files from filename")
    output_stream = ffmpeg_process.communicate(bpayload)
    out_bytes = output_stream[0]

    audio = np.frombuffer(out_bytes, np.float32)
    if audio.shape[0] == 0:
        raise ValueError("Malformed soundfile")
    return audio


@add_end_docstrings(PIPELINE_INIT_ARGS)
class AudioClassificationPipeline(Pipeline):
    """
69
    Audio classification pipeline using any `AutoModelForAudioClassification`. This pipeline predicts the class of
70
71
72
    a raw waveform or an audio file. In case of an audio file, ffmpeg should be installed to support multiple audio
    formats.

73
74
    This pipeline can currently be loaded from [`pipeline`] using the following task identifier:
    `"audio-classification"`.
75

76
    See the list of available models on [huggingface.co/models](https://huggingface.co/models?filter=audio-classification).
77
78
    """

79
80
81
82
    def __init__(self, *args, **kwargs):
        # Default, might be overriden by the model.config.
        kwargs["top_k"] = 5
        super().__init__(*args, **kwargs)
83
84
85
86
87
88
89
90
91
92
93
94

        if self.framework != "pt":
            raise ValueError(f"The {self.__class__} is only available in PyTorch.")

        self.check_model_type(MODEL_FOR_AUDIO_CLASSIFICATION_MAPPING)

    def __call__(
        self,
        inputs: Union[np.ndarray, bytes, str],
        **kwargs,
    ):
        """
95
        Classify the sequence(s) given as inputs. See the [`AutomaticSpeechRecognitionPipeline`]
96
97
98
        documentation for more information.

        Args:
99
100
101
            inputs (`np.ndarray` or `bytes` or `str`):
                The inputs is either a raw waveform (`np.ndarray` of shape (n, ) of type `np.float32` or
                `np.float64`) at the correct sampling rate (no further check will be done) or a `str` that is
102
                the filename of the audio file, the file will be read at the correct sampling rate to get the waveform
103
104
105
106
                using *ffmpeg*. This requires *ffmpeg* to be installed on the system. If *inputs* is `bytes` it is
                supposed to be the content of an audio file and is interpreted by *ffmpeg* in the same way.
            top_k (`int`, *optional*, defaults to None):
                The number of top labels that will be returned by the pipeline. If the provided number is *None* or
107
108
109
110
                higher than the number of labels available in the model configuration, it will default to the number of
                labels.

        Return:
111
            A list of `dict` with the following keys:
112

113
114
            - **label** (`str`) -- The label predicted.
            - **score** (`float`) -- The corresponding probability.
115
        """
116
117
118
119
120
121
122
123
124
125
126
127
        return super().__call__(inputs, **kwargs)

    def _sanitize_parameters(self, top_k=None, **kwargs):
        # No parameters on this pipeline right now
        postprocess_params = {}
        if top_k is not None:
            if top_k > self.model.config.num_labels:
                top_k = self.model.config.num_labels
            postprocess_params["top_k"] = top_k
        return {}, {}, postprocess_params

    def preprocess(self, inputs):
128
129
130
131
132
133
134
135
136
137
        if isinstance(inputs, str):
            with open(inputs, "rb") as f:
                inputs = f.read()

        if isinstance(inputs, bytes):
            inputs = ffmpeg_read(inputs, self.feature_extractor.sampling_rate)

        if not isinstance(inputs, np.ndarray):
            raise ValueError("We expect a numpy ndarray as input")
        if len(inputs.shape) != 1:
138
            raise ValueError("We expect a single channel audio input for AutomaticSpeechRecognitionPipeline")
139
140
141
142

        processed = self.feature_extractor(
            inputs, sampling_rate=self.feature_extractor.sampling_rate, return_tensors="pt"
        )
143
        return processed
144

145
146
147
    def _forward(self, model_inputs):
        model_outputs = self.model(**model_inputs)
        return model_outputs
148

149
150
151
    def postprocess(self, model_outputs, top_k=5):
        probs = model_outputs.logits[0].softmax(-1)
        scores, ids = probs.topk(top_k)
152

153
154
        scores = scores.tolist()
        ids = ids.tolist()
155
156
157
158

        labels = [{"score": score, "label": self.model.config.id2label[_id]} for score, _id in zip(scores, ids)]

        return labels