Unverified Commit f67dc38b authored by Alec's avatar Alec Committed by GitHub
Browse files

fix: Renamed event publisher classes and configuration (#1273)

parent d3a7587a
......@@ -5,11 +5,10 @@
# Can also be used standalone: `python3 vllm_inc.py` - lots of optional cmd line params
# Setup checklist:
# - We are in a virtualenv with vllm installed. Must be newer than v0.9.0 (currently pre-release)
# 1f079540db5f1080a2f61a730da50d3009934c5a - this commit is working for me
# - We are in a virtualenv with vllm installed. V1 is compatible with v0.9.0
# Steps:
# git clone https://github.com/vllm-project/vllm.git
# cd vllm && git checkout 1f079540db5f1080a2f61a730da50d3009934c5a
# cd vllm && git checkout v0.9.0
# uv pip uninstall ai-dynamo-vllm
# VLLM_USE_PRECOMPILED=1 uv pip install --editable .
......@@ -34,10 +33,10 @@ from vllm.v1.metrics.loggers import StatLoggerBase
from vllm.v1.metrics.stats import IterationStats, SchedulerStats
from dynamo.llm import (
KvEventPublisherFromZmq,
KvEventPublisherFromZmqConfig,
KvMetricsPublisher,
ModelType,
ZmqKvEventPublisher,
ZmqKvEventPublisherConfig,
register_llm,
)
from dynamo.runtime import Component, DistributedRuntime, dynamo_worker
......@@ -248,11 +247,11 @@ async def init(runtime: DistributedRuntime, config: Config):
logger.info("VllmWorker has been initialized")
zmq_config = KvEventPublisherFromZmqConfig(
zmq_config = ZmqKvEventPublisherConfig(
worker_id=endpoint.lease_id(), kv_block_size=engine_args.block_size
)
_ = KvEventPublisherFromZmq(component=component, config=zmq_config)
_ = ZmqKvEventPublisher(component=component, config=zmq_config)
handler = RequestHandler(component, engine_client, default_sampling_params)
......
......@@ -61,8 +61,8 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<llm::kv::AggregatedMetrics>()?;
m.add_class::<llm::kv::KvMetricsAggregator>()?;
m.add_class::<llm::kv::KvEventPublisher>()?;
m.add_class::<llm::kv::KvEventPublisherFromZmq>()?;
m.add_class::<llm::kv::KvEventPublisherFromZmqConfig>()?;
m.add_class::<llm::kv::ZmqKvEventPublisher>()?;
m.add_class::<llm::kv::ZmqKvEventPublisherConfig>()?;
m.add_class::<llm::kv::KvRecorder>()?;
m.add_class::<llm::nats::NatsQueue>()?;
m.add_class::<http::HttpService>()?;
......
......@@ -128,7 +128,7 @@ impl KvMetricsPublisher {
#[pyclass]
#[derive(Clone)]
pub struct KvEventPublisherFromZmqConfig {
pub struct ZmqKvEventPublisherConfig {
#[pyo3(get, set)]
pub worker_id: i64,
#[pyo3(get, set)]
......@@ -140,7 +140,7 @@ pub struct KvEventPublisherFromZmqConfig {
}
#[pymethods]
impl KvEventPublisherFromZmqConfig {
impl ZmqKvEventPublisherConfig {
#[new]
#[pyo3(signature = (
worker_id,
......@@ -164,16 +164,16 @@ impl KvEventPublisherFromZmqConfig {
}
#[pyclass]
pub(crate) struct KvEventPublisherFromZmq {
inner: llm_rs::kv_router::publisher::KvEventPublisherFromZmq,
pub(crate) struct ZmqKvEventPublisher {
inner: llm_rs::kv_router::publisher::ZmqKvEventPublisher,
}
#[pymethods]
impl KvEventPublisherFromZmq {
impl ZmqKvEventPublisher {
#[new]
fn new(component: Component, config: KvEventPublisherFromZmqConfig) -> PyResult<Self> {
fn new(component: Component, config: ZmqKvEventPublisherConfig) -> PyResult<Self> {
let mut inner =
llm_rs::kv_router::publisher::KvEventPublisherFromZmq::new(config.kv_block_size);
llm_rs::kv_router::publisher::ZmqKvEventPublisher::new(config.kv_block_size);
inner.start_background_task(
component.inner,
config.worker_id,
......
......@@ -579,7 +579,7 @@ class KvEventPublisher:
"""
...
class KvEventPublisherFromZmqConfig:
class ZmqKvEventPublisherConfig:
def __init__(
self,
worker_id: int,
......@@ -588,7 +588,7 @@ class KvEventPublisherFromZmqConfig:
zmq_topic: str = ""
) -> None:
"""
Configuration for the KvEventPublisherFromZmq.
Configuration for the ZmqKvEventPublisher.
:param worker_id: The worker ID.
:param kv_block_size: The block size for the key-value store.
......@@ -597,10 +597,10 @@ class KvEventPublisherFromZmqConfig:
"""
...
class KvEventPublisherFromZmq:
def __init__(self, component: Component, config: KvEventPublisherFromZmqConfig) -> None:
class ZmqKvEventPublisher:
def __init__(self, component: Component, config: ZmqKvEventPublisherConfig) -> None:
"""
Initializes a new KvEventPublisherFromZmq instance.
Initializes a new ZmqKvEventPublisher instance.
:param component: The component to be used.
:param config: Configuration for the event publisher.
......
......@@ -27,8 +27,6 @@ from dynamo._core import HttpAsyncEngine as HttpAsyncEngine
from dynamo._core import HttpError as HttpError
from dynamo._core import HttpService as HttpService
from dynamo._core import KvEventPublisher as KvEventPublisher
from dynamo._core import KvEventPublisherFromZmq as KvEventPublisherFromZmq
from dynamo._core import KvEventPublisherFromZmqConfig as KvEventPublisherFromZmqConfig
from dynamo._core import KvIndexer as KvIndexer
from dynamo._core import KvMetricsAggregator as KvMetricsAggregator
from dynamo._core import KvMetricsPublisher as KvMetricsPublisher
......@@ -36,6 +34,8 @@ from dynamo._core import KvRecorder as KvRecorder
from dynamo._core import KvRouter as KvRouter
from dynamo._core import ModelType as ModelType
from dynamo._core import OverlapScores as OverlapScores
from dynamo._core import ZmqKvEventPublisher as ZmqKvEventPublisher
from dynamo._core import ZmqKvEventPublisherConfig as ZmqKvEventPublisherConfig
from dynamo._core import register_llm as register_llm
try:
......
......@@ -41,7 +41,9 @@ pub struct WorkerSelectionResult {
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ForwardPassMetrics {
pub data_parallel_rank: Option<u32>, // backwards compatible
// https://lmsys.org/blog/2024-12-04-sglang-v0-4/#data-parallelism-attention-for-deepseek-models
// Data parallel ranks are semi-independent, so we need to track metrics at the DP level
pub data_parallel_rank: Option<u32>, // Optional for backwards compatibility
pub request_active_slots: u64,
pub request_total_slots: u64,
pub kv_active_blocks: u64,
......
......@@ -92,7 +92,7 @@ fn start_publish_task(
// For more info on zmq: https://zeromq.org/
// This publisher reads those events and publishes them to NATS
// The indexer will get the events from NATS and put them in the global prefix tree.
pub struct KvEventPublisherFromZmq {
pub struct ZmqKvEventPublisher {
kv_block_size: usize,
processor_handle: Option<tokio::task::JoinHandle<()>>,
zmq_handle: Option<tokio::task::JoinHandle<()>>,
......@@ -100,7 +100,7 @@ pub struct KvEventPublisherFromZmq {
warning_count: Arc<AtomicU32>,
}
impl KvEventPublisherFromZmq {
impl ZmqKvEventPublisher {
pub fn new(kv_block_size: usize) -> Self {
Self {
kv_block_size,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment