Unverified Commit c5458b99 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore(router): Remove deprecated best_worker_id and metrics_labels (#5412)


Signed-off-by: default avatarGraham King <grahamk@nvidia.com>
parent e22bb037
...@@ -145,11 +145,11 @@ class StandaloneRouterHandler: ...@@ -145,11 +145,11 @@ class StandaloneRouterHandler:
logger.error("KvPushRouter not initialized - cannot get best worker") logger.error("KvPushRouter not initialized - cannot get best worker")
raise RuntimeError("Router not initialized") raise RuntimeError("Router not initialized")
result = await self.kv_push_router.best_worker_id( (worker_id, _dp_rank, _overlap_blocks) = await self.kv_push_router.best_worker(
token_ids, router_config_override token_ids, router_config_override
) )
yield result yield worker_id
def parse_args(): def parse_args():
......
...@@ -80,7 +80,7 @@ class DynamoSglangPublisher: ...@@ -80,7 +80,7 @@ class DynamoSglangPublisher:
self.generate_endpoint = generate_endpoint self.generate_endpoint = generate_endpoint
self.component = component self.component = component
self.metrics_publisher = WorkerMetricsPublisher() self.metrics_publisher = WorkerMetricsPublisher()
self.metrics_publisher.create_endpoint(component, metrics_labels) self.metrics_publisher.create_endpoint(component)
# Set default values (can be overridden later if needed) # Set default values (can be overridden later if needed)
self.request_total_slots = 1024 self.request_total_slots = 1024
......
...@@ -322,9 +322,7 @@ class Publisher: ...@@ -322,9 +322,7 @@ class Publisher:
if self.metrics_publisher is None: if self.metrics_publisher is None:
logging.error("KV metrics publisher not initialized!") logging.error("KV metrics publisher not initialized!")
return return
await self.metrics_publisher.create_endpoint( await self.metrics_publisher.create_endpoint(self.component)
self.component, self.metrics_labels
)
def initialize(self): def initialize(self):
# Setup the metrics publisher # Setup the metrics publisher
......
...@@ -47,7 +47,7 @@ class DynamoStatLoggerPublisher(StatLoggerBase): ...@@ -47,7 +47,7 @@ class DynamoStatLoggerPublisher(StatLoggerBase):
self.inner = WorkerMetricsPublisher() self.inner = WorkerMetricsPublisher()
# Use labels directly for the new create_endpoint signature # Use labels directly for the new create_endpoint signature
metrics_labels = metrics_labels or [] metrics_labels = metrics_labels or []
self.inner.create_endpoint(component, metrics_labels) self.inner.create_endpoint(component)
self.dp_rank = dp_rank self.dp_rank = dp_rank
self.num_gpu_block = 1 self.num_gpu_block = 1
self.request_total_slots = 1 self.request_total_slots = 1
......
...@@ -517,15 +517,11 @@ The `KvPushRouter` provides the following methods: ...@@ -517,15 +517,11 @@ The `KvPushRouter` provides the following methods:
- Without `request_id`: Query-only, doesn't update router state - Without `request_id`: Query-only, doesn't update router state
- With `request_id`: Updates router state to track the request. **Note**: If used with `request_id`, you must call `mark_prefill_complete()` and `free()` at the appropriate lifecycle points to maintain accurate load tracking - With `request_id`: Updates router state to track the request. **Note**: If used with `request_id`, you must call `mark_prefill_complete()` and `free()` at the appropriate lifecycle points to maintain accurate load tracking
- **`best_worker_id(token_ids, router_config_override=None, request_id=None)`**: **[DEPRECATED - use `best_worker()` instead]** Query which worker would be selected for given tokens. Returns `(worker_id, overlap_blocks)`.
- Without `request_id`: Query-only, doesn't update router state
- With `request_id`: Updates router state to track the request. **Note**: If used with `request_id`, you must call `mark_prefill_complete()` and `free()` at the appropriate lifecycle points to maintain accurate load tracking
- **`get_potential_loads(token_ids)`**: Get detailed load information for all workers, including potential prefill tokens and active decode blocks. Returns a list of load dictionaries. - **`get_potential_loads(token_ids)`**: Get detailed load information for all workers, including potential prefill tokens and active decode blocks. Returns a list of load dictionaries.
- **`mark_prefill_complete(request_id)`**: Signal that a request has completed its prefill phase. Only used for [manual lifecycle management](#2-manual-state-management-advanced) when using `best_worker_id()` for manual routing instead of `generate()`. - **`mark_prefill_complete(request_id)`**: Signal that a request has completed its prefill phase. Only used for [manual lifecycle management](#2-manual-state-management-advanced) when using `best_worker()` for manual routing instead of `generate()`.
- **`free(request_id)`**: Signal that a request has completed and its resources should be released. Only used for [manual lifecycle management](#2-manual-state-management-advanced) when using `best_worker_id()` for manual routing instead of `generate()`. - **`free(request_id)`**: Signal that a request has completed and its resources should be released. Only used for [manual lifecycle management](#2-manual-state-management-advanced) when using `best_worker()` for manual routing instead of `generate()`.
- **`dump_events()`**: Dump all KV cache events from the router's indexer as a JSON string. Useful for debugging and analysis. - **`dump_events()`**: Dump all KV cache events from the router's indexer as a JSON string. Useful for debugging and analysis.
...@@ -603,9 +599,9 @@ stream = await router.generate(token_ids=tokens, model="model-name") ...@@ -603,9 +599,9 @@ stream = await router.generate(token_ids=tokens, model="model-name")
- **Router automatically**: Selects best worker, updates state, routes request, tracks lifecycle - **Router automatically**: Selects best worker, updates state, routes request, tracks lifecycle
#### 2. Manual State Management (Advanced) #### 2. Manual State Management (Advanced)
Use `best_worker_id(request_id=...)` to select and track, then manage the request yourself: Use `best_worker(request_id=...)` to select and track, then manage the request yourself:
```python ```python
worker_id, overlap = await router.best_worker_id(tokens, request_id="req-123") worker_id, _dp_rank, overlap = await router.best_worker(tokens, request_id="req-123")
response = await client.generate(tokens, request_id="req-123") response = await client.generate(tokens, request_id="req-123")
# await anext(response) # Get first token # await anext(response) # Get first token
await router.mark_prefill_complete("req-123") # After first token await router.mark_prefill_complete("req-123") # After first token
...@@ -621,8 +617,8 @@ await router.free("req-123") # After completion ...@@ -621,8 +617,8 @@ await router.free("req-123") # After completion
Query without state updates, then route through a chosen router: Query without state updates, then route through a chosen router:
```python ```python
# Probe multiple routers without updating state # Probe multiple routers without updating state
worker_id_1, overlap_1 = await router_1.best_worker_id(tokens) # No request_id worker_id_1, dp_rank, overlap_1 = await router_1.best_worker(tokens) # No request_id
worker_id_2, overlap_2 = await router_2.best_worker_id(tokens) worker_id_2, dp_rank, overlap_2 = await router_2.best_worker(tokens)
# Pick the best router based on results # Pick the best router based on results
chosen_router = router_1 if overlap_1 > overlap_2 else router_2 chosen_router = router_1 if overlap_1 > overlap_2 else router_2
...@@ -698,7 +694,7 @@ if __name__ == "__main__": ...@@ -698,7 +694,7 @@ if __name__ == "__main__":
This approach gives you complete control over routing decisions, allowing you to optimize for different metrics based on your specific requirements. As some examples: This approach gives you complete control over routing decisions, allowing you to optimize for different metrics based on your specific requirements. As some examples:
- **Minimize TTFT**: Select worker with lowest `potential_prefill_tokens` - **Minimize TTFT**: Select worker with lowest `potential_prefill_tokens`
- **Maximize cache reuse**: Use `best_worker_id()` which considers both prefill and decode loads - **Maximize cache reuse**: Use `best_worker()` which considers both prefill and decode loads
- **Balance load**: Consider both `potential_prefill_tokens` and `potential_decode_blocks` together - **Balance load**: Consider both `potential_prefill_tokens` and `potential_decode_blocks` together
See [KV Router Architecture](../router/README.md) for performance tuning details. See [KV Router Architecture](../router/README.md) for performance tuning details.
......
...@@ -54,11 +54,9 @@ class DynamoStatLoggerPublisher(StatLoggerBase): ...@@ -54,11 +54,9 @@ class DynamoStatLoggerPublisher(StatLoggerBase):
self, self,
component: Component, component: Component,
dp_rank: int, dp_rank: int,
metrics_labels: Optional[List[Tuple[str, str]]] = None,
) -> None: ) -> None:
self.inner = WorkerMetricsPublisher() self.inner = WorkerMetricsPublisher()
metrics_labels = metrics_labels or [] self.inner.create_endpoint(component)
self.inner.create_endpoint(component, metrics_labels)
self.dp_rank = dp_rank self.dp_rank = dp_rank
self.num_gpu_block = 1 self.num_gpu_block = 1
self.request_total_slots = 1 self.request_total_slots = 1
...@@ -165,9 +163,7 @@ class StatLoggerFactory: ...@@ -165,9 +163,7 @@ class StatLoggerFactory:
def create_stat_logger(self, dp_rank: int) -> StatLoggerBase: def create_stat_logger(self, dp_rank: int) -> StatLoggerBase:
if self.dp_rank != dp_rank: if self.dp_rank != dp_rank:
return NullStatLogger() return NullStatLogger()
logger = DynamoStatLoggerPublisher( logger = DynamoStatLoggerPublisher(self.component, dp_rank)
self.component, dp_rank, metrics_labels=self.metrics_labels
)
self.created_logger = logger self.created_logger = logger
return logger return logger
......
...@@ -75,26 +75,12 @@ impl WorkerMetricsPublisher { ...@@ -75,26 +75,12 @@ impl WorkerMetricsPublisher {
}) })
} }
#[pyo3(signature = (component, metrics_labels = None))] #[pyo3(signature = (component))]
#[allow(unused_variables)]
fn create_endpoint<'p>( fn create_endpoint<'p>(
&self, &self,
py: Python<'p>, py: Python<'p>,
component: Component, component: Component,
metrics_labels: Option<Vec<(String, String)>>, // TODO: fully remove this
) -> PyResult<Bound<'p, PyAny>> { ) -> PyResult<Bound<'p, PyAny>> {
// Emit deprecation warning if metrics_labels is provided
if metrics_labels.is_some() {
let warnings = py.import("warnings")?;
warnings.call_method1(
"warn",
(
"The 'metrics_labels' parameter is deprecated and no longer used. It will be removed in a future version.",
py.get_type::<pyo3::exceptions::PyDeprecationWarning>(),
),
)?;
}
let rs_publisher = self.inner.clone(); let rs_publisher = self.inner.clone();
let rs_component = component.inner.clone(); let rs_component = component.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move { pyo3_async_runtimes::tokio::future_into_py(py, async move {
...@@ -1357,52 +1343,6 @@ impl KvPushRouter { ...@@ -1357,52 +1343,6 @@ impl KvPushRouter {
}) })
} }
/// Deprecated: Use `best_worker()` instead which returns (worker_id, dp_rank, overlap_blocks)
#[pyo3(signature = (token_ids, router_config_override=None, request_id=None))]
fn best_worker_id<'p>(
&self,
py: Python<'p>,
token_ids: Vec<u32>,
router_config_override: Option<PyObject>,
request_id: Option<String>,
) -> PyResult<Bound<'p, PyAny>> {
// Issue deprecation warning
let warnings = py.import("warnings")?;
warnings.call_method1(
"warn",
(
"best_worker_id() is deprecated. Use best_worker() instead which returns (worker_id, dp_rank, overlap_blocks)",
py.get_type::<pyo3::exceptions::PyDeprecationWarning>(),
),
)?;
let router_config_override = if let Some(obj) = router_config_override {
let override_config: llm_rs::kv_router::RouterConfigOverride =
depythonize(obj.bind(py)).map_err(to_pyerr)?;
Some(override_config)
} else {
None
};
let chooser = self.inner.chooser.clone();
let update_states = request_id.is_some();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let (best_worker, overlap_blocks) = chooser
.find_best_match(
request_id.as_deref(),
&token_ids,
router_config_override.as_ref(),
update_states,
)
.await
.map_err(to_pyerr)?;
// Return only worker_id and overlap_blocks for backward compatibility
Ok((best_worker.worker_id, overlap_blocks))
})
}
/// Mark prefill as completed for a request /// Mark prefill as completed for a request
fn mark_prefill_complete<'p>( fn mark_prefill_complete<'p>(
&self, &self,
......
...@@ -466,16 +466,12 @@ class WorkerMetricsPublisher: ...@@ -466,16 +466,12 @@ class WorkerMetricsPublisher:
Create a `WorkerMetricsPublisher` object Create a `WorkerMetricsPublisher` object
""" """
def create_endpoint(self, component: Component, metrics_labels: Optional[List[Tuple[str, str]]] = None) -> None: def create_endpoint(self, component: Component) -> None:
""" """
Only service created through this method will interact with KV router of the same component. Only service created through this method will interact with KV router of the same component.
Args: Args:
component: The component to create the endpoint for component: The component to create the endpoint for
metrics_labels: [DEPRECATED] This parameter is no longer used and will be removed in a future version
.. deprecated::
The metrics_labels parameter is deprecated and has no effect.
""" """
def publish( def publish(
...@@ -1540,34 +1536,6 @@ class KvPushRouter: ...@@ -1540,34 +1536,6 @@ class KvPushRouter:
""" """
... ...
async def best_worker_id(
self,
token_ids: List[int],
router_config_override: Optional[JsonLike] = None,
request_id: Optional[str] = None,
) -> Tuple[int, int]:
"""
[DEPRECATED] Use best_worker() instead which returns (worker_id, dp_rank, overlap_blocks).
Find the best matching worker for the given tokens.
Args:
token_ids: List of token IDs to find matches for
router_config_override: Optional router configuration override
request_id: Optional request ID. If provided, router states will be updated
to track this request (active blocks, lifecycle events). If not
provided, this is a query-only operation that doesn't affect state.
Returns:
A tuple of (worker_id, overlap_blocks) where:
- worker_id: The ID of the best matching worker
- overlap_blocks: The number of overlapping blocks found
.. deprecated::
Use :meth:`best_worker` instead which also returns dp_rank.
"""
...
async def get_potential_loads( async def get_potential_loads(
self, self,
token_ids: List[int], token_ids: List[int],
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment