Unverified Commit 7bbacce1 authored by Alec's avatar Alec Committed by GitHub
Browse files

feat: default kv-events-config to empty (align with vLLM defaults) (#6404)


Signed-off-by: default avataralec-flowers <aflowers@nvidia.com>
Co-authored-by: default avatarClaude Opus 4.6 <noreply@anthropic.com>
parent d6c49779
......@@ -98,7 +98,6 @@ mkdir -p $LOG_DIR
for ((i=0; i<GPUS_PER_NODE; i++)); do
dp_rank=$((i + NODE_RANK * GPUS_PER_NODE))
CUDA_VISIBLE_DEVICES=$i \
DYN_VLLM_KV_EVENT_PORT=$((20080 + i)) \
VLLM_NIXL_SIDE_CHANNEL_PORT=$((20096 + i)) \
VLLM_ALL2ALL_BACKEND="deepep_low_latency" \
VLLM_USE_DEEP_GEMM=1 \
......@@ -112,7 +111,8 @@ for ((i=0; i<GPUS_PER_NODE; i++)); do
--data-parallel-address $MASTER_ADDR \
--data-parallel-rpc-port 13345 \
--gpu-memory-utilization 0.91 \
--enforce-eager 2>&1 | tee $LOG_DIR/dsr1_dep_${dp_rank}.log &
--enforce-eager \
--kv-events-config "{\"publisher\":\"zmq\",\"topic\":\"kv-events\",\"endpoint\":\"tcp://*:$((20080 + i))\",\"enable_kv_cache_events\":true}" 2>&1 | tee $LOG_DIR/dsr1_dep_${dp_rank}.log &
done
echo "All workers starting. (press Ctrl+C to stop)..."
......
......@@ -81,9 +81,9 @@ Leave this terminal running - it will show Decode Worker logs.
```bash
export DYN_LOG=debug # Increase log verbosity to see disaggregation
DYN_VLLM_KV_EVENT_PORT=20081 \
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 \
CUDA_VISIBLE_DEVICES=1 python -m dynamo.vllm --model Qwen/Qwen3-0.6B --is-prefill-worker
CUDA_VISIBLE_DEVICES=1 python -m dynamo.vllm --model Qwen/Qwen3-0.6B --is-prefill-worker \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20081","enable_kv_cache_events":true}'
```
This starts a specialized prefill worker that:
......
......@@ -42,4 +42,4 @@ spec:
- /bin/sh
- -c
args:
- python3 -m dynamo.vllm --model Qwen/Qwen2.5-1.5B-Instruct
- python3 -m dynamo.vllm --model Qwen/Qwen2.5-1.5B-Instruct --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20080","enable_kv_cache_events":true}'
......@@ -69,4 +69,4 @@ spec:
- /bin/sh
- -c
args:
- python3 -m dynamo.vllm --model meta-llama/Llama-3.1-70B-Instruct -tp 4 --is-prefill-worker
- python3 -m dynamo.vllm --model meta-llama/Llama-3.1-70B-Instruct -tp 4 --is-prefill-worker --kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20080","enable_kv_cache_events":true}'
......@@ -108,6 +108,8 @@ spec:
- '{"rope_scaling":{"rope_type":"yarn","factor":4.0,"original_max_position_embeddings":32768},"max_position_embeddings":131072}'
- --max-model-len
- '131072'
- --kv-events-config
- '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:20080","enable_kv_cache_events":true}'
command:
- python3
- -m
......
......@@ -9,6 +9,7 @@ Test Execution Times (Last Run: 2025-12-09):
- Total: 161.65s (0:02:41)
"""
import json
import logging
import os
import shutil
......@@ -96,10 +97,22 @@ class DynamoWorkerProcess(ManagedProcess):
env["DYN_SYSTEM_PORT"] = str(system_port)
env["DYN_HTTP_PORT"] = str(frontend_port)
# Set KV event port and NIXL side channel port only for prefill worker
# Set KV events config and NIXL side channel port only for prefill worker
# to avoid conflicts with decode worker
if is_prefill:
env["DYN_VLLM_KV_EVENT_PORT"] = "20082" # TODO: use dynamic port allocation
command.extend(
[
"--kv-events-config",
json.dumps(
{
"publisher": "zmq",
"topic": "kv-events",
"endpoint": "tcp://*:20082",
"enable_kv_cache_events": True,
}
),
]
)
env[
"VLLM_NIXL_SIDE_CHANNEL_PORT"
] = "5601" # TODO: use dynamic port allocation
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import json
import logging
import os
import shutil
......@@ -69,7 +70,19 @@ class DynamoWorkerProcess(ManagedProcess):
env["DYN_SYSTEM_PORT"] = port
if is_prefill:
env["DYN_VLLM_KV_EVENT_PORT"] = "20082"
command.extend(
[
"--kv-events-config",
json.dumps(
{
"publisher": "zmq",
"topic": "kv-events",
"endpoint": "tcp://*:20082",
"enable_kv_cache_events": True,
}
),
]
)
env["VLLM_NIXL_SIDE_CHANNEL_PORT"] = "5601"
# Set log directory based on worker type
......
......@@ -3,6 +3,7 @@
"""vLLM-specific utilities for GPU Memory Service tests."""
import json
import logging
import os
import shutil
......@@ -34,6 +35,14 @@ class VLLMWithGMSProcess(ManagedProcess):
log_dir = f"{request.node.name}_{engine_id}"
shutil.rmtree(log_dir, ignore_errors=True)
kv_events_cfg = json.dumps(
{
"publisher": "zmq",
"topic": "kv-events",
"endpoint": f"tcp://*:{kv_event_port}",
"enable_kv_cache_events": True,
}
)
super().__init__(
command=[
"python3",
......@@ -46,12 +55,13 @@ class VLLMWithGMSProcess(ManagedProcess):
"--enable-sleep-mode",
"--gpu-memory-utilization",
"0.8",
"--kv-events-config",
kv_events_cfg,
],
env={
**os.environ,
"DYN_LOG": "debug",
"DYN_SYSTEM_PORT": str(system_port),
"DYN_VLLM_KV_EVENT_PORT": str(kv_event_port),
"VLLM_NIXL_SIDE_CHANNEL_PORT": str(nixl_port),
},
health_check_urls=[
......
......@@ -9,6 +9,7 @@ Test Execution Times (Last Run: 2026-01-09):
- test_request_migration_vllm_decode: ~115s
"""
import json
import logging
import os
import shutil
......@@ -107,25 +108,32 @@ class DynamoWorkerProcess(ManagedProcess):
elif is_prefill is False:
command.append("--is-decode-worker")
# Aggregated mode and prefill workers publish KV events
if is_prefill is not False:
kv_event_port = f"2008{worker_id[-1]}" # TODO: use dynamic port allocation
command.extend(
[
"--kv-events-config",
json.dumps(
{
"publisher": "zmq",
"topic": "kv-events",
"endpoint": f"tcp://*:{kv_event_port}",
"enable_kv_cache_events": True,
}
),
]
)
# Set environment variables
env = os.environ.copy()
env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane")
# Set KV event and NIXL ports based on worker mode
# All workers need unique NIXL side channel ports for KV transfer
env[
"VLLM_NIXL_SIDE_CHANNEL_PORT"
] = f"560{worker_id[-1]}" # TODO: use dynamic port allocation
if is_prefill is False:
# Decode workers don't publish KV events
env.pop("DYN_VLLM_KV_EVENT_PORT", None)
else:
# Aggregated mode and prefill workers publish KV events
env[
"DYN_VLLM_KV_EVENT_PORT"
] = f"2008{worker_id[-1]}" # TODO: use dynamic port allocation
env["DYN_LOG"] = "debug"
# Disable canary health check - these tests expect full control over requests
# sent to the workers where canary health check intermittently sends dummy
......
......@@ -5,6 +5,7 @@
# - GPU-1 subset (`-m "gpu_1 and not gpu_2"`): 130.43s total for 3 tests.
# These tests load a real model and can be slow/flaky when GPU resources are contended,
# so we set explicit pytest timeouts to fail fast on hangs (see per-test markers below).
import json
import logging
import os
import time
......@@ -219,13 +220,23 @@ class VLLMProcess:
kv_event_port = self._kv_event_ports[worker_idx]
nixl_port = self._nixl_ports[worker_idx]
# Pass KV events config explicitly via CLI
kv_events_cfg = json.dumps(
{
"publisher": "zmq",
"topic": "kv-events",
"endpoint": f"tcp://*:{kv_event_port}",
"enable_kv_cache_events": True,
}
)
command.extend(["--kv-events-config", kv_events_cfg])
env = os.environ.copy() # Copy parent environment
env_vars = {
"CUDA_VISIBLE_DEVICES": gpu_device,
"DYN_NAMESPACE": self.namespace,
"DYN_REQUEST_PLANE": request_plane,
"DYN_SYSTEM_PORT": str(system_port),
"DYN_VLLM_KV_EVENT_PORT": str(kv_event_port),
"VLLM_NIXL_SIDE_CHANNEL_PORT": str(nixl_port),
"PYTHONHASHSEED": "0", # for deterministic event id's
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment