Unverified Commit 2f8da9ad authored by Alec's avatar Alec Committed by GitHub
Browse files

refactor: rename KvMetricsPublisher to WorkerMetricsPublisher (#1284)

parent 9210a26d
...@@ -82,7 +82,7 @@ metrics --component VllmWorker --endpoint load_metrics ...@@ -82,7 +82,7 @@ metrics --component VllmWorker --endpoint load_metrics
**NOTE**: `load_metrics` is currently a **NOTE**: `load_metrics` is currently a
[hard-coded](https://github.com/ai-dynamo/dynamo/blob/d5220c7b1151372ba3d2a061c7d0a7ed72724789/lib/llm/src/kv_router/publisher.rs#L108) [hard-coded](https://github.com/ai-dynamo/dynamo/blob/d5220c7b1151372ba3d2a061c7d0a7ed72724789/lib/llm/src/kv_router/publisher.rs#L108)
endpoint name used for python-based workers that register a `KvMetricsPublisher`. endpoint name used for python-based workers that register a `WorkerMetricsPublisher`.
## Visualization ## Visualization
......
...@@ -113,7 +113,7 @@ In the above image, our cost function is (KV match - Load) so we select Worker 2 ...@@ -113,7 +113,7 @@ In the above image, our cost function is (KV match - Load) so we select Worker 2
## Events ## Events
In Dynamo, we want to support KV Cache Routing and load balancing for many backends that have different implementations of KV Cache and record different metrics. To that end, we built a KVPublisher that can be plugged into any framework to publish KV Events and a KvMetricsPublisher that can publish Metric Events. In Dynamo, we want to support KV Cache Routing and load balancing for many backends that have different implementations of KV Cache and record different metrics. To that end, we built a KVPublisher that can be plugged into any framework to publish KV Events and a WorkerMetricsPublisher that can publish Metric Events.
On the receiving side we have a KVIndexer which accepts events from the KVPublisher and puts them into a global prefix tree and a KvMetricsAggregator which aggregates metric events by worker. On the receiving side we have a KVIndexer which accepts events from the KVPublisher and puts them into a global prefix tree and a KvMetricsAggregator which aggregates metric events by worker.
...@@ -174,7 +174,7 @@ Sample Output: ...@@ -174,7 +174,7 @@ Sample Output:
This example is designed to help you understand KV cache routing; it won't run outside of the context of dynamo serve. See the examples/ directory for runnable examples. This example is designed to help you understand KV cache routing; it won't run outside of the context of dynamo serve. See the examples/ directory for runnable examples.
``` ```
### KvMetricsPublisher ### WorkerMetricsPublisher
We added a KvMetrics Publisher which sends the following metrics to the KvMetricsAggregator: We added a KvMetrics Publisher which sends the following metrics to the KvMetricsAggregator:
- num_requests_waiting - num_requests_waiting
- gpu_cache_usage_perc - gpu_cache_usage_perc
...@@ -184,7 +184,7 @@ We added a KvMetrics Publisher which sends the following metrics to the KvMetric ...@@ -184,7 +184,7 @@ We added a KvMetrics Publisher which sends the following metrics to the KvMetric
- kv_active_blocks - kv_active_blocks
- kv_total_blocks - kv_total_blocks
Currently, the KvMetricsPublisher exists as a Python binding. Currently, the WorkerMetricsPublisher exists as a Python binding.
### KvMetricsAggregator ### KvMetricsAggregator
The KvMetricsAggregator receives these metrics and aggregates them. It has a method `get_metrics` which returns an object of `AggregatedMetrics`. The KvMetricsAggregator receives these metrics and aggregates them. It has a method `get_metrics` which returns an object of `AggregatedMetrics`.
......
...@@ -210,7 +210,7 @@ KV-aware routing is a powerful feature of Dynamo that optimizes for routing ...@@ -210,7 +210,7 @@ KV-aware routing is a powerful feature of Dynamo that optimizes for routing
requests to specific workers while minimizing a specific KV-cache based cost function. requests to specific workers while minimizing a specific KV-cache based cost function.
In its simplest form, all a worker needs to do to enable KV-aware routing is to In its simplest form, all a worker needs to do to enable KV-aware routing is to
publish KV metrics through the `KvMetricsPublisher`, which is consumed publish KV metrics through the `WorkerMetricsPublisher`, which is consumed
by a Dynamo KV Router through the `KvMetricsAggregator`: by a Dynamo KV Router through the `KvMetricsAggregator`:
```python ```python
...@@ -221,7 +221,7 @@ import logging ...@@ -221,7 +221,7 @@ import logging
import random import random
from pydantic import BaseModel from pydantic import BaseModel
from dynamo.llm import KvMetricsPublisher from dynamo.llm import WorkerMetricsPublisher
from dynamo.sdk import endpoint, service, dynamo_context from dynamo.sdk import endpoint, service, dynamo_context
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -241,7 +241,7 @@ class YourWorker: ...@@ -241,7 +241,7 @@ class YourWorker:
def __init__(self): def __init__(self):
# Initialize metrics publisher from Dynamo # Initialize metrics publisher from Dynamo
self.component = dynamo_context["component"] self.component = dynamo_context["component"]
self.metrics_publisher = KvMetricsPublisher() self.metrics_publisher = WorkerMetricsPublisher()
# Register an endpoint for consumers of the KV Metrics # Register an endpoint for consumers of the KV Metrics
# (KvMetricsAggregator) to listen/gather on. # (KvMetricsAggregator) to listen/gather on.
self.metrics_publisher.create_endpoint(self.component) self.metrics_publisher.create_endpoint(self.component)
......
...@@ -31,7 +31,7 @@ from vllm.entrypoints.openai.api_server import ( ...@@ -31,7 +31,7 @@ from vllm.entrypoints.openai.api_server import (
from vllm.remote_prefill import RemotePrefillParams, RemotePrefillRequest from vllm.remote_prefill import RemotePrefillParams, RemotePrefillRequest
from vllm.sampling_params import RequestOutputKind from vllm.sampling_params import RequestOutputKind
from dynamo.llm import KvMetricsPublisher from dynamo.llm import WorkerMetricsPublisher
from dynamo.sdk import async_on_start, depends, dynamo_context, endpoint, service from dynamo.sdk import async_on_start, depends, dynamo_context, endpoint, service
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -87,7 +87,7 @@ class VllmWorker: ...@@ -87,7 +87,7 @@ class VllmWorker:
os.environ["VLLM_KV_NAMESPACE"] = "dynamo" os.environ["VLLM_KV_NAMESPACE"] = "dynamo"
os.environ["VLLM_KV_COMPONENT"] = class_name os.environ["VLLM_KV_COMPONENT"] = class_name
self.metrics_publisher = KvMetricsPublisher() self.metrics_publisher = WorkerMetricsPublisher()
signal.signal(signal.SIGTERM, self.shutdown_vllm_engine) signal.signal(signal.SIGTERM, self.shutdown_vllm_engine)
signal.signal(signal.SIGINT, self.shutdown_vllm_engine) signal.signal(signal.SIGINT, self.shutdown_vllm_engine)
......
...@@ -42,7 +42,7 @@ from tensorrt_llm.llmapi.disagg_utils import ( ...@@ -42,7 +42,7 @@ from tensorrt_llm.llmapi.disagg_utils import (
) )
from tensorrt_llm.serve.openai_protocol import DisaggregatedParams from tensorrt_llm.serve.openai_protocol import DisaggregatedParams
from dynamo.llm import KvEventPublisher, KvMetricsPublisher from dynamo.llm import KvEventPublisher, WorkerMetricsPublisher
from dynamo.sdk import dynamo_context from dynamo.sdk import dynamo_context
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -134,7 +134,7 @@ class BaseTensorrtLLMEngine: ...@@ -134,7 +134,7 @@ class BaseTensorrtLLMEngine:
self._publish_events = False self._publish_events = False
if self._publish_stats: if self._publish_stats:
self._kv_metrics_publisher = KvMetricsPublisher() self._kv_metrics_publisher = WorkerMetricsPublisher()
if self._publish_events: if self._publish_events:
if self._worker_id is None: if self._worker_id is None:
......
...@@ -34,7 +34,7 @@ from vllm.inputs import TokensPrompt ...@@ -34,7 +34,7 @@ from vllm.inputs import TokensPrompt
from vllm.remote_prefill import RemotePrefillParams, RemotePrefillRequest from vllm.remote_prefill import RemotePrefillParams, RemotePrefillRequest
from vllm.sampling_params import RequestOutputKind from vllm.sampling_params import RequestOutputKind
from dynamo.llm import KvMetricsPublisher, ModelType, register_llm from dynamo.llm import ModelType, WorkerMetricsPublisher, register_llm
from dynamo.sdk import async_on_start, depends, dynamo_context, endpoint, service from dynamo.sdk import async_on_start, depends, dynamo_context, endpoint, service
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -90,7 +90,7 @@ class VllmWorker: ...@@ -90,7 +90,7 @@ class VllmWorker:
os.environ["VLLM_KV_NAMESPACE"] = "dynamo" os.environ["VLLM_KV_NAMESPACE"] = "dynamo"
os.environ["VLLM_KV_COMPONENT"] = class_name os.environ["VLLM_KV_COMPONENT"] = class_name
self.metrics_publisher = KvMetricsPublisher() self.metrics_publisher = WorkerMetricsPublisher()
model_config = self.engine_args.create_model_config() model_config = self.engine_args.create_model_config()
self.default_sampling_params = model_config.get_diff_sampling_param() self.default_sampling_params = model_config.get_diff_sampling_param()
......
...@@ -26,7 +26,7 @@ from vllm.entrypoints.openai.api_server import ( ...@@ -26,7 +26,7 @@ from vllm.entrypoints.openai.api_server import (
) )
from vllm.inputs import TokensPrompt from vllm.inputs import TokensPrompt
from dynamo.llm import KvMetricsPublisher, ModelType, register_llm from dynamo.llm import ModelType, WorkerMetricsPublisher, register_llm
from dynamo.runtime import DistributedRuntime, dynamo_worker from dynamo.runtime import DistributedRuntime, dynamo_worker
# Only used if you run it manually from the command line # Only used if you run it manually from the command line
...@@ -59,7 +59,7 @@ class RequestHandler: ...@@ -59,7 +59,7 @@ class RequestHandler:
self.component = component self.component = component
self.engine_client = engine self.engine_client = engine
self.default_sampling_params = default_sampling_params self.default_sampling_params = default_sampling_params
self.metrics_publisher = KvMetricsPublisher() self.metrics_publisher = WorkerMetricsPublisher()
def setup_kv_metrics(self): def setup_kv_metrics(self):
if not hasattr(self.engine_client, "set_metrics_publisher"): if not hasattr(self.engine_client, "set_metrics_publisher"):
......
...@@ -33,8 +33,8 @@ from vllm.v1.metrics.loggers import StatLoggerBase ...@@ -33,8 +33,8 @@ from vllm.v1.metrics.loggers import StatLoggerBase
from vllm.v1.metrics.stats import IterationStats, SchedulerStats from vllm.v1.metrics.stats import IterationStats, SchedulerStats
from dynamo.llm import ( from dynamo.llm import (
KvMetricsPublisher,
ModelType, ModelType,
WorkerMetricsPublisher,
ZmqKvEventPublisher, ZmqKvEventPublisher,
ZmqKvEventPublisherConfig, ZmqKvEventPublisherConfig,
register_llm, register_llm,
...@@ -64,10 +64,10 @@ class Config: ...@@ -64,10 +64,10 @@ class Config:
class DynamoStatLoggerPublisher(StatLoggerBase): class DynamoStatLoggerPublisher(StatLoggerBase):
"""Stat logger publisher. Wrapper for the KvMetricsPublisher to match the StatLoggerBase interface.""" """Stat logger publisher. Wrapper for the WorkerMetricsPublisher to match the StatLoggerBase interface."""
def __init__(self, component: Component, dp_rank: int) -> None: def __init__(self, component: Component, dp_rank: int) -> None:
self.inner = KvMetricsPublisher() self.inner = WorkerMetricsPublisher()
self.inner.create_endpoint(component) self.inner.create_endpoint(component)
self.dp_rank = dp_rank self.dp_rank = dp_rank
......
...@@ -51,7 +51,7 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { ...@@ -51,7 +51,7 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<AsyncResponseStream>()?; m.add_class::<AsyncResponseStream>()?;
m.add_class::<llm::kv::KvRouter>()?; m.add_class::<llm::kv::KvRouter>()?;
m.add_class::<llm::disagg_router::DisaggregatedRouter>()?; m.add_class::<llm::disagg_router::DisaggregatedRouter>()?;
m.add_class::<llm::kv::KvMetricsPublisher>()?; m.add_class::<llm::kv::WorkerMetricsPublisher>()?;
m.add_class::<llm::model_card::ModelDeploymentCard>()?; m.add_class::<llm::model_card::ModelDeploymentCard>()?;
m.add_class::<llm::preprocessor::OAIChatPreprocessor>()?; m.add_class::<llm::preprocessor::OAIChatPreprocessor>()?;
m.add_class::<llm::backend::Backend>()?; m.add_class::<llm::backend::Backend>()?;
......
...@@ -63,15 +63,16 @@ impl KvRouter { ...@@ -63,15 +63,16 @@ impl KvRouter {
} }
#[pyclass] #[pyclass]
pub(crate) struct KvMetricsPublisher { pub(crate) struct WorkerMetricsPublisher {
inner: Arc<llm_rs::kv_router::publisher::KvMetricsPublisher>, inner: Arc<llm_rs::kv_router::publisher::WorkerMetricsPublisher>,
} }
#[pymethods] #[pymethods]
impl KvMetricsPublisher { impl WorkerMetricsPublisher {
#[new] #[new]
fn new() -> PyResult<Self> { fn new() -> PyResult<Self> {
let inner = llm_rs::kv_router::publisher::KvMetricsPublisher::new().map_err(to_pyerr)?; let inner =
llm_rs::kv_router::publisher::WorkerMetricsPublisher::new().map_err(to_pyerr)?;
Ok(Self { Ok(Self {
inner: inner.into(), inner: inner.into(),
}) })
......
...@@ -344,16 +344,16 @@ class DisaggregatedRouter: ...@@ -344,16 +344,16 @@ class DisaggregatedRouter:
""" """
... ...
class KvMetricsPublisher: class WorkerMetricsPublisher:
""" """
A metrics publisher will provide KV metrics to the router. A metrics publisher will provide metrics to the router.
""" """
... ...
def __init__(self) -> None: def __init__(self) -> None:
""" """
Create a `KvMetricsPublisher` object Create a `WorkerMetricsPublisher` object
""" """
def create_service(self, component: Component) -> None: def create_service(self, component: Component) -> None:
......
...@@ -29,11 +29,11 @@ from dynamo._core import HttpService as HttpService ...@@ -29,11 +29,11 @@ from dynamo._core import HttpService as HttpService
from dynamo._core import KvEventPublisher as KvEventPublisher from dynamo._core import KvEventPublisher as KvEventPublisher
from dynamo._core import KvIndexer as KvIndexer from dynamo._core import KvIndexer as KvIndexer
from dynamo._core import KvMetricsAggregator as KvMetricsAggregator from dynamo._core import KvMetricsAggregator as KvMetricsAggregator
from dynamo._core import KvMetricsPublisher as KvMetricsPublisher
from dynamo._core import KvRecorder as KvRecorder from dynamo._core import KvRecorder as KvRecorder
from dynamo._core import KvRouter as KvRouter from dynamo._core import KvRouter as KvRouter
from dynamo._core import ModelType as ModelType from dynamo._core import ModelType as ModelType
from dynamo._core import OverlapScores as OverlapScores from dynamo._core import OverlapScores as OverlapScores
from dynamo._core import WorkerMetricsPublisher as WorkerMetricsPublisher
from dynamo._core import ZmqKvEventPublisher as ZmqKvEventPublisher from dynamo._core import ZmqKvEventPublisher as ZmqKvEventPublisher
from dynamo._core import ZmqKvEventPublisherConfig as ZmqKvEventPublisherConfig from dynamo._core import ZmqKvEventPublisherConfig as ZmqKvEventPublisherConfig
from dynamo._core import register_llm as register_llm from dynamo._core import register_llm as register_llm
......
...@@ -11,7 +11,7 @@ from contextlib import asynccontextmanager ...@@ -11,7 +11,7 @@ from contextlib import asynccontextmanager
from queue import Queue from queue import Queue
from typing import Callable, Optional, Union from typing import Callable, Optional, Union
from dynamo.llm import KvEventPublisher, KvMetricsPublisher from dynamo.llm import KvEventPublisher, WorkerMetricsPublisher
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
...@@ -113,7 +113,7 @@ class Publisher: ...@@ -113,7 +113,7 @@ class Publisher:
def initialize(self): def initialize(self):
# Setup the metrics publisher # Setup the metrics publisher
self.metrics_publisher = KvMetricsPublisher() self.metrics_publisher = WorkerMetricsPublisher()
self._init_publish_metrics_thread() self._init_publish_metrics_thread()
task = asyncio.create_task(self._create_metrics_publisher_endpoint()) task = asyncio.create_task(self._create_metrics_publisher_endpoint())
task.add_done_callback( task.add_done_callback(
......
...@@ -28,7 +28,7 @@ from dynamo.llm import ( ...@@ -28,7 +28,7 @@ from dynamo.llm import (
KvEventPublisher, KvEventPublisher,
KvIndexer, KvIndexer,
KvMetricsAggregator, KvMetricsAggregator,
KvMetricsPublisher, WorkerMetricsPublisher,
) )
from dynamo.runtime import Component, DistributedRuntime from dynamo.runtime import Component, DistributedRuntime
...@@ -256,7 +256,7 @@ async def test_metrics_aggregator(distributed_runtime): ...@@ -256,7 +256,7 @@ async def test_metrics_aggregator(distributed_runtime):
async def metrics_publisher_task(kv_listener, expected_metrics): async def metrics_publisher_task(kv_listener, expected_metrics):
metrics_publisher = KvMetricsPublisher() metrics_publisher = WorkerMetricsPublisher()
metrics_publisher.publish( metrics_publisher.publish(
expected_metrics["request_active_slots"], expected_metrics["request_active_slots"],
expected_metrics["request_total_slots"], expected_metrics["request_total_slots"],
......
...@@ -463,15 +463,15 @@ enum RawKvEvent { ...@@ -463,15 +463,15 @@ enum RawKvEvent {
// Metrics Publishers ------------------------------------------------------ // Metrics Publishers ------------------------------------------------------
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
pub struct KvMetricsPublisher { pub struct WorkerMetricsPublisher {
tx: tokio::sync::watch::Sender<Arc<ForwardPassMetrics>>, tx: tokio::sync::watch::Sender<Arc<ForwardPassMetrics>>,
rx: tokio::sync::watch::Receiver<Arc<ForwardPassMetrics>>, rx: tokio::sync::watch::Receiver<Arc<ForwardPassMetrics>>,
} }
impl KvMetricsPublisher { impl WorkerMetricsPublisher {
pub fn new() -> Result<Self> { pub fn new() -> Result<Self> {
let (tx, rx) = tokio::sync::watch::channel(Arc::new(ForwardPassMetrics::default())); let (tx, rx) = tokio::sync::watch::channel(Arc::new(ForwardPassMetrics::default()));
Ok(KvMetricsPublisher { tx, rx }) Ok(WorkerMetricsPublisher { tx, rx })
} }
pub fn publish( pub fn publish(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment