Unverified Commit 3e417022 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: remove and unify bindings in kv.rs (#6016)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent b0f54344
......@@ -17,8 +17,8 @@ if TYPE_CHECKING:
from dynamo.common.utils.prometheus import register_engine_metrics_callback
from dynamo.llm import (
KvEventPublisher,
WorkerMetricsPublisher,
ZmqKvEventPublisher,
ZmqKvEventPublisherConfig,
)
from dynamo.runtime import Component, Endpoint
......@@ -88,7 +88,7 @@ class DynamoSglangPublisher:
self.dp_rank = 0
self._running = True
self.kv_publishers: List[ZmqKvEventPublisher] = []
self.kv_publishers: List[KvEventPublisher] = []
# ZMQ setup for receiving scheduler metrics (leader node only)
# Non-leader nodes don't receive scheduler metrics via this socket - they only
......@@ -169,7 +169,7 @@ class DynamoSglangPublisher:
logging.info("Sending dummy metrics to initialize")
self.metrics_publisher.publish(self.dp_rank, 0)
def init_kv_event_publish(self) -> List[ZmqKvEventPublisher]:
def init_kv_event_publish(self) -> List[KvEventPublisher]:
"""Initialize KV event publisher(s) if configured.
For DP attention mode, creates one subscriber per LOCAL DP rank port.
......@@ -184,7 +184,7 @@ class DynamoSglangPublisher:
- NATS handles cross-node event distribution
Returns:
List of ZmqKvEventPublisher instances if kv_events_config is set,
List of KvEventPublisher instances if kv_events_config is set,
empty list otherwise.
"""
if self.server_args.kv_events_config:
......@@ -239,8 +239,8 @@ class DynamoSglangPublisher:
f"Setting up ZMQ kv event subscriber for dp_rank={dp_rank} "
f"(connecting to {zmq_ep})"
)
publisher = ZmqKvEventPublisher(
component=self.component, config=zmq_config
publisher = KvEventPublisher(
component=self.component, zmq_config=zmq_config
)
self.kv_publishers.append(publisher)
......
......@@ -40,10 +40,10 @@ from dynamo.common.config_dump import dump_config
from dynamo.common.utils.endpoint_types import parse_endpoint_types
from dynamo.common.utils.prometheus import register_engine_metrics_callback
from dynamo.llm import (
KvEventPublisher,
ModelInput,
ModelRuntimeConfig,
ModelType,
ZmqKvEventPublisher,
ZmqKvEventPublisherConfig,
register_llm,
)
......@@ -491,8 +491,8 @@ async def init(
zmq_endpoint=consolidator_output_connect_endpoint,
zmq_topic="", # Empty topic = all topics
)
consolidator_publisher = ZmqKvEventPublisher(
component, consolidator_config
consolidator_publisher = KvEventPublisher(
component, zmq_config=consolidator_config
)
logging.info(
f"Created worker-side publisher for consolidated events: "
......
......@@ -9,13 +9,13 @@ and publishes them either to ZMQ (for consolidator) or NATS (direct to router).
Key Components:
- ZmqKvEventPublisher: Pure Python ZMQ PUBLISHER that publishes TensorRT-LLM KV events
to ZMQ (so the consolidator can subscribe). This is different from the ZmqKvEventPublisher
in dynamo.llm, which is a Rust-based ZMQ SUBSCRIBER that subscribes from consolidator
and publishes to NATS.
to ZMQ (so the consolidator can subscribe). This is different from KvEventPublisher
in dynamo.llm, which is a Rust-based class that can optionally subscribe from a ZMQ
source and publishes to NATS.
- Publisher: Main class that coordinates event publishing (ZMQ or NATS) and metrics publishing.
Event Flow:
- With Consolidator: Engine → ZmqKvEventPublisher (ZMQ PUB) → Consolidator → ZmqKvEventPublisher (dynamo.llm, ZMQ SUB) → NATS → Router
- With Consolidator: Engine → ZmqKvEventPublisher (ZMQ PUB) → Consolidator → KvEventPublisher (dynamo.llm, ZMQ SUB) → NATS → Router
- Without Consolidator: Engine → KvEventPublisher (NATS PUB) → Router
"""
......@@ -65,9 +65,9 @@ class ZmqKvEventPublisher:
Pure Python ZMQ PUBLISHER for TensorRT-LLM KV events.
This class publishes TensorRT-LLM's KV cache events to ZMQ so that the consolidator
can subscribe to them. This is different from the ZmqKvEventPublisher in dynamo.llm,
which is a Rust-based ZMQ SUBSCRIBER that subscribes from the consolidator's ZMQ
output and publishes to NATS.
can subscribe to them. This is different from KvEventPublisher in dynamo.llm,
which is a Rust-based class that can optionally subscribe from a ZMQ source
and publishes to NATS.
Event Format: [timestamp, [events], data_parallel_rank]
Message Format: multipart ZMQ message [topic, sequence, payload] where payload is
......@@ -278,7 +278,7 @@ class Publisher:
- If zmq_endpoint None: Uses KvEventPublisher (NATS PUB) → Router directly
Note: The ZmqKvEventPublisher used here is the pure Python ZMQ publisher defined
in this module, not the Rust-based ZmqKvEventPublisher from dynamo.llm (which is
in this module, not the Rust-based KvEventPublisher from dynamo.llm (which is
used in main.py as the worker-side subscriber from consolidator to NATS).
"""
......@@ -357,7 +357,7 @@ class Publisher:
# Publisher selection based on consolidator configuration:
# - With consolidator: Use ZmqKvEventPublisher (this module) → ZMQ → Consolidator → NATS → Router
# - Without consolidator: Use KvEventPublisher → NATS → Router (direct)
# Note: The worker-side ZmqKvEventPublisher (from dynamo.llm) that subscribes from
# Note: The worker-side KvEventPublisher (from dynamo.llm) that subscribes from
# consolidator and publishes to NATS is created separately in main.py, not here.
if self.zmq_kv_event_publisher:
logging.info(
......
......@@ -26,9 +26,9 @@ from dynamo.common.utils.input_params import InputParamManager
from dynamo.common.utils.media_nixl import read_decoded_media_via_nixl
from dynamo.common.utils.otel_tracing import build_trace_headers
from dynamo.llm import (
KvEventPublisher,
ModelInput,
ModelType,
ZmqKvEventPublisher,
lora_name_to_id,
register_llm,
unregister_llm,
......@@ -252,7 +252,7 @@ class BaseWorkerHandler(ABC):
self.component = component
self.engine_client = engine
self.default_sampling_params = default_sampling_params
self.kv_publishers: list[ZmqKvEventPublisher] | None = None
self.kv_publishers: list[KvEventPublisher] | None = None
self.generate_endpoint = generate_endpoint
self.config = config
self.engine_monitor = VllmEngineMonitor(runtime, engine, shutdown_event)
......
......@@ -19,10 +19,10 @@ from dynamo.common.config_dump import dump_config
from dynamo.common.utils.endpoint_types import parse_endpoint_types
from dynamo.common.utils.prometheus import register_engine_metrics_callback
from dynamo.llm import (
KvEventPublisher,
ModelInput,
ModelRuntimeConfig,
ModelType,
ZmqKvEventPublisher,
ZmqKvEventPublisherConfig,
fetch_llm,
register_llm,
......@@ -347,7 +347,7 @@ def setup_kv_event_publisher(
vllm_config,
consolidator_enabled: bool = False,
consolidator_port: Optional[int] = 5558,
) -> Optional[ZmqKvEventPublisher]:
) -> Optional[KvEventPublisher]:
"""
Set up KV event publishers for prefix caching if enabled.
Creates one publisher per dp_rank since each dp_rank publishes to a different port.
......@@ -360,7 +360,7 @@ def setup_kv_event_publisher(
consolidator_port: Port where kv event consolidator publishes (default: 5558)
Returns:
List of ZmqKvEventPublisher instances (one per dp_rank) if prefix caching is enabled, None otherwise.
List of KvEventPublisher instances (one per dp_rank) if prefix caching is enabled, None otherwise.
"""
if not config.engine_args.enable_prefix_caching:
return None
......@@ -408,7 +408,7 @@ def setup_kv_event_publisher(
enable_local_indexer=config.enable_local_indexer,
dp_rank=dp_rank,
)
kv_publisher = ZmqKvEventPublisher(component=component, config=zmq_config)
kv_publisher = KvEventPublisher(component=component, zmq_config=zmq_config)
kv_publishers.append(kv_publisher)
logger.info(
......
......@@ -185,10 +185,10 @@ flowchart LR
### Part 1: ZMQ Subscriber (Dynamo Bindings)
If your engine already publishes to ZMQ, use `ZmqKvEventPublisher` to subscribe and forward to NATS:
If your engine already publishes to ZMQ, use `KvEventPublisher` with a `ZmqKvEventPublisherConfig` to subscribe and forward to NATS:
```python
from dynamo.llm import ZmqKvEventPublisher, ZmqKvEventPublisherConfig
from dynamo.llm import KvEventPublisher, ZmqKvEventPublisherConfig
# Configure the ZMQ subscriber
config = ZmqKvEventPublisherConfig(
......@@ -200,9 +200,9 @@ config = ZmqKvEventPublisherConfig(
)
# Create publisher - it automatically subscribes to ZMQ and forwards to NATS
kv_publisher = ZmqKvEventPublisher(
kv_publisher = KvEventPublisher(
component=component,
config=config,
zmq_config=config,
)
```
......
......@@ -23,7 +23,7 @@ from vllm.utils.argparse_utils import FlexibleArgumentParser
from vllm.v1.engine.async_llm import AsyncLLM
import dynamo.nixl_connect as connect
from dynamo.llm import ZmqKvEventPublisher, ZmqKvEventPublisherConfig
from dynamo.llm import KvEventPublisher, ZmqKvEventPublisherConfig
from dynamo.runtime import Component, DistributedRuntime, Endpoint, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging
......@@ -168,7 +168,7 @@ class VllmBaseWorker:
kv_block_size=vllm_config.cache_config.block_size,
zmq_endpoint=zmq_endpoint,
)
self.kv_publisher = ZmqKvEventPublisher(component=component, config=zmq_config)
self.kv_publisher = KvEventPublisher(component=component, zmq_config=zmq_config)
logger.info(f"Reading Events from {zmq_endpoint}")
......
......@@ -167,9 +167,7 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<llm::kv::KvEventPublisher>()?;
m.add_class::<llm::kv::RadixTree>()?;
m.add_class::<llm::kv::ZmqKvEventListener>()?;
m.add_class::<llm::kv::ZmqKvEventPublisher>()?;
m.add_class::<llm::kv::ZmqKvEventPublisherConfig>()?;
m.add_class::<llm::kv::KvRecorder>()?;
m.add_class::<llm::lora::LoRADownloader>()?;
m.add_class::<http::HttpService>()?;
m.add_class::<http::HttpAsyncEngine>()?;
......
......@@ -13,7 +13,6 @@ use crate::Component;
use llm_rs::kv_router::indexer::KvIndexerInterface;
use llm_rs::kv_router::protocols::compute_block_hash_for_seq;
use rs::pipeline::{AsyncEngine, SingleIn};
use rs::transports::event_plane::EventSubscriber;
use tracing;
use llm_rs::kv_router::protocols::*;
......@@ -149,34 +148,6 @@ impl ZmqKvEventPublisherConfig {
}
}
#[pyclass]
pub(crate) struct ZmqKvEventPublisher {
inner: llm_rs::kv_router::publisher::KvEventPublisher,
}
#[pymethods]
impl ZmqKvEventPublisher {
#[new]
fn new(component: Component, config: ZmqKvEventPublisherConfig) -> PyResult<Self> {
let inner = llm_rs::kv_router::publisher::KvEventPublisher::new_with_local_indexer(
component.inner,
config.kv_block_size as u32,
Some(KvEventSourceConfig::Zmq {
endpoint: config.zmq_endpoint,
topic: config.zmq_topic,
}),
config.enable_local_indexer,
config.dp_rank,
)
.map_err(to_pyerr)?;
Ok(Self { inner })
}
fn shutdown(&mut self) {
self.inner.shutdown()
}
}
/// A ZMQ-based key-value cache event listener that operates independently
/// of the dynamo runtime or event plane infrastructure.
#[pyclass]
......@@ -261,26 +232,42 @@ pub(crate) struct KvEventPublisher {
#[pymethods]
impl KvEventPublisher {
#[new]
#[pyo3(signature = (component, worker_id, kv_block_size, dp_rank=0, enable_local_indexer=false))]
#[pyo3(signature = (component, worker_id=0, kv_block_size=0, dp_rank=0, enable_local_indexer=false, zmq_config=None))]
fn new(
component: Component,
worker_id: WorkerId,
kv_block_size: usize,
dp_rank: DpRank,
enable_local_indexer: bool,
zmq_config: Option<ZmqKvEventPublisherConfig>,
) -> PyResult<Self> {
// worker_id is not used; connection_id is inferred from the component.
let _ = worker_id;
// When zmq_config is provided, use its fields for kv_block_size/dp_rank/enable_local_indexer
let (kv_block_size, dp_rank, enable_local_indexer, source_config) =
if let Some(ref cfg) = zmq_config {
(
cfg.kv_block_size,
cfg.dp_rank,
cfg.enable_local_indexer,
Some(KvEventSourceConfig::Zmq {
endpoint: cfg.zmq_endpoint.clone(),
topic: cfg.zmq_topic.clone(),
}),
)
} else {
(kv_block_size, dp_rank, enable_local_indexer, None)
};
if kv_block_size == 0 {
return Err(to_pyerr(anyhow::anyhow!("kv_block_size cannot be 0")));
}
// Note: worker_id parameter matches the Python stub (_core.pyi) signature but is not used.
// The actual worker_id is inferred from component's connection_id in the Rust implementation.
let _ = worker_id;
let inner = llm_rs::kv_router::publisher::KvEventPublisher::new_with_local_indexer(
component.inner,
kv_block_size as u32,
None,
source_config,
enable_local_indexer,
dp_rank,
)
......@@ -371,6 +358,14 @@ impl KvEventPublisher {
inner.publish(event).map_err(to_pyerr)
})
}
fn shutdown(&mut self) {
// If no other Arc clones exist, shut down eagerly.
// Otherwise the Drop impl handles cleanup when the last reference is freed.
if let Some(inner) = Arc::get_mut(&mut self.inner) {
inner.shutdown();
}
}
}
#[pyclass]
......@@ -863,133 +858,6 @@ impl ApproxKvIndexer {
}
}
#[pyclass]
pub(crate) struct KvRecorder {
inner: Arc<llm_rs::kv_router::recorder::KvRecorder>,
}
#[pymethods]
impl KvRecorder {
#[new]
#[pyo3(signature = (component, output_path=None, max_lines_per_file=None, max_count=None, max_time=None))]
fn new(
component: Component,
output_path: Option<String>,
max_lines_per_file: Option<usize>,
max_count: Option<usize>,
max_time: Option<f64>,
) -> PyResult<Self> {
let runtime = pyo3_async_runtimes::tokio::get_runtime();
runtime.block_on(async {
let token = component.inner.drt().runtime().child_token();
// Create a temp path if none provided
let path = match output_path {
Some(p) => p,
None => {
let temp_dir = std::env::temp_dir();
temp_dir
.join("kv_events.jsonl")
.to_string_lossy()
.to_string()
}
};
let inner = llm_rs::kv_router::recorder::KvRecorder::new(
token.clone(),
path,
max_lines_per_file,
max_count,
max_time,
)
.await
.map_err(to_pyerr)?;
// Subscribe to KV events
let mut kv_events_rx = EventSubscriber::for_component(
&component.inner,
llm_rs::kv_router::KV_EVENT_SUBJECT,
)
.await
.map_err(to_pyerr)?
.typed::<llm_rs::kv_router::protocols::RouterEvent>();
let event_tx = inner.event_sender();
// Spawn a task to forward events to the recorder
tokio::spawn(async move {
while let Some(result) = kv_events_rx.next().await {
let event = match result {
Ok((_envelope, event)) => event,
Err(e) => {
tracing::warn!("KvRecorder failed to decode kv event: {:?}", e);
continue;
}
};
tracing::debug!("KvRecorder received kv event: {:?}", event);
if let Err(e) = event_tx.send(event).await {
tracing::trace!(
"KvRecorder failed to send kv event; shutting down: {:?}",
e
);
break;
}
}
});
Ok(Self {
inner: Arc::new(inner),
})
})
}
fn event_count<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let recorder = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let count = recorder.event_count().await;
Ok(count)
})
}
fn elapsed_time<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let recorder = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
match recorder.elapsed_time().await {
Ok(elapsed) => Ok(elapsed.as_secs_f64()),
Err(_) => Ok(0.0), // Return 0.0 when no events have been received yet
}
})
}
#[pyo3(signature = (indexer, timed=false, max_count=None, max_time=None))]
fn replay_events<'py>(
&self,
py: Python<'py>,
indexer: &KvIndexer,
timed: bool,
max_count: Option<usize>,
max_time: Option<f64>,
) -> PyResult<Bound<'py, PyAny>> {
let event_tx = indexer.inner.event_sender();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let count = llm_rs::kv_router::recorder::KvRecorder::send_events(
"dummy_path", // This doesn't matter as we'll use the provided event_tx
&event_tx,
timed,
max_count,
max_time,
)
.await
.map_err(to_pyerr)?;
Ok(count)
})
}
fn shutdown(&self) -> PyResult<()> {
self.inner.shutdown();
Ok(())
}
}
/// Helper function to create a KV router from an endpoint using the ModelManager
/// to ensure proper etcd registration.
/// Infers worker type using endpoint naming and router config:
......
......@@ -657,9 +657,9 @@ class ApproxKvIndexer:
...
class KvRecorder:
class KvEventPublisher:
"""
A recorder for KV Router events.
A KV event publisher will publish KV events corresponding to the component.
"""
...
......@@ -667,88 +667,28 @@ class KvRecorder:
def __init__(
self,
component: Component,
output_path: Optional[str] = None,
max_lines_per_file: Optional[int] = None,
max_count: Optional[int] = None,
max_time: Optional[float] = None,
worker_id: int = 0,
kv_block_size: int = 0,
dp_rank: int = 0,
enable_local_indexer: bool = False,
zmq_config: Optional[ZmqKvEventPublisherConfig] = None,
) -> None:
"""
Create a new KvRecorder instance.
Args:
component: The component to associate with this recorder
output_path: Path to the JSONL file to write events to
max_lines_per_file: Maximum number of lines per file before rotating to a new file
max_count: Maximum number of events to record before shutting down
max_time: Maximum duration in seconds to record before shutting down
"""
...
def event_count(self) -> int:
"""
Get the count of recorded events.
Returns:
The number of events recorded
"""
...
def elapsed_time(self) -> float:
"""
Get the elapsed time since the recorder was started.
Create a `KvEventPublisher` object.
Returns:
The elapsed time in seconds as a float
"""
...
When zmq_config is provided, the publisher subscribes to a ZMQ socket for
incoming engine events (e.g. from SGLang/vLLM) and relays them to NATS.
The zmq_config fields override kv_block_size, dp_rank, and enable_local_indexer.
def replay_events(
self,
indexer: KvIndexer,
timed: bool = False,
max_count: Optional[int] = None,
max_time: Optional[float] = None,
) -> int:
"""
Populate an indexer with the recorded events.
Args:
indexer: The KvIndexer to populate with events
timed: If true, events will be sent according to their recorded timestamps.
If false, events will be sent without any delay in between.
max_count: Maximum number of events to send before stopping
max_time: Maximum duration in seconds to send events before stopping
Returns:
The number of events sent to the indexer
"""
...
def shutdown(self) -> None:
"""
Shutdown the recorder.
"""
...
class KvEventPublisher:
"""
A KV event publisher will publish KV events corresponding to the component.
"""
...
def __init__(
self, component: Component, worker_id: int, kv_block_size: int, dp_rank: int = 0, enable_local_indexer: bool = False
) -> None:
"""
Create a `KvEventPublisher` object
When zmq_config is None, events are pushed manually via publish_stored/publish_removed.
Args:
component: The component to publish events for
worker_id: The worker ID
kv_block_size: The KV block size (must be > 0)
dp_rank: The data parallel rank (defaults to 0)
enable_local_indexer: Enable worker-local KV indexer (defaults to False)
worker_id: The worker ID (unused, inferred from component)
kv_block_size: The KV block size (must be > 0; ignored if zmq_config is set)
dp_rank: The data parallel rank (defaults to 0; ignored if zmq_config is set)
enable_local_indexer: Enable worker-local KV indexer (ignored if zmq_config is set)
zmq_config: Optional ZMQ configuration for relay mode
"""
def publish_stored(
......@@ -784,6 +724,12 @@ class KvEventPublisher:
"""
...
def shutdown(self) -> None:
"""
Shuts down the event publisher, stopping any background tasks.
"""
...
class ZmqKvEventPublisherConfig:
def __init__(
self,
......@@ -795,7 +741,7 @@ class ZmqKvEventPublisherConfig:
dp_rank: int = 0
) -> None:
"""
Configuration for the ZmqKvEventPublisher.
ZMQ configuration for KvEventPublisher relay mode.
:param worker_id: The worker ID.
:param kv_block_size: The block size for the key-value store.
......@@ -806,22 +752,6 @@ class ZmqKvEventPublisherConfig:
"""
...
class ZmqKvEventPublisher:
def __init__(self, component: Component, config: ZmqKvEventPublisherConfig) -> None:
"""
Initializes a new ZmqKvEventPublisher instance.
:param component: The component to be used.
:param config: Configuration for the event publisher.
"""
...
def shutdown(self) -> None:
"""
Shuts down the event publisher, stopping any background tasks.
"""
...
class HttpService:
"""
A HTTP service for dynamo applications.
......
......@@ -14,7 +14,6 @@ from dynamo._core import KserveGrpcService as KserveGrpcService
from dynamo._core import KvEventPublisher as KvEventPublisher
from dynamo._core import KvIndexer as KvIndexer
from dynamo._core import KvPushRouter as KvPushRouter
from dynamo._core import KvRecorder as KvRecorder
from dynamo._core import KvRouterConfig as KvRouterConfig
from dynamo._core import LoRADownloader as LoRADownloader
from dynamo._core import MediaDecoder as MediaDecoder
......@@ -30,7 +29,6 @@ from dynamo._core import RouterConfig as RouterConfig
from dynamo._core import RouterMode as RouterMode
from dynamo._core import WorkerMetricsPublisher as WorkerMetricsPublisher
from dynamo._core import ZmqKvEventListener as ZmqKvEventListener
from dynamo._core import ZmqKvEventPublisher as ZmqKvEventPublisher
from dynamo._core import ZmqKvEventPublisherConfig as ZmqKvEventPublisherConfig
from dynamo._core import compute_block_hash_for_seq_py as compute_block_hash_for_seq_py
from dynamo._core import fetch_llm as fetch_llm
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment