Unverified Commit 8e236375 authored by KrishnanPrash's avatar KrishnanPrash Committed by GitHub
Browse files

fix: restore E/P/D multimodal disagg serving and add Qwen3-VL-30B-A3B support (#6533)


Signed-off-by: default avatarKrishnan Prashanth <kprashanth@nvidia.com>
Signed-off-by: default avatarKrishnanPrash <140860868+KrishnanPrash@users.noreply.github.com>
parent 5c64ffc3
...@@ -1041,8 +1041,8 @@ class BaseWorkerHandler(ABC): ...@@ -1041,8 +1041,8 @@ class BaseWorkerHandler(ABC):
prompt_tokens + completion_tokens if prompt_tokens is not None else None prompt_tokens + completion_tokens if prompt_tokens is not None else None
), ),
"prompt_tokens_details": ( "prompt_tokens_details": (
{"cached_tokens": request_output.num_cached_tokens} {"cached_tokens": num_cached}
if request_output.num_cached_tokens if (num_cached := getattr(request_output, "num_cached_tokens", None))
else None else None
), ),
} }
......
...@@ -199,22 +199,6 @@ class MultimodalPDWorkerHandler(BaseWorkerHandler): ...@@ -199,22 +199,6 @@ class MultimodalPDWorkerHandler(BaseWorkerHandler):
logger.debug(f"Prepared multimodal data size: {len(multi_modal_data['image'])}") logger.debug(f"Prepared multimodal data size: {len(multi_modal_data['image'])}")
logger.debug("Multimodal data keys: %s", list(multi_modal_data.keys())) logger.debug("Multimodal data keys: %s", list(multi_modal_data.keys()))
# ── Response serialization ───────────────────────────────────────
@staticmethod
def _serialize_response(response) -> str:
"""Build a JSON-serialized ``MyRequestOutput`` from an engine response."""
return MyRequestOutput(
request_id=response.request_id,
prompt=response.prompt,
prompt_token_ids=response.prompt_token_ids,
prompt_logprobs=response.prompt_logprobs,
outputs=response.outputs,
finished=response.finished,
metrics=response.metrics,
kv_transfer_params=response.kv_transfer_params,
).model_dump_json()
@staticmethod @staticmethod
def _format_engine_output( def _format_engine_output(
response, num_output_tokens_so_far: int response, num_output_tokens_so_far: int
...@@ -346,13 +330,16 @@ class MultimodalPDWorkerHandler(BaseWorkerHandler): ...@@ -346,13 +330,16 @@ class MultimodalPDWorkerHandler(BaseWorkerHandler):
f"— ensure the same adapter is loaded on the decode worker." f"— ensure the same adapter is loaded on the decode worker."
) )
num_output_tokens_so_far = 0
async for ( async for (
decode_response decode_response
) in await self.decode_worker_client.round_robin( # type: ignore[union-attr] ) in await self.decode_worker_client.round_robin( # type: ignore[union-attr]
request.model_dump_json() request.model_dump_json()
): ):
output = MyRequestOutput.model_validate_json(decode_response.data()) # type: ignore[attr-defined] output = MyRequestOutput.model_validate_json(decode_response.data()) # type: ignore[attr-defined]
yield self._serialize_response(output) yield self._format_engine_output(output, num_output_tokens_so_far)
if output.outputs:
num_output_tokens_so_far = len(output.outputs[0].token_ids)
# ── Public entry point ─────────────────────────────────────────── # ── Public entry point ───────────────────────────────────────────
......
...@@ -38,6 +38,8 @@ class SupportedModels: ...@@ -38,6 +38,8 @@ class SupportedModels:
QWEN_2_5_VL_3B = "Qwen/Qwen2.5-VL-3B-Instruct" QWEN_2_5_VL_3B = "Qwen/Qwen2.5-VL-3B-Instruct"
QWEN_2_5_VL_7B = "Qwen/Qwen2.5-VL-7B-Instruct" QWEN_2_5_VL_7B = "Qwen/Qwen2.5-VL-7B-Instruct"
QWEN_2_5_VL_32B = "Qwen/Qwen2.5-VL-32B-Instruct" QWEN_2_5_VL_32B = "Qwen/Qwen2.5-VL-32B-Instruct"
QWEN_3_VL_2B = "Qwen/Qwen3-VL-2B-Instruct"
QWEN_3_VL_30B_A3B = "Qwen/Qwen3-VL-30B-A3B-Instruct"
QWEN_3_VL_30B_A3B_FP8 = "Qwen/Qwen3-VL-30B-A3B-Instruct-FP8" QWEN_3_VL_30B_A3B_FP8 = "Qwen/Qwen3-VL-30B-A3B-Instruct-FP8"
QWEN_3_VL_8B_FP8 = "Qwen/Qwen3-VL-8B-Instruct-FP8" QWEN_3_VL_8B_FP8 = "Qwen/Qwen3-VL-8B-Instruct-FP8"
LLAVA_NEXT_VIDEO_7B = "llava-hf/LLaVA-NeXT-Video-7B-hf" LLAVA_NEXT_VIDEO_7B = "llava-hf/LLaVA-NeXT-Video-7B-hf"
...@@ -118,6 +120,8 @@ QWEN_VL_MODELS = [ ...@@ -118,6 +120,8 @@ QWEN_VL_MODELS = [
SupportedModels.QWEN_2_5_VL_3B, SupportedModels.QWEN_2_5_VL_3B,
SupportedModels.QWEN_2_5_VL_7B, SupportedModels.QWEN_2_5_VL_7B,
SupportedModels.QWEN_2_5_VL_32B, SupportedModels.QWEN_2_5_VL_32B,
SupportedModels.QWEN_3_VL_2B,
SupportedModels.QWEN_3_VL_30B_A3B,
SupportedModels.QWEN_3_VL_30B_A3B_FP8, SupportedModels.QWEN_3_VL_30B_A3B_FP8,
SupportedModels.QWEN_3_VL_8B_FP8, SupportedModels.QWEN_3_VL_8B_FP8,
] ]
......
...@@ -15,7 +15,6 @@ from dynamo.common.memory.multimodal_embedding_cache_manager import ( ...@@ -15,7 +15,6 @@ from dynamo.common.memory.multimodal_embedding_cache_manager import (
) )
from dynamo.vllm.multimodal_handlers import multimodal_pd_worker_handler as mod from dynamo.vllm.multimodal_handlers import multimodal_pd_worker_handler as mod
from dynamo.vllm.multimodal_utils.protocol import ( from dynamo.vllm.multimodal_utils.protocol import (
MyRequestOutput,
PatchedTokensPrompt, PatchedTokensPrompt,
vLLMMultimodalRequest, vLLMMultimodalRequest,
) )
...@@ -105,7 +104,7 @@ def _make_vllm_request(request_id: str = "req-1") -> vLLMMultimodalRequest: ...@@ -105,7 +104,7 @@ def _make_vllm_request(request_id: str = "req-1") -> vLLMMultimodalRequest:
def _make_engine_response(request_id: str = "req-1", finished: bool = True): def _make_engine_response(request_id: str = "req-1", finished: bool = True):
"""Create a mock engine response with the fields _serialize_response needs.""" """Create a mock engine response with the fields _format_engine_output needs."""
resp = MagicMock() resp = MagicMock()
resp.request_id = request_id resp.request_id = request_id
resp.prompt = "test" resp.prompt = "test"
...@@ -274,16 +273,28 @@ class TestGenerateDisagg: ...@@ -274,16 +273,28 @@ class TestGenerateDisagg:
handler.engine_client.generate = fake_generate handler.engine_client.generate = fake_generate
decode_output = MyRequestOutput( decode_json = json.dumps(
request_id="req-1", {
prompt="test", "request_id": "req-1",
prompt_token_ids=[1, 2, 3], "prompt": "test",
outputs=[], "prompt_token_ids": [1, 2, 3],
finished=True, "outputs": [
kv_transfer_params={"block_ids": [0, 1]}, {
"index": 0,
"text": "",
"token_ids": [42],
"cumulative_logprob": None,
"logprobs": None,
"finish_reason": "stop",
"stop_reason": None,
}
],
"finished": True,
"kv_transfer_params": {"block_ids": [0, 1]},
}
) )
decode_resp = MagicMock() decode_resp = MagicMock()
decode_resp.data.return_value = decode_output.model_dump_json() decode_resp.data.return_value = decode_json
async def fake_round_robin(payload): async def fake_round_robin(payload):
async def _stream(): async def _stream():
...@@ -299,6 +310,6 @@ class TestGenerateDisagg: ...@@ -299,6 +310,6 @@ class TestGenerateDisagg:
chunks.append(chunk) chunks.append(chunk)
assert len(chunks) == 1 assert len(chunks) == 1
parsed = json.loads(chunks[0]) assert isinstance(chunks[0], dict)
assert parsed["request_id"] == "req-1" assert chunks[0]["token_ids"] == [42]
assert parsed["finished"] is True assert chunks[0]["finish_reason"] == "stop"
...@@ -187,19 +187,19 @@ class WorkerFactory: ...@@ -187,19 +187,19 @@ class WorkerFactory:
if kv_publisher: if kv_publisher:
handler.kv_publisher = kv_publisher handler.kv_publisher = kv_publisher
# Register model with the frontend so it can route requests if not config.multimodal_decode_worker:
model_type = parse_endpoint_types(config.endpoint_types) model_type = parse_endpoint_types(config.endpoint_types)
model_input = ( model_input = (
ModelInput.Text if config.use_vllm_tokenizer else ModelInput.Tokens ModelInput.Text if config.use_vllm_tokenizer else ModelInput.Tokens
) )
await self.register_vllm_model( await self.register_vllm_model(
model_input, model_input,
model_type, model_type,
generate_endpoint, generate_endpoint,
config, config,
engine_client, engine_client,
vllm_config, vllm_config,
) )
metrics_labels = [("model", config.served_model_name or config.model)] metrics_labels = [("model", config.served_model_name or config.model)]
try: try:
......
...@@ -71,7 +71,7 @@ VLLM_NIXL_SIDE_CHANNEL_PORT=20097 CUDA_VISIBLE_DEVICES=$DYN_ENCODE_WORKER_GPU py ...@@ -71,7 +71,7 @@ VLLM_NIXL_SIDE_CHANNEL_PORT=20097 CUDA_VISIBLE_DEVICES=$DYN_ENCODE_WORKER_GPU py
# Start prefill worker (also handles encode routing via --route-to-encoder) # Start prefill worker (also handles encode routing via --route-to-encoder)
echo "Starting prefill worker on GPU $DYN_PREFILL_WORKER_GPU (GPU mem: $DYN_PREFILL_GPU_MEM)..." echo "Starting prefill worker on GPU $DYN_PREFILL_WORKER_GPU (GPU mem: $DYN_PREFILL_GPU_MEM)..."
VLLM_NIXL_SIDE_CHANNEL_PORT=20098 \ VLLM_NIXL_SIDE_CHANNEL_PORT=20098 \
CUDA_VISIBLE_DEVICES=$DYN_PREFILL_WORKER_GPU python -m dynamo.vllm --route-to-encoder --disaggregation-mode prefill --enable-multimodal --enable-mm-embeds --model $MODEL_NAME --gpu-memory-utilization $DYN_PREFILL_GPU_MEM $EXTRA_ARGS --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20081"}' & CUDA_VISIBLE_DEVICES=$DYN_PREFILL_WORKER_GPU python -m dynamo.vllm --multimodal-worker --route-to-encoder --disaggregation-mode prefill --enable-multimodal --enable-mm-embeds --model $MODEL_NAME --gpu-memory-utilization $DYN_PREFILL_GPU_MEM $EXTRA_ARGS --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20081"}' &
# Start decode worker # Start decode worker
echo "Starting decode worker on GPU $DYN_DECODE_WORKER_GPU (GPU mem: $DYN_DECODE_GPU_MEM)..." echo "Starting decode worker on GPU $DYN_DECODE_WORKER_GPU (GPU mem: $DYN_DECODE_GPU_MEM)..."
......
...@@ -276,13 +276,13 @@ vllm_configs = { ...@@ -276,13 +276,13 @@ vllm_configs = {
completion_payload_default(), completion_payload_default(),
], ],
), ),
"multimodal_disagg_qwen2vl_2b_e_pd": VLLMConfig( "multimodal_disagg_qwen3vl_2b_e_pd": VLLMConfig(
name="multimodal_disagg_qwen2vl_2b_e_pd", name="multimodal_disagg_qwen3vl_2b_e_pd",
directory=vllm_dir, directory=vllm_dir,
script_name="disagg_multimodal_e_pd.sh", script_name="disagg_multimodal_e_pd.sh",
marks=[pytest.mark.gpu_1, pytest.mark.pre_merge], marks=[pytest.mark.gpu_1, pytest.mark.pre_merge],
model="Qwen/Qwen2-VL-2B-Instruct", model="Qwen/Qwen3-VL-2B-Instruct",
script_args=["--model", "Qwen/Qwen2-VL-2B-Instruct", "--single-gpu"], script_args=["--model", "Qwen/Qwen3-VL-2B-Instruct", "--single-gpu"],
request_payloads=[ request_payloads=[
chat_payload( chat_payload(
[ [
...@@ -335,13 +335,21 @@ vllm_configs = { ...@@ -335,13 +335,21 @@ vllm_configs = {
) )
], ],
), ),
"multimodal_agg_llava_epd": VLLMConfig( "multimodal_disagg_qwen3vl_2b_epd": VLLMConfig(
name="multimodal_agg_llava_epd", name="multimodal_disagg_qwen3vl_2b_epd",
directory=vllm_dir, directory=vllm_dir,
script_name="agg_multimodal_epd.sh", script_name="disagg_multimodal_epd.sh",
marks=[pytest.mark.gpu_2, pytest.mark.nightly], marks=[pytest.mark.gpu_2, pytest.mark.pre_merge],
model="llava-hf/llava-1.5-7b-hf", model="Qwen/Qwen3-VL-2B-Instruct",
script_args=["--model", "llava-hf/llava-1.5-7b-hf"], script_args=["--model", "Qwen/Qwen3-VL-2B-Instruct"],
env={
"DYN_ENCODE_WORKER_GPU": "0",
"DYN_PREFILL_WORKER_GPU": "0",
"DYN_DECODE_WORKER_GPU": "1",
"DYN_ENCODE_GPU_MEM": "0.4",
"DYN_PREFILL_GPU_MEM": "0.4",
"DYN_DECODE_GPU_MEM": "0.85",
},
request_payloads=[ request_payloads=[
chat_payload( chat_payload(
[ [
...@@ -355,39 +363,12 @@ vllm_configs = { ...@@ -355,39 +363,12 @@ vllm_configs = {
}, },
], ],
repeat_count=1, repeat_count=1,
expected_response=["purple"], expected_response=["green"],
temperature=0.0, temperature=0.0,
max_tokens=100, max_tokens=100,
) )
], ],
), ),
"multimodal_agg_qwen_epd": VLLMConfig(
name="multimodal_agg_qwen_epd",
directory=vllm_dir,
script_name="agg_multimodal_epd.sh",
marks=[pytest.mark.gpu_2, pytest.mark.nightly],
model="Qwen/Qwen2.5-VL-7B-Instruct",
delayed_start=0,
script_args=["--model", "Qwen/Qwen2.5-VL-7B-Instruct"],
timeout=360,
request_payloads=[
chat_payload(
[
{
"type": "text",
"text": "What colors are in the following image? Respond only with the colors.",
},
{
"type": "image_url",
"image_url": {"url": MULTIMODAL_IMG_URL},
},
],
repeat_count=1,
expected_response=["purple"],
max_tokens=100,
)
],
),
"multimodal_agg_qwen": VLLMConfig( "multimodal_agg_qwen": VLLMConfig(
name="multimodal_agg_qwen", name="multimodal_agg_qwen",
directory=vllm_dir, directory=vllm_dir,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment