Unverified Commit f9918f61 authored by Ayush Agarwal's avatar Ayush Agarwal Committed by GitHub
Browse files

chore: support for muliple MM requests for vLLM Encoder + Rust Based Preprocessor (#5463)


Signed-off-by: default avatarayushag <ayushag@nvidia.com>
parent 50c54695
......@@ -861,9 +861,9 @@ async def init_ec_processor(runtime: DistributedRuntime, config: Config):
await encoder_client.wait_for_instances()
await pd_client.wait_for_instances()
# Register the endpoint as entrypoint to a model (same as regular processor)
# Register the endpoint as entrypoint to a model (same as preprocessed_handler)
await register_llm(
ModelInput.Text, # Custom processor is used and this type bypasses SDK processor
ModelInput.Tokens, # Use Rust tokenization for better performance and multi-image support
ModelType.Chat,
generate_endpoint,
config.model,
......
......@@ -5,8 +5,10 @@ from dynamo.vllm.multimodal_handlers.encode_worker_handler import (
EncodeWorkerHandler,
VLLMEncodeWorkerHandler,
)
from dynamo.vllm.multimodal_handlers.preprocessed_handler import PreprocessedHandler
from dynamo.vllm.multimodal_handlers.preprocessor_handler import ECProcessorHandler
from dynamo.vllm.multimodal_handlers.preprocessed_handler import (
ECProcessorHandler,
PreprocessedHandler,
)
from dynamo.vllm.multimodal_handlers.worker_handler import (
MultimodalDecodeWorkerHandler,
MultimodalPDWorkerHandler,
......@@ -16,7 +18,7 @@ __all__ = [
"EncodeWorkerHandler",
"VLLMEncodeWorkerHandler",
"PreprocessedHandler",
"ECProcessorHandler",
"MultimodalPDWorkerHandler",
"MultimodalDecodeWorkerHandler",
"ECProcessorHandler",
]
......@@ -10,7 +10,7 @@ from typing import AsyncGenerator, AsyncIterator
import safetensors
from transformers import AutoImageProcessor
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.inputs.data import TextPrompt
from vllm.inputs import TokensPrompt
from vllm.multimodal.hasher import MultiModalHasher
from vllm.sampling_params import SamplingParams
......@@ -255,11 +255,11 @@ class VLLMEncodeWorkerHandler:
Process encoder request and trigger vLLM encoder execution.
Args:
request: VLLMNativeEncoderRequest with multimodal_input
request: VLLMNativeEncoderRequest with multimodal_inputs (list of MultiModalGroup)
context: Request context from Dynamo runtime
Yields:
JSON-encoded VLLMNativeEncoderResponse with mm_hash and connector metadata
JSON-encoded VLLMNativeEncoderResponse for each processed item
"""
# Parse request
if not isinstance(request, VLLMNativeEncoderRequest):
......@@ -268,16 +268,32 @@ class VLLMEncodeWorkerHandler:
else:
request = VLLMNativeEncoderRequest.model_validate(request)
if not request.multimodal_inputs:
raise ValueError("No multimodal inputs provided in request")
logger.info(
f"Processing {len(request.multimodal_inputs)} multimodal item(s) "
f"for request_id={request.request_id}"
)
# Process each multimodal input
for idx, mm_group in enumerate(request.multimodal_inputs):
mm_input = mm_group.multimodal_input
item_request_id = f"{request.request_id}_mm_{idx}"
# Load media (image/video/audio)
# TODO: Add support for video_url and audio
if request.multimodal_input.image_url:
media = await self.image_loader.load_image(
request.multimodal_input.image_url
)
if mm_input.image_url:
media = await self.image_loader.load_image(mm_input.image_url)
media_key = "image"
modality = "image"
elif mm_input.video_url:
# TODO: Implement video loading
raise NotImplementedError("Video encoding not yet supported")
else:
raise ValueError(
"No media URL provided. Specify image_url in multimodal_input."
f"No media URL provided in multimodal_input[{idx}]. "
"Specify image_url or video_url."
)
# Compute mm_hash using vLLM's hasher
......@@ -285,21 +301,21 @@ class VLLMEncodeWorkerHandler:
mm_hash = MultiModalHasher.hash_kwargs(
model_id=self.config.model, **{media_key: media}
)
logger.debug(f"Computed mm_hash: {mm_hash}")
logger.debug(f"[{item_request_id}] Computed mm_hash: {mm_hash}")
except Exception as e:
logger.error(f"Failed to compute mm_hash: {e}")
logger.error(f"[{item_request_id}] Failed to compute mm_hash: {e}")
raise
try:
# Prompt can be a random string as the encoder is only interested in the multimodal data
prompt_dict = TextPrompt(
prompt=request.prompt, multi_modal_data={media_key: media}
prompt_dict = TokensPrompt(
prompt_token_ids=request.token_ids,
multi_modal_data={media_key: media},
)
gen = self.engine_client.generate(
prompt=prompt_dict,
sampling_params=SamplingParams(max_tokens=1, min_tokens=0),
request_id=request.request_id,
request_id=item_request_id,
)
# Consume generator to trigger encoder execution
......@@ -307,27 +323,34 @@ class VLLMEncodeWorkerHandler:
pass
logger.info(
f"Encoder execution completed for request_id={request.request_id}"
f"[{item_request_id}] Encoder execution completed "
f"({idx + 1}/{len(request.multimodal_inputs)})"
)
except Exception as e:
logger.error(f"Encoder execution failed: {e}")
logger.error(f"[{item_request_id}] Encoder execution failed: {e}")
raise
# Return metadata for PD workers
# Yield metadata for each item (PD workers can use these to lookup from cache)
# Right now this is not used. Can be used for logging purpose later.
response = VLLMNativeEncoderResponse(
request_id=request.request_id,
request_id=item_request_id,
mm_hash=mm_hash,
modality=request.modality,
modality=modality,
connector_metadata={
"ec_connector": self.config.ec_connector_backend,
"storage_path": self.config.ec_storage_path,
},
)
logger.debug(f"Returning response: {response}")
logger.debug(f"[{item_request_id}] Returning response: {response}")
yield response.model_dump_json()
logger.info(
f"All {len(request.multimodal_inputs)} multimodal items processed "
f"for request_id={request.request_id}"
)
def cleanup(self):
"""Cleanup resources."""
logger.info("Cleaning up VLLMNativeEncoderWorkerHandler")
......
......@@ -21,6 +21,7 @@ from ..multimodal_utils import (
MyRequestOutput,
PatchedTokensPrompt,
ProcessMixIn,
VLLMNativeEncoderRequest,
vLLMMultimodalRequest,
)
......@@ -243,3 +244,140 @@ class PreprocessedHandler(ProcessMixIn):
async for response in self._generate(request, multimodal_inputs, context):
yield response
class ECProcessorHandler(PreprocessedHandler):
"""
Processor handler for ECConnector-based encoder with pre-tokenized input support.
Inherits from PreprocessedHandler to reuse common pre-tokenized processing logic.
Uses ECConnector (vLLM-native encoder) instead of custom RDMA-based encoder.
"""
def __init__(
self,
engine_args: AsyncEngineArgs,
encoder_worker_client: Client,
pd_worker_client: Client,
prompt_template: str = None,
):
"""
Initialize the ECConnector processor.
Args:
engine_args: vLLM engine arguments for model config
encoder_worker_client: Client for vLLM-native encoder worker endpoints
pd_worker_client: Client for PD worker endpoints (ECConnector consumer)
prompt_template: Optional prompt template (for reference, tokenization done by Rust)
"""
# Initialize base class
super().__init__(engine_args, encoder_worker_client, pd_worker_client)
self.prompt_template = prompt_template
logger.info(
"ECProcessorHandler initialized (inherits PreprocessedHandler, uses ECConnector)"
)
async def _generate(
self,
raw_request,
multimodal_inputs,
context,
):
"""
Generate responses using ECConnector encoder.
Overrides PreprocessedHandler._generate to use VLLMNativeEncoderRequest
instead of custom encoder protocol.
"""
# Extract token_ids from request (these contain placeholder tokens like 32000 for <image>)
token_ids = raw_request.get("token_ids", [])
if not token_ids:
raise ValueError("token_ids not found in request")
logger.info(
f"ECProcessor using token_ids (length={len(token_ids)}) with placeholders. "
f"Sample: {token_ids[:min(20, len(token_ids))]}"
)
# Check video not supported yet
if VIDEO_URL_KEY in multimodal_inputs and multimodal_inputs[VIDEO_URL_KEY]:
raise ValueError("Video URL not supported in ECConnector encoder yet")
request_id = str(uuid.uuid4().hex)
# Build sampling params from request
sampling_params = build_sampling_params(
raw_request, self.default_sampling_params
)
# Create multimodal groups for encoder
multimodal_groups = []
for mm_type, urls in multimodal_inputs.items():
for url in urls:
multimodal_input = MultiModalInput()
if mm_type == IMAGE_URL_KEY:
multimodal_input.image_url = url
elif mm_type == VIDEO_URL_KEY:
multimodal_input.video_url = url
multimodal_groups.append(
MultiModalGroup(multimodal_input=multimodal_input)
)
logger.info(
f"[{request_id}] Encoding {len(multimodal_groups)} multimodal item(s) "
f"via vLLM-native encoder (ECConnector)..."
)
# Send to vLLM-native encoder using VLLMNativeEncoderRequest
# Pass token_ids which already contain placeholder tokens (e.g., 32000 for <image> in LLaVA)
# The encoder worker will use TokensPrompt so vLLM can match placeholder token IDs
try:
encoder_request = VLLMNativeEncoderRequest(
request_id=request_id,
token_ids=token_ids, # Pass pre-tokenized input with placeholder tokens
multimodal_inputs=multimodal_groups,
)
request_json = encoder_request.model_dump_json()
response_stream = await self.encode_worker_client.round_robin(request_json)
# Consume encoder responses (embeddings written to ECConnector cache)
async for chunk in response_stream:
logger.debug(
f"[{request_id}] Received encoder response (embeddings cached)"
)
logger.info(f"[{request_id}] Encoder completed successfully for all items")
except Exception as e:
logger.error(f"[{request_id}] Encoder processing failed: {e}")
raise
# Create worker request with pre-tokenized prompt and ALL multimodal inputs
worker_request = vLLMMultimodalRequest(
engine_prompt=PatchedTokensPrompt(
prompt_token_ids=raw_request["token_ids"] # Pre-tokenized by Rust!
),
sampling_params=sampling_params,
request_id=request_id,
multimodal_inputs=multimodal_groups, # ALL images at once
)
logger.info(
f"[{request_id}] Sending request with {len(multimodal_groups)} "
f"multimodal item(s) to PD worker (ECConnector consumer)..."
)
# Send single request to PD worker with ALL images
response_generator = await self.pd_worker_client.round_robin(
worker_request.model_dump_json(), context=context
)
# Stream responses back to client (reuse base class method)
async for output in self._generate_responses(response_generator):
yield output
logger.info(
f"[{request_id}] Completed processing all {len(multimodal_groups)} item(s)"
)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
import json
import logging
import uuid
from enum import Enum
from typing import Any, AsyncIterator, Dict, List, Union
from transformers import AutoTokenizer
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.entrypoints.openai.protocol import ChatCompletionRequest, CompletionRequest
from vllm.outputs import RequestOutput
from vllm.tokenizers import TokenizerLike as AnyTokenizer
from dynamo.runtime import Client
from ..multimodal_utils import (
ChatProcessor,
CompletionsProcessor,
MultiModalGroup,
MultiModalInput,
MultiModalRequest,
MyRequestOutput,
ProcessMixIn,
extract_user_text,
vLLMMultimodalRequest,
)
logger = logging.getLogger(__name__)
class RequestType(Enum):
CHAT = "chat"
COMPLETION = "completion"
class ProcessorHandler(ProcessMixIn):
"""
vLLM pre and post processing for multimodal requests
"""
def __init__(
self,
engine_args: AsyncEngineArgs,
encode_worker_client: Client,
prompt_template: str,
):
self.encode_worker_client = encode_worker_client
self.prompt_template = prompt_template
self.engine_args = engine_args
self.model_config = self.engine_args.create_model_config()
self.default_sampling_params = self.model_config.get_diff_sampling_param()
self.tokenizer = self._create_tokenizer(self.engine_args)
self.chat_processor = ChatProcessor(self.tokenizer, self.model_config)
self.completions_processor = CompletionsProcessor(
self.tokenizer, self.model_config
)
def cleanup(self):
pass
def _create_tokenizer(self, engine_args: AsyncEngineArgs) -> AnyTokenizer:
"""Create a TokenizerGroup using engine arguments similar to VLLM's approach"""
model_path = engine_args.model
# Create the base tokenizer with VLLM's typical settings
base_tokenizer = AutoTokenizer.from_pretrained(
model_path,
trust_remote_code=True,
padding_side="left",
truncation_side="left",
use_fast=True, # VLLM might use the fast tokenizer for efficiency
)
return base_tokenizer
# Main method to parse the request and send the request to the vllm worker.
async def _generate(
self,
raw_request: Union[CompletionRequest, ChatCompletionRequest],
multimodal_input: MultiModalInput,
request_type: RequestType,
context,
):
request_id = str(uuid.uuid4().hex)
logger.debug(f"Got raw request: {raw_request}")
(
request,
conversation,
engine_prompt,
sampling_params,
) = await self._parse_raw_request(raw_request)
worker_request = vLLMMultimodalRequest(
engine_prompt=engine_prompt,
sampling_params=sampling_params,
request_id=request_id,
multimodal_input=multimodal_input,
)
# model_dump_json() serializes the request to JSON string
# This API could accept Pydantic class, but SamplingParams
# in vLLMMultimodalRequest is not a Pydantic class and will
# cause TypeError: unsupported type SamplingParams
response_generator = await self.encode_worker_client.round_robin(
worker_request.model_dump_json()
)
output = self._generate_responses(response_generator, request_type)
# Stream the processed responses
async for response in await self._stream_response(
request, output, request_id, conversation
):
yield response
# This method is used to process the responses from the engine generator.
async def _generate_responses(
self,
response_generator: AsyncIterator[RequestOutput],
request_type: RequestType,
):
async for resp in response_generator:
# Deserialize the response from the engine
# Creates correct vLLM objects for each field
output = MyRequestOutput.model_validate_json(resp.data())
# OpenAIServingChat.chat_completion_stream_generator() method expects a RequestOutput object
request_output = RequestOutput(
request_id=output.request_id,
prompt=output.prompt,
prompt_token_ids=output.prompt_token_ids,
prompt_logprobs=output.prompt_logprobs,
outputs=output.outputs,
finished=output.finished,
metrics=output.metrics,
)
if request_type == RequestType.CHAT:
# For chat requests, yield the request_output directly.
yield request_output
else:
raise NotImplementedError(
f"Request type {request_type} not implemented"
)
# The generate endpoint will be used by the frontend to handle incoming requests.
async def generate(self, raw_request: MultiModalRequest, context):
logger.debug(f"Got raw request: {raw_request}")
if not isinstance(raw_request, MultiModalRequest):
# If the request is not MultiModalRequest, convert it to MultiModalRequest
raw_request = MultiModalRequest.model_validate(raw_request)
# Ensure the configured template includes the placeholder
template = self.prompt_template
if "<prompt>" not in template:
raise ValueError("prompt_template must contain '<prompt>' placeholder")
# Safely extract user text
user_text = extract_user_text(raw_request.messages)
prompt = template.replace("<prompt>", user_text)
msg = {
"role": "user",
"content": prompt,
}
# Set stream=True - the http frontend will handle aggregation of
# streamed chunks into a single http response, or stream them
# back as SSE responses based on the stream flag in the request.
chat_request = ChatCompletionRequest(
model=raw_request.model,
messages=[msg],
stream=True,
max_tokens=raw_request.max_tokens,
temperature=raw_request.temperature,
request_id=str(uuid.uuid4()),
)
multimodal_input = MultiModalInput()
for message in raw_request.messages:
for item in message.content:
if item.type == "image_url":
multimodal_input.image_url = item.image_url.url
elif item.type == "video_url":
if multimodal_input.image_url is not None:
raise ValueError("Cannot provide both image and video URLs")
multimodal_input.video_url = item.video_url.url
if multimodal_input.image_url is None and multimodal_input.video_url is None:
raise ValueError("Either image URL or video URL is required")
async for response in self._generate(
chat_request, multimodal_input, RequestType.CHAT, context
):
logger.debug(
f"Generated response type {type(response)}, content: {response}"
)
# reconstructing back the OpenAI chat response as dynamo egress expects it
if response.startswith("data: [DONE]"):
break
response = json.loads(response.lstrip("data: "))
yield response
class ECProcessorHandler(ProcessorHandler):
"""
Processor handler for ECConnector-based encoder
"""
def __init__(
self,
engine_args: AsyncEngineArgs,
encoder_worker_client: Client,
pd_worker_client: Client,
prompt_template: str,
):
"""
Initialize the ECConnector processor.
Args:
engine_args: vLLM engine arguments for model config
encoder_worker_client: Client for encoder worker endpoints
pd_worker_client: Client for PD worker endpoints
prompt_template: Multimodal prompt template
"""
# Initialize base class with encoder client
super().__init__(engine_args, encoder_worker_client, prompt_template)
# Store additional PD client for disaggregated architecture
self.encoder_client = encoder_worker_client
self.pd_client = pd_worker_client
logger.info("ECProcessorHandler initialized with disaggregated architecture")
@staticmethod
def _extract_multimodal_items(request_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Extract all multimodal items (images/videos) from the request messages.
Args:
request_data: The request dictionary
Returns:
List of multimodal content items
"""
items = []
messages = request_data.get("messages", [])
for msg in messages:
content = msg.get("content")
if not isinstance(content, list):
continue
for item in content:
item_type = item.get("type")
if item_type in ("image_url", "video_url"):
items.append(item)
return items
@staticmethod
def _create_encoder_request(
prompt: str,
mm_item: Dict[str, Any],
model: str,
request_id: str,
) -> Dict[str, Any]:
# Create MultiModalInput from the item
multimodal_input = {}
modality = None
if mm_item.get("type") == "image_url":
multimodal_input["image_url"] = mm_item["image_url"]["url"]
modality = "image"
elif mm_item.get("type") == "video_url":
multimodal_input["video_url"] = mm_item["video_url"]["url"]
modality = "video"
else:
raise ValueError(f"Unsupported multimodal type: {mm_item.get('type')}")
return {
"prompt": prompt,
"request_id": request_id,
"multimodal_input": multimodal_input,
"modality": modality,
}
async def _encode_multimodal_items(
self,
prompt: str,
mm_items: List[Dict[str, Any]],
model: str,
request_id: str,
) -> None:
"""
Send all multimodal items to encoder workers concurrently.
Each item is sent as a separate request to an encoder worker.
The encoder processes the item and stores embeddings to shared storage.
"""
if not mm_items:
logger.debug(f"[{request_id}] No multimodal items to encode")
return
logger.info(f"[{request_id}] Encoding {len(mm_items)} multimodal item(s)")
tasks = []
for idx, mm_item in enumerate(mm_items):
# Create unique request ID for each item
item_request_id = f"{request_id}_mm_{idx}"
# Build encoder request
encoder_request = self._create_encoder_request(
prompt=prompt,
mm_item=mm_item,
model=model,
request_id=item_request_id,
)
# Create task for this encoder request
task = self._send_to_encoder(encoder_request, item_request_id)
tasks.append(task)
# Wait for all encoder requests to complete
try:
await asyncio.gather(*tasks)
logger.info(f"[{request_id}] All encoders completed successfully")
except Exception as e:
logger.error(f"[{request_id}] Encoder encoding failed: {e}")
raise
async def _send_to_encoder(
self,
encoder_request: Dict[str, Any],
request_id: str,
) -> None:
"""
Send a single request to an encoder worker and wait for completion.
"""
try:
# Convert to JSON
request_json = json.dumps(encoder_request)
# Send to encoder worker (round-robin)
response_stream = await self.encoder_client.round_robin(request_json)
# Consume the response stream
async for chunk in response_stream:
pass
logger.debug(f"[{request_id}] Encoder completed successfully")
except Exception as e:
logger.error(f"[{request_id}] Encoder request failed: {e}")
raise
async def generate(self, raw_request: MultiModalRequest, context):
"""
Main endpoint handler for chat completion requests with ECConnector.
"""
logger.debug(f"ECProcessor received request: {raw_request}")
if not isinstance(raw_request, MultiModalRequest):
raw_request = MultiModalRequest.model_validate(raw_request)
# Ensure the configured template includes the placeholder
template = self.prompt_template
if "<prompt>" not in template:
raise ValueError("prompt_template must contain '<prompt>' placeholder")
# Safely extract user text
user_text = None
for message in raw_request.messages:
for item in message.content:
if item.type == "text":
user_text = item.text
break
if not user_text:
raise ValueError("No text content found in request")
prompt = template.replace("<prompt>", user_text)
msg = {
"role": "user",
"content": prompt,
}
# Generate single request ID for entire flow
request_id = str(uuid.uuid4().hex)
# Create chat request for preprocessing
chat_request = ChatCompletionRequest(
model=raw_request.model,
messages=[msg],
stream=True,
max_tokens=raw_request.max_tokens,
temperature=raw_request.temperature,
request_id=request_id,
)
# Step 1: Extract multimodal input (needed for PD worker to generate mm_hash)
multimodal_input = MultiModalInput()
for message in raw_request.messages:
for item in message.content:
if item.type == "image_url":
multimodal_input.image_url = item.image_url.url
elif item.type == "video_url":
if multimodal_input.image_url is not None:
raise ValueError("Cannot provide both image and video URLs")
multimodal_input.video_url = item.video_url.url
if multimodal_input.image_url is None and multimodal_input.video_url is None:
raise ValueError("Either image URL or video URL is required")
# Step 2: Send multimodal items to encoder (ECConnector producer)
mm_items = self._extract_multimodal_items(raw_request.model_dump())
if mm_items:
logger.info(
f"[{request_id}] Encoding {len(mm_items)} multimodal item(s) via encoder..."
)
try:
await self._encode_multimodal_items(
prompt=prompt,
mm_items=mm_items,
model=raw_request.model,
request_id=request_id,
)
except Exception as e:
logger.error(f"[{request_id}] Encoder processing failed: {e}")
error_response = {
"error": {
"message": f"Encoder processing failed: {str(e)}",
"type": "encoder_error",
"code": 500,
}
}
yield error_response
return
# Step 2: Preprocess request (parse chat, tokenize, create engine prompt)
logger.debug(f"[{request_id}] Preprocessing request...")
(
request,
conversation,
engine_prompt,
sampling_params,
) = await self._parse_raw_request(chat_request)
# Step 3: Create worker request for PD worker (WITH multimodal_input)
# PD worker needs multimodal_input to generate mm_hash and lookup EC cache
# vLLM will see the multimodal items, generate mm_hash, and load from cache
worker_request = vLLMMultimodalRequest(
engine_prompt=engine_prompt,
sampling_params=sampling_params,
request_id=request_id,
multimodal_inputs=[
MultiModalGroup(multimodal_input=multimodal_input)
], # ✓ Keep this so vLLM can generate mm_hash
)
logger.debug(
f"[{request_id}] Forwarding to PD worker (vLLM will load from ECConnector cache using mm_hash)..."
)
# Step 4: Send to PD worker (ECConnector consumer - will load from storage)
response_generator = await self.pd_client.round_robin(
worker_request.model_dump_json()
)
# Step 5: Generate and stream responses (reuse base class method)
output = self._generate_responses(response_generator, RequestType.CHAT)
async for response in await self._stream_response(
request, output, request_id, conversation
):
logger.debug(f"[{request_id}] Generated response: {type(response)}")
# Reconstruct OpenAI chat response
if response.startswith("data: [DONE]"):
break
response = json.loads(response.lstrip("data: "))
yield response
......@@ -163,10 +163,13 @@ class VLLMNativeEncoderRequest(BaseModel):
"""Request for vLLM-native encoder worker using ECConnector"""
request_id: str
prompt: str
multimodal_input: MultiModalInput
modality: Literal["image", "video", "audio"]
batch_items: Optional[List[MultiModalInput]] = None # For future batch processing
token_ids: List[
int
] # Pre-tokenized prompt with placeholder tokens (for TokensPrompt)
multimodal_inputs: List[MultiModalGroup] = Field(default_factory=list)
modality: Optional[
Literal["image", "video", "audio"]
] = None # Can be inferred from inputs
class VLLMNativeEncoderResponse(BaseModel):
......
......@@ -6,8 +6,6 @@ trap 'echo Cleaning up...; kill 0' EXIT
# Default values
MODEL_NAME="llava-hf/llava-1.5-7b-hf"
PROMPT_TEMPLATE="USER: <image>\n<prompt> ASSISTANT:"
PROVIDED_PROMPT_TEMPLATE=""
EC_STORAGE_PATH="/tmp/dynamo_ec_cache"
EC_CONNECTOR_BACKEND="ECExampleConnector"
......@@ -18,10 +16,6 @@ while [[ $# -gt 0 ]]; do
MODEL_NAME=$2
shift 2
;;
--prompt-template)
PROVIDED_PROMPT_TEMPLATE=$2
shift 2
;;
--ec-storage-path)
EC_STORAGE_PATH=$2
shift 2
......@@ -37,13 +31,12 @@ while [[ $# -gt 0 ]]; do
echo ""
echo "This script launches:"
echo " - Frontend server"
echo " - Processor component"
echo " - Processor component (uses pre-tokenized input with ModelInput.Tokens)"
echo " - vLLM-native encoder worker (producer using ECConnector)"
echo " - Multimodal worker (consumer using ECConnector, aggregated P+D)"
echo ""
echo "Options:"
echo " --model <model_name> Specify the VLM model to use (default: $MODEL_NAME)"
echo " --prompt-template <template> Specify the multi-modal prompt template to use"
echo " --ec-storage-path <path> Path for ECConnector storage (default: $EC_STORAGE_PATH)"
echo " --ec-connector-backend <backend> ECConnector backend class (default: $EC_CONNECTOR_BACKEND)"
echo " -h, --help Show this help message"
......@@ -63,23 +56,6 @@ while [[ $# -gt 0 ]]; do
esac
done
# Set PROMPT_TEMPLATE based on the MODEL_NAME if not provided
if [[ -n "$PROVIDED_PROMPT_TEMPLATE" ]]; then
PROMPT_TEMPLATE="$PROVIDED_PROMPT_TEMPLATE"
elif [[ "$MODEL_NAME" == "meta-llama/Llama-3.2-11B-Vision-Instruct" ]]; then
PROMPT_TEMPLATE="<|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\n<|image|><prompt><|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n"
elif [[ "$MODEL_NAME" == "llava-hf/llava-1.5-7b-hf" ]]; then
PROMPT_TEMPLATE="USER: <image>\n<prompt> ASSISTANT:"
elif [[ "$MODEL_NAME" == "microsoft/Phi-3.5-vision-instruct" ]]; then
PROMPT_TEMPLATE="<|user|>\n<|image_1|>\n<prompt><|end|>\n<|assistant|>\n"
elif [[ "$MODEL_NAME" == "Qwen/Qwen2.5-VL-7B-Instruct" ]]; then
PROMPT_TEMPLATE="<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n<|im_start|>user\n<|vision_start|><|image_pad|><|vision_end|><prompt><|im_end|>\n<|im_start|>assistant\n"
else
echo "No multi-modal prompt template is defined for the model: $MODEL_NAME"
echo "Please provide a prompt template using --prompt-template option."
exit 1
fi
# Create storage directory if it doesn't exist
mkdir -p "$EC_STORAGE_PATH"
......@@ -87,7 +63,6 @@ echo "=================================================="
echo "Aggregated Multimodal Serving (vLLM-Native Encoder with ECConnector)"
echo "=================================================="
echo "Model: $MODEL_NAME"
echo "Prompt Template: $PROMPT_TEMPLATE"
echo "ECConnector Backend: $EC_CONNECTOR_BACKEND"
echo "Storage Path: $EC_STORAGE_PATH"
echo "=================================================="
......@@ -96,13 +71,12 @@ echo "=================================================="
echo "Starting frontend..."
python -m dynamo.frontend &
# Start EC Processor (simple processor for ECConnector mode)
# Start EC Processor (uses pre-tokenized input with placeholder tokens)
echo "Starting EC Processor..."
python -m dynamo.vllm \
--ec-processor \
--enable-multimodal \
--model $MODEL_NAME \
--mm-prompt-template "$PROMPT_TEMPLATE" &
--model $MODEL_NAME &
# Start vLLM-native encoder worker (ECConnector producer)
echo "Starting vLLM-native encoder worker (ECConnector producer) on GPU 0..."
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment