Unverified Commit c3984bc2 authored by Indrajit Bhosale's avatar Indrajit Bhosale Committed by GitHub
Browse files

fix: Move trtllm multimodal worker to use ModelInput.Token (#4373)


Signed-off-by: default avatarIndrajit Bhosale <iamindrajitb@gmail.com>
parent 4d3269cd
......@@ -202,7 +202,10 @@ class EncodeHelper:
Response dictionary with NIXL metadata and embeddings info, or error response
"""
# Load embeddings first to get the actual shape
messages = request.get("messages", [])
# Extract messages from extra_args (set by Rust preprocessor for multimodal) or fall back to direct field
messages = request.get("extra_args", {}).get(
"messages", request.get("messages", [])
)
_, _, embedding_paths = multimodal_processor.extract_prompt_and_media(messages)
if not embedding_paths:
......
......@@ -261,7 +261,6 @@ async def init(runtime: DistributedRuntime, config: Config):
if modality == "multimodal":
engine_args["skip_tokenizer_init"] = False
model_input = ModelInput.Text
model_config = AutoConfig.from_pretrained(
config.model_path, trust_remote_code=True
)
......
......@@ -63,6 +63,9 @@ class MultimodalRequestProcessor:
def is_url(self, path: str) -> bool:
"""Check if a path is a URL."""
parsed = urlparse(path)
# file:// URLs have scheme but no netloc, treat them as local paths
if parsed.scheme == "file":
return False
return bool(parsed.scheme and parsed.netloc)
def load_tensor_from_path_or_url(self, path: str) -> torch.Tensor:
......@@ -97,7 +100,10 @@ class MultimodalRequestProcessor:
)
raise RuntimeError("Failed to load tensor")
resolved_path = Path(path).resolve()
# Strip file:// prefix if present
local_path = path.removeprefix("file://")
resolved_path = Path(local_path).resolve()
allowed_path = Path(self.allowed_local_media_path).resolve()
# Secure path validation: Check if the resolved path is actually within allowed directory
......@@ -151,21 +157,11 @@ class MultimodalRequestProcessor:
self, request: Dict, embeddings: Any
) -> Optional[Any]:
"""Process OpenAI request and return with multimodal data."""
# Normalize the request to handle OpenAI format
if "stop_conditions" not in request:
request["stop_conditions"] = {}
if "max_tokens" in request and "max_tokens" not in request["stop_conditions"]:
request["stop_conditions"]["max_tokens"] = request.pop("max_tokens")
if "sampling_options" not in request:
request["sampling_options"] = {}
if (
"temperature" in request
and "temperature" not in request["sampling_options"]
):
request["sampling_options"]["temperature"] = request.pop("temperature")
messages = request.get("messages", [])
# Extract messages - check extra_args first (from Rust preprocessor for multimodal)
# Fall back to direct messages field for backward compatibility
messages = request.get("extra_args", {}).get(
"messages", request.get("messages", [])
)
text_prompt, image_urls, embedding_paths = self.extract_prompt_and_media(
messages
)
......@@ -241,19 +237,3 @@ class MultimodalRequestProcessor:
"object": "chat.completion.chunk",
"choices": [choice],
}
def get_stop_response(self, request_id: str, model_name: str) -> Dict[str, Any]:
"""Creates the final stop response chunk for multimodal streaming."""
final_choice = {
"index": 0,
"delta": {},
"finish_reason": "stop",
}
return {
"id": request_id,
"model": model_name,
"created": int(time.time()),
"object": "chat.completion.chunk",
"choices": [final_choice],
"finish_reason": "stop",
}
......@@ -251,7 +251,6 @@ class HandlerBase:
)
request_id = request.get("id") or request.get("request_id", "unknown-id")
model_name = request.get("model", "unknown_model")
# Optional test-only logits processing (enable with DYNAMO_ENABLE_TEST_LOGITS_PROCESSOR=1)
if os.getenv("DYNAMO_ENABLE_TEST_LOGITS_PROCESSOR") == "1":
......@@ -277,18 +276,6 @@ class HandlerBase:
self.publisher.start()
self.first_generation = False
# Upon completion, send a final chunk with "stop" as the finish reason.
# This signals to the client that the stream has ended.
if (
res.finished
and self.disaggregation_mode != DisaggregationMode.PREFILL
):
if self.multimodal_processor:
final_out = self.multimodal_processor.get_stop_response(
request_id, model_name
)
yield final_out
# If we are not done generating, but there are no outputs, return an error
if not res.outputs and not res.finished:
yield {"finish_reason": "error", "token_ids": []}
......@@ -298,12 +285,9 @@ class HandlerBase:
# The engine returns all tokens generated so far. We must calculate the new
# tokens generated in this iteration to create the "delta".
next_total_toks = len(output.token_ids)
if self.multimodal_processor:
out = self.multimodal_processor.create_response_chunk(
output, num_output_tokens_so_far, request_id, model_name
)
else:
out = {"token_ids": output.token_ids[num_output_tokens_so_far:]}
if output.finish_reason:
out["finish_reason"] = output.finish_reason
if output.stop_reason:
......
......@@ -111,8 +111,12 @@ class PrefillHandler(HandlerBase):
embeddings_tensor = None
if self.multimodal_processor:
# Extract messages from extra_args (set by Rust preprocessor) or fall back to direct field
messages = request.get("extra_args", {}).get(
"messages", request.get("messages", [])
)
_, _, embedding_paths = self.multimodal_processor.extract_prompt_and_media(
request.get("messages", [])
messages
)
if embedding_paths:
if self.encode_client and self.connector:
......
......@@ -23,10 +23,14 @@ To enable it build the dynamo container with the `--tensorrtllm-commit` flag, fo
```bash
cd $DYNAMO_HOME/examples/backends/trtllm
# Launch 3-worker EPD flow with NIXL
# Launch 3-worker EPD flow with NIXL.
./launch/epd_disagg.sh
```
## Pre-requsites
This script is specifically designed to work on 8 node H200 and `Llama-4-Maverick-17B-128E-Instruct` model with assumption that you already have a model specific embedding file ready.
## Configuration
The EPD flow uses a dedicated **Encode Worker** that runs separately from the Prefill and Decode workers. The `ENCODE_ENDPOINT` environment variable specifies how the Prefill worker communicates with the Encode worker:
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
tensor_parallel_size: 4
moe_expert_parallel_size: 1
enable_attention_dp: false
max_num_tokens: 8192
max_batch_size: 16
trust_remote_code: true
backend: pytorch
enable_chunked_prefill: true
disable_overlap_scheduler: false
kv_cache_config:
free_gpu_memory_fraction: 0.30
enable_block_reuse: false
cache_transceiver_config:
backend: DEFAULT
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
tensor_parallel_size: 1
moe_expert_parallel_size: 1
enable_attention_dp: false
max_num_tokens: 8192
max_batch_size: 16
trust_remote_code: true
backend: pytorch
enable_chunked_prefill: true
disable_overlap_scheduler: false
kv_cache_config:
free_gpu_memory_fraction: 0.30
enable_block_reuse: false
cache_transceiver_config:
backend: DEFAULT
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
tensor_parallel_size: 4
moe_expert_parallel_size: 1
enable_attention_dp: false
max_num_tokens: 8192
max_batch_size: 16
trust_remote_code: true
backend: pytorch
enable_chunked_prefill: true
# Overlap scheduler not currently supported in prefill only workers.
disable_overlap_scheduler: true
kv_cache_config:
free_gpu_memory_fraction: 0.30
enable_block_reuse: false
cache_transceiver_config:
backend: DEFAULT
\ No newline at end of file
......@@ -4,14 +4,15 @@
# Environment variables with defaults
export DYNAMO_HOME=${DYNAMO_HOME:-"/workspace"}
export MODEL_PATH=${MODEL_PATH:-"Qwen/Qwen2-VL-7B-Instruct"}
export SERVED_MODEL_NAME=${SERVED_MODEL_NAME:-"Qwen/Qwen2-VL-7B-Instruct"}
export PREFILL_ENGINE_ARGS=${PREFILL_ENGINE_ARGS:-"$DYNAMO_HOME/examples/backends/trtllm/engine_configs/qwen2-vl-7b-instruct/prefill.yaml"}
export DECODE_ENGINE_ARGS=${DECODE_ENGINE_ARGS:-"$DYNAMO_HOME/examples/backends/trtllm/engine_configs/qwen2-vl-7b-instruct/decode.yaml"}
export ENCODE_ENGINE_ARGS=${ENCODE_ENGINE_ARGS:-"$DYNAMO_HOME/examples/backends/trtllm/engine_configs/qwen2-vl-7b-instruct/encode.yaml"}
export PREFILL_CUDA_VISIBLE_DEVICES=${PREFILL_CUDA_VISIBLE_DEVICES:-"0"}
export DECODE_CUDA_VISIBLE_DEVICES=${DECODE_CUDA_VISIBLE_DEVICES:-"1"}
export ENCODE_CUDA_VISIBLE_DEVICES=${ENCODE_CUDA_VISIBLE_DEVICES:-"2"}
export MODEL_PATH=${MODEL_PATH:-"meta-llama/Llama-4-Scout-17B-16E-Instruct"}
export SERVED_MODEL_NAME=${SERVED_MODEL_NAME:-"meta-llama/Llama-4-Scout-17B-16E-Instruct"}
export PREFILL_ENGINE_ARGS=${PREFILL_ENGINE_ARGS:-"$DYNAMO_HOME/examples/backends/trtllm/engine_configs/llama4/multimodal/llama4-Scout/prefill.yaml"}
export DECODE_ENGINE_ARGS=${DECODE_ENGINE_ARGS:-"$DYNAMO_HOME/examples/backends/trtllm/engine_configs/llama4/multimodal/llama4-Scout/decode.yaml"}
# Placeholder for now, this is NO-OP as encoder just loads embeddings path, done to maintain consistency with other workers adn future api enhancements
export ENCODE_ENGINE_ARGS=${ENCODE_ENGINE_ARGS:-"$DYNAMO_HOME/examples/backends/trtllm/engine_configs/llama4/multimodal/llama4-Scout/encode.yaml"}
export PREFILL_CUDA_VISIBLE_DEVICES=${PREFILL_CUDA_VISIBLE_DEVICES:-"0,1,2,3"}
export DECODE_CUDA_VISIBLE_DEVICES=${DECODE_CUDA_VISIBLE_DEVICES:-"4,5,6,7"}
export ENCODE_CUDA_VISIBLE_DEVICES=${ENCODE_CUDA_VISIBLE_DEVICES:-"0"}
export ENCODE_ENDPOINT=${ENCODE_ENDPOINT:-"dyn://dynamo.tensorrt_llm_encode.generate"}
export MODALITY=${MODALITY:-"multimodal"}
export ALLOWED_LOCAL_MEDIA_PATH=${ALLOWED_LOCAL_MEDIA_PATH:-"/tmp"}
......
......@@ -339,6 +339,14 @@ impl OpenAIPreprocessor {
if !media_map.is_empty() {
builder.multi_modal_data(Some(media_map));
// Preserve original messages in extra_args for multimodal workers that need them
// (e.g., TRT-LLM multimodal processor needs raw messages for proper tokenization)
let messages_json = serde_json::to_value(&messages)?;
let extra_args = serde_json::json!({
"messages": messages_json
});
builder.extra_args(Some(extra_args));
}
Ok(())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment