voxtral_realtime.py 18.5 KB
Newer Older
Patrick von Platen's avatar
Patrick von Platen committed
1
2
3
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

4
import asyncio
Patrick von Platen's avatar
Patrick von Platen committed
5
import math
6
from collections.abc import AsyncGenerator, Iterable, Iterator, Mapping
Patrick von Platen's avatar
Patrick von Platen committed
7

8
import numpy as np
Patrick von Platen's avatar
Patrick von Platen committed
9
import torch
10
from mistral_common.audio import Audio
11
12
13
14
15
from mistral_common.protocol.instruct.chunk import RawAudio
from mistral_common.protocol.transcription.request import (
    StreamingMode,
    TranscriptionRequest,
)
16
from mistral_common.tokens.tokenizers.audio import AudioConfig
Patrick von Platen's avatar
Patrick von Platen committed
17

18
from vllm.compilation.decorators import support_torch_compile
19
from vllm.config import ModelConfig, SpeechToTextConfig, VllmConfig
20
from vllm.config.speech_to_text import SpeechToTextParams
21
from vllm.engine.protocol import StreamingInput
22
from vllm.envs import VLLM_ENGINE_ITERATION_TIMEOUT_S
23
from vllm.inputs import PromptType, TokensPrompt
Patrick von Platen's avatar
Patrick von Platen committed
24
from vllm.logger import init_logger
25
from vllm.model_executor.models.interfaces import MultiModalEmbeddings, SupportsRealtime
Patrick von Platen's avatar
Patrick von Platen committed
26
27
28
29
30
31
32
33
from vllm.model_executor.models.voxtral import (
    VoxtralDummyInputsBuilder,
    VoxtralForConditionalGeneration,
    VoxtralMultiModalProcessor,
    VoxtralProcessingInfo,
)
from vllm.multimodal import MULTIMODAL_REGISTRY
from vllm.multimodal.cache import _I, BaseMultiModalProcessorCache
34
from vllm.multimodal.inputs import MultiModalKwargsOptionalItems
Patrick von Platen's avatar
Patrick von Platen committed
35
from vllm.multimodal.parse import MultiModalDataItems
36
37
from vllm.multimodal.processing import BaseDummyInputsBuilder
from vllm.multimodal.processing.processor import (
Patrick von Platen's avatar
Patrick von Platen committed
38
39
40
41
    MultiModalPromptUpdates,
    PlaceholderFeaturesInfo,
)
from vllm.sequence import IntermediateTensors
42
from vllm.tokenizers import cached_tokenizer_from_config
43
from vllm.utils.torch_utils import is_torch_equal_or_newer
Patrick von Platen's avatar
Patrick von Platen committed
44
45
46
47
48
49
50
51

from .utils import (
    _flatten_embeddings,
)

logger = init_logger(__name__)


52
class VoxtralRealtimeMultiModalProcessor(VoxtralMultiModalProcessor):
Patrick von Platen's avatar
Patrick von Platen committed
53
54
55
56
57
58
59
    def __init__(
        self,
        info: _I,
        dummy_inputs: BaseDummyInputsBuilder[_I],
        *,
        cache: BaseMultiModalProcessorCache | None = None,
    ) -> None:
60
        # realtime can't make use of a cache yet
Patrick von Platen's avatar
Patrick von Platen committed
61
62
63
64
65
66
67
68
69
70
71
72
73
        super().__init__(info, dummy_inputs, cache=None)

    def _maybe_apply_prompt_updates(
        self,
        mm_items: MultiModalDataItems,
        prompt_ids: list[int],
        mm_kwargs: MultiModalKwargsOptionalItems,
        mm_prompt_updates: MultiModalPromptUpdates,
        is_update_applied: bool,
    ) -> tuple[list[int], Mapping[str, list[PlaceholderFeaturesInfo]]]:
        # there are no placeholder audio tokens for streaming
        # so we need to build the place placeholder positions manually

74
        # in realtime there is always only one audio input
Patrick von Platen's avatar
Patrick von Platen committed
75
76
        audios = mm_kwargs.get("audio", [])
        assert len(audios) == 1, (
77
            f"Expected only one audio input for realtime, got {mm_kwargs=}"
Patrick von Platen's avatar
Patrick von Platen committed
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
        )
        tokenizer = self.info.get_tokenizer()
        audio_config = tokenizer.instruct.audio_encoder.audio_config

        num_audio_samples = audios[0]["audio_arrays"].data.shape[0]
        length = audio_config.num_audio_tokens(num_audio_samples)

        features_info = PlaceholderFeaturesInfo(
            modality="audio",
            item_idx=0,
            start_idx=0,
            tokens=length
            * [0],  # only used for length computation, so we can take dummy inputs
            is_embed=None,
        )
        return prompt_ids, {"audio": [features_info]}


class TimeEmbedding(torch.nn.Module):
    """Sinusoidal Embedding for encoding time"""

    def __init__(self, dim: int, theta: float = 10000.0) -> None:
        super().__init__()
        self.dim = dim
        self.theta = theta
        inv_freq = torch.exp(
            -math.log(self.theta)
            * torch.arange(self.dim // 2).float()
            / (self.dim // 2)
        )
        self.register_buffer("inv_freq", inv_freq, persistent=False)

    def forward(self, t: torch.Tensor) -> torch.Tensor:
        t = t[..., None]  # (B,) -> (B, 1) or (B, T) -> (B, T, 1)
        inv_freq = self.inv_freq.to(device=t.device, dtype=t.dtype)
        emb = (
            t * inv_freq
        )  # (B, 1) x (D/2,) -> (B, D/2) or (B, T, 1) x (D/2,) -> (B, T, D/2)
        return torch.cat((emb.cos(), emb.sin()), dim=-1)  # (B, D) or (B, T, D)


119
120
121
122
123
124
125
126
127
128
129
130
def _expand_tensor(input_tensor: torch.Tensor, scaling: int) -> torch.Tensor:
    # 1. Multiply by the scaling factor (e.g. 4)
    base = input_tensor * scaling

    # 2. Create the offsets, e.g. [0, 1, 2, 3]
    offsets = torch.arange(scaling, device=input_tensor.device)

    # 3. Use broadcasting, e.g. (N, 1) + (4,) results in (N, 4)
    # Then flatten back to 1D
    return (base.unsqueeze(1) + offsets).view(-1)


131
class VoxtralRealtimeBuffer:
132
    def __init__(self, config: AudioConfig, prompt_tokens: list[int]) -> None:
133
134
        self._config = config

135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
        _look_ahead_in_ms = self._config.streaming_look_ahead_ms
        _look_back_in_ms = self._config.streaming_look_back_ms
        self._look_ahead_in_samples = self._ms_to_samples(_look_ahead_in_ms)
        self._look_back_in_samples = self._ms_to_samples(_look_back_in_ms)

        # None signals the end
        self._audio_queue: asyncio.Queue[np.ndarray | None] = asyncio.Queue()
        self._leftover: np.ndarray | None = None
        self._token_queue: asyncio.Queue[int] = asyncio.Queue()

        self._initial_end = len(prompt_tokens) * self._config.raw_audio_length_per_tok
        for token in prompt_tokens:
            self._token_queue.put_nowait(token)

    def _generate_frame_size_and_num_tokens(self) -> Iterator[tuple[int, int]]:
        streaming_step_size = self._ms_to_samples(1000 / self._config.frame_rate)
        start = 0
        end = self._initial_end
        while True:
            frame_start = max(start - self._look_back_in_samples, 0)
            frame_end = end + self._look_ahead_in_samples
            frame_size = frame_end - frame_start
            num_tokens = (end - start) / self._config.raw_audio_length_per_tok
            assert num_tokens.is_integer()
            yield frame_size, int(num_tokens)
            start = end
            end += streaming_step_size

    def _ms_to_samples(self, ms: float) -> int:
        len_ = self._config.sampling_rate * ms / 1000
        assert len_.is_integer(), len_
        return int(len_)

    async def append_audio(self, audio_array: np.ndarray | None) -> None:
        await self._audio_queue.put(audio_array)

    async def append_tokens(self, tokens: Iterable[int]) -> None:
        for token in tokens:
            await self._token_queue.put(token)

    async def get_input_stream(self) -> AsyncGenerator[StreamingInput]:
        for frame_size, num_tokens in self._generate_frame_size_and_num_tokens():
            next_tokens = [await self._token_queue.get() for _ in range(num_tokens)]

            audio_arrays: list[np.ndarray] = (
                [self._leftover] if self._leftover is not None else []
            )
            while sum(len(arr) for arr in audio_arrays) < frame_size:
                arr = await self._audio_queue.get()
                if arr is None:
                    return
                audio_arrays.append(arr)

            audio_array = np.concatenate(audio_arrays)
            frame = audio_array[:frame_size]

            # The current stride took look_ahead_in_samples audio of the next sample
            # In addition the next sample will take look_back_in_samples audio of
            # the current sample => So let's put both of this into the leftover
            stride = (
                frame_size - self._look_ahead_in_samples - self._look_back_in_samples
            )
            assert stride > 0, f"{stride=} must be positive"

            self._leftover = audio_array[stride:]

            yield StreamingInput(
                TokensPrompt(
                    prompt_token_ids=next_tokens,
                    multi_modal_data={"audio": (frame, None)},
                )
            )
207
208


Patrick von Platen's avatar
Patrick von Platen committed
209
@MULTIMODAL_REGISTRY.register_processor(
210
    VoxtralRealtimeMultiModalProcessor,
Patrick von Platen's avatar
Patrick von Platen committed
211
212
213
    info=VoxtralProcessingInfo,
    dummy_inputs=VoxtralDummyInputsBuilder,
)
214
@support_torch_compile
215
class VoxtralRealtimeGeneration(VoxtralForConditionalGeneration, SupportsRealtime):
Patrick von Platen's avatar
Patrick von Platen committed
216
    requires_raw_input_tokens = True
217
218
219
    # transformers' currently has limited support for MistralCommon backend
    # and cached_get_processor. Let's skip until fixed
    skip_warmup_audio_preprocessing = True
Patrick von Platen's avatar
Patrick von Platen committed
220
221
222

    def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
        super().__init__(vllm_config=vllm_config, prefix=prefix)
223
224
225

        assert (
            not vllm_config.compilation_config.cudagraph_mode.has_full_cudagraphs()
226
        ), "Voxtral realtime doesn't support full cudagraphs yet. Please use PIECEWISE."
227

Patrick von Platen's avatar
Patrick von Platen committed
228
229
230
231
232
        self.time_embedding: TimeEmbedding = TimeEmbedding(
            dim=self.config.text_config.hidden_size
        )

        audio_config = self.tokenizer.instruct.audio_encoder.audio_config
233
        self.n_delay_tokens = audio_config.get_num_delay_tokens()
234
235
236
237
238
239
240
241
242
243
244
245
246

    # for realtime transcription
    @classmethod
    async def buffer_realtime_audio(
        cls,
        audio_stream: AsyncGenerator[np.ndarray, None],
        input_stream: asyncio.Queue[list[int]],
        model_config: ModelConfig,
    ) -> AsyncGenerator[PromptType, None]:
        tokenizer = cached_tokenizer_from_config(model_config)
        audio_encoder = tokenizer.instruct.audio_encoder
        config = audio_encoder.audio_config

247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
        # Get prompt tokens (streaming prefix tokens) without encoding audio
        prompt_tokens = (
            tokenizer.instruct.start() + audio_encoder.encode_streaming_tokens()
        )

        # Get left/right padding audio
        left_pad, right_pad = audio_encoder.get_padding_audio()

        buffer = VoxtralRealtimeBuffer(config, prompt_tokens)

        # Feed audio with padding into buffer in background
        async def feed_audio():
            yielded_first_chunk = False
            async for audio_chunk in audio_stream:
                if not yielded_first_chunk:
                    yielded_first_chunk = True
                    # Prepend left padding before first real audio
                    await buffer.append_audio(left_pad.audio_array)
                await buffer.append_audio(audio_chunk)
            # Append right padding at the end
            await buffer.append_audio(right_pad.audio_array)
            await buffer.append_audio(None)  # signal end

        # Feed output tokens back into buffer in background
        async def feed_tokens():
            while True:
                all_outputs = await asyncio.wait_for(
                    input_stream.get(),
                    timeout=VLLM_ENGINE_ITERATION_TIMEOUT_S,
276
                )
277
278
279
280
281
282
283
284
285
286
287
                await buffer.append_tokens(all_outputs[-1:])

        audio_task = asyncio.create_task(feed_audio())
        token_task = asyncio.create_task(feed_tokens())

        try:
            async for streaming_input in buffer.get_input_stream():
                yield streaming_input.prompt
        finally:
            audio_task.cancel()
            token_task.cancel()
Patrick von Platen's avatar
Patrick von Platen committed
288
289
290
291
292
293
294
295
296
297
298
299
300

    @property
    def audio_config(self):
        return self.tokenizer.instruct.audio_encoder.audio_config

    def embed_input_ids(
        self,
        input_ids: torch.Tensor,
        multimodal_embeddings: MultiModalEmbeddings | None = None,
        *,
        is_multimodal: torch.Tensor | None = None,
        # Multi-modal token ID may exceed vocab size
    ) -> torch.Tensor:
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
        """Pass post-conv embeddings directly as input.

        For realtime models, multimodal embeddings are required at every
        decode step.  If they are missing (e.g. due to an empty audio
        commit, encoder-cache eviction under GPU memory pressure, or a
        client disconnect), return zero embeddings instead of crashing
        the engine so that all other in-flight requests stay alive.
        """
        if multimodal_embeddings is None or len(multimodal_embeddings) == 0:
            logger.warning(
                "Realtime model received empty multimodal embeddings "
                "for %d input tokens. Returning zero embeddings to "
                "avoid engine crash.",
                input_ids.shape[0],
            )
            pool_size = self.config.audio_config.block_pool_size
            embed_dim = self.config.audio_config.d_model * pool_size
            return torch.zeros(
                input_ids.shape[0],
                embed_dim,
                dtype=self.whisper_encoder.dtype,
                device=input_ids.device,
            )
Patrick von Platen's avatar
Patrick von Platen committed
324
325
326
327
328
        mm_embeds_flat = _flatten_embeddings(multimodal_embeddings)
        return mm_embeds_flat

    def forward(
        self,
329
        input_ids: torch.Tensor | None,
Patrick von Platen's avatar
Patrick von Platen committed
330
331
332
333
334
335
336
337
338
        positions: torch.Tensor,
        intermediate_tensors: IntermediateTensors | None = None,
        inputs_embeds: torch.Tensor | None = None,
        **kwargs: object,
    ) -> torch.Tensor | IntermediateTensors:
        assert inputs_embeds is not None
        assert input_ids is not None

        pool_size = self.config.audio_config.block_pool_size
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
        if is_torch_equal_or_newer("2.11"):
            inputs_embeds = inputs_embeds.view(
                inputs_embeds.shape[0] * pool_size, inputs_embeds.shape[1] // pool_size
            )
        else:
            # TODO Use reshape + clone to break the view chain and avoid output
            # aliasing input bug in torch.compile's AOT autograd cache.
            # Without clone(), if any downstream operation returns a view that's
            # connected to this view of inputs_embeds, the AOT autograd cache
            # fails to pickle the ViewMetaSequence containing SymInt shapes.
            # This will be fixed in pytorch 2.11 and beyond.
            # issue: https://github.com/pytorch/pytorch/issues/174299
            inputs_embeds = inputs_embeds.reshape(
                inputs_embeds.shape[0] * pool_size, inputs_embeds.shape[1] // pool_size
            ).clone()
Patrick von Platen's avatar
Patrick von Platen committed
354

355
356
357
        whisper_positions = _expand_tensor(positions, pool_size)
        audio_hidden_states = self.whisper_encoder.whisper_encoder(
            inputs_embeds, whisper_positions
Patrick von Platen's avatar
Patrick von Platen committed
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
        )

        num_tokens, audio_hidden_size = audio_hidden_states.shape
        assert num_tokens % self.downsample_factor == 0
        audio_hidden_states = audio_hidden_states.reshape(
            num_tokens // self.downsample_factor,
            audio_hidden_size * self.downsample_factor,
        )
        audio_text_embeds = self.audio_language_adapter(audio_hidden_states)

        text_embeds = self.language_model.embed_input_ids(input_ids)

        # sum pool text and audio embeddings
        inputs_embeds = audio_text_embeds + text_embeds

373
374
375
        time_tensor = torch.full(
            (1,),
            fill_value=self.n_delay_tokens,
Patrick von Platen's avatar
Patrick von Platen committed
376
377
378
            device=inputs_embeds.device,
            dtype=inputs_embeds.dtype,
        )
379
        t_cond = self.time_embedding(time_tensor)
Patrick von Platen's avatar
Patrick von Platen committed
380
381

        hidden_states = self.language_model.model(
382
383
384
385
386
            input_ids,
            positions,
            intermediate_tensors,
            inputs_embeds=inputs_embeds,
            t_cond=t_cond,
Patrick von Platen's avatar
Patrick von Platen committed
387
388
389
390
391
392
393
394
395
396
        )

        return hidden_states

    def embed_multimodal(
        self, **kwargs
    ) -> list[torch.Tensor] | torch.Tensor | tuple[torch.Tensor, ...] | None:
        """Transform audio waveforms -> initial whisper post-conv embeddings"""
        audio_inputs = self._parse_and_validate_audio_arrays(**kwargs)

397
398
399
400
401
402
        if audio_inputs is None:
            logger.warning(
                "Realtime model received no audio inputs in "
                "embed_multimodal. Returning empty embeddings."
            )
            return []
Patrick von Platen's avatar
Patrick von Platen committed
403

404
405
406
407
408
409
410
411
412
413
414
        def _truncate_left(
            sample: torch.Tensor, mult_of: int, pos: int
        ) -> torch.Tensor:
            assert pos in [0, 1], pos
            if (ctx := sample.shape[pos] % mult_of) != 0:
                sample = sample[ctx:] if pos == 0 else sample[:, ctx:]
                assert sample.shape[pos] > 0, (
                    f"Sample is empty after truncation with ctx {ctx}"
                )

            return sample
Patrick von Platen's avatar
Patrick von Platen committed
415
416
417
418
419
420
421

        mel_features = [
            self.whisper_encoder.compute_whisper_melspec(audio).to(
                self.whisper_encoder.dtype
            )
            for audio in audio_inputs
        ]
422
423
424
425
426

        # we truncate the left most mel feature
        # if the sequence length in impair
        mel_features = [_truncate_left(mel, 2, 1) for mel in mel_features]

Patrick von Platen's avatar
Patrick von Platen committed
427
428
429
430
        seq_lens = [mel.shape[1] for mel in mel_features]
        # [total_num_20ms_frames, hidden_size]
        audio_embeddings = self.whisper_encoder.whisper_encoder.forward_conv(
            mel_features
431
        )
Patrick von Platen's avatar
Patrick von Platen committed
432
433
434
435
436
437
438
        conv_stride = self.whisper_encoder.whisper_encoder.total_stride
        audio_embeddings_per_sample = audio_embeddings.split(
            [s // conv_stride for s in seq_lens], dim=0
        )

        # audio_embeddings per sample need to be divisible by 4
        pool_size = self.config.audio_config.block_pool_size
439
440
441

        audio_embeddings_per_sample = [
            _truncate_left(sample, pool_size, 0)
Patrick von Platen's avatar
Patrick von Platen committed
442
            for sample in audio_embeddings_per_sample
443
        ]
Patrick von Platen's avatar
Patrick von Platen committed
444
445
446
447
448
449

        audio_embeddings_per_sample = [
            e.view(e.shape[0] // pool_size, e.shape[1] * pool_size)
            for e in audio_embeddings_per_sample
        ]
        return audio_embeddings_per_sample
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467

    @classmethod
    def get_speech_to_text_config(
        cls, model_config: ModelConfig, task_type: str
    ) -> SpeechToTextConfig:
        tokenizer = cached_tokenizer_from_config(model_config)
        audio_config = tokenizer.instruct.audio_encoder.audio_config
        sample_rate = audio_config.sampling_rate
        return SpeechToTextConfig(
            max_audio_clip_s=None,  # only limited by memory
            sample_rate=sample_rate,
            min_energy_split_window_size=None,
        )

    @classmethod
    # for speech-to-text transcription
    def get_generation_prompt(
        cls,
468
        stt_params: SpeechToTextParams,
469
    ) -> PromptType:
470
471
472
473
474
        audio = stt_params.audio
        model_config = stt_params.model_config
        stt_config = stt_params.stt_config
        language = stt_params.language

475
476
477
478
479
480
481
482
483
484
485
        tokenizer = cached_tokenizer_from_config(model_config)
        audio = Audio(audio, int(stt_config.sample_rate), format="wav")  # lossless

        req = TranscriptionRequest(
            model=model_config.model,
            audio=RawAudio.from_audio(audio),
            language=language,
            streaming=StreamingMode.OFFLINE,
        )

        tokenized = tokenizer.instruct.encode_transcription(req)
486
487
488
489
490
491
492

        return TokensPrompt(
            prompt_token_ids=tokenized.tokens,
            multi_modal_data={
                "audio": (tokenized.audios[0].audio_array, stt_config.sample_rate)
            },
        )