Unverified Commit 6067c59d authored by Kris Hung's avatar Kris Hung Committed by GitHub
Browse files

feat: Extend the KV Event Consolidator to support TensorRT-LLM (#4533)


Signed-off-by: default avatarkrishung5 <krish@nvidia.com>
parent ca264beb
......@@ -39,7 +39,14 @@ import dynamo.nixl_connect as nixl_connect
from dynamo.common.config_dump import dump_config
from dynamo.common.utils.endpoint_types import parse_endpoint_types
from dynamo.common.utils.prometheus import register_engine_metrics_callback
from dynamo.llm import ModelInput, ModelRuntimeConfig, ModelType, register_llm
from dynamo.llm import (
ModelInput,
ModelRuntimeConfig,
ModelType,
ZmqKvEventPublisher,
ZmqKvEventPublisherConfig,
register_llm,
)
from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.trtllm.engine import Backend, TensorRTLLMEngine, get_llm_engine
......@@ -235,6 +242,39 @@ async def init(runtime: DistributedRuntime, config: Config):
)
sys.exit(1)
trtllm_zmq_bind_endpoint = None # Endpoint for TensorRT-LLM to bind and publish
consolidator_output_endpoint = (
None # Endpoint where consolidator publishes (workers subscribe to this)
)
try:
from kvbm.trtllm_integration.consolidator_config import (
get_consolidator_endpoints,
should_enable_consolidator,
)
if should_enable_consolidator(arg_map):
# get_consolidator_endpoints returns (trtllm_bind_endpoint, output_bind_endpoint, output_connect_endpoint)
consolidator_endpoints = get_consolidator_endpoints()
trtllm_zmq_bind_endpoint = consolidator_endpoints[0] # TRTLLM bind endpoint
consolidator_output_endpoint = consolidator_endpoints[
1
] # Consolidator output bind endpoint (for KVBM connector)
consolidator_output_connect_endpoint = consolidator_endpoints[
2
] # Consolidator output connect endpoint (for worker publisher)
except ImportError:
# kvbm package is not installed
logging.info(
"kvbm package not installed - skipping KV event consolidator setup."
)
except Exception as e:
logging.error(
f"Failed to set up consolidator endpoints: {e}. "
"Continuing without KV event consolidation.",
exc_info=True,
)
logging.info(f"TensorRT-LLM engine args: {arg_map}")
engine_args = arg_map
......@@ -400,6 +440,26 @@ async def init(runtime: DistributedRuntime, config: Config):
# Use model_path as fallback if served_model_name is not provided
model_name_for_metrics = config.served_model_name or config.model_path
metrics_labels = [("model", model_name_for_metrics)]
# Create worker-side publisher for consolidated events if consolidator is enabled
# This subscribes to consolidator's ZMQ output and publishes to NATS with worker_id
consolidator_publisher = None
if consolidator_output_endpoint:
# Use the connect endpoint directly (already provided by get_consolidator_endpoints)
consolidator_config = ZmqKvEventPublisherConfig(
worker_id=int(endpoint.connection_id()),
kv_block_size=config.kv_block_size,
zmq_endpoint=consolidator_output_connect_endpoint,
zmq_topic="", # Empty topic = all topics
)
consolidator_publisher = ZmqKvEventPublisher(
component, consolidator_config
)
logging.info(
f"Created worker-side publisher for consolidated events: "
f"subscribing to {consolidator_output_connect_endpoint}, worker_id={endpoint.connection_id()}"
)
async with get_publisher(
component,
engine,
......@@ -407,6 +467,7 @@ async def init(runtime: DistributedRuntime, config: Config):
int(endpoint.connection_id()),
config.kv_block_size,
metrics_labels,
zmq_endpoint=trtllm_zmq_bind_endpoint,
) as publisher:
handler_config.publisher = publisher
handler = RequestHandlerFactory().get_request_handler(handler_config)
......@@ -415,6 +476,10 @@ async def init(runtime: DistributedRuntime, config: Config):
metrics_labels=metrics_labels,
health_check_payload=health_check_payload,
)
# Shutdown consolidator publisher if it was created
if consolidator_publisher:
consolidator_publisher.shutdown()
else:
handler = RequestHandlerFactory().get_request_handler(handler_config)
await endpoint.serve_endpoint(
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
TensorRT-LLM KV Event Publisher Module
This module contains the Publisher class that retrieves KV cache events from TensorRT-LLM
and publishes them either to ZMQ (for consolidator) or NATS (direct to router).
Key Components:
- ZmqKvEventPublisher: Pure Python ZMQ PUBLISHER that publishes TensorRT-LLM KV events
to ZMQ (so the consolidator can subscribe). This is different from the ZmqKvEventPublisher
in dynamo.llm, which is a Rust-based ZMQ SUBSCRIBER that subscribes from consolidator
and publishes to NATS.
- Publisher: Main class that coordinates event publishing (ZMQ or NATS) and metrics publishing.
Event Flow:
- With Consolidator: Engine → ZmqKvEventPublisher (ZMQ PUB) → Consolidator → ZmqKvEventPublisher (dynamo.llm, ZMQ SUB) → NATS → Router
- Without Consolidator: Engine → KvEventPublisher (NATS PUB) → Router
"""
import asyncio
import concurrent.futures
import logging
import threading
import time
import traceback
import weakref
from contextlib import asynccontextmanager
from queue import Queue
from typing import Awaitable, Callable, Optional, Union
import msgpack
import zmq
from dynamo.llm import (
ForwardPassMetrics,
KvEventPublisher,
......@@ -34,6 +56,128 @@ def _to_signed_i64(value: int | None) -> int | None:
return value
class ZmqKvEventPublisher:
"""
Pure Python ZMQ PUBLISHER for TensorRT-LLM KV events.
This class publishes TensorRT-LLM's KV cache events to ZMQ so that the consolidator
can subscribe to them. This is different from the ZmqKvEventPublisher in dynamo.llm,
which is a Rust-based ZMQ SUBSCRIBER that subscribes from the consolidator's ZMQ
output and publishes to NATS.
Event Format: [timestamp, [events], data_parallel_rank]
Message Format: multipart ZMQ message [topic, sequence, payload] where payload is
msgpack-serialized batch.
Usage:
Used by Publisher class when consolidator is enabled (zmq_endpoint provided).
Publishes events from TensorRT-LLM engine to ZMQ for consolidator to consume.
"""
def __init__(self, zmq_endpoint: str, kv_block_size: int, topic: str = ""):
"""
Initialize ZMQ publisher.
Args:
zmq_endpoint: ZMQ endpoint to bind to (e.g., "tcp://*:20081")
kv_block_size: Size of KV cache blocks in tokens
topic: ZMQ topic to publish on (empty string for all topics)
"""
self.zmq_endpoint = zmq_endpoint
self.kv_block_size = kv_block_size
self.topic = topic
self.ctx = zmq.Context()
self.socket = self.ctx.socket(zmq.PUB)
self.socket.bind(zmq_endpoint)
self.sequence = 0
self.data_parallel_rank = 0 # TensorRT-LLM doesn't use DP for now
logging.info(
f"TensorRT-LLM: ZMQ KV event publisher initialized - bound to {zmq_endpoint} "
f"with topic '{topic}', kv_block_size={kv_block_size}"
)
def publish_stored(
self,
event_id: int,
token_ids: list[int],
num_block_tokens: list[int],
block_hashes: list[int],
lora_id: int = 0,
parent_hash: Optional[int] = None,
):
"""Publish a BlockStored event."""
# Convert block hashes to signed i64 format
block_hashes_signed = [_to_signed_i64(h) for h in block_hashes]
parent_hash_signed = (
_to_signed_i64(parent_hash) if parent_hash is not None else None
)
# Create event in the same format as vLLM's ZmqEventPublisher:
# All blocks should have the same size (kv_block_size)
event = {
"type": "BlockStored",
"block_hashes": block_hashes_signed,
"parent_block_hash": parent_hash_signed,
"token_ids": token_ids,
"block_size": self.kv_block_size,
"lora_id": lora_id if lora_id != 0 else None,
}
self._publish_event(event)
def publish_removed(self, event_id: int, block_hashes: list[int]):
"""Publish a BlockRemoved event."""
# Convert block hashes to signed i64 format (vLLM compatibility)
block_hashes_signed = [_to_signed_i64(h) for h in block_hashes]
event = {
"type": "BlockRemoved",
"block_hashes": block_hashes_signed,
}
self._publish_event(event)
def publish_all_cleared(self):
"""Publish an AllBlocksCleared event."""
event = {"type": "AllBlocksCleared"}
self._publish_event(event)
def _publish_event(self, event: dict):
"""Publish a single event to ZMQ in vLLM batch format."""
try:
# Create batch in vLLM format: [timestamp, [events], data_parallel_rank]
timestamp = time.time()
batch = [timestamp, [event], self.data_parallel_rank]
event_type = event.get("type", "Unknown")
logging.debug(
f"TensorRT-LLM: ZMQ publisher sending {event_type} event to {self.zmq_endpoint}"
)
# Serialize with msgpack (vLLM uses msgpack/rmp_serde compatible format)
payload = msgpack.packb(batch, use_bin_type=True)
# Create multipart message: [topic, sequence, payload]
# Format matches what consolidator expects: 3 frames [topic, sequence, payload]
sequence_bytes = self.sequence.to_bytes(8, byteorder="big")
self.sequence += 1
# Send multipart message (blocking send to ensure delivery)
# Topic is empty string for "all topics" (vLLM compatibility)
self.socket.send_multipart(
[self.topic.encode(), sequence_bytes, payload], flags=0
)
except Exception as e:
logging.error(f"Failed to publish ZMQ event: {e}", exc_info=True)
def shutdown(self):
"""Shutdown the ZMQ publisher."""
if self.socket:
self.socket.close()
if self.ctx:
self.ctx.term()
logging.info("ZMQ KV event publisher shut down")
class ManagedThread(threading.Thread):
"""
A thread that runs a task and handles errors.
......@@ -108,11 +252,30 @@ class ManagedThread(threading.Thread):
class Publisher:
"""
A class to retrieve stats and kv cache events from TRTLLM engine and publish them to the metrics and events publishers.
Main publisher class for TensorRT-LLM KV events and metrics.
Retrieves KV cache events and stats from TensorRT-LLM engine and publishes them:
- KV Events: Routes to either ZMQ (if consolidator enabled) or NATS (if no consolidator)
- Metrics: Always publishes to NATS via WorkerMetricsPublisher
Publisher Selection Logic:
- If zmq_endpoint provided: Uses ZmqKvEventPublisher (ZMQ PUB) → Consolidator → NATS
- If zmq_endpoint None: Uses KvEventPublisher (NATS PUB) → Router directly
Note: The ZmqKvEventPublisher used here is the pure Python ZMQ publisher defined
in this module, not the Rust-based ZmqKvEventPublisher from dynamo.llm (which is
used in main.py as the worker-side subscriber from consolidator to NATS).
"""
def __init__(
self, component, engine, kv_listener, worker_id, kv_block_size, metrics_labels
self,
component,
engine,
kv_listener,
worker_id,
kv_block_size,
metrics_labels,
zmq_endpoint: Optional[str] = None,
):
self.component = component
self.engine = engine
......@@ -130,14 +293,28 @@ class Publisher:
# Needed by the events and metrics publishers
self.metrics_publisher = None
self.kv_event_publisher = None
self.publish_kv_cache_events_thread = None
self.publish_stats_thread = None
self.zmq_kv_event_publisher = None # ZMQ publisher for consolidator
self.publish_kv_cache_events_thread: Optional[ManagedThread] = None
self.publish_stats_thread: Optional[ManagedThread] = None
# A set to store the block hash of partial block (i.e. block containing less than kv_block_size tokens) hashes.
# It is used to prevent sending remove event to kv router since partial blocks are not stored.
self.partial_block_hashes = set()
self.partial_block_hashes: set[int] = set()
self.error_queue: Queue = Queue()
self._stop_event = threading.Event()
# Initialize ZMQ publisher if endpoint is provided (consolidator enabled)
if zmq_endpoint:
logging.info(
f"TensorRT-LLM: Initializing ZMQ KV event publisher with endpoint={zmq_endpoint}"
)
self.zmq_kv_event_publisher = ZmqKvEventPublisher(
zmq_endpoint, self.kv_block_size
)
else:
logging.info(
"TensorRT-LLM: ZMQ endpoint not provided, ZMQ publisher will not be initialized"
)
async def _create_metrics_publisher_endpoint(self):
logging.debug("Creating metrics publisher endpoint")
if self.metrics_publisher is None:
......@@ -157,9 +334,24 @@ class Publisher:
)
# Setup the kv cache events publisher
self.kv_event_publisher = KvEventPublisher(
self.kv_listener, self.worker_id, self.kv_block_size
)
# Publisher selection based on consolidator configuration:
# - With consolidator: Use ZmqKvEventPublisher (this module) → ZMQ → Consolidator → NATS → Router
# - Without consolidator: Use KvEventPublisher → NATS → Router (direct)
# Note: The worker-side ZmqKvEventPublisher (from dynamo.llm) that subscribes from
# consolidator and publishes to NATS is created separately in main.py, not here.
if self.zmq_kv_event_publisher:
logging.info(
"KV Event Consolidator enabled - using ZMQ publisher only. "
"Consolidator will publish consolidated events to NATS."
)
self.kv_event_publisher = None
else:
# No consolidator: use NATS publisher (router subscribes directly)
self.kv_event_publisher = KvEventPublisher(
self.kv_listener, self.worker_id, self.kv_block_size, dp_rank=0
)
# Always initialize the thread - it routes to either ZMQ or NATS publisher
self._init_publish_kv_cache_events_thread()
def _init_publish_metrics_thread(self):
......@@ -212,10 +404,7 @@ class Publisher:
)
def _init_publish_kv_cache_events_thread(self):
if self.kv_event_publisher is None:
logging.error("KV event publisher not initialized!")
return
# The _publish_kv_cache_events_task will route to the appropriate publisher
# Prepare threads for publishing kv cache events but don't start them yet.
# TRTLLM needs to start generating tokens first before kv cache events
# can be retrieved.
......@@ -288,13 +477,15 @@ class Publisher:
async def _publish_kv_cache_events_task(self):
"""
Publish kv cache events to the events publisher.
Routes to ZMQ (if kv event consolidation is enabled) or NATS (if no kv event consolidation).
"""
if self.engine is None:
logging.error("LLM engine not initialized!")
return
if self.kv_event_publisher is None:
logging.error("KV event publisher not initialized!")
# Check that at least one publisher is available
if self.kv_event_publisher is None and self.zmq_kv_event_publisher is None:
logging.error("No KV event publisher initialized (neither NATS nor ZMQ)!")
return
events = self.engine.llm.get_kv_cache_events_async(timeout=5)
......@@ -309,9 +500,9 @@ class Publisher:
if data["type"] == "stored":
self.processing_initial_created_events = False
parent_hash = _to_signed_i64(data["parent_hash"])
token_ids = []
num_block_tokens = []
block_hashes = []
token_ids: list[int] = []
num_block_tokens: list[int] = []
block_hashes: list[int] = []
for block in data["blocks"]:
token_num_in_block = len(block["tokens"])
block_hash = _to_signed_i64(block["block_hash"])
......@@ -320,6 +511,11 @@ class Publisher:
f"Block {block_hash} contains {token_num_in_block} tokens, which is greater than kv_block_size {self.kv_block_size}"
)
return
if block_hash is None:
logging.warning(
f"Skipping block with None hash containing {token_num_in_block} tokens"
)
continue
if token_num_in_block < self.kv_block_size:
logging.debug(
f"Early stop when block {block_hash} containing {token_num_in_block} tokens not equal to kv_block_size {self.kv_block_size}"
......@@ -339,31 +535,56 @@ class Publisher:
logging.debug(
f"publish stored event: event_id: {event_id}, token_ids: {token_ids}, num_block_tokens: {num_block_tokens}, block_hashes: {block_hashes}, lora_id: {lora_id}, parent_hash: {parent_hash}"
)
self.kv_event_publisher.publish_stored(
event_id,
token_ids,
num_block_tokens,
block_hashes,
lora_id,
parent_hash,
)
# Publish to ZMQ if consolidator is enabled, otherwise publish to NATS
if self.zmq_kv_event_publisher:
# Consolidator enabled: publish to ZMQ only
self.zmq_kv_event_publisher.publish_stored(
event_id,
token_ids,
num_block_tokens,
block_hashes,
lora_id,
parent_hash,
)
elif self.kv_event_publisher:
# No consolidator: publish to NATS (router subscribes directly)
self.kv_event_publisher.publish_stored(
event_id,
token_ids,
num_block_tokens,
block_hashes,
lora_id,
parent_hash,
)
elif data["type"] == "removed":
self.processing_initial_created_events = False
block_hashes = []
removed_block_hashes: list[int] = []
for block_hash in data["block_hashes"]:
block_hash = _to_signed_i64(block_hash)
if block_hash is None:
continue
if block_hash in self.partial_block_hashes:
logging.debug(
f"Skipping removing block hash {block_hash} since it is a partial block"
)
self.partial_block_hashes.remove(block_hash)
continue
block_hashes.append(block_hash)
removed_block_hashes.append(block_hash)
logging.debug(
f"publish removed event: event_id: {event_id}, block_hashes: {block_hashes}"
f"publish removed event: event_id: {event_id}, block_hashes: {removed_block_hashes}"
)
self.kv_event_publisher.publish_removed(event_id, block_hashes)
# Publish to ZMQ if consolidator is enabled, otherwise publish to NATS
if self.zmq_kv_event_publisher:
# Consolidator enabled: publish to ZMQ only
self.zmq_kv_event_publisher.publish_removed(
event_id, removed_block_hashes
)
elif self.kv_event_publisher:
# No consolidator: publish to NATS (router subscribes directly)
self.kv_event_publisher.publish_removed(
event_id, removed_block_hashes
)
elif data["type"] == "created" and self.processing_initial_created_events:
self.update_max_window_size(event)
......@@ -414,6 +635,10 @@ class Publisher:
if self.publish_kv_cache_events_thread.is_alive():
logging.warning("KV cache events thread did not stop within timeout")
# Shutdown ZMQ publisher if it exists
if self.zmq_kv_event_publisher:
self.zmq_kv_event_publisher.shutdown()
def update_max_window_size(self, event):
if "window_size" in event:
window_size = event["window_size"]
......@@ -453,10 +678,22 @@ class Publisher:
@asynccontextmanager
async def get_publisher(
component, engine, kv_listener, worker_id, kv_block_size, metrics_labels
component,
engine,
kv_listener,
worker_id,
kv_block_size,
metrics_labels,
zmq_endpoint: Optional[str] = None,
):
publisher = Publisher(
component, engine, kv_listener, worker_id, kv_block_size, metrics_labels
component,
engine,
kv_listener,
worker_id,
kv_block_size,
metrics_labels,
zmq_endpoint=zmq_endpoint,
)
try:
publisher.initialize()
......
......@@ -25,6 +25,7 @@ kr8s==0.20.13
kubernetes==32.0.1
kubernetes_asyncio<=32.1.1 # May vary by platform
matplotlib==3.10.7
msgpack==1.1.2
msgspec==0.19.0
mypy==1.18.2
nvidia-ml-py<=13.580.65 # NVIDIA/CUDA related, may vary by driver version
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
import os
from typing import List, Optional
from kvbm import KvbmLeader
from kvbm.trtllm_integration.consolidator_config import is_truthy
from kvbm.trtllm_integration.rust import KvbmRequest
from kvbm.trtllm_integration.rust import KvConnectorLeader as RustKvConnectorLeader
from kvbm.trtllm_integration.rust import SchedulerOutput as RustSchedulerOutput
......@@ -16,13 +18,19 @@ from tensorrt_llm._torch.pyexecutor.kv_cache_connector import (
from tensorrt_llm.bindings.internal.batch_manager import LlmRequest
from tensorrt_llm.llmapi.llm_args import TorchLlmArgs
logger = logging.getLogger(__name__)
DistributedRuntime = None
if is_dyn_runtime_enabled():
from dynamo.runtime import DistributedRuntime
class DynamoKVBMConnectorLeader(KvCacheConnectorScheduler):
def __init__(self, llm_args: TorchLlmArgs):
def __init__(
self,
llm_args: TorchLlmArgs,
consolidator_trtllm_endpoint: Optional[str] = None,
):
super().__init__(llm_args)
drt: Optional[object] = None
......@@ -39,9 +47,63 @@ class DynamoKVBMConnectorLeader(KvCacheConnectorScheduler):
# Set bytes_per_block to 0, because we will retrieve the actual value from the worker side.
leader = KvbmLeader(world_size, drt=self.drt)
# Check if consolidator is enabled first
consolidator_enabled = is_truthy(
os.getenv("DYN_KVBM_KV_EVENTS_ENABLE_CONSOLIDATOR", "true")
)
trtllm_ep = None
consolidator_output_ep = None
if consolidator_enabled:
# Get consolidator endpoint from environment variable
# DYN_KVBM_TRTLLM_ZMQ_PORT contains just the port number (e.g., "20081")
zmq_port = os.getenv("DYN_KVBM_TRTLLM_ZMQ_PORT")
if zmq_port:
try:
port_num = int(zmq_port)
trtllm_ep = f"tcp://127.0.0.1:{port_num}"
# Calculate consolidator output endpoint
# Derive from KVBM leader port (default 56001) + 1000 offset
kvbm_pub_port_str = os.getenv(
"DYN_KVBM_LEADER_ZMQ_PUB_PORT", "56001"
)
kvbm_pub_port = int(kvbm_pub_port_str)
# Use 1000 as the offset. This needs to be aligned with the offset used in the consolidator config.
consolidator_port_offset = 1000
output_port = kvbm_pub_port + consolidator_port_offset
consolidator_output_ep = f"tcp://0.0.0.0:{output_port}"
logger.info(
f"KV Event Consolidator: Using ZMQ port from DYN_KVBM_TRTLLM_ZMQ_PORT - trtllm={trtllm_ep}, output={consolidator_output_ep} (derived from KVBM port {kvbm_pub_port})"
)
except ValueError as e:
logger.error(
f"KV Event Consolidator: Invalid port value - {e}. Consolidator will not be enabled."
)
trtllm_ep = None
consolidator_output_ep = None
else:
logger.error(
"KV Event Consolidator: No ZMQ port found - consolidator will not be enabled. "
"Set this environment variable before running TensorRT-LLM:\n"
" export DYN_KVBM_TRTLLM_ZMQ_PORT=20081"
)
trtllm_ep = None
consolidator_output_ep = None
else:
logger.info(
"KV Event Consolidator disabled via DYN_KVBM_KV_EVENTS_ENABLE_CONSOLIDATOR"
)
print(f"KvConnectorLeader initialized with rank: {mappings.rank}")
self._connector = RustKvConnectorLeader(
mappings.rank, self.drt, self.block_size, leader
mappings.rank,
self.drt,
self.block_size,
leader,
consolidator_trtllm_endpoint=trtllm_ep,
consolidator_output_endpoint=consolidator_output_ep,
)
def build_connector_meta(self, scheduler_output: SchedulerOutput) -> bytes:
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Helper functions for KV Event Consolidator configuration for TensorRT-LLM.
"""
import logging
import os
logger = logging.getLogger(__name__)
def is_truthy(val: str) -> bool:
"""
Check if a string represents a truthy value.
Truthy values: "1", "true", "on", "yes" (case-insensitive)
Args:
val: The string value to check
Returns:
True if the value is truthy, False otherwise
"""
return val.lower() in ("1", "true", "on", "yes")
def should_enable_consolidator(arg_map) -> bool:
"""
Determine if the KV Event Consolidator should be enabled for TensorRT-LLM.
The consolidator can be controlled via the DYN_KVBM_KV_EVENTS_ENABLE_CONSOLIDATOR environment variable:
- Set to truthy values ("1", "true", "on", "yes") to enable (default)
- Set to any other value to disable
- If not set, defaults to enabled and auto-detects based on KVBM connector
Args:
arg_map: Dictionary containing TensorRT-LLM engine arguments
Returns:
True if consolidator should be enabled, False otherwise
"""
# Check environment variable override
env_override = os.getenv("DYN_KVBM_KV_EVENTS_ENABLE_CONSOLIDATOR", "true")
if not is_truthy(env_override):
logger.info(
"KV Event Consolidator disabled via DYN_KVBM_KV_EVENTS_ENABLE_CONSOLIDATOR environment variable"
)
return False
# Check if KVBM connector is enabled
if not isinstance(arg_map, dict):
logger.warning("KV Event Consolidator is not enabled: arg_map is not a dict")
return False
kv_connector_config = arg_map.get("kv_connector_config", {})
if not isinstance(kv_connector_config, dict):
logger.warning(
"KV Event Consolidator is not enabled: kv_connector_config is not a dict"
)
return False
connector_module = kv_connector_config.get("connector_module", "")
has_kvbm_connector = "kvbm.trtllm_integration.connector" in connector_module
if not has_kvbm_connector:
logger.warning(
f"KV Event Consolidator is not enabled: KVBM connector not found (current connector: {connector_module})"
)
return False
logger.info("KV Event Consolidator auto-enabled (KVBM connector detected)")
return True
def get_consolidator_endpoints() -> tuple[str, str, str]:
"""
Get consolidator endpoints for TensorRT-LLM (matching vLLM pattern).
Returns a tuple of (trtllm_endpoint, output_bind_endpoint, output_connect_endpoint):
- trtllm_endpoint: ZMQ endpoint for consolidator to subscribe to TRTLLM events
- output_bind_endpoint: ZMQ endpoint for consolidator to bind and publish (tcp://0.0.0.0:PORT)
- output_connect_endpoint: ZMQ endpoint for workers to connect (tcp://127.0.0.1:PORT)
Port configuration (matching vLLM):
- Derives TRTLLM port from KVBM leader ZMQ pub port (DYN_KVBM_LEADER_ZMQ_PUB_PORT, default 56001)
- Uses offset of 1000 for consolidator output port (e.g., 56001 -> 57001)
- Can override TRTLLM port with DYN_KVBM_TRTLLM_ZMQ_PORT if needed
Returns:
Tuple of (trtllm_endpoint, output_bind_endpoint, output_connect_endpoint)
"""
# Get KVBM leader ZMQ pub port (default 56001, matching vLLM)
kvbm_pub_port_str = os.getenv("DYN_KVBM_LEADER_ZMQ_PUB_PORT", "56001")
kvbm_pub_port = int(kvbm_pub_port_str)
# Check for explicit TRTLLM port override
trtllm_port_env = os.getenv("DYN_KVBM_TRTLLM_ZMQ_PORT")
if trtllm_port_env:
trtllm_port = int(trtllm_port_env)
logger.info(
f"Using TRTLLM ZMQ port from DYN_KVBM_TRTLLM_ZMQ_PORT: {trtllm_port}"
)
else:
# Derive TRTLLM port from KVBM port (use same port as vLLM pattern)
# For TRTLLM, we use the base port directly (vLLM uses offset_endpoint_port for DP)
trtllm_port = kvbm_pub_port
logger.info(
f"Using TRTLLM ZMQ port {trtllm_port} (derived from KVBM port {kvbm_pub_port})"
)
# Derive consolidator output port deterministically (matching vLLM)
# Use 1000 as the offset. This needs to be aligned with the offset used in the kvbm connector leader.
consolidator_port_offset = 1000
output_port = kvbm_pub_port + consolidator_port_offset
# Validate the derived port is within valid range
if output_port > 65535:
raise ValueError(
f"Derived consolidator port {output_port} exceeds maximum (65535). "
f"KVBM port {kvbm_pub_port} is too high. Use a lower base port."
)
# Build endpoints
# TRTLLM binds to all interfaces, consolidator connects to 127.0.0.1
trtllm_bind_endpoint = f"tcp://*:{trtllm_port}"
# Consolidator output: bind to 0.0.0.0 (all interfaces), workers connect to 127.0.0.1
output_bind_endpoint = f"tcp://0.0.0.0:{output_port}"
output_connect_endpoint = f"tcp://127.0.0.1:{output_port}"
logger.info(
f"Consolidator endpoints: trtllm_bind={trtllm_bind_endpoint}, "
f"output_bind={output_bind_endpoint}, output_connect={output_connect_endpoint} "
f"(derived from KVBM port {kvbm_pub_port})"
)
# Return tuple format: (trtllm_bind_endpoint, output_bind_endpoint, output_connect_endpoint)
return trtllm_bind_endpoint, output_bind_endpoint, output_connect_endpoint
......@@ -6,8 +6,9 @@ use anyhow::Result;
use dynamo_llm::block_manager::block::{
data::logical::distributed_leader_worker::DistributedLeaderWorkerResources, locality::Logical,
};
use dynamo_llm::block_manager::kv_consolidator::KvEventConsolidatorConfig;
use dynamo_llm::block_manager::kv_consolidator::tracker::EventSource;
use dynamo_llm::block_manager::kv_consolidator::{
EventSource, KvEventConsolidatorConfig,
};
use dynamo_llm::block_manager::offload::filter::FrequencyFilter;
use dynamo_llm::block_manager::{BasicMetadata, BlockParallelismStrategy};
use dynamo_runtime::DistributedRuntime;
......@@ -16,7 +17,7 @@ use pyo3::PyResult;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
pub mod cache_stats;
mod cache_stats;
mod controller;
mod distributed;
......@@ -250,7 +251,7 @@ pub struct BlockManagerBuilder {
page_size: usize,
disable_device_pool: bool,
kvbm_metrics: Option<dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics>,
consolidator_config: Option<(String, String, EventSource)>, // (engine_endpoint, output_endpoint, engine_source)
consolidator_config: Option<(String, Option<String>, EventSource)>, // (engine_endpoint, output_endpoint (optional), engine_source)
}
impl BlockManagerBuilder {
......@@ -289,7 +290,7 @@ impl BlockManagerBuilder {
pub fn consolidator_config(
mut self,
engine_endpoint: String,
output_endpoint: String,
output_endpoint: Option<String>,
engine_source: EventSource,
) -> Self {
self.consolidator_config = Some((engine_endpoint, output_endpoint, engine_source));
......@@ -367,12 +368,7 @@ impl BlockManagerBuilder {
}
if let Some((engine_ep, output_ep, engine_source)) = self.consolidator_config {
let consolidator_config = KvEventConsolidatorConfig::new(
engine_ep,
output_ep,
engine_source,
);
config_builder = config_builder.consolidator_config(consolidator_config);
config_builder = config_builder.consolidator_config(engine_ep, output_ep, engine_source);
}
let config = config_builder.build()?;
......
......@@ -22,7 +22,7 @@ use dynamo_llm::block_manager::{
locality::Logical,
},
connector::*,
kv_consolidator::tracker::EventSource,
kv_consolidator::EventSource,
};
use dynamo_llm::tokens::{SaltHash, TokenBlockSequence, Tokens};
use dynamo_runtime::config::environment_names::kvbm as env_kvbm;
......@@ -144,8 +144,11 @@ impl KvConnectorLeader {
vllm_ep,
output_ep
);
block_manager_builder =
block_manager_builder.consolidator_config(vllm_ep, output_ep, EventSource::Vllm);
block_manager_builder = block_manager_builder.consolidator_config(
vllm_ep,
Some(output_ep),
EventSource::Vllm,
);
}
let block_manager = match block_manager_builder.build().await {
......
......@@ -3,7 +3,7 @@
use super::*;
use anyhow;
use dynamo_llm::block_manager::kv_consolidator::tracker::EventSource;
use dynamo_llm::block_manager::kv_consolidator::EventSource;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Action {
......@@ -153,7 +153,11 @@ impl KvConnectorLeaderRecorder {
(consolidator_vllm_ep, consolidator_output_ep)
{
block_manager_builder =
block_manager_builder.consolidator_config(vllm_ep, output_ep, EventSource::Vllm);
block_manager_builder.consolidator_config(
vllm_ep,
Some(output_ep),
EventSource::Vllm,
);
}
let block_manager = match block_manager_builder.build().await {
......@@ -169,7 +173,7 @@ impl KvConnectorLeaderRecorder {
block_manager.get_block_manager().clone(),
leader.clone(),
kvbm_metrics_clone.clone(),
None, // Recorder doesn't need identifier
Some(format!("worker-{}", worker_id)),
);
let _ = slot_manager_cell.set(sm);
......
......@@ -4,6 +4,7 @@
use super::*;
use crate::block_manager::BlockManagerBuilder;
use dynamo_llm::block_manager::kv_consolidator::EventSource;
use crate::block_manager::vllm::connector::leader::slot::{
ConnectorSlotManager, SlotManager, SlotState,
};
......@@ -63,7 +64,13 @@ pub struct KvConnectorLeader {
}
impl KvConnectorLeader {
fn new(worker_id: u64, page_size: usize, leader_py: PyKvbmLeader) -> Self {
fn new(
worker_id: u64,
page_size: usize,
leader_py: PyKvbmLeader,
consolidator_trtllm_endpoint: Option<String>,
consolidator_output_endpoint: Option<String>,
) -> Self {
tracing::info!(
"KvConnectorLeader initialized with worker_id: {}",
worker_id
......@@ -84,6 +91,9 @@ impl KvConnectorLeader {
{
let slot_manager_cell = slot_manager_cell.clone();
// Capture consolidator endpoints for the async block
let consolidator_trtllm_ep = consolidator_trtllm_endpoint.clone();
let consolidator_output_ep = consolidator_output_endpoint.clone();
handle.spawn(async move {
let ready = leader.wait_worker_sync_ready().await;
......@@ -94,14 +104,30 @@ impl KvConnectorLeader {
return;
}
let block_manager = match BlockManagerBuilder::new()
let mut block_manager_builder = BlockManagerBuilder::new()
.worker_id(0)
.leader(leader_py)
.page_size(page_size)
.disable_device_pool(false)
.kvbm_metrics(kvbm_metrics_clone.clone())
.build()
.await
.kvbm_metrics(kvbm_metrics_clone.clone());
// Add consolidator config if endpoint is provided
// For TRTLLM: engine_endpoint is where TRTLLM publishes, output_endpoint is where consolidator publishes
if let Some(trtllm_ep) = consolidator_trtllm_ep.clone() {
tracing::info!(
"Consolidator config: trtllm_endpoint={}, consolidated_output_endpoint={:?}",
trtllm_ep,
consolidator_output_ep
);
block_manager_builder = block_manager_builder.consolidator_config(
trtllm_ep,
consolidator_output_ep,
EventSource::Trtllm,
);
}
let block_manager = match block_manager_builder.build().await
{
Ok(bm) => bm,
Err(e) => {
......@@ -439,16 +465,24 @@ pub struct PyTrtllmKvConnectorLeader {
#[pymethods]
impl PyTrtllmKvConnectorLeader {
#[new]
#[pyo3(signature = (worker_id, drt, page_size, leader))]
#[pyo3(signature = (worker_id, drt, page_size, leader, consolidator_trtllm_endpoint=None, consolidator_output_endpoint=None))]
pub fn new(
worker_id: u64,
drt: Option<PyObject>,
page_size: usize,
leader: PyKvbmLeader,
consolidator_trtllm_endpoint: Option<String>,
consolidator_output_endpoint: Option<String>,
) -> PyResult<Self> {
let _ = &drt; // drt is currently un-used in leader
let connector_leader: Box<dyn Leader> =
Box::new(KvConnectorLeader::new(worker_id, page_size, leader));
let connector_leader: Box<dyn Leader> = Box::new(KvConnectorLeader::new(
worker_id,
page_size,
leader,
consolidator_trtllm_endpoint,
consolidator_output_endpoint,
));
Ok(Self { connector_leader })
}
......
......@@ -211,7 +211,7 @@ pub struct KvBlockManagerConfig {
/// If provided, KVBM will create a KV Event Consolidator that deduplicates
/// KV cache events from vLLM (G1) and KVBM (G2/G3) before sending to the router.
/// This is used when `--connector kvbm` is enabled with prefix caching.
#[builder(default, setter(strip_option))]
#[builder(default, setter(custom))]
pub consolidator_config:
Option<crate::block_manager::kv_consolidator::KvEventConsolidatorConfig>,
}
......@@ -223,6 +223,50 @@ impl KvBlockManagerConfig {
}
}
impl KvBlockManagerConfigBuilder {
/// Set the consolidator config using individual parameters
pub fn consolidator_config(
mut self,
engine_endpoint: String,
output_endpoint: Option<String>,
engine_source: crate::block_manager::kv_consolidator::EventSource,
) -> Self {
let config = match engine_source {
crate::block_manager::kv_consolidator::EventSource::Vllm => {
let output_ep = output_endpoint.expect("output_endpoint is required for vLLM");
crate::block_manager::kv_consolidator::KvEventConsolidatorConfig::new_vllm(
engine_endpoint,
output_ep,
)
}
crate::block_manager::kv_consolidator::EventSource::Trtllm => {
// output_endpoint is the ZMQ endpoint where consolidator publishes
// Worker-side publishers subscribe to this and forward to NATS
let output_ep = output_endpoint.expect(
"output_endpoint (consolidated_event_endpoint) is required for TensorRT-LLM",
);
crate::block_manager::kv_consolidator::KvEventConsolidatorConfig::new_trtllm(
engine_endpoint,
output_ep,
)
}
crate::block_manager::kv_consolidator::EventSource::Kvbm => {
// This case should never be reached - consolidator_config() is only called with
// EventSource::Vllm or EventSource::Trtllm. EventSource::Kvbm is used when KVBM
// sends events TO the consolidator (via DynamoEventManager), but KVBM is never
// the engine_source that publishes events via ZMQ that the consolidator subscribes to.
unreachable!(
"consolidator_config() should never be called with EventSource::Kvbm. \
KVBM events are sent directly to the consolidator handle, not via ZMQ."
)
}
};
// With setter(custom), the builder field is Option<Option<T>>, so we need Some(Some(...))
self.consolidator_config = Some(Some(config));
self
}
}
/// Determines if CPU memory (G2) should be bypassed for direct G1->G3 (Device->Disk) offloading.
///
/// Returns `true` if:
......
......@@ -14,6 +14,7 @@ pub struct KvEventConsolidatorConfig {
pub engine_event_endpoint: String,
/// ZMQ endpoint to publish consolidated events (e.g., "tcp://*:5558")
/// Worker-side publishers subscribe to this and add worker_id before forwarding to NATS
pub consolidated_event_endpoint: String,
/// Engine source for events (vLLM or TensorRT-LLM)
......@@ -42,4 +43,22 @@ impl KvEventConsolidatorConfig {
engine_source,
}
}
/// Create config for vLLM
pub fn new_vllm(engine_event_endpoint: String, consolidated_event_endpoint: String) -> Self {
Self {
engine_event_endpoint,
consolidated_event_endpoint,
engine_source: EventSource::Vllm,
}
}
/// Create config for TensorRT-LLM
pub fn new_trtllm(engine_event_endpoint: String, consolidated_event_endpoint: String) -> Self {
Self {
engine_event_endpoint,
consolidated_event_endpoint,
engine_source: EventSource::Trtllm,
}
}
}
......@@ -101,19 +101,18 @@ impl KvEventConsolidator {
/// Start the KV Event Consolidator
pub async fn start(&mut self) -> Result<()> {
tracing::info!(
"Starting KV Event Consolidator: subscribe from {}, publish to {}",
"Starting KV Event Consolidator: subscribe from {}, publish to ZMQ at {}",
self.config.engine_event_endpoint,
self.config.consolidated_event_endpoint
);
// Start the publisher first
// Always publish to ZMQ (worker-side publishers will add worker_id and forward to NATS)
let publisher = KvEventConsolidatorPublisher::new(
&self.config.consolidated_event_endpoint,
self.tracker.clone(),
)?;
self.publisher = Some(publisher);
tracing::info!("Waiting for downstream subscribers to connect...");
tracing::info!("Waiting for downstream ZMQ subscribers to connect...");
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// Start the subscriber (connects to engine's publisher - vLLM or TensorRT-LLM)
......@@ -133,19 +132,19 @@ impl KvEventConsolidator {
}
/// Shutdown the KV Event Consolidator
pub async fn shutdown(&mut self) -> Result<()> {
pub async fn shutdown(self) -> Result<()> {
tracing::info!("Shutting down KV Event Consolidator");
// Cancel the ZMQ listener
self.cancellation_token.cancel();
// Wait for adapter task to finish
if let Some(handle) = self.subscriber_handle.take() {
if let Some(handle) = self.subscriber_handle {
handle.abort();
let _ = handle.await;
}
if let Some(publisher) = self.publisher.take() {
if let Some(publisher) = self.publisher {
publisher.shutdown().await?;
}
......
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! ZMQ Publisher for KV Events Consolidator
//! Publishers for KV Events Consolidator
//!
//! Publishes consolidated KV cache events to the router using the same format as vLLM.
//! Publishes consolidated KV cache events to ZMQ (in vLLM format).
//! Worker-side publishers subscribe to this ZMQ stream and add worker_id before publishing to NATS.
use anyhow::{Context, Result};
use bytes::Bytes;
......@@ -30,7 +31,7 @@ struct EventBatch(
);
/// Event types matching vLLM's format
/// Note: block_hashes are u64 to match vLLM's ExternalBlockHash type
/// Note: Uses i32 for token_ids, block_size, and lora_id to match vLLM's ZmqEventPublisher
#[derive(Debug, Serialize)]
#[serde(tag = "type")]
enum Event {
......@@ -41,15 +42,21 @@ enum Event {
token_ids: Vec<i32>,
block_size: i32,
lora_id: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
medium: Option<String>,
},
#[serde(rename = "BlockRemoved")]
BlockRemoved { block_hashes: Vec<u64> },
BlockRemoved {
block_hashes: Vec<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
medium: Option<String>,
},
#[serde(rename = "AllBlocksCleared")]
AllBlocksCleared {},
}
impl Event {
/// Convert from ConsolidatedEvent to vLLM Event format
/// Convert from ConsolidatedEvent to router Event format
/// Parses string block hashes back to u64 for router compatibility
/// Note: source field is kept in ConsolidatedEvent for internal logging but not sent to router
///
......@@ -98,12 +105,16 @@ impl Event {
i32::MAX
});
// lora_id is already Option<i32> in ConsolidatedEvent::Store
let lora_id_i32 = lora_id;
Ok(Event::BlockStored {
block_hashes: vec![parsed_hash],
parent_block_hash: parsed_parent,
token_ids: token_ids_i32,
block_size: block_size_i32,
lora_id,
lora_id: lora_id_i32,
medium: None, // Not provided by ConsolidatedEvent
})
}
ConsolidatedEvent::Remove {
......@@ -117,6 +128,7 @@ impl Event {
Ok(Event::BlockRemoved {
block_hashes: vec![parsed_hash],
medium: None, // Not provided by ConsolidatedEvent
})
}
ConsolidatedEvent::ClearAll => Ok(Event::AllBlocksCleared {}),
......@@ -267,7 +279,7 @@ impl KvEventConsolidatorPublisher {
tracing::error!("Failed to send consolidated events: {}", e);
} else {
tracing::debug!(
"Published batch with {} event(s) to router (seq={})",
"Consolidator: Published batch with {} event(s) to ZMQ (seq={})",
num_events,
seq
);
......
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Simple ZMQ Subscriber for vLLM/TensorRT-LLM KV Events
//! Simple ZMQ Subscriber for vLLM KV Events
//!
//! This is a simplified subscriber that deserializes raw vLLM/TensorRT-LLM events.
......@@ -43,23 +43,99 @@ impl VllmEventBatch {
}
/// Block hash can be either an integer or a string (bytes hex-encoded)
#[derive(Debug, Clone, Deserialize)]
#[serde(untagged)]
///
/// Note: Integers can be u64 or i64 (msgpack compatibility) but we convert to u64 for internal use.
/// - vLLM uses u64 block hashes
/// - TensorRT-LLM uses i64 block hashes (signed integers)
#[derive(Debug, Clone)]
enum BlockHash {
Int(u64),
IntU64(u64),
IntI64(i64), // Added for TensorRT-LLM support (uses signed i64 hashes)
Str(String),
}
impl<'de> serde::Deserialize<'de> for BlockHash {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::{self, Visitor};
use std::fmt;
struct BlockHashVisitor;
impl<'de> Visitor<'de> for BlockHashVisitor {
type Value = BlockHash;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("an integer or a string")
}
fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(BlockHash::IntU64(value))
}
fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(BlockHash::IntI64(value))
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(BlockHash::Str(value.to_string()))
}
fn visit_string<E>(self, value: String) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(BlockHash::Str(value))
}
}
deserializer.deserialize_any(BlockHashVisitor)
}
}
impl BlockHash {
/// Convert to u64, handling both signed and unsigned integers
/// Returns None if the hash cannot be converted (e.g., invalid hex string)
/// This avoids silently collapsing invalid hashes to 0, which could cause collisions
fn to_u64(&self) -> Option<u64> {
match self {
BlockHash::IntU64(n) => Some(*n),
BlockHash::IntI64(n) => {
// Convert signed i64 back to unsigned u64 (two's complement)
// Rust's `as u64` automatically handles two's complement conversion
Some(*n as u64)
}
BlockHash::Str(s) => {
// Try to parse as hex string, return None on failure
// This avoids silently mapping invalid hashes to 0, which could cause collisions
u64::from_str_radix(s, 16).ok()
}
}
}
}
impl std::fmt::Display for BlockHash {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BlockHash::Int(n) => write!(f, "{}", n),
BlockHash::IntU64(n) => write!(f, "{}", n),
BlockHash::IntI64(n) => write!(f, "{}", n),
BlockHash::Str(s) => write!(f, "{}", s),
}
}
}
/// Raw vLLM/TensorRT-LLM event format (preserves all data including token_ids)
/// Raw vLLM event format (preserves all data including token_ids)
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type")]
enum VllmRawEvent {
......@@ -257,13 +333,34 @@ fn process_event(
// Process each block with its corresponding token chunk
// For batches, chain the blocks: each block's parent is the previous block in the batch
let mut current_parent = parent_block_hash.as_ref().map(|h| h.to_string());
let mut current_parent = parent_block_hash
.as_ref()
.and_then(|h| {
h.to_u64().or_else(|| {
tracing::warn!(
"Skipping parent block hash with unparsable string hash {:?}",
h
);
None
})
})
.map(|h| h.to_string());
for (i, block_hash) in block_hashes.iter().enumerate() {
let block_tokens = token_chunks[i].clone();
// Skip blocks with invalid/unparsable hashes to avoid collisions
let Some(block_hash_u64) = block_hash.to_u64() else {
tracing::warn!(
"Skipping block with unparsable string hash {:?} (index {})",
block_hash,
i
);
continue;
};
tracker.handle_store(
block_hash.to_string(),
block_hash_u64.to_string(),
engine_source,
block_tokens,
current_parent.clone(),
......@@ -273,8 +370,8 @@ fn process_event(
data_parallel_rank,
);
// Next block's parent is this block
current_parent = Some(block_hash.to_string());
// Next block's parent is this block (only if hash was valid)
current_parent = Some(block_hash_u64.to_string());
}
}
......@@ -294,7 +391,15 @@ fn process_event(
);
for block_hash in block_hashes {
tracker.handle_remove(&block_hash.to_string(), engine_source);
// Skip blocks with invalid/unparsable hashes to avoid collisions
let Some(block_hash_u64) = block_hash.to_u64() else {
tracing::warn!(
"Skipping removal of block with unparsable string hash {:?}",
block_hash
);
continue;
};
tracker.handle_remove(&block_hash_u64.to_string(), engine_source);
}
}
......
......@@ -338,10 +338,33 @@ impl CacheStatusTracker {
// Add to hash mapping so remove events can find the block by external hash
self.hash_mapping.insert(block_hash.clone(), sequence_hash);
// Resolve parent_hash to first_block_hash if parent was deduplicated
//
// Problem: When the same block is stored from multiple sources (deduplication),
// each source may use a different external hash for the same logical block.
// Example:
// - Source A (TRTLLM) stores parent with hash "hash_A"
// - Source B (KVBM) stores same parent with hash "hash_B" (different format/algorithm)
// - Router only received STORE event with "hash_A" (first source)
// - When Source B stores child with parent_hash="hash_B", router won't recognize it
//
// Resolve the parent's external hash to its first_block_hash (the hash
// that was sent to the router in the first STORE event) so the router can find it.
let resolved_parent_hash = parent_hash.and_then(|ph| {
// Look up parent's sequence hash from its external hash
self.hash_mapping.get(&ph).and_then(|&parent_seq_hash| {
// Get parent's metadata to find first_block_hash
self.blocks
.get(&parent_seq_hash)
.map(|parent_metadata| parent_metadata.first_block_hash.clone())
})
});
// Queue a STORE event with full metadata
// Use resolved_parent_hash (first_block_hash) so router can find the parent
self.event_queue.push(ConsolidatedEvent::Store {
block_hash: block_hash.clone(),
parent_hash,
parent_hash: resolved_parent_hash,
token_ids,
block_size,
lora_id,
......
......@@ -328,10 +328,10 @@ pub async fn start_zmq_listener(
zmq_endpoint,
batch.events.len(),
seq,
batch.data_parallel_rank
batch.data_parallel_rank.unwrap_or(0)
);
let dp_rank = batch.data_parallel_rank;
let dp_rank = batch.data_parallel_rank.unwrap_or(0) as u32;
for raw_event in batch.events.into_iter() {
let event = convert_event(raw_event, seq, kv_block_size, dp_rank, &warning_count);
if tx.send(event).is_err() {
......@@ -474,12 +474,27 @@ pub fn create_stored_blocks(
// Types mirroring the Python msgspec-defined structures -------------------
// -------------------------------------------------------------------------
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Serialize)]
struct KvEventBatch {
ts: f64,
events: Vec<RawKvEvent>,
#[serde(alias = "dp_rank")]
data_parallel_rank: u32, // we are ignoring this for now
data_parallel_rank: Option<i32>,
}
impl<'de> Deserialize<'de> for KvEventBatch {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
// Deserialize from array format: [timestamp, [events], data_parallel_rank]
let arr: (f64, Vec<RawKvEvent>, Option<i32>) = Deserialize::deserialize(deserializer)?;
Ok(KvEventBatch {
ts: arr.0,
events: arr.1,
data_parallel_rank: arr.2,
})
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
......@@ -498,7 +513,7 @@ impl BlockHashValue {
}
}
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone)]
#[serde(tag = "type")] // msgspec encodes variant tag as a string when `tag=True`
enum RawKvEvent {
BlockStored {
......@@ -1131,7 +1146,7 @@ mod tests_startup_helpers {
let batch = KvEventBatch {
ts: 0.0,
events,
data_parallel_rank: 1,
data_parallel_rank: Some(1),
};
let payload = Bytes::from(rmps::to_vec(&batch).unwrap());
......
......@@ -6,8 +6,8 @@
E2E test for KV Event Consolidator with Router integration.
This test validates that:
1. vLLM with KVBM correctly emits KV events to the consolidator
2. The consolidator correctly deduplicates events from vLLM and KVBM
1. vLLM/TensorRT-LLM with KVBM correctly emits KV events to the consolidator
2. The consolidator correctly deduplicates events from engine and KVBM
3. The router receives and processes consolidated events without warnings
"""
......@@ -22,12 +22,34 @@ from pathlib import Path
import pytest
import requests
import yaml
from tests.kvbm_integration.common import ApiTester, check_logs_for_patterns
from tests.utils.managed_process import ManagedProcess
# Check if vLLM is available
HAS_VLLM = importlib.util.find_spec("vllm") is not None
# Check if engines are available and build list of available engines
# Use find_spec first (fast check), then verify import works (functional check)
def _check_engine_available(module_name: str) -> bool:
"""Check if an engine module is available and importable."""
if importlib.util.find_spec(module_name) is None:
return False
try:
importlib.import_module(module_name)
return True
except ImportError:
return False
HAS_VLLM = _check_engine_available("vllm")
HAS_TRTLLM = _check_engine_available("tensorrt_llm")
# Build list of available engines for parameterization
AVAILABLE_ENGINES = []
if HAS_VLLM:
AVAILABLE_ENGINES.append("vllm")
if HAS_TRTLLM:
AVAILABLE_ENGINES.append("trtllm")
# Test markers
pytestmark = [
......@@ -35,8 +57,7 @@ pytestmark = [
pytest.mark.e2e,
pytest.mark.slow,
pytest.mark.gpu_1,
pytest.mark.nightly,
pytest.mark.skipif(not HAS_VLLM, reason="requires vllm"),
pytest.mark.skipif(not (HAS_VLLM or HAS_TRTLLM), reason="requires vllm or trtllm"),
]
logger = logging.getLogger(__name__)
......@@ -54,8 +75,32 @@ def test_directory(request):
# Cleanup handled by pytest (logs are kept for debugging)
def extract_consolidator_stats(log_path: Path) -> dict:
"""Extract consolidator event statistics from vLLM logs."""
def create_trtllm_config(test_directory: Path) -> Path:
"""Create TensorRT-LLM config YAML file with KVBM connector configuration."""
config_path = test_directory / "trtllm_config.yaml"
config = {
"backend": "pytorch",
"cuda_graph_config": None,
"kv_cache_config": {
"enable_partial_reuse": False,
"free_gpu_memory_fraction": 0.01,
},
"build_config": {
"max_seq_len": 4096,
},
"kv_connector_config": {
"connector_module": "kvbm.trtllm_integration.connector",
"connector_scheduler_class": "DynamoKVBMConnectorLeader",
"connector_worker_class": "DynamoKVBMConnectorWorker",
},
}
with open(config_path, "w") as f:
yaml.dump(config, f)
return config_path
def extract_consolidator_stats(log_path: Path, engine: str = "vllm") -> dict:
"""Extract consolidator event statistics from engine logs."""
stats = {
"store_events": 0,
"remove_events": 0,
......@@ -130,7 +175,7 @@ def wait_for_worker_registration(
health_data = response.json()
if health_data.get("instances"):
elapsed = time.time() - start_time
logger.info(f"vLLM worker registered after {elapsed:.1f}s")
logger.info(f"Worker registered after {elapsed:.1f}s")
return True
except Exception as e:
logger.debug(f"Health check failed: {e}")
......@@ -138,8 +183,8 @@ def wait_for_worker_registration(
time.sleep(poll_interval)
elapsed = time.time() - start_time
logger.error(f"vLLM worker failed to register after {elapsed:.1f}s")
logger.error("Check vLLM logs for initialization errors")
logger.error(f"Worker failed to register after {elapsed:.1f}s")
logger.error("Check worker logs for initialization errors")
return False
......@@ -199,24 +244,51 @@ def frontend_server(test_directory, runtime_services):
logger.info("Frontend server stopped")
@pytest.fixture(params=AVAILABLE_ENGINES)
def engine_type(request):
"""Parameterize test to run with available engines only."""
return request.param
@pytest.fixture
def vllm_worker(frontend_server, test_directory, runtime_services):
"""Start vLLM worker with KVBM connector and KV Event Consolidator."""
def llm_worker(frontend_server, test_directory, runtime_services, engine_type):
"""Start LLM worker (vLLM or TensorRT-LLM) with KVBM connector and KV Event Consolidator."""
model_id = os.environ.get("CONSOLIDATOR_MODEL_ID", "Qwen/Qwen3-0.6B")
engine = engine_type
logger.info(f"Starting vLLM worker with KVBM connector and model {model_id}")
logger.info(
f"Starting {engine.upper()} worker with KVBM connector and model {model_id}"
)
# vLLM worker command - consolidator is auto-enabled with KVBM
command = [
"python",
"-m",
"dynamo.vllm",
"--model",
model_id,
"--connector",
"kvbm",
"--enforce-eager", # For faster startup in tests
]
# Build command based on engine type
if engine == "vllm":
command = [
"python",
"-m",
"dynamo.vllm",
"--model",
model_id,
"--connector",
"kvbm",
"--enforce-eager", # For faster startup in tests
]
else: # trtllm
# Create TensorRT-LLM config file with KVBM connector
config_path = create_trtllm_config(test_directory)
command = [
"python",
"-m",
"dynamo.trtllm",
"--model-path",
model_id,
"--served-model-name",
model_id,
"--extra-engine-args",
str(
config_path.absolute()
), # Use absolute path to avoid working directory issues
"--publish-events-and-metrics",
]
# Environment
env = os.environ.copy()
......@@ -231,10 +303,14 @@ def vllm_worker(frontend_server, test_directory, runtime_services):
}
)
# Create separate log directory for vllm to avoid conflicts with frontend
vllm_log_dir = test_directory / "vllm"
vllm_log_dir.mkdir(parents=True, exist_ok=True)
log_file = vllm_log_dir / "python.log.txt"
# Set ZMQ port for TensorRT-LLM consolidator
if engine == "trtllm":
env["DYN_KVBM_TRTLLM_ZMQ_PORT"] = "20081"
# Create separate log directory for worker to avoid conflicts with frontend
worker_log_dir = test_directory / engine
worker_log_dir.mkdir(parents=True, exist_ok=True)
log_file = worker_log_dir / "python.log.txt"
# Create managed process and start via context manager
with ManagedProcess(
......@@ -244,10 +320,12 @@ def vllm_worker(frontend_server, test_directory, runtime_services):
timeout=300, # Increased timeout for model loading and consolidator init
working_dir=str(test_directory),
display_output=False,
log_dir=str(vllm_log_dir), # Separate log directory
log_dir=str(worker_log_dir), # Separate log directory
terminate_existing=False,
) as vllm_process:
logger.info("Waiting for vLLM worker and consolidator to initialize...")
) as worker_process:
logger.info(
f"Waiting for {engine.upper()} worker and consolidator to initialize..."
)
# Wait for worker to register with frontend
worker_registered = wait_for_worker_registration(frontend_server["base_url"])
......@@ -259,29 +337,30 @@ def vllm_worker(frontend_server, test_directory, runtime_services):
time.sleep(5)
# Verify consolidator started by checking logs
stats = extract_consolidator_stats(log_file)
stats = extract_consolidator_stats(log_file, engine)
if not stats["consolidator_started"]:
logger.warning("Consolidator may not have started - check logs")
else:
logger.info("Consolidator detected in logs")
yield {
"process": vllm_process,
"process": worker_process,
"model_id": model_id,
"log_file": log_file,
"consolidator_stats": stats,
"engine": engine,
}
# Cleanup happens automatically via context manager __exit__
logger.info("vLLM worker stopped")
logger.info(f"{engine.upper()} worker stopped")
@pytest.fixture
def tester(frontend_server, vllm_worker):
def tester(frontend_server, llm_worker):
"""Provides a test client that sends requests to frontend."""
return ApiTester(
base_url=frontend_server["base_url"],
model_id=vllm_worker["model_id"],
model_id=llm_worker["model_id"],
)
......@@ -298,12 +377,17 @@ class TestConsolidatorRouterE2E:
r"fatal",
]
def assert_no_errors_in_logs(self, vllm_log: Path, frontend_log: Path):
"""Helper to check both vLLM and frontend logs for errors."""
vllm_errors = check_logs_for_patterns(
vllm_log, self.ERROR_PATTERNS, "vLLM Worker"
def assert_no_errors_in_logs(
self, worker_log: Path, frontend_log: Path, engine: str = "vllm"
):
"""Helper to check both worker and frontend logs for errors."""
engine_name = engine.upper()
worker_errors = check_logs_for_patterns(
worker_log, self.ERROR_PATTERNS, f"{engine_name} Worker"
)
assert not vllm_errors, f"Errors in vLLM Worker logs: {vllm_errors}"
assert (
not worker_errors
), f"Errors in {engine_name} Worker logs: {worker_errors}"
frontend_errors = check_logs_for_patterns(
frontend_log, self.ERROR_PATTERNS, "Frontend/Router"
......@@ -336,14 +420,15 @@ class TestConsolidatorRouterE2E:
logger.info(f"Concurrent requests: {successes}/{num_requests} succeeded")
return successes, results
def test_basic_consolidator_flow(self, tester, vllm_worker, frontend_server):
def test_basic_consolidator_flow(self, tester, llm_worker, frontend_server):
"""
Test basic consolidator flow:
1. Send requests
2. Verify consolidator starts and processes events
3. Verify router receives events without errors
"""
logger.info("TEST: Basic Consolidator Flow")
engine = llm_worker["engine"]
logger.info(f"TEST: Basic Consolidator Flow ({engine.upper()})")
# Send 3 requests to frontend
requests_data = [
......@@ -362,9 +447,9 @@ class TestConsolidatorRouterE2E:
# Wait for logs to flush
time.sleep(5)
# Check vLLM worker logs for consolidator
vllm_log = vllm_worker["log_file"]
consolidator_stats = extract_consolidator_stats(vllm_log)
# Check worker logs for consolidator
worker_log = llm_worker["log_file"]
consolidator_stats = extract_consolidator_stats(worker_log, engine)
logger.info(f"Consolidator stats: {consolidator_stats}")
......@@ -374,12 +459,12 @@ class TestConsolidatorRouterE2E:
assert consolidator_stats["published_events"] > 0, "No events published"
# Check for errors in logs
self.assert_no_errors_in_logs(vllm_log, frontend_server["log_file"])
self.assert_no_errors_in_logs(worker_log, frontend_server["log_file"], engine)
logger.info("Basic consolidator flow test passed")
logger.info(f"Basic consolidator flow test passed ({engine.upper()})")
def test_consolidator_handles_concurrent_requests(
self, tester, vllm_worker, frontend_server
self, tester, llm_worker, frontend_server
):
"""
Test consolidator under concurrent load:
......@@ -387,7 +472,8 @@ class TestConsolidatorRouterE2E:
2. Verify no crashes or critical errors
3. Verify all events processed
"""
logger.info("TEST: Concurrent Request Handling")
engine = llm_worker["engine"]
logger.info(f"TEST: Concurrent Request Handling ({engine.upper()})")
# Send 10 concurrent requests
num_requests = 10
......@@ -408,27 +494,27 @@ class TestConsolidatorRouterE2E:
# Check for errors in logs
self.assert_no_errors_in_logs(
vllm_worker["log_file"], frontend_server["log_file"]
llm_worker["log_file"], frontend_server["log_file"], engine
)
# Verify events were processed
stats = extract_consolidator_stats(vllm_worker["log_file"])
stats = extract_consolidator_stats(llm_worker["log_file"], engine)
assert stats["store_events"] > 0, "No events processed during concurrent load"
logger.info("Concurrent request handling test passed")
logger.info(f"Concurrent request handling test passed ({engine.upper()})")
def test_store_deduplication_across_sources(
self, tester, vllm_worker, frontend_server
self, tester, llm_worker, frontend_server
):
"""
Test STORE event deduplication across vLLM (G1) and KVBM (G2/G3):
Test STORE event deduplication across engine (G1) and KVBM (G2/G3):
When a block is stored in G1 (GPU), it's automatically offloaded
to G2 (CPU) and G3 (Disk). This triggers STORE events from both vLLM and KVBM.
to G2 (CPU) and G3 (Disk). This triggers STORE events from both engine and KVBM.
Test Scenario:
1. Send requests → blocks stored in vLLM (G1)
2. Consolidator receives vLLM STORE events → queues them for publishing
1. Send requests → blocks stored in engine (G1)
2. Consolidator receives engine STORE events → queues them for publishing
3. KVBM replicates blocks to G2/G3 → emits STORE events
4. Consolidator sees blocks already exist → logs DEDUP message → does NOT publish again
5. Result: Router receives ONE STORE event per unique block (from step 2)
......@@ -437,7 +523,8 @@ class TestConsolidatorRouterE2E:
even though the block exists in multiple storage tiers (G1, G2, G3).
KVBM replications are deduplicated and don't trigger duplicate router updates.
"""
logger.info("Starting STORE deduplication test")
engine = llm_worker["engine"]
logger.info(f"Starting STORE deduplication test ({engine.upper()})")
# Send requests to generate STORE events
logger.info("Sending concurrent requests to generate STORE events")
......@@ -454,60 +541,69 @@ class TestConsolidatorRouterE2E:
# Phase 2: Analyze consolidator logs
logger.info("Phase 2: Analyzing STORE event deduplication")
vllm_log = vllm_worker["log_file"]
log_content = vllm_log.read_text()
worker_log = llm_worker["log_file"]
log_content = worker_log.read_text()
# Count STORE events received from vLLM (first source = will publish)
vllm_stores = len(
# Count STORE events - order-agnostic approach
# First source stores (will publish) - could be engine or KVBM depending on timing
first_source_stores = len(
re.findall(
r"stored in first source Vllm.*will publish STORE event", log_content
r"stored in first source \w+.*will publish STORE event", log_content
)
)
# Count STORE events received from KVBM (they appear as DEDUP messages)
kvbm_stores = len(
# Second source stores (DEDUP) - could be engine or KVBM depending on timing
# Pattern: "DEDUP: Block ... added to source X"
dedup_stores = len(
re.findall(
r"DEDUP: Block \d+ \(seq_hash=\d+\) added to source Kvbm", log_content
r"DEDUP: Block \d+ \(seq_hash=\d+\) added to source \w+", log_content
)
)
# Count total STORE events received (from both sources)
total_stores_received = vllm_stores + kvbm_stores
total_stores_received = first_source_stores + dedup_stores
# Count STORE events actually published to router
published_stores = len(re.findall(r"will publish STORE event", log_content))
logger.info(f"STORE events received from vLLM: {vllm_stores}")
logger.info(f"STORE events received from KVBM: {kvbm_stores}")
logger.info(
f"STORE events from first source (will publish): {first_source_stores}"
)
logger.info(f"STORE events from second source (DEDUP): {dedup_stores}")
logger.info(f"Total STORE events received: {total_stores_received}")
logger.info(f"STORE events published to router: {published_stores}")
# Assertions:
# 1. We should receive STORE events from both vLLM and KVBM
assert vllm_stores > 0, "Expected STORE events from vLLM"
assert kvbm_stores > 0, "Expected STORE events from KVBM (replication to G2/G3)"
# 1. We should receive STORE events from both sources (order doesn't matter)
assert (
first_source_stores > 0
), f"Expected STORE events from first source (could be {engine.upper()} or KVBM)"
assert (
dedup_stores > 0
), "Expected DEDUP STORE events from second source (proves deduplication working)"
# 2. Published stores should approximately equal vLLM stores
# (each unique block is published once when first stored in vLLM)
# 2. Published stores should equal first source stores
# (each unique block is published once when first stored, regardless of which source)
assert (
published_stores == vllm_stores
), f"Expected published events ({published_stores}) to equal vLLM stores ({vllm_stores})"
published_stores == first_source_stores
), f"Expected published events ({published_stores}) to equal first-source stores ({first_source_stores})"
# 3. Total stores should be vLLM + KVBM (each block stored in both)
# 3. Total stores should be first source + second source (each block stored in both)
assert (
total_stores_received == vllm_stores + kvbm_stores
), f"Total should be vLLM ({vllm_stores}) + KVBM ({kvbm_stores})"
total_stores_received == first_source_stores + dedup_stores
), f"Total should be first-source ({first_source_stores}) + second-source ({dedup_stores})"
# 4. Check for errors in logs
self.assert_no_errors_in_logs(vllm_log, frontend_server["log_file"])
self.assert_no_errors_in_logs(worker_log, frontend_server["log_file"], engine)
logger.info("STORE deduplication test passed")
logger.info(f"STORE deduplication test passed ({engine.upper()})")
@pytest.mark.parametrize("engine_type", AVAILABLE_ENGINES)
def test_remove_deduplication_across_sources(
self, test_directory, runtime_services
self, test_directory, runtime_services, engine_type
):
"""
Test REMOVE event deduplication across G1 (vLLM GPU), G2 (KVBM CPU), G3 (KVBM disk):
Test REMOVE event deduplication across G1 (engine GPU), G2 (KVBM CPU), G3 (KVBM disk):
When blocks are stored in G1 (GPU), they are AUTOMATICALLY
replicated to G2 (CPU) and G3 (Disk) simultaneously.
......@@ -515,14 +611,16 @@ class TestConsolidatorRouterE2E:
Test Scenario:
1. Configure very small GPU cache (30 blocks) and slightly larger KVBM caches (50 blocks each)
2. Send 25 requests with 100 tokens each → blocks stored in G1 AND offloaded to G2/G3
3. GPU fills up (30 blocks) → blocks evicted from G1 → consolidator receives REMOVE from vLLM
3. GPU fills up (30 blocks) → blocks evicted from G1 → consolidator receives REMOVE from engine
→ consolidator sees blocks still exist in G2/G3 → does NOT publish REMOVE to router
4. Some blocks only exist in G1 (not replicated) → when evicted → published to router
This verifies: REMOVE is only sent to router when a block is removed from ALL sources.
Deduplication prevents unnecessary REMOVE events when blocks are still cached in G2/G3.
"""
logger.info("Starting REMOVE deduplication test")
engine = engine_type
logger.info(f"Starting REMOVE deduplication test ({engine.upper()})")
# Start frontend with router
frontend_command = [
......@@ -561,25 +659,47 @@ class TestConsolidatorRouterE2E:
) as _frontend_process:
logger.info(f"Frontend started on port {FRONTEND_PORT}")
# Start vLLM worker with constrained GPU blocks but larger KVBM blocks
# Start worker with constrained GPU blocks but larger KVBM blocks
model_id = os.environ.get("CONSOLIDATOR_MODEL_ID", "Qwen/Qwen3-0.6B")
vllm_command = [
"python",
"-m",
"dynamo.vllm",
"--model",
model_id,
"--connector",
"kvbm",
"--enforce-eager",
"--enable-prefix-caching",
"--num-gpu-blocks-override",
"30", # Very small GPU cache to force evictions
]
vllm_env = os.environ.copy()
vllm_env.update(
# Build command based on engine type
if engine == "vllm":
worker_command = [
"python",
"-m",
"dynamo.vllm",
"--model",
model_id,
"--connector",
"kvbm",
"--enforce-eager",
"--enable-prefix-caching",
"--num-gpu-blocks-override",
"30", # Very small GPU cache to force evictions
]
else: # trtllm
# Create TensorRT-LLM config file with KVBM connector
# Use small GPU cache (0.01 = 1% of GPU memory) to force evictions
# This ensures blocks will be evicted from GPU while still in KVBM
# Small enough to trigger evictions but large enough to handle sequence requirements
config_path = create_trtllm_config(test_directory)
worker_command = [
"python",
"-m",
"dynamo.trtllm",
"--model-path",
model_id,
"--served-model-name",
model_id,
"--extra-engine-args",
str(
config_path.absolute()
), # Use absolute path to avoid working directory issues
"--publish-events-and-metrics",
]
worker_env = os.environ.copy()
worker_env.update(
{
"RUST_BACKTRACE": "1",
"NATS_SERVER": "nats://localhost:4222",
......@@ -590,21 +710,25 @@ class TestConsolidatorRouterE2E:
}
)
vllm_log_dir = test_directory / "vllm"
vllm_log_dir.mkdir(parents=True, exist_ok=True)
vllm_log = vllm_log_dir / "python.log.txt"
# Set ZMQ port for TensorRT-LLM consolidator
if engine == "trtllm":
worker_env["DYN_KVBM_TRTLLM_ZMQ_PORT"] = "20081"
worker_log_dir = test_directory / engine
worker_log_dir.mkdir(parents=True, exist_ok=True)
worker_log = worker_log_dir / "python.log.txt"
with ManagedProcess(
command=vllm_command,
env=vllm_env,
command=worker_command,
env=worker_env,
health_check_urls=[],
timeout=300,
working_dir=str(test_directory),
display_output=False,
log_dir=str(vllm_log_dir),
log_dir=str(worker_log_dir),
terminate_existing=False,
) as _vllm_process:
logger.info("Waiting for vLLM worker to initialize...")
) as _worker_process:
logger.info(f"Waiting for {engine.upper()} worker to initialize...")
# Wait for worker to register with frontend
worker_registered = wait_for_worker_registration(
......@@ -612,7 +736,9 @@ class TestConsolidatorRouterE2E:
)
if not worker_registered:
pytest.fail("vLLM worker failed to register with frontend")
pytest.fail(
f"{engine.upper()} worker failed to register with frontend"
)
# Additional wait for consolidator to fully initialize
time.sleep(5)
......@@ -624,30 +750,47 @@ class TestConsolidatorRouterE2E:
# Phase 1: Send requests to fill GPU cache
logger.info("Phase 1: Filling GPU cache with diverse prompts")
for i in range(25): # Send enough requests to trigger GPU eviction
num_requests = 100
for i in range(num_requests):
prompt = f"Tell me a unique story about topic {i}. Make it very long and detailed with many paragraphs."
response = tester.send_chat_request(
messages=[{"role": "user", "content": prompt}],
max_tokens=100, # Increase tokens to use more blocks per request
)
assert "content" in response["choices"][0]["message"]
logger.info(f"Request {i+1}/25 completed")
logger.info(f"Request {i+1}/{num_requests} completed")
# Wait for evictions to settle
time.sleep(5)
# Wait for requests to complete and blocks to be freed
# With GUARANTEED_NO_EVICT, blocks are freed when requests complete (not evicted)
# We need to wait long enough for requests to finish and blocks to be freed
# For vLLM with FIFO, evictions happen immediately when cache fills.
wait_time = 5 if engine == "trtllm" else 5
logger.info(
f"Waiting {wait_time}s for requests to complete and blocks to be freed..."
)
time.sleep(wait_time)
# Phase 2: Analyze consolidator logs
logger.info("Phase 2: Analyzing consolidator deduplication behavior")
log_content = vllm_log.read_text()
# Count blocks removed from vLLM but still in KVBM (deduplication working!)
vllm_removes_but_in_kvbm = len(
re.findall(r"removed from source Vllm, still in.*Kvbm", log_content)
log_content = worker_log.read_text()
# Count blocks removed but still in another source (deduplication working!)
# Order-agnostic: checks for any removal where block still exists in another source
# Pattern: "removed from source X, still in Y source(s): [sources]"
removes_but_still_in_other_source = len(
re.findall(
r"removed from source \w+, still in \d+ source\(s\)",
log_content,
)
)
# Count blocks removed from vLLM as last source (no KVBM copy)
vllm_removes_last_source = len(
re.findall(r"removed from last source Vllm", log_content)
# Count blocks removed from last source (will publish REMOVE)
# Order-agnostic: checks for any removal from last source, regardless of which source
removes_from_last_source = len(
re.findall(
r"removed from last source \w+.*will publish REMOVE event",
log_content,
)
)
# Count REMOVE events actually published to router
......@@ -656,24 +799,31 @@ class TestConsolidatorRouterE2E:
)
logger.info(
f"Blocks removed from vLLM (G1) but still in KVBM (G2/G3): {vllm_removes_but_in_kvbm}"
f"Blocks removed but still in another source (deduplication working): {removes_but_still_in_other_source}"
)
logger.info(
f"Blocks removed from vLLM as last source: {vllm_removes_last_source}"
f"Blocks removed from last source (will publish): {removes_from_last_source}"
)
logger.info(f"REMOVE events published to router: {published_removes}")
# Assertions:
# 1. We should see GPU evictions where blocks still exist in KVBM
# 1. We should see removals where blocks still exist in another source
# This proves deduplication is working (REMOVE not sent to router yet)
# Order doesn't matter - could be engine→KVBM or KVBM→engine
assert (
vllm_removes_but_in_kvbm > 0
), "Expected GPU evictions where blocks still exist in KVBM (deduplication working)"
removes_but_still_in_other_source > 0
), f"Expected removals where blocks still exist in another source (deduplication working) for {engine.upper()}"
# 2. REMOVE events should be published for last-source removals
# Order doesn't matter - could be engine or KVBM as last source
assert (
published_removes > 0
), "Expected REMOVE events to be published for last-source removals"
# 3. Published removes should equal removes from last source
assert (
published_removes == removes_from_last_source
), f"Expected published REMOVE events ({published_removes}) to equal last-source removals ({removes_from_last_source})"
# 3. Check for errors in logs
self.assert_no_errors_in_logs(vllm_log, frontend_log)
self.assert_no_errors_in_logs(worker_log, frontend_log, engine)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment