Unverified Commit 85e0512f authored by Ayush Agarwal's avatar Ayush Agarwal Committed by GitHub
Browse files

feat: ec connector handler (#5162)


Signed-off-by: default avatarAyush Agarwal <ayushag@nvidia.com>
parent e994caeb
...@@ -61,12 +61,23 @@ class Config: ...@@ -61,12 +61,23 @@ class Config:
# multimodal options # multimodal options
multimodal_processor: bool = False multimodal_processor: bool = False
# Emebdding Cache Processor is different from the regular processor
# TODO: Have a single processor for all cases and adopting rust based processor
ec_processor: bool = False
multimodal_encode_worker: bool = False multimodal_encode_worker: bool = False
multimodal_worker: bool = False multimodal_worker: bool = False
multimodal_decode_worker: bool = False multimodal_decode_worker: bool = False
enable_multimodal: bool = False enable_multimodal: bool = False
multimodal_encode_prefill_worker: bool = False multimodal_encode_prefill_worker: bool = False
mm_prompt_template: str = "USER: <image>\n<prompt> ASSISTANT:" mm_prompt_template: str = "USER: <image>\n<prompt> ASSISTANT:"
# vLLM-native encoder worker (ECConnector mode)
vllm_native_encoder_worker: bool = False
ec_connector_backend: Optional[str] = "ECExampleConnector"
ec_storage_path: Optional[str] = None
ec_extra_config: Optional[str] = None
ec_consumer_mode: bool = False
# dump config to file # dump config to file
dump_config_to: Optional[str] = None dump_config_to: Optional[str] = None
...@@ -153,6 +164,11 @@ def parse_args() -> Config: ...@@ -153,6 +164,11 @@ def parse_args() -> Config:
action="store_true", action="store_true",
help="Run as multimodal processor component for handling multimodal requests", help="Run as multimodal processor component for handling multimodal requests",
) )
parser.add_argument(
"--ec-processor",
action="store_true",
help="Run as ECConnector processor (routes multimodal requests to encoder then PD workers)",
)
parser.add_argument( parser.add_argument(
"--multimodal-encode-worker", "--multimodal-encode-worker",
action="store_true", action="store_true",
...@@ -191,6 +207,34 @@ def parse_args() -> Config: ...@@ -191,6 +207,34 @@ def parse_args() -> Config:
"'USER: <image> please describe the image ASSISTANT:'." "'USER: <image> please describe the image ASSISTANT:'."
), ),
) )
parser.add_argument(
"--vllm-native-encoder-worker",
action="store_true",
help="Run as vLLM-native encoder worker using ECConnector for encoder disaggregation (requires shared storage). The following flags only work when this flag is enabled: --ec-connector-backend, --ec-storage-path, --ec-extra-config, --ec-consumer-mode.",
)
parser.add_argument(
"--ec-connector-backend",
type=str,
default="ECExampleConnector",
help="ECConnector implementation class for encoder disaggregation. Default: ECExampleConnector (disk-based)",
)
parser.add_argument(
"--ec-storage-path",
type=str,
default=None,
help="Storage path for ECConnector (required for ECExampleConnector, optional for other backends)",
)
parser.add_argument(
"--ec-extra-config",
type=str,
default=None,
help="Additional ECConnector configuration as JSON string",
)
parser.add_argument(
"--ec-consumer-mode",
action="store_true",
help="Configure as ECConnector consumer for receiving encoder embeddings (for PD workers)",
)
parser.add_argument( parser.add_argument(
"--store-kv", "--store-kv",
type=str, type=str,
...@@ -271,27 +315,42 @@ def parse_args() -> Config: ...@@ -271,27 +315,42 @@ def parse_args() -> Config:
# Check multimodal role exclusivity # Check multimodal role exclusivity
mm_flags = ( mm_flags = (
int(bool(args.multimodal_processor)) int(bool(args.multimodal_processor))
+ int(bool(args.ec_processor))
+ int(bool(args.multimodal_encode_worker)) + int(bool(args.multimodal_encode_worker))
+ int(bool(args.multimodal_worker)) + int(bool(args.multimodal_worker))
+ int(bool(args.multimodal_decode_worker)) + int(bool(args.multimodal_decode_worker))
+ int(bool(args.multimodal_encode_prefill_worker)) + int(bool(args.multimodal_encode_prefill_worker))
+ int(bool(args.vllm_native_encoder_worker))
) )
if mm_flags > 1: if mm_flags > 1:
raise ValueError( raise ValueError(
"Use only one of --multimodal-processor, --multimodal-encode-worker, --multimodal-worker, --multimodal-decode-worker, or --multimodal-encode-prefill-worker" "Use only one of --multimodal-processor, --ec-processor, --multimodal-encode-worker, --multimodal-worker, "
"--multimodal-decode-worker, --multimodal-encode-prefill-worker, or --vllm-native-encoder-worker"
) )
if mm_flags == 1 and not args.enable_multimodal: if mm_flags == 1 and not args.enable_multimodal:
raise ValueError("Use --enable-multimodal to enable multimodal processing") raise ValueError("Use --enable-multimodal to enable multimodal processing")
# Validate vLLM-native encoder worker config
if args.vllm_native_encoder_worker:
if (
args.ec_connector_backend == "ECExampleConnector"
and not args.ec_storage_path
):
raise ValueError(
"--ec-storage-path is required when using ECExampleConnector backend. "
"Specify a shared storage path for encoder cache."
)
# Set component and endpoint based on worker type # Set component and endpoint based on worker type
if args.multimodal_processor: if args.multimodal_processor or args.ec_processor:
config.component = "processor" config.component = "processor"
config.endpoint = "generate" config.endpoint = "generate"
elif args.multimodal_encode_worker: elif (
config.component = "encoder" args.vllm_native_encoder_worker
config.endpoint = "generate" or args.multimodal_encode_worker
elif args.multimodal_encode_prefill_worker: or args.multimodal_encode_prefill_worker
):
config.component = "encoder" config.component = "encoder"
config.endpoint = "generate" config.endpoint = "generate"
elif args.multimodal_decode_worker: elif args.multimodal_decode_worker:
...@@ -319,12 +378,18 @@ def parse_args() -> Config: ...@@ -319,12 +378,18 @@ def parse_args() -> Config:
config.custom_jinja_template = args.custom_jinja_template config.custom_jinja_template = args.custom_jinja_template
config.dyn_endpoint_types = args.dyn_endpoint_types config.dyn_endpoint_types = args.dyn_endpoint_types
config.multimodal_processor = args.multimodal_processor config.multimodal_processor = args.multimodal_processor
config.ec_processor = args.ec_processor
config.multimodal_encode_worker = args.multimodal_encode_worker config.multimodal_encode_worker = args.multimodal_encode_worker
config.multimodal_worker = args.multimodal_worker config.multimodal_worker = args.multimodal_worker
config.multimodal_decode_worker = args.multimodal_decode_worker config.multimodal_decode_worker = args.multimodal_decode_worker
config.multimodal_encode_prefill_worker = args.multimodal_encode_prefill_worker config.multimodal_encode_prefill_worker = args.multimodal_encode_prefill_worker
config.enable_multimodal = args.enable_multimodal config.enable_multimodal = args.enable_multimodal
config.mm_prompt_template = args.mm_prompt_template config.mm_prompt_template = args.mm_prompt_template
config.vllm_native_encoder_worker = args.vllm_native_encoder_worker
config.ec_connector_backend = args.ec_connector_backend
config.ec_storage_path = args.ec_storage_path
config.ec_extra_config = args.ec_extra_config
config.ec_consumer_mode = args.ec_consumer_mode
config.store_kv = args.store_kv config.store_kv = args.store_kv
config.request_plane = args.request_plane config.request_plane = args.request_plane
config.enable_local_indexer = args.enable_local_indexer config.enable_local_indexer = args.enable_local_indexer
...@@ -510,7 +575,9 @@ def overwrite_args(config): ...@@ -510,7 +575,9 @@ def overwrite_args(config):
setattr(config.engine_args, key, value) setattr(config.engine_args, key, value)
logger.debug(f" engine_args.{key} = {value}") logger.debug(f" engine_args.{key} = {value}")
else: else:
raise ValueError(f"{key} not found in AsyncEngineArgs from vLLM.") logger.debug(
f" Skipping engine_args.{key} (not available in this vLLM version)"
)
def get_host_ip() -> str: def get_host_ip() -> str:
......
...@@ -30,11 +30,14 @@ from dynamo.llm import ( ...@@ -30,11 +30,14 @@ from dynamo.llm import (
from dynamo.runtime import DistributedRuntime from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.vllm.multimodal_handlers import ( from dynamo.vllm.multimodal_handlers import (
ECProcessorHandler,
EncodeWorkerHandler, EncodeWorkerHandler,
MultimodalDecodeWorkerHandler, MultimodalDecodeWorkerHandler,
MultimodalPDWorkerHandler, MultimodalPDWorkerHandler,
ProcessorHandler, ProcessorHandler,
VLLMEncodeWorkerHandler,
) )
from dynamo.vllm.multimodal_utils.encode_utils import create_ec_transfer_config
from .args import Config, overwrite_args, parse_args from .args import Config, overwrite_args, parse_args
from .handlers import DecodeWorkerHandler, PrefillWorkerHandler from .handlers import DecodeWorkerHandler, PrefillWorkerHandler
...@@ -85,7 +88,13 @@ async def worker(): ...@@ -85,7 +88,13 @@ async def worker():
config.model = config.engine_args.model = await fetch_llm(config.model) config.model = config.engine_args.model = await fetch_llm(config.model)
# Route to appropriate initialization based on config flags # Route to appropriate initialization based on config flags
if config.multimodal_processor: if config.vllm_native_encoder_worker:
await init_vllm_native_encoder(runtime, config)
logger.debug("init_vllm_native_encoder completed")
elif config.ec_processor:
await init_ec_processor(runtime, config)
logger.debug("init_ec_processor completed")
elif config.multimodal_processor:
await init_multimodal_processor(runtime, config) await init_multimodal_processor(runtime, config)
logger.debug("init_multimodal_processor completed") logger.debug("init_multimodal_processor completed")
elif config.multimodal_encode_worker: elif config.multimodal_encode_worker:
...@@ -717,7 +726,138 @@ async def init_multimodal_encode_worker(runtime: DistributedRuntime, config: Con ...@@ -717,7 +726,138 @@ async def init_multimodal_encode_worker(runtime: DistributedRuntime, config: Con
), ),
) )
except Exception as e: except Exception as e:
logger.error(f"Failed to serve endpoints: {e}") logger.error(f"Failed to serve encode worker endpoint: {e}")
raise
finally:
handler.cleanup()
async def init_vllm_native_encoder(runtime: DistributedRuntime, config: Config):
"""
Initialize vLLM-native encoder worker component (ECConnector mode).
In this mode, vLLM handles encoder execution, caching, and storage automatically.
"""
# Create component and endpoint
component = runtime.namespace(config.namespace).component(config.component)
generate_endpoint = component.endpoint(config.endpoint)
# Configure ECTransferConfig for producer role
instance_id = 0
engine_id = f"{config.namespace}.{config.component}.encoder.{instance_id}"
# Configure encoder with producer role, it will be responsible for creating embeddings and storing them in the shared storage
ec_transfer_config = create_ec_transfer_config(
engine_id=engine_id,
ec_role="ec_producer",
ec_connector_backend=config.ec_connector_backend,
ec_storage_path=config.ec_storage_path,
ec_extra_config=config.ec_extra_config,
)
# Set ECTransferConfig on engine args
config.engine_args.ec_transfer_config = ec_transfer_config
# Setup vLLM engine
(
engine_client,
vllm_config,
default_sampling_params,
prometheus_temp_dir,
) = setup_vllm_engine(config)
# Initialize vLLM Native Encoder Worker Handler
handler = VLLMEncodeWorkerHandler(
runtime,
component,
engine_client,
config,
)
handler.add_temp_dir(prometheus_temp_dir)
# 5. No async init needed - vLLM handles everything
# await handler.async_init(runtime) # Not needed for ECConnector mode
logger.info("Starting to serve vLLM-native encoder endpoint...")
# 6. Serve endpoint
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate, metrics_labels=[("model", config.model)]
),
)
except Exception as e:
logger.error(f"Failed to serve vLLM-native encoder endpoint: {e}")
raise
finally:
handler.cleanup()
async def init_ec_processor(runtime: DistributedRuntime, config: Config):
"""
Initialize ECConnector processor component.
Simple processor that routes multimodal requests using ECConnector pattern:
1. Preprocess request (same as regular processor)
2. Send multimodal items to encoder workers (stores to shared storage)
3. Forward preprocessed request to PD worker (loads from shared storage)
4. Stream response back to client
"""
# Create component and endpoint
component = runtime.namespace(config.namespace).component(config.component)
generate_endpoint = component.endpoint(config.endpoint)
# Get encoder worker client
encoder_client = (
await runtime.namespace(config.namespace)
.component("encoder")
.endpoint("generate")
.client()
)
# Get PD worker client
pd_client = (
await runtime.namespace(config.namespace)
.component("backend")
.endpoint("generate")
.client()
)
# Get prompt template from args (must be passed via environment or command line)
mm_prompt_template = config.mm_prompt_template
# Create EC processor handler (with preprocessing like regular processor)
handler = ECProcessorHandler(
config.engine_args,
encoder_worker_client=encoder_client,
pd_worker_client=pd_client,
prompt_template=mm_prompt_template,
)
logger.info("Waiting for encoder and PD worker instances...")
await encoder_client.wait_for_instances()
await pd_client.wait_for_instances()
# Register the endpoint as entrypoint to a model (same as regular processor)
await register_llm(
ModelInput.Text, # Custom processor is used and this type bypasses SDK processor
ModelType.Chat,
generate_endpoint,
config.model,
config.served_model_name,
kv_cache_block_size=config.engine_args.block_size,
)
logger.info("Starting to serve EC processor endpoint...")
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate, metrics_labels=[("model", config.model)]
),
)
except Exception as e:
logger.error(f"Failed to serve EC processor endpoint: {e}")
raise raise
finally: finally:
handler.cleanup() handler.cleanup()
...@@ -732,12 +872,34 @@ async def init_multimodal_worker(runtime: DistributedRuntime, config: Config): ...@@ -732,12 +872,34 @@ async def init_multimodal_worker(runtime: DistributedRuntime, config: Config):
2. --multimodal-encode-prefill-worker: Handles inline encoding (e.g., Llama 4) 2. --multimodal-encode-prefill-worker: Handles inline encoding (e.g., Llama 4)
Both can operate in aggregated (P+D) or disaggregated (P→D) mode. Both can operate in aggregated (P+D) or disaggregated (P→D) mode.
When --ec-consumer-mode is enabled, configures as ECConnector consumer
to load encoder embeddings from shared storage.
""" """
component = runtime.namespace(config.namespace).component(config.component) component = runtime.namespace(config.namespace).component(config.component)
generate_endpoint = component.endpoint(config.endpoint) generate_endpoint = component.endpoint(config.endpoint)
clear_endpoint = component.endpoint("clear_kv_blocks") clear_endpoint = component.endpoint("clear_kv_blocks")
# Configure ECConnector consumer mode if enabled
if config.ec_consumer_mode:
logger.info("Configuring as ECConnector consumer for encoder embeddings")
instance_id = 0
engine_id = f"{config.namespace}.{config.component}.backend.{instance_id}"
# The PD Worker just load the embeddings from the shared storage, so it is a consumer
ec_transfer_config = create_ec_transfer_config(
engine_id=engine_id,
ec_role="ec_consumer",
ec_connector_backend=config.ec_connector_backend,
ec_storage_path=config.ec_storage_path,
ec_extra_config=config.ec_extra_config,
)
# Set ECTransferConfig on engine args
config.engine_args.ec_transfer_config = ec_transfer_config
logger.info(f"Configured as ECConnector consumer with engine_id={engine_id}")
( (
engine_client, engine_client,
vllm_config, vllm_config,
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
from dynamo.vllm.multimodal_handlers.encode_worker_handler import EncodeWorkerHandler from dynamo.vllm.multimodal_handlers.encode_worker_handler import (
from dynamo.vllm.multimodal_handlers.processor_handler import ProcessorHandler EncodeWorkerHandler,
VLLMEncodeWorkerHandler,
)
from dynamo.vllm.multimodal_handlers.processor_handler import (
ECProcessorHandler,
ProcessorHandler,
)
from dynamo.vllm.multimodal_handlers.worker_handler import ( from dynamo.vllm.multimodal_handlers.worker_handler import (
MultimodalDecodeWorkerHandler, MultimodalDecodeWorkerHandler,
MultimodalPDWorkerHandler, MultimodalPDWorkerHandler,
...@@ -10,7 +16,9 @@ from dynamo.vllm.multimodal_handlers.worker_handler import ( ...@@ -10,7 +16,9 @@ from dynamo.vllm.multimodal_handlers.worker_handler import (
__all__ = [ __all__ = [
"EncodeWorkerHandler", "EncodeWorkerHandler",
"VLLMEncodeWorkerHandler",
"ProcessorHandler", "ProcessorHandler",
"MultimodalPDWorkerHandler", "MultimodalPDWorkerHandler",
"MultimodalDecodeWorkerHandler", "MultimodalDecodeWorkerHandler",
"ECProcessorHandler",
] ]
...@@ -2,10 +2,14 @@ ...@@ -2,10 +2,14 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import logging import logging
from typing import AsyncIterator import shutil
from typing import AsyncGenerator, AsyncIterator
from transformers import AutoImageProcessor from transformers import AutoImageProcessor
from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.inputs.data import TextPrompt
from vllm.multimodal.hasher import MultiModalHasher
from vllm.sampling_params import SamplingParams
import dynamo.nixl_connect as connect import dynamo.nixl_connect as connect
from dynamo.runtime import Client, DistributedRuntime from dynamo.runtime import Client, DistributedRuntime
...@@ -13,6 +17,8 @@ from dynamo.runtime import Client, DistributedRuntime ...@@ -13,6 +17,8 @@ from dynamo.runtime import Client, DistributedRuntime
from ..multimodal_utils import ( from ..multimodal_utils import (
ImageLoader, ImageLoader,
MyRequestOutput, MyRequestOutput,
VLLMNativeEncoderRequest,
VLLMNativeEncoderResponse,
encode_image_embeddings, encode_image_embeddings,
get_encoder_components, get_encoder_components,
load_vision_model, load_vision_model,
...@@ -156,3 +162,127 @@ class EncodeWorkerHandler: ...@@ -156,3 +162,127 @@ class EncodeWorkerHandler:
except Exception as e: except Exception as e:
logger.error(f"Error processing request {request_id}: {e}") logger.error(f"Error processing request {request_id}: {e}")
raise raise
class VLLMEncodeWorkerHandler:
"""
Handler for vLLM-native encoder worker using ECConnector.
"""
def __init__(self, runtime, component, engine_client, config):
"""
Initialize the handler.
Args:
runtime: Dynamo distributed runtime
component: Dynamo component instance
engine_client: vLLM AsyncLLM instance
config: Dynamo Config object with CLI arguments
"""
self.runtime = runtime
self.component = component
self.engine_client = engine_client
self.config = config
self.temp_dirs = []
self.image_loader = ImageLoader()
logger.info(
f"VLLMNativeEncoderWorkerHandler initialized with "
f"backend={config.ec_connector_backend}, "
f"storage_path={config.ec_storage_path}"
)
def add_temp_dir(self, temp_dir):
"""Add temporary directory for cleanup."""
if temp_dir:
self.temp_dirs.append(temp_dir)
async def generate(self, request, context) -> AsyncGenerator[str, None]:
"""
Process encoder request and trigger vLLM encoder execution.
Args:
request: VLLMNativeEncoderRequest with multimodal_input
context: Request context from Dynamo runtime
Yields:
JSON-encoded VLLMNativeEncoderResponse with mm_hash and connector metadata
"""
# Parse request
if not isinstance(request, VLLMNativeEncoderRequest):
if isinstance(request, str):
request = VLLMNativeEncoderRequest.model_validate_json(request)
else:
request = VLLMNativeEncoderRequest.model_validate(request)
# Load media (image/video/audio)
# TODO: Add support for video_url and audio
if request.multimodal_input.image_url:
media = await self.image_loader.load_image(
request.multimodal_input.image_url
)
media_key = "image"
else:
raise ValueError(
"No media URL provided. Specify image_url in multimodal_input."
)
# Compute mm_hash using vLLM's hasher
try:
mm_hash = MultiModalHasher.hash_kwargs(
model_id=self.config.model, **{media_key: media}
)
logger.debug(f"Computed mm_hash: {mm_hash}")
except Exception as e:
logger.error(f"Failed to compute mm_hash: {e}")
raise
try:
# Prompt can be a random string as the encoder is only interested in the multimodal data
prompt_dict = TextPrompt(
prompt="<image>", multi_modal_data={media_key: media}
)
gen = self.engine_client.generate(
prompt=prompt_dict,
sampling_params=SamplingParams(max_tokens=1, min_tokens=0),
request_id=request.request_id,
)
# Consume generator to trigger encoder execution
async for _ in gen:
pass
logger.info(
f"Encoder execution completed for request_id={request.request_id}"
)
except Exception as e:
logger.error(f"Encoder execution failed: {e}")
raise
# Return metadata for PD workers
response = VLLMNativeEncoderResponse(
request_id=request.request_id,
mm_hash=mm_hash,
modality=request.modality,
connector_metadata={
"ec_connector": self.config.ec_connector_backend,
"storage_path": self.config.ec_storage_path,
},
)
logger.debug(f"Returning response: {response}")
yield response.model_dump_json()
def cleanup(self):
"""Cleanup resources."""
logger.info("Cleaning up VLLMNativeEncoderWorkerHandler")
# Clean up temporary directories
for temp_dir in self.temp_dirs:
try:
shutil.rmtree(temp_dir, ignore_errors=True)
logger.debug(f"Cleaned up temp directory: {temp_dir}")
except Exception as e:
logger.warning(f"Failed to cleanup {temp_dir}: {e}")
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import asyncio
import json import json
import logging import logging
import uuid import uuid
from enum import Enum from enum import Enum
from typing import AsyncIterator, Union from typing import Any, AsyncIterator, Dict, List, Union
from transformers import AutoTokenizer from transformers import AutoTokenizer
from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.arg_utils import AsyncEngineArgs
...@@ -203,3 +204,276 @@ class ProcessorHandler(ProcessMixIn): ...@@ -203,3 +204,276 @@ class ProcessorHandler(ProcessMixIn):
break break
response = json.loads(response.lstrip("data: ")) response = json.loads(response.lstrip("data: "))
yield response yield response
class ECProcessorHandler(ProcessorHandler):
"""
Processor handler for ECConnector-based encoder
"""
def __init__(
self,
engine_args: AsyncEngineArgs,
encoder_worker_client: Client,
pd_worker_client: Client,
prompt_template: str,
):
"""
Initialize the ECConnector processor.
Args:
engine_args: vLLM engine arguments for model config
encoder_worker_client: Client for encoder worker endpoints
pd_worker_client: Client for PD worker endpoints
prompt_template: Multimodal prompt template
"""
# Initialize base class with encoder client
super().__init__(engine_args, encoder_worker_client, prompt_template)
# Store additional PD client for disaggregated architecture
self.encoder_client = encoder_worker_client
self.pd_client = pd_worker_client
logger.info("ECProcessorHandler initialized with disaggregated architecture")
@staticmethod
def _extract_multimodal_items(request_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Extract all multimodal items (images/videos) from the request messages.
Args:
request_data: The request dictionary
Returns:
List of multimodal content items
"""
items = []
messages = request_data.get("messages", [])
for msg in messages:
content = msg.get("content")
if not isinstance(content, list):
continue
for item in content:
item_type = item.get("type")
if item_type in ("image_url", "video_url"):
items.append(item)
return items
@staticmethod
def _create_encoder_request(
mm_item: Dict[str, Any],
model: str,
request_id: str,
) -> Dict[str, Any]:
# Create MultiModalInput from the item
multimodal_input = {}
modality = None
if mm_item.get("type") == "image_url":
multimodal_input["image_url"] = mm_item["image_url"]["url"]
modality = "image"
elif mm_item.get("type") == "video_url":
multimodal_input["video_url"] = mm_item["video_url"]["url"]
modality = "video"
else:
raise ValueError(f"Unsupported multimodal type: {mm_item.get('type')}")
return {
"request_id": request_id,
"multimodal_input": multimodal_input,
"modality": modality,
}
async def _encode_multimodal_items(
self,
mm_items: List[Dict[str, Any]],
model: str,
request_id: str,
) -> None:
"""
Send all multimodal items to encoder workers concurrently.
Each item is sent as a separate request to an encoder worker.
The encoder processes the item and stores embeddings to shared storage.
"""
if not mm_items:
logger.debug(f"[{request_id}] No multimodal items to encode")
return
logger.info(f"[{request_id}] Encoding {len(mm_items)} multimodal item(s)")
tasks = []
for idx, mm_item in enumerate(mm_items):
# Create unique request ID for each item
item_request_id = f"{request_id}_mm_{idx}"
# Build encoder request
encoder_request = self._create_encoder_request(
mm_item=mm_item,
model=model,
request_id=item_request_id,
)
# Create task for this encoder request
task = self._send_to_encoder(encoder_request, item_request_id)
tasks.append(task)
# Wait for all encoder requests to complete
try:
await asyncio.gather(*tasks)
logger.info(f"[{request_id}] All encoders completed successfully")
except Exception as e:
logger.error(f"[{request_id}] Encoder encoding failed: {e}")
raise
async def _send_to_encoder(
self,
encoder_request: Dict[str, Any],
request_id: str,
) -> None:
"""
Send a single request to an encoder worker and wait for completion.
"""
try:
# Convert to JSON
request_json = json.dumps(encoder_request)
# Send to encoder worker (round-robin)
response_stream = await self.encoder_client.round_robin(request_json)
# Consume the response stream
async for chunk in response_stream:
pass
logger.debug(f"[{request_id}] Encoder completed successfully")
except Exception as e:
logger.error(f"[{request_id}] Encoder request failed: {e}")
raise
async def generate(self, raw_request: MultiModalRequest, context):
"""
Main endpoint handler for chat completion requests with ECConnector.
"""
logger.debug(f"ECProcessor received request: {raw_request}")
if not isinstance(raw_request, MultiModalRequest):
raw_request = MultiModalRequest.model_validate(raw_request)
# Ensure the configured template includes the placeholder
template = self.prompt_template
if "<prompt>" not in template:
raise ValueError("prompt_template must contain '<prompt>' placeholder")
# Safely extract user text
user_text = None
for message in raw_request.messages:
for item in message.content:
if item.type == "text":
user_text = item.text
break
if not user_text:
raise ValueError("No text content found in request")
prompt = template.replace("<prompt>", user_text)
msg = {
"role": "user",
"content": prompt,
}
# Generate single request ID for entire flow
request_id = str(uuid.uuid4().hex)
# Create chat request for preprocessing
chat_request = ChatCompletionRequest(
model=raw_request.model,
messages=[msg],
stream=True,
max_tokens=raw_request.max_tokens,
temperature=raw_request.temperature,
request_id=request_id,
)
# Step 1: Extract multimodal input (needed for PD worker to generate mm_hash)
multimodal_input = MultiModalInput()
for message in raw_request.messages:
for item in message.content:
if item.type == "image_url":
multimodal_input.image_url = item.image_url.url
elif item.type == "video_url":
if multimodal_input.image_url is not None:
raise ValueError("Cannot provide both image and video URLs")
multimodal_input.video_url = item.video_url.url
if multimodal_input.image_url is None and multimodal_input.video_url is None:
raise ValueError("Either image URL or video URL is required")
# Step 2: Send multimodal items to encoder (ECConnector producer)
mm_items = self._extract_multimodal_items(raw_request.model_dump())
if mm_items:
logger.info(
f"[{request_id}] Encoding {len(mm_items)} multimodal item(s) via encoder..."
)
try:
await self._encode_multimodal_items(
mm_items=mm_items,
model=raw_request.model,
request_id=request_id,
)
except Exception as e:
logger.error(f"[{request_id}] Encoder processing failed: {e}")
error_response = {
"error": {
"message": f"Encoder processing failed: {str(e)}",
"type": "encoder_error",
"code": 500,
}
}
yield error_response
return
# Step 2: Preprocess request (parse chat, tokenize, create engine prompt)
logger.debug(f"[{request_id}] Preprocessing request...")
(
request,
conversation,
engine_prompt,
sampling_params,
) = await self._parse_raw_request(chat_request)
# Step 3: Create worker request for PD worker (WITH multimodal_input)
# PD worker needs multimodal_input to generate mm_hash and lookup EC cache
# vLLM will see the multimodal items, generate mm_hash, and load from cache
worker_request = vLLMMultimodalRequest(
engine_prompt=engine_prompt,
sampling_params=sampling_params,
request_id=request_id,
multimodal_input=multimodal_input, # ✓ Keep this so vLLM can generate mm_hash
)
logger.debug(
f"[{request_id}] Forwarding to PD worker (vLLM will load from ECConnector cache using mm_hash)..."
)
# Step 4: Send to PD worker (ECConnector consumer - will load from storage)
response_generator = await self.pd_client.round_robin(
worker_request.model_dump_json()
)
# Step 5: Generate and stream responses (reuse base class method)
output = self._generate_responses(response_generator, RequestType.CHAT)
async for response in await self._stream_response(
request, output, request_id, conversation
):
logger.debug(f"[{request_id}] Generated response: {type(response)}")
# Reconstruct OpenAI chat response
if response.startswith("data: [DONE]"):
break
response = json.loads(response.lstrip("data: "))
yield response
...@@ -148,11 +148,38 @@ class MultimodalPDWorkerHandler(BaseWorkerHandler): ...@@ -148,11 +148,38 @@ class MultimodalPDWorkerHandler(BaseWorkerHandler):
request = vLLMMultimodalRequest.model_validate(request) request = vLLMMultimodalRequest.model_validate(request)
logger.debug(f"Received PD request: {{ id: {request.request_id} }}.") logger.debug(f"Received PD request: {{ id: {request.request_id} }}.")
if ( # ECConnector consumer mode: vLLM loads embeddings automatically from disk
request.multimodal_input.image_url is None # We need to pass multimodal_input so vLLM can generate mm_hash and look up cache
if self.config.ec_consumer_mode:
logger.debug(
f"[{request.request_id}] ECConnector consumer mode: "
f"vLLM will load embeddings from cache using mm_hash"
)
# Use PIL image loading - vLLM will detect it's already in EC cache
# and load from disk instead of reprocessing
if request.multimodal_input and request.multimodal_input.image_url:
multi_modal_data = {
"image": await self.image_loader.load_image(
request.multimodal_input.image_url
)
}
elif request.multimodal_input and request.multimodal_input.video_url:
# For video, load as image placeholder (vLLM will use EC cache)
multi_modal_data = {
"image": await self.image_loader.load_image(
request.multimodal_input.video_url
)
}
else:
raise ValueError(
"ECConnector mode requires multimodal_input with image/video URL"
)
elif (
request.multimodal_input is not None
and request.multimodal_input.image_url is None
and request.multimodal_input.video_url is None and request.multimodal_input.video_url is None
): ):
# Process embeddings using the connector # Network transfer mode: receive embeddings via connector from encoder worker
# Create a descriptor based on the embedding shape. # Create a descriptor based on the embedding shape.
embeddings = torch.empty( embeddings = torch.empty(
request.embeddings_shape, request.embeddings_shape,
...@@ -184,15 +211,20 @@ class MultimodalPDWorkerHandler(BaseWorkerHandler): ...@@ -184,15 +211,20 @@ class MultimodalPDWorkerHandler(BaseWorkerHandler):
image_embeds=embeddings, image_embeds=embeddings,
image_grid_thw=request.image_grid_thw, image_grid_thw=request.image_grid_thw,
) )
else: elif request.multimodal_input is not None:
# Use PIL image instead of image embeddings # Native mode: Use PIL image instead of image embeddings
multi_modal_data = { multi_modal_data = {
"image": await self.image_loader.load_image( "image": await self.image_loader.load_image(
request.multimodal_input.image_url request.multimodal_input.image_url
) )
} }
else:
raise ValueError(
"Invalid request: multimodal_input is None but not in ec_consumer_mode"
)
# Remove the image features from the request as they are not required # Clear multimodal_input fields if present (not needed for engine)
if request.multimodal_input is not None:
request.multimodal_input.image_url = None request.multimodal_input.image_url = None
request.multimodal_input.video_url = None request.multimodal_input.video_url = None
request.serialized_request = None request.serialized_request = None
......
...@@ -21,6 +21,8 @@ from dynamo.vllm.multimodal_utils.protocol import ( ...@@ -21,6 +21,8 @@ from dynamo.vllm.multimodal_utils.protocol import (
MultiModalInput, MultiModalInput,
MultiModalRequest, MultiModalRequest,
MyRequestOutput, MyRequestOutput,
VLLMNativeEncoderRequest,
VLLMNativeEncoderResponse,
vLLMMultimodalRequest, vLLMMultimodalRequest,
) )
...@@ -39,4 +41,6 @@ __all__ = [ ...@@ -39,4 +41,6 @@ __all__ = [
"MultiModalRequest", "MultiModalRequest",
"MyRequestOutput", "MyRequestOutput",
"vLLMMultimodalRequest", "vLLMMultimodalRequest",
"VLLMNativeEncoderRequest",
"VLLMNativeEncoderResponse",
] ]
...@@ -13,10 +13,12 @@ ...@@ -13,10 +13,12 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
import logging import logging
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
import torch import torch
from vllm.config import ECTransferConfig
from .model import SupportedModels, is_model_supported, is_qwen_vl_model from .model import SupportedModels, is_model_supported, is_qwen_vl_model
...@@ -130,3 +132,51 @@ def get_encoder_components( ...@@ -130,3 +132,51 @@ def get_encoder_components(
else: else:
raise NotImplementedError(f"Model not supported: {model_name}") raise NotImplementedError(f"Model not supported: {model_name}")
def create_ec_transfer_config(
engine_id: str,
ec_role: str,
ec_connector_backend: str = "ECExampleConnector",
ec_storage_path: Optional[str] = None,
ec_extra_config: Optional[str] = None,
) -> ECTransferConfig:
"""
Create ECTransferConfig for vLLM encoder disaggregation.
Args:
engine_id: Unique identifier for this engine instance
ec_role: Role of this instance - "ec_producer" (encoder) or "ec_consumer" (PD worker)
ec_connector_backend: ECConnector implementation class name
ec_storage_path: Storage path for disk-based connectors
ec_extra_config: Additional connector config as JSON string
Returns:
ECTransferConfig configured for the specified role
"""
# Parse extra config if provided
extra_config: Dict[str, Any] = {}
if ec_extra_config:
try:
extra_config = json.loads(ec_extra_config)
logger.debug(f"Parsed ec_extra_config: {extra_config}")
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in --ec-extra-config: {e}")
# Add storage path to config if provided
if ec_storage_path:
extra_config["shared_storage_path"] = ec_storage_path
else:
raise ValueError("ec_storage_path is not provided")
logger.info(
f"Creating ECTransferConfig: engine_id={engine_id}, role={ec_role}, "
f"backend={ec_connector_backend}, config={extra_config}"
)
return ECTransferConfig(
engine_id=engine_id,
ec_role=ec_role,
ec_connector=ec_connector_backend,
ec_connector_extra_config=extra_config,
)
...@@ -150,6 +150,24 @@ class vLLMMultimodalRequest(vLLMGenerateRequest): ...@@ -150,6 +150,24 @@ class vLLMMultimodalRequest(vLLMGenerateRequest):
serialized_request: Optional[connect.RdmaMetadata] = None serialized_request: Optional[connect.RdmaMetadata] = None
class VLLMNativeEncoderRequest(BaseModel):
"""Request for vLLM-native encoder worker using ECConnector"""
request_id: str
multimodal_input: MultiModalInput
modality: Literal["image", "video", "audio"]
batch_items: Optional[List[MultiModalInput]] = None # For future batch processing
class VLLMNativeEncoderResponse(BaseModel):
"""Response from vLLM-native encoder worker (ECConnector mode)"""
request_id: str
mm_hash: str # vLLM's multimodal hash identifier
modality: str # "image", "video", "audio"
connector_metadata: dict[str, Any] # ECConnector config info for PD workers
class MyRequestOutput(BaseModel): class MyRequestOutput(BaseModel):
""" """
RequestOutput from vLLM is not serializable by default RequestOutput from vLLM is not serializable by default
......
...@@ -49,6 +49,7 @@ vLLM supports all multimodal deployment patterns. See [Architecture Patterns](in ...@@ -49,6 +49,7 @@ vLLM supports all multimodal deployment patterns. See [Architecture Patterns](in
| E/PD (Encode Separate) | ✅ | `agg_multimodal_epd.sh` | Separate encode worker | | E/PD (Encode Separate) | ✅ | `agg_multimodal_epd.sh` | Separate encode worker |
| E/P/D (Full Disaggregation) | ✅ | `disagg_multimodal_epd.sh` | All stages separate | | E/P/D (Full Disaggregation) | ✅ | `disagg_multimodal_epd.sh` | All stages separate |
| EP/D (Traditional Disaggregated) | ✅ | `disagg_multimodal_llama.sh` | For Llama 4 models | | EP/D (Traditional Disaggregated) | ✅ | `disagg_multimodal_llama.sh` | For Llama 4 models |
| E/PD (EC Connector) | ✅ | `agg_multimodal_ec_connector.sh` | vLLM-native encoder with ECConnector |
### Component Flags ### Component Flags
...@@ -60,6 +61,7 @@ vLLM supports all multimodal deployment patterns. See [Architecture Patterns](in ...@@ -60,6 +61,7 @@ vLLM supports all multimodal deployment patterns. See [Architecture Patterns](in
| Prefill Worker | `--multimodal-worker --is-prefill-worker` | Prefill only | | Prefill Worker | `--multimodal-worker --is-prefill-worker` | Prefill only |
| Decode Worker | `--multimodal-decode-worker` | Decode only | | Decode Worker | `--multimodal-decode-worker` | Decode only |
| Encode+Prefill Worker | `--multimodal-encode-prefill-worker --is-prefill-worker` | Combined (Llama 4) | | Encode+Prefill Worker | `--multimodal-encode-prefill-worker --is-prefill-worker` | Combined (Llama 4) |
| vLLM Native Encoder | `--vllm-native-encoder-worker` | vLLM-native encoding with ECConnector |
## Use the Latest Release ## Use the Latest Release
...@@ -172,6 +174,34 @@ bash launch/disagg_multimodal_epd.sh --model llava-hf/llava-1.5-7b-hf ...@@ -172,6 +174,34 @@ bash launch/disagg_multimodal_epd.sh --model llava-hf/llava-1.5-7b-hf
> [!NOTE] Disaggregation is currently only confirmed to work with LLaVA. Qwen2.5-VL is not confirmed to be supported. > [!NOTE] Disaggregation is currently only confirmed to work with LLaVA. Qwen2.5-VL is not confirmed to be supported.
## ECConnector Serving
ECConnector is vLLM's native connector for transferring multimodal embeddings via an Embedding Cache. The encoder worker acts as a **producer** (writes embeddings), while the PD worker acts as a **consumer** (reads embeddings).
**Workflow:**
```mermaid
flowchart LR
HTTP --> processor[EC Processor]
processor --image_url--> encoder[vLLM Native Encoder<br/>Producer]
encoder --writes--> cache[(Embedding Cache)]
cache --reads--> pd[PD Worker<br/>Consumer]
pd --> processor
processor --> HTTP
```
**Launch:**
```bash
cd $DYNAMO_HOME/examples/backends/vllm
bash launch/agg_multimodal_ec_connector.sh --model llava-hf/llava-1.5-7b-hf
# Custom storage path for Embedding Cache
bash launch/agg_multimodal_ec_connector.sh --ec-storage-path /shared/encoder-cache
```
**Client:** Same as [E/PD Serving](#epd-serving-encode-separate)
## Llama 4 Serving ## Llama 4 Serving
The Llama 4 model family is natively multimodal. Unlike LLaVA, they do not directly consume image embeddings as input (see the [vLLM support matrix](https://docs.vllm.ai/en/latest/models/supported_models.html#text-generation_1)). Therefore, the encoder worker is not used and encoding is done alongside prefill. The Llama 4 model family is natively multimodal. Unlike LLaVA, they do not directly consume image embeddings as input (see the [vLLM support matrix](https://docs.vllm.ai/en/latest/models/supported_models.html#text-generation_1)). Therefore, the encoder worker is not used and encoding is done alongside prefill.
...@@ -431,6 +461,7 @@ bash launch/audio_disagg.sh ...@@ -431,6 +461,7 @@ bash launch/audio_disagg.sh
| E/PD (Encode Separate) | `agg_multimodal_epd.sh` | Yes | Encoder → PD (embeddings) | | E/PD (Encode Separate) | `agg_multimodal_epd.sh` | Yes | Encoder → PD (embeddings) |
| E/P/D (Full Disaggregation) | `disagg_multimodal_epd.sh` | Yes | Encoder → Prefill (embeddings), Prefill → Decode (KV cache) | | E/P/D (Full Disaggregation) | `disagg_multimodal_epd.sh` | Yes | Encoder → Prefill (embeddings), Prefill → Decode (KV cache) |
| EP/D (Llama 4) | `disagg_multimodal_llama.sh` | Yes | Prefill → Decode (KV cache) | | EP/D (Llama 4) | `disagg_multimodal_llama.sh` | Yes | Prefill → Decode (KV cache) |
| E/PD (EC Connector) | `agg_multimodal_ec_connector.sh` | No | ECConnector via Embedding Cache |
## ModelInput Types and Registration ## ModelInput Types and Registration
...@@ -487,5 +518,5 @@ For a complete list of multimodal models supported by vLLM, see [vLLM Supported ...@@ -487,5 +518,5 @@ For a complete list of multimodal models supported by vLLM, see [vLLM Supported
| `components/src/dynamo/vllm/main.py` | Worker initialization and setup | | `components/src/dynamo/vllm/main.py` | Worker initialization and setup |
| `components/src/dynamo/vllm/args.py` | Command-line argument parsing | | `components/src/dynamo/vllm/args.py` | Command-line argument parsing |
| `components/src/dynamo/vllm/multimodal_handlers/processor_handler.py` | Processor implementation | | `components/src/dynamo/vllm/multimodal_handlers/processor_handler.py` | Processor implementation |
| `components/src/dynamo/vllm/multimodal_handlers/encode_worker_handler.py` | Encode worker implementation | | `components/src/dynamo/vllm/multimodal_handlers/encode_worker_handler.py` | Encode worker implementations (custom and vLLM-native) |
| `components/src/dynamo/vllm/multimodal_handlers/worker_handler.py` | PD/Prefill/Decode worker implementation | | `components/src/dynamo/vllm/multimodal_handlers/worker_handler.py` | PD/Prefill/Decode worker implementation |
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
set -e
trap 'echo Cleaning up...; kill 0' EXIT
# Default values
MODEL_NAME="llava-hf/llava-1.5-7b-hf"
PROMPT_TEMPLATE="USER: <image>\n<prompt> ASSISTANT:"
PROVIDED_PROMPT_TEMPLATE=""
EC_STORAGE_PATH="/tmp/dynamo_ec_cache"
EC_CONNECTOR_BACKEND="ECExampleConnector"
# Parse command line arguments
while [[ $# -gt 0 ]]; do
case $1 in
--model)
MODEL_NAME=$2
shift 2
;;
--prompt-template)
PROVIDED_PROMPT_TEMPLATE=$2
shift 2
;;
--ec-storage-path)
EC_STORAGE_PATH=$2
shift 2
;;
--ec-connector-backend)
EC_CONNECTOR_BACKEND=$2
shift 2
;;
-h|--help)
echo "Usage: $0 [OPTIONS]"
echo ""
echo "Aggregated multimodal serving with vLLM-native encoder (ECConnector mode)"
echo ""
echo "This script launches:"
echo " - Frontend server"
echo " - Processor component"
echo " - vLLM-native encoder worker (producer using ECConnector)"
echo " - Multimodal worker (consumer using ECConnector, aggregated P+D)"
echo ""
echo "Options:"
echo " --model <model_name> Specify the VLM model to use (default: $MODEL_NAME)"
echo " --prompt-template <template> Specify the multi-modal prompt template to use"
echo " --ec-storage-path <path> Path for ECConnector storage (default: $EC_STORAGE_PATH)"
echo " --ec-connector-backend <backend> ECConnector backend class (default: $EC_CONNECTOR_BACKEND)"
echo " -h, --help Show this help message"
echo ""
echo "Examples:"
echo " $0"
echo " $0 --model llava-hf/llava-1.5-7b-hf"
echo " $0 --ec-storage-path /shared/encoder-cache"
echo ""
exit 0
;;
*)
echo "Unknown option: $1"
echo "Use --help for usage information"
exit 1
;;
esac
done
# Set PROMPT_TEMPLATE based on the MODEL_NAME if not provided
if [[ -n "$PROVIDED_PROMPT_TEMPLATE" ]]; then
PROMPT_TEMPLATE="$PROVIDED_PROMPT_TEMPLATE"
elif [[ "$MODEL_NAME" == "meta-llama/Llama-3.2-11B-Vision-Instruct" ]]; then
PROMPT_TEMPLATE="<|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\n<|image|><prompt><|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n"
elif [[ "$MODEL_NAME" == "llava-hf/llava-1.5-7b-hf" ]]; then
PROMPT_TEMPLATE="USER: <image>\n<prompt> ASSISTANT:"
elif [[ "$MODEL_NAME" == "microsoft/Phi-3.5-vision-instruct" ]]; then
PROMPT_TEMPLATE="<|user|>\n<|image_1|>\n<prompt><|end|>\n<|assistant|>\n"
elif [[ "$MODEL_NAME" == "Qwen/Qwen2.5-VL-7B-Instruct" ]]; then
PROMPT_TEMPLATE="<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n<|im_start|>user\n<|vision_start|><|image_pad|><|vision_end|><prompt><|im_end|>\n<|im_start|>assistant\n"
else
echo "No multi-modal prompt template is defined for the model: $MODEL_NAME"
echo "Please provide a prompt template using --prompt-template option."
exit 1
fi
# Create storage directory if it doesn't exist
mkdir -p "$EC_STORAGE_PATH"
echo "=================================================="
echo "Aggregated Multimodal Serving (vLLM-Native Encoder with ECConnector)"
echo "=================================================="
echo "Model: $MODEL_NAME"
echo "Prompt Template: $PROMPT_TEMPLATE"
echo "ECConnector Backend: $EC_CONNECTOR_BACKEND"
echo "Storage Path: $EC_STORAGE_PATH"
echo "=================================================="
# Start frontend
echo "Starting frontend..."
python -m dynamo.frontend &
# Start EC Processor (simple processor for ECConnector mode)
echo "Starting EC Processor..."
python -m dynamo.vllm \
--ec-processor \
--enable-multimodal \
--model $MODEL_NAME \
--mm-prompt-template "$PROMPT_TEMPLATE" &
# Start vLLM-native encoder worker (ECConnector producer)
echo "Starting vLLM-native encoder worker (ECConnector producer) on GPU 0..."
CUDA_VISIBLE_DEVICES=0 python -m dynamo.vllm \
--vllm-native-encoder-worker \
--enable-multimodal \
--model $MODEL_NAME \
--ec-connector-backend $EC_CONNECTOR_BACKEND \
--ec-storage-path $EC_STORAGE_PATH \
--connector none \
--enforce-eager \
--max-num-batched-tokens 114688 \
--no-enable-prefix-caching &
# Start aggregated multimodal worker (ECConnector consumer, P+D combined)
echo "Starting aggregated multimodal worker (ECConnector consumer) on GPU 1..."
CUDA_VISIBLE_DEVICES=1 python -m dynamo.vllm \
--multimodal-worker \
--enable-multimodal \
--model $MODEL_NAME \
--ec-consumer-mode \
--ec-connector-backend $EC_CONNECTOR_BACKEND \
--ec-storage-path $EC_STORAGE_PATH \
--enable-mm-embeds \
--connector none \
--enforce-eager &
# Wait for all background processes to complete
wait
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment