voxtral_realtime.py 16.9 KB
Newer Older
Patrick von Platen's avatar
Patrick von Platen committed
1
2
3
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

4
import asyncio
Patrick von Platen's avatar
Patrick von Platen committed
5
import math
6
from collections.abc import AsyncGenerator, Iterable, Iterator, Mapping
7
from typing import Literal
Patrick von Platen's avatar
Patrick von Platen committed
8

9
import numpy as np
Patrick von Platen's avatar
Patrick von Platen committed
10
import torch
11
12
13
14
15
from mistral_common.protocol.instruct.chunk import RawAudio
from mistral_common.protocol.transcription.request import (
    StreamingMode,
    TranscriptionRequest,
)
16
from mistral_common.tokens.tokenizers.audio import Audio, AudioConfig
Patrick von Platen's avatar
Patrick von Platen committed
17

18
from vllm.compilation.decorators import support_torch_compile
19
from vllm.config import ModelConfig, SpeechToTextConfig, VllmConfig
20
from vllm.envs import VLLM_ENGINE_ITERATION_TIMEOUT_S
21
from vllm.inputs.data import PromptType, StreamingInput, TokensPrompt
Patrick von Platen's avatar
Patrick von Platen committed
22
from vllm.logger import init_logger
23
from vllm.model_executor.models.interfaces import MultiModalEmbeddings, SupportsRealtime
Patrick von Platen's avatar
Patrick von Platen committed
24
25
26
27
28
29
30
31
32
33
34
35
from vllm.model_executor.models.voxtral import (
    VoxtralDummyInputsBuilder,
    VoxtralForConditionalGeneration,
    VoxtralMultiModalProcessor,
    VoxtralProcessingInfo,
)
from vllm.multimodal import MULTIMODAL_REGISTRY
from vllm.multimodal.cache import _I, BaseMultiModalProcessorCache
from vllm.multimodal.inputs import (
    MultiModalKwargsOptionalItems,
)
from vllm.multimodal.parse import MultiModalDataItems
36
37
from vllm.multimodal.processing import BaseDummyInputsBuilder
from vllm.multimodal.processing.processor import (
Patrick von Platen's avatar
Patrick von Platen committed
38
39
40
41
    MultiModalPromptUpdates,
    PlaceholderFeaturesInfo,
)
from vllm.sequence import IntermediateTensors
42
from vllm.tokenizers import cached_tokenizer_from_config
Patrick von Platen's avatar
Patrick von Platen committed
43
44
45
46
47
48
49
50

from .utils import (
    _flatten_embeddings,
)

logger = init_logger(__name__)


51
class VoxtralRealtimeMultiModalProcessor(VoxtralMultiModalProcessor):
Patrick von Platen's avatar
Patrick von Platen committed
52
53
54
55
56
57
58
    def __init__(
        self,
        info: _I,
        dummy_inputs: BaseDummyInputsBuilder[_I],
        *,
        cache: BaseMultiModalProcessorCache | None = None,
    ) -> None:
59
        # realtime can't make use of a cache yet
Patrick von Platen's avatar
Patrick von Platen committed
60
61
62
63
64
65
66
67
68
69
70
71
72
        super().__init__(info, dummy_inputs, cache=None)

    def _maybe_apply_prompt_updates(
        self,
        mm_items: MultiModalDataItems,
        prompt_ids: list[int],
        mm_kwargs: MultiModalKwargsOptionalItems,
        mm_prompt_updates: MultiModalPromptUpdates,
        is_update_applied: bool,
    ) -> tuple[list[int], Mapping[str, list[PlaceholderFeaturesInfo]]]:
        # there are no placeholder audio tokens for streaming
        # so we need to build the place placeholder positions manually

73
        # in realtime there is always only one audio input
Patrick von Platen's avatar
Patrick von Platen committed
74
75
        audios = mm_kwargs.get("audio", [])
        assert len(audios) == 1, (
76
            f"Expected only one audio input for realtime, got {mm_kwargs=}"
Patrick von Platen's avatar
Patrick von Platen committed
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
        )
        tokenizer = self.info.get_tokenizer()
        audio_config = tokenizer.instruct.audio_encoder.audio_config

        num_audio_samples = audios[0]["audio_arrays"].data.shape[0]
        length = audio_config.num_audio_tokens(num_audio_samples)

        features_info = PlaceholderFeaturesInfo(
            modality="audio",
            item_idx=0,
            start_idx=0,
            tokens=length
            * [0],  # only used for length computation, so we can take dummy inputs
            is_embed=None,
        )
        return prompt_ids, {"audio": [features_info]}


class TimeEmbedding(torch.nn.Module):
    """Sinusoidal Embedding for encoding time"""

    def __init__(self, dim: int, theta: float = 10000.0) -> None:
        super().__init__()
        self.dim = dim
        self.theta = theta
        inv_freq = torch.exp(
            -math.log(self.theta)
            * torch.arange(self.dim // 2).float()
            / (self.dim // 2)
        )
        self.register_buffer("inv_freq", inv_freq, persistent=False)

    def forward(self, t: torch.Tensor) -> torch.Tensor:
        t = t[..., None]  # (B,) -> (B, 1) or (B, T) -> (B, T, 1)
        inv_freq = self.inv_freq.to(device=t.device, dtype=t.dtype)
        emb = (
            t * inv_freq
        )  # (B, 1) x (D/2,) -> (B, D/2) or (B, T, 1) x (D/2,) -> (B, T, D/2)
        return torch.cat((emb.cos(), emb.sin()), dim=-1)  # (B, D) or (B, T, D)


118
119
120
121
122
123
124
125
126
127
128
129
def _expand_tensor(input_tensor: torch.Tensor, scaling: int) -> torch.Tensor:
    # 1. Multiply by the scaling factor (e.g. 4)
    base = input_tensor * scaling

    # 2. Create the offsets, e.g. [0, 1, 2, 3]
    offsets = torch.arange(scaling, device=input_tensor.device)

    # 3. Use broadcasting, e.g. (N, 1) + (4,) results in (N, 4)
    # Then flatten back to 1D
    return (base.unsqueeze(1) + offsets).view(-1)


130
class VoxtralRealtimeBuffer:
131
    def __init__(self, config: AudioConfig, prompt_tokens: list[int]) -> None:
132
133
        self._config = config

134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
        _look_ahead_in_ms = self._config.streaming_look_ahead_ms
        _look_back_in_ms = self._config.streaming_look_back_ms
        self._look_ahead_in_samples = self._ms_to_samples(_look_ahead_in_ms)
        self._look_back_in_samples = self._ms_to_samples(_look_back_in_ms)

        # None signals the end
        self._audio_queue: asyncio.Queue[np.ndarray | None] = asyncio.Queue()
        self._leftover: np.ndarray | None = None
        self._token_queue: asyncio.Queue[int] = asyncio.Queue()

        self._initial_end = len(prompt_tokens) * self._config.raw_audio_length_per_tok
        for token in prompt_tokens:
            self._token_queue.put_nowait(token)

    def _generate_frame_size_and_num_tokens(self) -> Iterator[tuple[int, int]]:
        streaming_step_size = self._ms_to_samples(1000 / self._config.frame_rate)
        start = 0
        end = self._initial_end
        while True:
            frame_start = max(start - self._look_back_in_samples, 0)
            frame_end = end + self._look_ahead_in_samples
            frame_size = frame_end - frame_start
            num_tokens = (end - start) / self._config.raw_audio_length_per_tok
            assert num_tokens.is_integer()
            yield frame_size, int(num_tokens)
            start = end
            end += streaming_step_size

    def _ms_to_samples(self, ms: float) -> int:
        len_ = self._config.sampling_rate * ms / 1000
        assert len_.is_integer(), len_
        return int(len_)

    async def append_audio(self, audio_array: np.ndarray | None) -> None:
        await self._audio_queue.put(audio_array)

    async def append_tokens(self, tokens: Iterable[int]) -> None:
        for token in tokens:
            await self._token_queue.put(token)

    async def get_input_stream(self) -> AsyncGenerator[StreamingInput]:
        for frame_size, num_tokens in self._generate_frame_size_and_num_tokens():
            next_tokens = [await self._token_queue.get() for _ in range(num_tokens)]

            audio_arrays: list[np.ndarray] = (
                [self._leftover] if self._leftover is not None else []
            )
            while sum(len(arr) for arr in audio_arrays) < frame_size:
                arr = await self._audio_queue.get()
                if arr is None:
                    return
                audio_arrays.append(arr)

            audio_array = np.concatenate(audio_arrays)
            frame = audio_array[:frame_size]

            # The current stride took look_ahead_in_samples audio of the next sample
            # In addition the next sample will take look_back_in_samples audio of
            # the current sample => So let's put both of this into the leftover
            stride = (
                frame_size - self._look_ahead_in_samples - self._look_back_in_samples
            )
            assert stride > 0, f"{stride=} must be positive"

            self._leftover = audio_array[stride:]

            yield StreamingInput(
                TokensPrompt(
                    prompt_token_ids=next_tokens,
                    multi_modal_data={"audio": (frame, None)},
                )
            )
206
207


Patrick von Platen's avatar
Patrick von Platen committed
208
@MULTIMODAL_REGISTRY.register_processor(
209
    VoxtralRealtimeMultiModalProcessor,
Patrick von Platen's avatar
Patrick von Platen committed
210
211
212
    info=VoxtralProcessingInfo,
    dummy_inputs=VoxtralDummyInputsBuilder,
)
213
@support_torch_compile
214
class VoxtralRealtimeGeneration(VoxtralForConditionalGeneration, SupportsRealtime):
Patrick von Platen's avatar
Patrick von Platen committed
215
    requires_raw_input_tokens = True
216
217
218
    # transformers' currently has limited support for MistralCommon backend
    # and cached_get_processor. Let's skip until fixed
    skip_warmup_audio_preprocessing = True
Patrick von Platen's avatar
Patrick von Platen committed
219
220
221

    def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
        super().__init__(vllm_config=vllm_config, prefix=prefix)
222
223
224

        assert (
            not vllm_config.compilation_config.cudagraph_mode.has_full_cudagraphs()
225
        ), "Voxtral realtime doesn't support full cudagraphs yet. Please use PIECEWISE."
226

Patrick von Platen's avatar
Patrick von Platen committed
227
228
229
230
231
        self.time_embedding: TimeEmbedding = TimeEmbedding(
            dim=self.config.text_config.hidden_size
        )

        audio_config = self.tokenizer.instruct.audio_encoder.audio_config
232
        self.n_delay_tokens = audio_config.get_num_delay_tokens()
233
234
235
236
237
238
239
240
241
242
243
244
245

    # for realtime transcription
    @classmethod
    async def buffer_realtime_audio(
        cls,
        audio_stream: AsyncGenerator[np.ndarray, None],
        input_stream: asyncio.Queue[list[int]],
        model_config: ModelConfig,
    ) -> AsyncGenerator[PromptType, None]:
        tokenizer = cached_tokenizer_from_config(model_config)
        audio_encoder = tokenizer.instruct.audio_encoder
        config = audio_encoder.audio_config

246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
        # Get prompt tokens (streaming prefix tokens) without encoding audio
        prompt_tokens = (
            tokenizer.instruct.start() + audio_encoder.encode_streaming_tokens()
        )

        # Get left/right padding audio
        left_pad, right_pad = audio_encoder.get_padding_audio()

        buffer = VoxtralRealtimeBuffer(config, prompt_tokens)

        # Feed audio with padding into buffer in background
        async def feed_audio():
            yielded_first_chunk = False
            async for audio_chunk in audio_stream:
                if not yielded_first_chunk:
                    yielded_first_chunk = True
                    # Prepend left padding before first real audio
                    await buffer.append_audio(left_pad.audio_array)
                await buffer.append_audio(audio_chunk)
            # Append right padding at the end
            await buffer.append_audio(right_pad.audio_array)
            await buffer.append_audio(None)  # signal end

        # Feed output tokens back into buffer in background
        async def feed_tokens():
            while True:
                all_outputs = await asyncio.wait_for(
                    input_stream.get(),
                    timeout=VLLM_ENGINE_ITERATION_TIMEOUT_S,
275
                )
276
277
278
279
280
281
282
283
284
285
286
                await buffer.append_tokens(all_outputs[-1:])

        audio_task = asyncio.create_task(feed_audio())
        token_task = asyncio.create_task(feed_tokens())

        try:
            async for streaming_input in buffer.get_input_stream():
                yield streaming_input.prompt
        finally:
            audio_task.cancel()
            token_task.cancel()
Patrick von Platen's avatar
Patrick von Platen committed
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301

    @property
    def audio_config(self):
        return self.tokenizer.instruct.audio_encoder.audio_config

    def embed_input_ids(
        self,
        input_ids: torch.Tensor,
        multimodal_embeddings: MultiModalEmbeddings | None = None,
        *,
        is_multimodal: torch.Tensor | None = None,
        # Multi-modal token ID may exceed vocab size
        handle_oov_mm_token: bool = True,
    ) -> torch.Tensor:
        """Pass post-conv embeddings directly as input"""
302
        # for realtime we simply flatten the multimodal embeddings
Patrick von Platen's avatar
Patrick von Platen committed
303
304
305
        # to be in tensor format, we treat the input ids later
        assert multimodal_embeddings is not None
        assert len(multimodal_embeddings) > 0, (
306
            "For realtime you must provide a multimodal_embedding at every step."
Patrick von Platen's avatar
Patrick von Platen committed
307
308
309
310
311
312
        )
        mm_embeds_flat = _flatten_embeddings(multimodal_embeddings)
        return mm_embeds_flat

    def forward(
        self,
313
        input_ids: torch.Tensor | None,
Patrick von Platen's avatar
Patrick von Platen committed
314
315
316
317
318
319
320
321
322
323
324
325
326
        positions: torch.Tensor,
        intermediate_tensors: IntermediateTensors | None = None,
        inputs_embeds: torch.Tensor | None = None,
        **kwargs: object,
    ) -> torch.Tensor | IntermediateTensors:
        assert inputs_embeds is not None
        assert input_ids is not None

        pool_size = self.config.audio_config.block_pool_size
        inputs_embeds = inputs_embeds.view(
            inputs_embeds.shape[0] * pool_size, inputs_embeds.shape[1] // pool_size
        )

327
328
329
        whisper_positions = _expand_tensor(positions, pool_size)
        audio_hidden_states = self.whisper_encoder.whisper_encoder(
            inputs_embeds, whisper_positions
Patrick von Platen's avatar
Patrick von Platen committed
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
        )

        num_tokens, audio_hidden_size = audio_hidden_states.shape
        assert num_tokens % self.downsample_factor == 0
        audio_hidden_states = audio_hidden_states.reshape(
            num_tokens // self.downsample_factor,
            audio_hidden_size * self.downsample_factor,
        )
        audio_text_embeds = self.audio_language_adapter(audio_hidden_states)

        text_embeds = self.language_model.embed_input_ids(input_ids)

        # sum pool text and audio embeddings
        inputs_embeds = audio_text_embeds + text_embeds

345
346
347
        time_tensor = torch.full(
            (1,),
            fill_value=self.n_delay_tokens,
Patrick von Platen's avatar
Patrick von Platen committed
348
349
350
            device=inputs_embeds.device,
            dtype=inputs_embeds.dtype,
        )
351
        t_cond = self.time_embedding(time_tensor)
Patrick von Platen's avatar
Patrick von Platen committed
352
353

        hidden_states = self.language_model.model(
354
355
356
357
358
            input_ids,
            positions,
            intermediate_tensors,
            inputs_embeds=inputs_embeds,
            t_cond=t_cond,
Patrick von Platen's avatar
Patrick von Platen committed
359
360
361
362
363
364
365
366
367
368
369
        )

        return hidden_states

    def embed_multimodal(
        self, **kwargs
    ) -> list[torch.Tensor] | torch.Tensor | tuple[torch.Tensor, ...] | None:
        """Transform audio waveforms -> initial whisper post-conv embeddings"""
        audio_inputs = self._parse_and_validate_audio_arrays(**kwargs)

        assert audio_inputs is not None, (
370
            "For realtime you must provide an audio input at every step."
Patrick von Platen's avatar
Patrick von Platen committed
371
372
        )

373
374
375
376
377
378
379
380
381
382
383
        def _truncate_left(
            sample: torch.Tensor, mult_of: int, pos: int
        ) -> torch.Tensor:
            assert pos in [0, 1], pos
            if (ctx := sample.shape[pos] % mult_of) != 0:
                sample = sample[ctx:] if pos == 0 else sample[:, ctx:]
                assert sample.shape[pos] > 0, (
                    f"Sample is empty after truncation with ctx {ctx}"
                )

            return sample
Patrick von Platen's avatar
Patrick von Platen committed
384
385
386
387
388
389
390

        mel_features = [
            self.whisper_encoder.compute_whisper_melspec(audio).to(
                self.whisper_encoder.dtype
            )
            for audio in audio_inputs
        ]
391
392
393
394
395

        # we truncate the left most mel feature
        # if the sequence length in impair
        mel_features = [_truncate_left(mel, 2, 1) for mel in mel_features]

Patrick von Platen's avatar
Patrick von Platen committed
396
397
398
399
        seq_lens = [mel.shape[1] for mel in mel_features]
        # [total_num_20ms_frames, hidden_size]
        audio_embeddings = self.whisper_encoder.whisper_encoder.forward_conv(
            mel_features
400
        )
Patrick von Platen's avatar
Patrick von Platen committed
401
402
403
404
405
406
407
        conv_stride = self.whisper_encoder.whisper_encoder.total_stride
        audio_embeddings_per_sample = audio_embeddings.split(
            [s // conv_stride for s in seq_lens], dim=0
        )

        # audio_embeddings per sample need to be divisible by 4
        pool_size = self.config.audio_config.block_pool_size
408
409
410

        audio_embeddings_per_sample = [
            _truncate_left(sample, pool_size, 0)
Patrick von Platen's avatar
Patrick von Platen committed
411
            for sample in audio_embeddings_per_sample
412
        ]
Patrick von Platen's avatar
Patrick von Platen committed
413
414
415
416
417
418

        audio_embeddings_per_sample = [
            e.view(e.shape[0] // pool_size, e.shape[1] * pool_size)
            for e in audio_embeddings_per_sample
        ]
        return audio_embeddings_per_sample
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455

    @classmethod
    def get_speech_to_text_config(
        cls, model_config: ModelConfig, task_type: str
    ) -> SpeechToTextConfig:
        tokenizer = cached_tokenizer_from_config(model_config)
        audio_config = tokenizer.instruct.audio_encoder.audio_config
        sample_rate = audio_config.sampling_rate
        return SpeechToTextConfig(
            max_audio_clip_s=None,  # only limited by memory
            sample_rate=sample_rate,
            min_energy_split_window_size=None,
        )

    @classmethod
    # for speech-to-text transcription
    def get_generation_prompt(
        cls,
        audio: np.ndarray,
        model_config: ModelConfig,
        stt_config: SpeechToTextConfig,
        language: str | None,
        task_type: Literal["transcribe", "translate"],
        request_prompt: str,
        to_language: str | None,
    ) -> PromptType:
        tokenizer = cached_tokenizer_from_config(model_config)
        audio = Audio(audio, int(stt_config.sample_rate), format="wav")  # lossless

        req = TranscriptionRequest(
            model=model_config.model,
            audio=RawAudio.from_audio(audio),
            language=language,
            streaming=StreamingMode.OFFLINE,
        )

        tokenized = tokenizer.instruct.encode_transcription(req)
456
457
458
459
460
461
462

        return TokensPrompt(
            prompt_token_ids=tokenized.tokens,
            multi_modal_data={
                "audio": (tokenized.audios[0].audio_array, stt_config.sample_rate)
            },
        )