Unverified Commit 0f5dd2b7 authored by Yuekai Zhang's avatar Yuekai Zhang Committed by GitHub
Browse files

feat: Add vLLM multimodal audio support (#2760)


Signed-off-by: default avatarYuekai Zhang <zhangyuekai@foxmail.com>
Signed-off-by: default avatarkrishung5 <krish@nvidia.com>
Co-authored-by: default avatarKris Hung <krish@nvidia.com>
parent 1bec3555
......@@ -203,7 +203,7 @@ of the model per node.
#### Workflow
In this workflow, we have [VllmPDWorker](components/worker.py) which will encode the image, prefill and decode the prompt, just like the [LLM aggregated serving](/docs/backends/vllm/README.md) example.
In this workflow, we have [VllmPDWorker](components/worker.py) which will encode the image, prefill and decode the prompt, just like the [LLM aggregated serving](../../docs/backends/vllm/README.md) example.
This figure illustrates the workflow:
```mermaid
......@@ -342,7 +342,7 @@ This example demonstrates deploying an aggregated multimodal model that can proc
In this workflow, we have two workers, [VideoEncodeWorker](components/video_encode_worker.py) and [VllmPDWorker](components/worker.py).
The VideoEncodeWorker is responsible for decoding the video into a series of frames. Unlike the image pipeline which generates embeddings,
this pipeline passes the raw frames directly to the VllmPDWorker via a combination of NATS and RDMA.
Its VllmPDWorker then prefills and decodes the prompt, just like the [LLM aggregated serving](/docs/backends/vllm/README.md) example.
Its VllmPDWorker then prefills and decodes the prompt, just like the [LLM aggregated serving](../../docs/backends/vllm/README.md) example.
By separating the video processing from the prefill and decode stages, we can have a more flexible deployment and scale the
VideoEncodeWorker independently from the prefill and decode workers if needed.
......@@ -502,3 +502,134 @@ You should see a response describing the video's content similar to
"usage": null
}
```
## Multimodal Aggregated Audio Serving
This example demonstrates deploying an aggregated multimodal model that can process audio inputs.
### Components
- workers: For audio serving, we use the [AudioEncodeWorker](components/audio_encode_worker.py) for decoding audio into audio embeddings, and send the embeddings to [VllmPDWorker](components/worker.py) for prefilling and decoding.
- processor: Tokenizes the prompt and passes it to the AudioEncodeWorker.
- frontend: HTTP endpoint to handle incoming requests.
### Workflow
In this workflow, we have two workers, [AudioEncodeWorker](components/audio_encode_worker.py) and [VllmPDWorker](components/worker.py).
The AudioEncodeWorker is responsible for decoding the audio into embeddings.
Its VllmPDWorker then prefills and decodes the prompt, just like the [LLM aggregated serving](../../docs/backends/vllm/README.md) example.
By separating the audio processing from the prefill and decode stages, we can have a more flexible deployment and scale the
AudioEncodeWorker independently from the prefill and decode workers if needed.
This figure illustrates the workflow:
```mermaid
flowchart LR
HTTP --> processor
processor --> HTTP
processor --audio_url--> audio_encode_worker
audio_encode_worker --> processor
audio_encode_worker --embeddings--> pd_worker
pd_worker --> audio_encode_worker
```
```bash
pip install vllm["audio"] accelerate # multimodal audio models dependency
cd $DYNAMO_HOME/examples/multimodal
bash launch/audio_agg.sh
```
### Client
In another terminal:
```bash
curl http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "Qwen/Qwen2-Audio-7B-Instruct",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "What is recited in the audio?"
},
{
"type": "audio_url",
"audio_url": {
"url": "https://raw.githubusercontent.com/yuekaizhang/Triton-ASR-Client/main/datasets/mini_en/wav/1221-135766-0002.wav"
}
}
]
}
],
"max_tokens": 6000,
"temperature": 0.8,
"stream": false
}' | jq
```
You should see a response describing the audio's content similar to
```json
{
"id": "e2d8d67c37634b309400974eaa058ce8",
"choices": [
{
"index": 0,
"message": {
"content": "The original content of this audio is:'yet these thoughts affected Hester Pynne less with hope than apprehension.'",
"refusal": null,
"tool_calls": null,
"role": "assistant",
"function_call": null,
"audio": null
},
"finish_reason": "stop",
"logprobs": null
}
],
"created": 1756368148,
"model": "Qwen/Qwen2-Audio-7B-Instruct",
"service_tier": null,
"system_fingerprint": null,
"object": "chat.completion",
"usage": null
}
```
## Multimodal Disaggregated Audio Serving
This example demonstrates deploying a disaggregated multimodal model that can process audio inputs.
### Components
- workers: For disaggregated audio serving, we have three workers, [AudioEncodeWorker](components/audio_encode_worker.py) for decoding audio into embeddings,
[VllmDecodeWorker](components/worker.py) for decoding, and [VllmPDWorker](components/worker.py) for prefilling.
- processor: Tokenizes the prompt and passes it to the AudioEncodeWorker.
- frontend: HTTP endpoint to handle incoming requests.
### Workflow
In this workflow, we have three workers, [AudioEncodeWorker](components/audio_encode_worker.py), [VllmDecodeWorker](components/worker.py), and [VllmPDWorker](components/worker.py).
For the Qwen/Qwen2-Audio-7B-Instruct model, audio embeddings are only required during the prefill stage. As such, the AudioEncodeWorker is connected directly to the prefill worker.
The AudioEncodeWorker is responsible for decoding the audio into embeddings and passing them to the prefill worker via RDMA.
The prefill worker performs the prefilling step and forwards the KV cache to the decode worker for decoding.
For more details on the roles of the prefill and decode workers, refer to the [LLM disaggregated serving](../../docs/backends/vllm/README.md) example.
This figure illustrates the workflow:
```mermaid
flowchart LR
HTTP --> processor
processor --> HTTP
processor --audio_url--> audio_encode_worker
audio_encode_worker --> processor
audio_encode_worker --embeddings--> prefill_worker
prefill_worker --> audio_encode_worker
prefill_worker --> decode_worker
decode_worker --> prefill_worker
```
```bash
pip install vllm["audio"] accelerate # multimodal audio models dependency
cd $DYNAMO_HOME/examples/multimodal
bash launch/audio_disagg.sh
```
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import asyncio
import logging
import os
import signal
import sys
from typing import AsyncIterator, Tuple
import torch
import uvloop
from transformers import AutoProcessor, Qwen2AudioForConditionalGeneration
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.utils import FlexibleArgumentParser
import dynamo.nixl_connect as connect
from dynamo.runtime import Client, DistributedRuntime, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
from utils.args import Config, base_parse_args, parse_endpoint
from utils.audio_loader import AudioLoader
from utils.protocol import MyRequestOutput, vLLMMultimodalRequest
configure_dynamo_logging()
logger = logging.getLogger(__name__)
try:
import cupy as array_module
if not array_module.cuda.is_available():
raise ImportError("CUDA is not available.")
DEVICE = "cuda"
logger.info("Using cupy for array operations (GPU mode).")
except ImportError as e:
logger.warning(f"Failed to import cupy, falling back to numpy: {e}.")
import numpy as array_module
DEVICE = "cpu"
CACHE_SIZE_MAXIMUM = 8
class VllmEncodeWorker:
def __init__(
self,
args: argparse.Namespace,
engine_args: AsyncEngineArgs,
pd_worker_client: Client,
) -> None:
self.pd_worker_client = pd_worker_client
self.engine_args = engine_args
self.model = self.engine_args.model
self.audio_loader = AudioLoader(cache_size=CACHE_SIZE_MAXIMUM)
self.audio_processor = AutoProcessor.from_pretrained(
self.model, trust_remote_code=True
)
self.audio_model = Qwen2AudioForConditionalGeneration.from_pretrained(
self.model, device_map="auto", dtype=torch.float16
).eval()
def get_audio_embeddings(self, audio_features):
input_features, feature_attention_mask = (
audio_features.input_features,
audio_features.feature_attention_mask,
)
with torch.no_grad():
(
audio_feat_lengths,
audio_output_lengths,
) = self.audio_model.audio_tower._get_feat_extract_output_lengths(
feature_attention_mask.sum(-1)
)
batch_size, _, max_mel_seq_len = input_features.shape
max_seq_len = (max_mel_seq_len - 2) // 2 + 1
# Create a sequence tensor of shape (batch_size, max_seq_len)
seq_range = (
torch.arange(
0,
max_seq_len,
dtype=audio_feat_lengths.dtype,
device=audio_feat_lengths.device,
)
.unsqueeze(0)
.expand(batch_size, max_seq_len)
)
lengths_expand = audio_feat_lengths.unsqueeze(1).expand(
batch_size, max_seq_len
)
# Create mask
padding_mask = seq_range >= lengths_expand
audio_attention_mask_ = padding_mask.view(
batch_size, 1, 1, max_seq_len
).expand(batch_size, 1, max_seq_len, max_seq_len)
audio_attention_mask = audio_attention_mask_.to(
dtype=self.audio_model.audio_tower.conv1.weight.dtype,
device=self.audio_model.audio_tower.conv1.weight.device,
)
audio_attention_mask[audio_attention_mask_] = float("-inf")
audio_outputs = self.audio_model.audio_tower(
input_features, attention_mask=audio_attention_mask
)
selected_audio_feature = audio_outputs.last_hidden_state
audio_features = self.audio_model.multi_modal_projector(
selected_audio_feature
)
num_audios, max_audio_tokens, embed_dim = audio_features.shape
audio_features_mask = torch.arange(
max_audio_tokens, device=audio_output_lengths.device
)[None, :]
audio_features_mask = audio_features_mask < audio_output_lengths[:, None]
audio_features = audio_features[audio_features_mask]
return audio_features
def cleanup(self):
pass
async def generate(
self, request: vLLMMultimodalRequest
) -> AsyncIterator[MyRequestOutput]:
logger.debug(f"Got raw request: {request}")
if not isinstance(request, vLLMMultimodalRequest):
if isinstance(request, str):
request = vLLMMultimodalRequest.model_validate_json(request)
else:
request = vLLMMultimodalRequest.model_validate(request)
logger.debug(f"Received encode request: {{ id: {request.request_id} }}.")
request_id = request.request_id
# The following steps encode the requested audio and provided useful embeddings.
# 1. Open the audio from the provided URL.
# 2. Process the audio using the audio processor.
# 3. Run the audio through the audio model's audio tower.
# 4. Run the results of the audio tower through the multi-modal projector.
# 5. Create a descriptor for the embeddings.
# 6. Create a write operation using the serialized request and the descriptor.
# 7. Await for the write operation to complete.
# 8. Yield the encode response.
try:
audio, sr = await self.audio_loader.load_audio(
request.multimodal_input.audio_url
)
audio_features = self.audio_processor(
text="test<|AUDIO|>", audio=audio, return_tensors="pt", padding=False
)
with torch.no_grad():
audio_embeddings = self.get_audio_embeddings(audio_features)
descriptor = connect.Descriptor(audio_embeddings)
with self._connector.create_readable(descriptor) as readable:
request.serialized_request = readable.metadata()
# Clear the audio URL as hint that the audio is passed as embeddings.
request.multimodal_input.audio_url = None
request.embeddings_shape = tuple(audio_embeddings.shape)
logger.debug(f"Request: {request.model_dump_json()}")
response_generator = await self.pd_worker_client.round_robin(
request.model_dump_json()
)
await readable.wait_for_completion()
async for response in response_generator:
output = MyRequestOutput.model_validate_json(response.data())
yield MyRequestOutput(
request_id=output.request_id,
prompt=output.prompt,
prompt_token_ids=output.prompt_token_ids,
prompt_logprobs=output.prompt_logprobs,
outputs=output.outputs,
finished=output.finished,
).model_dump_json()
except Exception as e:
logger.error(f"Error processing request {request_id}: {e}")
raise
async def async_init(self, runtime: DistributedRuntime):
logger.info("Startup started.")
# Create and initialize a dynamo connector for this worker.
# We'll needs this to move data between this worker and remote workers efficiently.
self._connector = connect.Connector()
await self._connector.initialize()
logger.info("Startup completed.")
@classmethod
def parse_args(cls) -> Tuple[argparse.Namespace, Config]:
DYN_NAMESPACE = os.environ.get("DYN_NAMESPACE", "dynamo")
DEFAULT_ENDPOINT = f"dyn://{DYN_NAMESPACE}.encoder.generate"
DEFAULT_DOWNSTREAM_ENDPOINT = f"dyn://{DYN_NAMESPACE}.llm.generate"
parser = FlexibleArgumentParser(
description="vLLM based encoder for Dynamo LLM."
)
parser.add_argument(
"--endpoint",
type=str,
default=DEFAULT_ENDPOINT,
help=f"Dynamo endpoint string in 'dyn://namespace.component.endpoint' format. Default: '{DEFAULT_ENDPOINT}'",
)
parser.add_argument(
"--downstream-endpoint",
type=str,
default=DEFAULT_DOWNSTREAM_ENDPOINT,
help=f"The endpoint string of the downstream LLM in 'dyn://namespace.component.endpoint' format. Default: '{DEFAULT_DOWNSTREAM_ENDPOINT}'",
)
args, config = base_parse_args(parser)
return args, config
async def graceful_shutdown(runtime):
"""
By calling `runtime.shutdown()`, the endpoints will immediately be unavailable.
However, in-flight requests will still be processed until they are finished.
After all in-flight requests are finished, the `serve_endpoint` functions will return
and the engine will be shutdown by Python's garbage collector.
"""
logging.info("Received shutdown signal, shutting down DistributedRuntime")
runtime.shutdown()
logging.info("DistributedRuntime shutdown complete")
@dynamo_worker()
async def worker(runtime: DistributedRuntime):
# Runtime setup
# Set up signal handler for graceful shutdown
loop = asyncio.get_running_loop()
def signal_handler():
asyncio.create_task(graceful_shutdown(runtime))
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler)
logging.info("Signal handlers set up for graceful shutdown")
# worker setup
args, config = VllmEncodeWorker.parse_args()
await init(runtime, args, config)
async def init(runtime: DistributedRuntime, args: argparse.Namespace, config: Config):
"""
Instantiate and serve
"""
component = runtime.namespace(config.namespace).component(config.component)
await component.create_service()
generate_endpoint = component.endpoint(config.endpoint)
parsed_namespace, parsed_component_name, parsed_endpoint_name = parse_endpoint(
args.downstream_endpoint
)
pd_worker_client = (
await runtime.namespace(parsed_namespace)
.component(parsed_component_name)
.endpoint(parsed_endpoint_name)
.client()
)
handler = VllmEncodeWorker(args, config.engine_args, pd_worker_client)
await handler.async_init(runtime)
logger.info("Waiting for PD Worker Instances ...")
await pd_worker_client.wait_for_instances()
logger.info(f"Starting to serve the {args.endpoint} endpoint...")
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate, metrics_labels=[("model", config.model)]
),
)
except Exception as e:
logger.error(f"Failed to serve endpoints: {e}")
raise
finally:
handler.cleanup()
if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
......@@ -238,9 +238,20 @@ class Processor(ProcessMixIn):
if multimodal_input.image_url is not None:
raise ValueError("Cannot provide both image and video URLs")
multimodal_input.video_url = item.video_url.url
if multimodal_input.image_url is None and multimodal_input.video_url is None:
raise ValueError("Either image URL or video URL is required")
elif item.type == "audio_url":
if (
multimodal_input.image_url is not None
or multimodal_input.video_url is not None
):
raise ValueError("Cannot mix image, video and audio URLs")
multimodal_input.audio_url = item.audio_url.url
if (
multimodal_input.image_url is None
and multimodal_input.video_url is None
and multimodal_input.audio_url is None
):
raise ValueError("Either image URL or video URL or audio URL is required")
async for response in self._generate(
chat_request, multimodal_input, RequestType.CHAT
......
......@@ -269,6 +269,7 @@ class VllmPDWorker(VllmBaseWorker):
if (
request.multimodal_input.image_url is None
and request.multimodal_input.video_url is None
and request.multimodal_input.audio_url is None
):
# Process embeddings using the connector
# Create a descriptor based on the embedding shape.
......@@ -295,6 +296,12 @@ class VllmPDWorker(VllmBaseWorker):
self.EMBEDDINGS_DTYPE,
video_numpy=video_numpy,
)
elif "audio" in self.engine_args.model.lower():
multi_modal_data = construct_mm_data(
self.engine_args.model,
self.EMBEDDINGS_DTYPE,
audio_embeds=embeddings,
)
else:
multi_modal_data = construct_mm_data(
self.engine_args.model,
......@@ -313,6 +320,7 @@ class VllmPDWorker(VllmBaseWorker):
# Remove the image features from the request as they are not required
request.multimodal_input.image_url = None
request.multimodal_input.video_url = None
request.multimodal_input.audio_url = None
request.serialized_request = None
pd_request = copy.deepcopy(request)
......
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
set -e
trap 'echo Cleaning up...; kill 0' EXIT
# Default values
MODEL_NAME="Qwen/Qwen2-Audio-7B-Instruct"
PROMPT_TEMPLATE=""
PROVIDED_PROMPT_TEMPLATE=""
# Parse command line arguments
while [[ $# -gt 0 ]]; do
case $1 in
--model)
MODEL_NAME=$2
shift 2
;;
--prompt-template)
PROVIDED_PROMPT_TEMPLATE=$2
shift 2
;;
-h|--help)
echo "Usage: $0 [OPTIONS]"
echo "Options:"
echo " --model <model_name> Specify the model to use (default: $MODEL_NAME)"
echo " --prompt-template <template> Specify the multi-modal prompt template to use. LLaVA 1.5 7B, Qwen2.5-VL, and Phi3V models have predefined templates."
echo " -h, --help Show this help message"
exit 0
;;
*)
echo "Unknown option: $1"
echo "Use --help for usage information"
exit 1
;;
esac
done
# Set PROMPT_TEMPLATE based on the MODEL_NAME
if [[ -n "$PROVIDED_PROMPT_TEMPLATE" ]]; then
PROMPT_TEMPLATE="$PROVIDED_PROMPT_TEMPLATE"
elif [[ "$MODEL_NAME" == "Qwen/Qwen2-Audio-7B-Instruct" ]]; then
PROMPT_TEMPLATE="<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n<|im_start|>user\nAudio 1: <|audio_bos|><|AUDIO|><|audio_eos|>\n<prompt><|im_end|>\n<|im_start|>assistant\n"
else
echo "No multi-modal prompt template is defined for the model: $MODEL_NAME"
echo "Please provide a prompt template using --prompt-template option."
echo "Example: --prompt-template 'USER: <image>\n<prompt> ASSISTANT:'"
exit 1
fi
# Check and install required dependencies for audio multimodal models
echo "Checking audio multimodal dependencies..."
DEPS_MISSING=false
# Check for accelerate
if ! python -c "import accelerate" &> /dev/null; then
echo " accelerate not found"
DEPS_MISSING=true
else
echo " ✓ accelerate is installed"
fi
# Check for vllm with audio support
if ! python -c "import vllm" &> /dev/null; then
echo " vllm not found"
DEPS_MISSING=true
else
# Check if audio dependencies are available (librosa is a key audio dependency)
if ! python -c "import librosa" &> /dev/null; then
echo " vllm audio dependencies not found"
DEPS_MISSING=true
else
echo " ✓ vllm with audio support is installed"
fi
fi
# Install missing dependencies
if [ "$DEPS_MISSING" = true ]; then
echo "Installing missing dependencies..."
pip install 'vllm[audio]' accelerate
echo "Dependencies installed successfully"
else
echo "All required dependencies are already installed"
fi
# run ingress
python -m dynamo.frontend --http-port 8000 &
# run processor
python3 components/processor.py --model $MODEL_NAME --prompt-template "$PROMPT_TEMPLATE" &
# run E/P/D workers
CUDA_VISIBLE_DEVICES=0 python3 components/audio_encode_worker.py --model $MODEL_NAME &
CUDA_VISIBLE_DEVICES=1 python3 components/worker.py --model $MODEL_NAME --worker-type prefill &
# Wait for all background processes to complete
wait
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
set -e
trap 'echo Cleaning up...; kill 0' EXIT
# Default values
MODEL_NAME="Qwen/Qwen2-Audio-7B-Instruct"
PROMPT_TEMPLATE=""
PROVIDED_PROMPT_TEMPLATE=""
# Parse command line arguments
while [[ $# -gt 0 ]]; do
case $1 in
--model)
MODEL_NAME=$2
shift 2
;;
--prompt-template)
PROVIDED_PROMPT_TEMPLATE=$2
shift 2
;;
-h|--help)
echo "Usage: $0 [OPTIONS]"
echo "Options:"
echo " --model <model_name> Specify the model to use (default: $MODEL_NAME)"
echo " --prompt-template <template> Specify the multi-modal prompt template to use. LLaVA 1.5 7B, Qwen2.5-VL, and Phi3V models have predefined templates."
echo " -h, --help Show this help message"
exit 0
;;
*)
echo "Unknown option: $1"
echo "Use --help for usage information"
exit 1
;;
esac
done
# Set PROMPT_TEMPLATE based on the MODEL_NAME
if [[ -n "$PROVIDED_PROMPT_TEMPLATE" ]]; then
PROMPT_TEMPLATE="$PROVIDED_PROMPT_TEMPLATE"
elif [[ "$MODEL_NAME" == "Qwen/Qwen2-Audio-7B-Instruct" ]]; then
PROMPT_TEMPLATE="<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n<|im_start|>user\nAudio 1: <|audio_bos|><|AUDIO|><|audio_eos|>\n<prompt><|im_end|>\n<|im_start|>assistant\n"
else
echo "No multi-modal prompt template is defined for the model: $MODEL_NAME"
echo "Please provide a prompt template using --prompt-template option."
echo "Example: --prompt-template 'USER: <image>\n<prompt> ASSISTANT:'"
exit 1
fi
# Check and install required dependencies for audio multimodal models
echo "Checking audio multimodal dependencies..."
DEPS_MISSING=false
# Check for accelerate
if ! python -c "import accelerate" &> /dev/null; then
echo " accelerate not found"
DEPS_MISSING=true
else
echo " ✓ accelerate is installed"
fi
# Check for vllm with audio support
if ! python -c "import vllm" &> /dev/null; then
echo " vllm not found"
DEPS_MISSING=true
else
# Check if audio dependencies are available (librosa is a key audio dependency)
if ! python -c "import librosa" &> /dev/null; then
echo " vllm audio dependencies not found"
DEPS_MISSING=true
else
echo " ✓ vllm with audio support is installed"
fi
fi
# Install missing dependencies
if [ "$DEPS_MISSING" = true ]; then
echo "Installing missing dependencies..."
pip install 'vllm[audio]' accelerate
echo "Dependencies installed successfully"
else
echo "All required dependencies are already installed"
fi
# run ingress
python -m dynamo.frontend --http-port 8000 &
# run processor
python3 components/processor.py --model $MODEL_NAME --prompt-template "$PROMPT_TEMPLATE" &
# run E/P/D workers
CUDA_VISIBLE_DEVICES=0 python3 components/audio_encode_worker.py --model $MODEL_NAME &
CUDA_VISIBLE_DEVICES=1 python3 components/worker.py --model $MODEL_NAME --worker-type prefill --enable-disagg &
CUDA_VISIBLE_DEVICES=2 python3 components/worker.py --model $MODEL_NAME --worker-type decode --enable-disagg &
# Wait for all background processes to complete
wait
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import functools
import logging
from io import BytesIO
from typing import Tuple
from urllib.parse import urlparse
import httpx
import librosa
import numpy as np
logger = logging.getLogger(__name__)
class AudioLoader:
CACHE_SIZE_MAXIMUM = 8
def __init__(self, cache_size: int = CACHE_SIZE_MAXIMUM):
self._http_timeout = 30.0
# functools.lru_cache is not directly compatible with async methods.
# We create a synchronous method for fetching and processing audio,
# and then apply lru_cache to it. This cached synchronous method
# is then called from our async method using asyncio.to_thread.
self._load_and_process_audio_cached = functools.lru_cache(maxsize=cache_size)(
self._load_and_process_audio
)
def _load_and_process_audio(
self, audio_url: str, sampling_rate: int
) -> Tuple[np.ndarray, float]:
"""
Synchronously loads and processes audio from a URL.
This method is memoized using lru_cache.
"""
with httpx.Client(timeout=self._http_timeout) as client:
response = client.get(audio_url)
response.raise_for_status()
if not response.content:
raise ValueError("Empty response content from audio URL")
audio_data_stream = BytesIO(response.content)
audio_data, sr = librosa.load(audio_data_stream, sr=sampling_rate)
return audio_data, sr
async def load_audio(
self, audio_url: str, sampling_rate: int = 16000
) -> Tuple[np.ndarray, float]:
parsed_url = urlparse(audio_url)
if parsed_url.scheme not in ("http", "https"):
raise ValueError(f"Invalid audio source scheme: {parsed_url.scheme}")
try:
# Offload the synchronous, cached function to a separate thread
# to avoid blocking the asyncio event loop.
return await asyncio.to_thread(
self._load_and_process_audio_cached, audio_url, sampling_rate
)
except httpx.HTTPError as e:
logger.error(f"HTTP error loading audio: {e}")
raise
except Exception as e:
logger.error(f"Error loading audio: {e}")
raise ValueError(f"Failed to load audio: {e}")
......@@ -28,6 +28,7 @@ class SupportedModels:
LLAVA_1_5_7B = "llava-hf/llava-1.5-7b-hf"
QWEN_2_5_VL_7B = "Qwen/Qwen2.5-VL-7B-Instruct"
LLAVA_NEXT_VIDEO_7B = "llava-hf/LLaVA-NeXT-Video-7B-hf"
QWEN_2_AUDIO_7B = "Qwen/Qwen2-Audio-7B-Instruct"
def load_vision_model(model_id: str) -> torch.nn.Module:
......@@ -46,9 +47,13 @@ def construct_mm_data(
image_embeds: Optional[torch.Tensor] = None,
video_numpy: Optional[Any] = None,
image_grid_thw: Optional[List[Any]] = None,
audio_embeds: Optional[torch.Tensor] = None,
) -> Dict[str, torch.Tensor | Dict[str, Any]]:
"""Construct multimodal data for a vLLM request for models that require additional parameters alongside the embeddings"""
if model == SupportedModels.QWEN_2_AUDIO_7B:
audio_embeds = audio_embeds.to(torch.bfloat16)
assert audio_embeds.ndim == 2, "Audio embeddings must be 2D"
return {"audio": [audio_embeds]}
# Handle video models
if model == SupportedModels.LLAVA_NEXT_VIDEO_7B:
if video_numpy is None:
......
......@@ -109,6 +109,15 @@ class ImageContent(BaseModel):
image_url: ImageURLDetail
class AudioURLDetail(BaseModel):
url: str
class AudioContent(BaseModel):
type: Literal["audio_url"]
audio_url: AudioURLDetail
class VideoURLDetail(BaseModel):
url: str
......@@ -118,7 +127,7 @@ class VideoContent(BaseModel):
video_url: VideoURLDetail
MessageContent = Union[TextContent, ImageContent, VideoContent]
MessageContent = Union[TextContent, ImageContent, AudioContent, VideoContent]
class ChatMessage(BaseModel):
......@@ -138,6 +147,7 @@ class MultiModalRequest(BaseModel):
class MultiModalInput(BaseModel):
image_url: Optional[str] = None
video_url: Optional[str] = None
audio_url: Optional[str] = None
class vLLMMultimodalRequest(vLLMGenerateRequest):
......@@ -145,7 +155,7 @@ class vLLMMultimodalRequest(vLLMGenerateRequest):
multimodal_input: Optional[MultiModalInput] = Field(default_factory=MultiModalInput)
image_grid_thw: Optional[List[Any]] = None
embeddings_shape: Optional[
Union[Tuple[int, int, int], Tuple[int, int, int, int]]
Union[Tuple[int, int, int], Tuple[int, int, int, int], Tuple[int, int]]
] = None
serialized_request: Optional[connect.RdmaMetadata] = None
......
......@@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize};
use crate::error::OpenAIError;
use super::{ImageDetail, ImageUrl, VideoUrl};
use super::{AudioUrl, ImageDetail, ImageUrl, VideoUrl};
#[derive(Clone, Serialize, Debug, Deserialize, PartialEq, Default)]
#[serde(rename_all = "lowercase")]
......@@ -115,6 +115,7 @@ pub enum MessageContent {
ImageFile(MessageContentImageFileObject),
ImageUrl(MessageContentImageUrlObject),
VideoUrl(MessageContentVideoUrlObject),
AudioUrl(MessageContentAudioUrlObject),
Refusal(MessageContentRefusalObject),
}
......@@ -205,6 +206,11 @@ pub struct MessageContentVideoUrlObject {
pub video_url: VideoUrl,
}
#[derive(Clone, Serialize, Debug, Deserialize, PartialEq)]
pub struct MessageContentAudioUrlObject {
pub audio_url: AudioUrl,
}
#[derive(Clone, Serialize, Debug, Deserialize, PartialEq)]
pub struct MessageRequestContentTextObject {
/// Text content to be sent to the model
......@@ -228,6 +234,7 @@ pub enum MessageContentInput {
ImageFile(MessageContentImageFileObject),
ImageUrl(MessageContentImageUrlObject),
VideoUrl(MessageContentVideoUrlObject),
AudioUrl(MessageContentAudioUrlObject),
}
#[derive(Clone, Serialize, Default, Debug, Deserialize, Builder, PartialEq)]
#[builder(name = "CreateMessageRequestArgs")]
......@@ -298,6 +305,7 @@ pub enum MessageDeltaContent {
ImageFile(MessageDeltaContentImageFileObject),
ImageUrl(MessageDeltaContentImageUrlObject),
VideoUrl(MessageDeltaContentVideoUrlObject),
AudioUrl(MessageDeltaContentAudioUrlObject),
Text(MessageDeltaContentTextObject),
Refusal(MessageDeltaContentRefusalObject),
}
......@@ -380,3 +388,11 @@ pub struct MessageDeltaContentVideoUrlObject {
pub video_url: Option<VideoUrl>,
}
#[derive(Clone, Serialize, Debug, Deserialize, PartialEq)]
pub struct MessageDeltaContentAudioUrlObject {
/// The index of the content part in the message.
pub index: u32,
pub audio_url: Option<AudioUrl>,
}
......@@ -102,6 +102,8 @@ pub enum ContentType {
InputImage(InputImage),
/// A video input to the model.
InputVideo(InputVideo),
/// An audio input to the model.
InputAudio(InputAudio),
/// A file input to the model.
InputFile(InputFile),
}
......@@ -151,6 +153,24 @@ pub struct InputVideo {
video_url: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default, Builder)]
#[builder(
name = "InputAudioArgs",
pattern = "mutable",
setter(into, strip_option),
default
)]
#[builder(build_fn(error = "OpenAIError"))]
pub struct InputAudio {
/// The ID of the file to be sent to the model.
#[serde(skip_serializing_if = "Option::is_none")]
file_id: Option<String>,
/// The URL of the audio to be sent to the model. A fully qualified URL or base64 encoded audio
/// in a data URL.
#[serde(skip_serializing_if = "Option::is_none")]
audio_url: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default, Builder)]
#[builder(
name = "InputFileArgs",
......
......@@ -244,6 +244,34 @@ vllm_configs = {
)
],
),
"multimodal_audio_agg": VLLMConfig(
name="multimodal_audio_agg",
directory="/workspace/examples/multimodal",
script_name="audio_agg.sh",
marks=[pytest.mark.gpu_2],
model="Qwen/Qwen2-Audio-7B-Instruct",
delayed_start=0,
script_args=["--model", "Qwen/Qwen2-Audio-7B-Instruct"],
timeout=500,
request_payloads=[
chat_payload(
[
{"type": "text", "text": "What is recited in the audio?"},
{
"type": "audio_url",
"audio_url": {
"url": "https://raw.githubusercontent.com/yuekaizhang/Triton-ASR-Client/main/datasets/mini_en/wav/1221-135766-0002.wav"
},
},
],
repeat_count=1,
expected_response=[
"The original content of this audio is:'yet these thoughts affected Hester Pynne less with hope than apprehension.'"
],
temperature=0.8,
)
],
),
# TODO: Enable this test case when we have 4 GPUs runners.
# "multimodal_disagg": VLLMConfig(
# name="multimodal_disagg",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment