Unverified Commit 7fabe7bf authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix: do not delete KV events jetstream (#2800)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 488c8709
......@@ -130,19 +130,13 @@ def parse_args():
default=0.0,
help="KV Router: Temperature for worker sampling via softmax. Higher values promote more randomness, and 0 fallbacks to deterministic.",
)
parser.add_argument(
"--kv-events",
action="store_true",
dest="use_kv_events",
help=" KV Router: Whether to use KV events to maintain the view of cached blocks. If false, would use ApproxKvRouter for predicting block creation / deletion based only on incoming requests at a timer.",
)
parser.add_argument(
"--no-kv-events",
action="store_false",
dest="use_kv_events",
help=" KV Router. Disable KV events.",
default=True,
help="KV Router: Disable KV events. When set, uses ApproxKvRouter for predicting block creation/deletion based only on incoming requests at a timer. By default, KV events are enabled.",
)
parser.set_defaults(use_kv_events=True)
parser.add_argument(
"--router-replica-sync",
action="store_true",
......@@ -156,11 +150,11 @@ def parse_args():
help="KV Router: Number of messages in stream before triggering a snapshot. Defaults to 10000.",
)
parser.add_argument(
"--router-persist-states",
action="store_false",
"--router-reset-states",
action="store_true",
dest="router_reset_states",
default=True,
help="KV Router: Persist router state on startup. Keep existing state from stream and object store (default: reset states).",
default=False,
help="KV Router: Reset router state on startup, purging stream and object store. By default, states are persisted. WARNING: This can affect existing router replicas.",
)
parser.add_argument(
"--busy-threshold",
......
......@@ -21,14 +21,17 @@ The main KV-aware routing arguments:
- `--router-temperature`: Controls worker selection randomness through softmax sampling of router cost logits. A value of 0 (default) ensures deterministic selection of the lowest-cost worker, while higher values introduce more randomness.
- `--use-kv-events`/`--no-kv-events`: Determines how the router tracks cached blocks. When enabled (default), uses `KvIndexer` to monitor block creation and deletion events. When disabled, uses `ApproxKvIndexer`, which estimates cache hits based on a fixed time window (120s). Disable this if your backend doesn't support KV events.
- `--no-kv-events`: Disables KV event tracking. By default (when this flag is not provided), the router uses `KvIndexer` to monitor block creation and deletion events. When disabled with this flag, uses `ApproxKvIndexer`, which estimates cache hits based on a fixed time window (120s). Use this flag if your backend doesn't support KV events (or you are not confident in the accuracy or responsiveness of the events).
- `--router-replica-sync`: Enables NATS-based synchronization of local routing decisions between router replicas. When enabled, routers share their active sequence information and local predictions of block usage, improving routing consistency across instances. Note that this does not sync the radix tree or cached KV block states themselves - those are synchronized through JetStream events. Disabled by default.
- `--router-replica-sync`: Disabled by default. Enables NATS-based synchronization of local routing decisions between router replicas. When enabled, routers share their active sequence information and local predictions of block usage, improving routing consistency across instances. Note that this does not sync the radix tree or cached KV block states themselves - those are synchronized through JetStream events
- `--router-reset-states`/`--router-persist-states`: Controls whether the router state is reset on startup. When `--router-reset-states` is used (default), the router clears both the JetStream event stream and NATs object store, starting with a fresh state. When `--router-persist-states` is used, the router retains existing state from previous runs, downloading any available snapshot from NATs object store and continuing to consume events from where it left off. This enables routers to maintain KV cache awareness across restarts. **Note**: State persistence is only available when `--use-kv-events` is enabled (default). When using `--no-kv-events` with `ApproxKvIndexer`, state persistence is not supported.
- `--router-reset-states`: When specified, resets the router state on startup by clearing both the JetStream event stream and NATS object store, starting with a fresh state. By default (when this flag is not provided), the router persists state across restarts, downloading any available snapshot from NATS object store and continuing to consume events from where it left off. This enables routers to maintain KV cache awareness across restarts. **Warning**: Using `--router-reset-states` can bring existing router replicas into an inconsistent state. Only use this flag when launching the first router replica in a component, or consider using a different namespace/component for a clean slate.
- `--router-snapshot-threshold`: Sets the number of messages in the JetStream before triggering a snapshot. When the message count exceeds this threshold, a router will attempt to purge acknowledged messages from the stream and create a snapshot of the current radix tree state in NATs object store. Defaults to 10000. This helps manage stream size and provides faster initialization for routers that restart.
>[!Note]
> State persistence is only available when KV events are enabled (default). When using `--no-kv-events` with `ApproxKvIndexer`, state persistence is not currently supported.
## Architecture
Colloquially, we refer to a Dynamo component that serves an endpoint for LLM inference as a **worker**.
......@@ -50,30 +53,52 @@ We can then use the default routing methods exposed by the client class to send
KV Cache routing uses direct routing with a special worker selection algorithm.
## Serving Two Router Replicas
## Serving Multiple Router Replicas
For improved fault tolerance, you can launch multiple frontend + router replicas. Since the frontend and router are currently tied together, you'll need to use different HTTP ports for each instance. (The separation of the frontend and Router is WIP.)
### Router State Management
For improved fault tolerance, you can launch two frontend + router replicas. Since the frontend and router are currently tied together, you'll need to use two different HTTP ports for each instance.
The KV Router tracks two types of state (see [KV Router Architecture](../components/router/README.md) for details):
To enable state sharing between the router replicas (which provides more accurate routing decisions), use the `--router-replica-sync` flag when starting the frontend. Router replicas are currently tied to a component, and state syncing and sharing can only happen within the component group. Here's an example of running multiple router replicas:
1. **Prefix blocks (cached KV blocks)**: Maintained in a radix tree, tracking which blocks are cached on each worker. This state is **persistent** - backed by NATS JetStream events and object store snapshots. New router replicas automatically sync this state on startup, ensuring consistent cache awareness across restarts.
2. **Active blocks (decoding blocks)**: Tracks blocks currently being used for active generation requests. This state is **ephemeral** - when a new router replica starts, it begins with zero active block knowledge but becomes eventually consistent as it handles requests.
### Enabling Router Replica Synchronization
```bash
# Router replica 1
python -m dynamo.frontend --router-mode kv --port 8000 --router-replica-sync
# Router replica 2 (can be started later, note the extra --router-persist-states arg)
python -m dynamo.frontend --router-mode kv --port 8001 --router-replica-sync --router-persist-states
# Router replica 2 (can be started later)
python -m dynamo.frontend --router-mode kv --port 8001 --router-replica-sync
```
After these two replicas are launched, they will share the same JetStream and snapshot state. The second replica can be started after the first has already been handling requests. As long as `--router-persist-states` is set, the new replica will sync its KV block indexer by consuming the JetStream events and/or downloading the latest snapshot, ensuring both replicas have the same view of cached blocks across workers. It's okay for one router to go down, or even both to go down - the state persistence ensures continuity (up to the message retention of an hour we set for the stream). When a third router starts (with `--router-persist-states`), the states will still persist:
The `--router-replica-sync` flag enables active block synchronization between replicas:
- Active blocks are shared via NATS core messaging (fire-and-forget)
- Replicas exchange routing decisions to maintain consistent load estimates
- A new replica start with zero active blocks but quickly converge through request handling, by itself and active syncing with other replicas
Without this flag, each replica maintains its own isolated view of active blocks, potentially leading to suboptimal routing.
### Persistence and Recovery
**Prefix blocks persist by default:**
- Stored in NATS JetStream with 1-hour retention
- Snapshots saved to NATS object store at configurable thresholds
- New replicas automatically restore this state on startup
You can a launch a third Router replica even if the first two Router replicas are down, and it will recover the full prefix state. (As mentioned above, the tracking of active blocks will not persist, but will become eventually consistent through request handling.)
```bash
# Router replica 3 (can be started even after replicas 1 and 2 have gone down)
python -m dynamo.frontend --router-mode kv --port 8002 --router-replica-sync --router-persist-states
python -m dynamo.frontend --router-mode kv --port 8002 --router-replica-sync
```
> **Note:** If a router replica is launched without the `--router-persist-states` flag, the entire stream and radix snapshot will be purged. If you want to serve a separate router (targeting a different set of workers) independently without affecting the current state, consider using a new namespace/component (see [Distributed Runtime](distributed_runtime.md)) which will start a new stream and NATS object store path.
When `--router-replica-sync` is enabled, the router replicas will additionally share their local routing decisions and active sequence predictions via NATS. Active blocks information is communicated between routers in a fire-and-forget manner, but the routers will quickly become consistent as this information is tied to the request cycle. This helps maintain consistent load estimates across instances even when requests are distributed between routers.
>[!Note]
> If you need to start with a fresh state, you have two options:
> 1. **Recommended**: Use a different namespace/component (see [Distributed Runtime](distributed_runtime.md)) which will start a new stream and NATS object store path
> 2. **Use with caution**: Launch a router with the `--router-reset-states` flag, which will purge the entire stream and radix snapshot. This should only be done when launching the first router replica in a component, as it can bring existing router replicas into an inconsistent state.
## Understanding KV Cache
The leading Large Language Models (LLMs) today are auto-regressive and based off of the [transformer architecture](https://proceedings.neurips.cc/paper_files/paper/2017/file/3f5ee243547dee91fbd053c1c4a845aa-Paper.pdf). One key inference optimization technique is to cache the already computed keys and values and to reuse them for the future tokens. This is called the [KV Cache](https://developer.nvidia.com/blog/mastering-llm-techniques-inference-optimization/#key-value_caching).
......@@ -197,6 +222,74 @@ Each event carries a unique router ID to prevent self-event processing. This asy
### Event Persistence and Recovery
KV cache events are persisted in NATS JetStream, allowing router replicas to maintain their global view of KV blocks across restarts. When a router starts with `--router-persist-states`, it downloads any available snapshot from NATs object store and continues consuming events from its last acknowledged position in the stream.
KV cache events are persisted in NATS JetStream, allowing router replicas to maintain their global view of KV blocks across restarts. By default, routers persist their state - they download any available snapshot from NATS object store and continue consuming events from their last acknowledged position in the stream. This default behavior ensures KV cache awareness is maintained across router restarts without any additional configuration.
To manage stream growth, when the message count exceeds `--router-snapshot-threshold`, a router acquires an etcd-based distributed lock, purges acknowledged messages from the stream, and uploads the current radix tree state to NATS object store. This snapshot serves as a checkpoint for faster initialization of future router instances.
## Using KvPushRouter Python API
Instead of launching the KV Router via command line, you can create a `KvPushRouter` object directly in Python. This allows per-request routing configuration overrides.
### Setup
First, launch your backend engines:
```bash
python -m dynamo.vllm --model meta-llama/Llama-2-7b-hf --endpoint dyn://inference.vllm.generate
```
### Example Script
```python
import asyncio
from dynamo._core import DistributedRuntime, KvPushRouter, KvRouterConfig
async def main():
# Get runtime and create endpoint
runtime = DistributedRuntime.detached()
namespace = runtime.namespace("inference")
component = namespace.component("vllm")
endpoint = component.endpoint("generate")
# Create KV router
kv_router_config = KvRouterConfig()
router = KvPushRouter(
endpoint=endpoint,
block_size=16,
kv_router_config=kv_router_config
)
# Your input tokens
token_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# Generate with per-request routing override
stream = await router.generate(
token_ids=token_ids,
model="meta-llama/Llama-2-7b-hf",
stop_conditions={
"max_tokens": 20, # Generate exactly 20 tokens
"ignore_eos": True, # Don't stop at EOS token
},
sampling_options={
"temperature": 0.7,
"top_p": 0.9,
},
router_config_override={
"overlap_score_weight": 2.0, # Prioritize cache hits for this request
"router_temperature": 0.5, # Add routing randomness
}
)
# Collect generated tokens
generated_tokens = []
async for response in stream:
if isinstance(response, dict) and "token_ids" in response:
generated_tokens.extend(response["token_ids"])
print(f"Generated {len(generated_tokens)} tokens: {generated_tokens}")
if __name__ == "__main__":
asyncio.run(main())
```
To manage stream growth, when the message count exceeds `--router-snapshot-threshold`, a router acquires an etcd-based distributed lock, purges acknowledged messages from the stream, and uploads the current radix tree state to NATs object store. This snapshot serves as a checkpoint for faster initialization of future router instances.
The `router_config_override` parameter allows you to adjust routing behavior per request without recreating the router. This is useful for implementing different routing strategies based on request characteristics.
......@@ -144,70 +144,3 @@ The `router_temperature` parameter controls routing randomness:
- To reduce TTFT: Increase the weight
- To reduce ITL: Decrease the weight
4. If you observe severe load imbalance, increase the temperature setting
## Using KvPushRouter Python API
Instead of launching the KV Router via command line, you can create a `KvPushRouter` object directly in Python. This allows per-request routing configuration overrides.
### Setup
First, launch your backend engines:
```bash
python -m dynamo.vllm --model meta-llama/Llama-2-7b-hf --endpoint dyn://inference.vllm.generate
```
### Example Script
```python
import asyncio
from dynamo._core import DistributedRuntime, KvPushRouter, KvRouterConfig
async def main():
# Get runtime and create endpoint
runtime = DistributedRuntime.detached()
namespace = runtime.namespace("inference")
component = namespace.component("vllm")
endpoint = component.endpoint("generate")
# Create KV router
kv_router_config = KvRouterConfig()
router = KvPushRouter(
endpoint=endpoint,
block_size=16,
kv_router_config=kv_router_config
)
# Your input tokens
token_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# Generate with per-request routing override
stream = await router.generate(
token_ids=token_ids,
model="meta-llama/Llama-2-7b-hf",
stop_conditions={
"max_tokens": 20, # Generate exactly 20 tokens
"ignore_eos": True, # Don't stop at EOS token
},
sampling_options={
"temperature": 0.7,
"top_p": 0.9,
},
router_config_override={
"overlap_score_weight": 2.0, # Prioritize cache hits for this request
"router_temperature": 0.5, # Add routing randomness
}
)
# Collect generated tokens
generated_tokens = []
async for response in stream:
if isinstance(response, dict) and "token_ids" in response:
generated_tokens.extend(response["token_ids"])
print(f"Generated {len(generated_tokens)} tokens: {generated_tokens}")
if __name__ == "__main__":
asyncio.run(main())
```
The `router_config_override` parameter allows you to adjust routing behavior per request without recreating the router. This is useful for implementing different routing strategies based on request characteristics.
\ No newline at end of file
......@@ -42,7 +42,7 @@ impl KvRouterConfig {
#[pymethods]
impl KvRouterConfig {
#[new]
#[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, router_replica_sync=false, router_snapshot_threshold=10000, router_reset_states=true))]
#[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, router_replica_sync=false, router_snapshot_threshold=10000, router_reset_states=false))]
fn new(
overlap_score_weight: f64,
router_temperature: f64,
......
......@@ -1198,6 +1198,61 @@ class ZmqKvEventListener:
"""
...
class KvPushRouter:
"""
A KV-aware push router that performs intelligent routing based on KV cache overlap.
"""
def __init__(
self,
endpoint: Endpoint,
block_size: int,
kv_router_config: KvRouterConfig,
) -> None:
"""
Create a new KvPushRouter instance.
Args:
endpoint: The endpoint to connect to for routing requests
block_size: The KV cache block size
kv_router_config: Configuration for the KV router
"""
...
async def generate(
self,
token_ids: List[int],
model: str,
stop_conditions: Optional[JsonLike] = None,
sampling_options: Optional[JsonLike] = None,
output_options: Optional[JsonLike] = None,
router_config_override: Optional[JsonLike] = None,
) -> AsyncIterator[JsonLike]:
"""
Generate text using the KV-aware router.
Args:
token_ids: Input token IDs
model: Model name to use for generation
stop_conditions: Optional stop conditions for generation
sampling_options: Optional sampling configuration
output_options: Optional output configuration
router_config_override: Optional router configuration override
Returns:
An async iterator yielding generation responses
"""
...
async def dump_events(self) -> str:
"""
Dump all events from the KV router's indexer.
Returns:
A JSON string containing all indexer events
"""
...
class EntrypointArgs:
"""
Settings to connect an input to a worker and run them.
......
......@@ -109,7 +109,7 @@ pub struct KvRouterConfig {
/// Threshold for triggering snapshots. If None, no snapshots will be performed.
pub router_snapshot_threshold: Option<u32>,
/// Whether to reset the router state on startup (default: true)
/// Whether to reset the router state on startup (default: false)
pub router_reset_states: bool,
}
......@@ -122,7 +122,7 @@ impl Default for KvRouterConfig {
router_replica_sync: false,
max_num_batched_tokens: 8192,
router_snapshot_threshold: Some(10000),
router_reset_states: true,
router_reset_states: false,
}
}
}
......
......@@ -271,7 +271,7 @@ impl ActiveSequencesMultiWorker {
let component_clone = component.clone();
let router_id_clone = router_id;
component.drt().runtime().secondary().spawn(async move {
tokio::spawn(async move {
// NATS subscription loop
if let Err(e) = Self::subscribe_to_events(
senders_clone,
......
......@@ -170,7 +170,7 @@ pub async fn start_kv_router_background(
None
};
component.drt().runtime().secondary().spawn(async move {
tokio::spawn(async move {
let mut check_interval = tokio::time::interval(Duration::from_secs(1));
check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
......@@ -181,6 +181,7 @@ pub async fn start_kv_router_background(
_ = cancellation_token.cancelled() => {
tracing::debug!("KV Router background task received cancellation signal");
// Clean up the queue and remove the durable consumer
// TODO: durable consumer cannot cleanup if ungraceful shutdown (crash)
if let Err(e) = nats_queue.shutdown(None).await {
tracing::warn!("Failed to shutdown NatsQueue: {e}");
}
......
......@@ -526,25 +526,6 @@ impl NatsQueue {
let client = client_options.connect().await?;
// If reset_stream is true, delete the stream first
if reset_stream {
match client.jetstream().delete_stream(&self.stream_name).await {
Ok(_) => {
log::debug!(
"Successfully deleted NATS stream {} for reset",
self.stream_name
);
}
Err(e) => {
log::debug!(
"Failed to delete NATS stream '{}' (may not exist): {}",
self.stream_name,
e
);
}
}
}
// Always try to create the stream (removes the race condition)
let stream_config = jetstream::stream::Config {
name: self.stream_name.clone(),
......@@ -560,11 +541,35 @@ impl NatsQueue {
}
Err(e) => {
// Log warning but continue - stream likely already exists
log::warn!(
"Failed to create NATS stream '{}': {}. Stream likely already exists, continuing...",
self.stream_name,
e
log::debug!(
"Failed to create NATS stream '{}': {e}. Stream likely already exists, continuing...",
self.stream_name
);
// If reset_stream is true, purge all messages from the newly created stream
if reset_stream {
match client
.jetstream()
.get_stream(&self.stream_name)
.await?
.purge()
.await
{
Ok(purge_info) => {
log::debug!(
"Successfully purged {} messages from NATS stream {}",
purge_info.purged,
self.stream_name
);
}
Err(e) => {
log::warn!(
"Failed to purge NATS stream '{}': {e}",
self.stream_name
);
}
}
}
}
}
......@@ -572,6 +577,7 @@ impl NatsQueue {
if let Some(ref consumer_name) = self.consumer_name {
let consumer_config = jetstream::consumer::pull::Config {
durable_name: Some(consumer_name.clone()),
inactive_threshold: std::time::Duration::from_secs(3600), // 1 hour
..Default::default()
};
......
......@@ -163,7 +163,7 @@ where
}
}
tracing::info!("TypedPrefixWatcher for prefix '{}' stopped", prefix_str);
tracing::debug!("TypedPrefixWatcher for prefix '{}' stopped", prefix_str);
});
Ok(TypedPrefixWatcher { rx: watch_rx })
......
......@@ -856,10 +856,7 @@ def test_indexers_sync(request, runtime_services):
# Create first KV router
from dynamo._core import KvPushRouter, KvRouterConfig
# First router with default router_reset_states=True
kv_router_config = KvRouterConfig(
router_snapshot_threshold=20, router_reset_states=True
)
kv_router_config = KvRouterConfig(router_snapshot_threshold=20)
async def send_requests_to_router(router, num_requests, router_name):
# First, send a test request with retry to ensure router is ready
......@@ -947,11 +944,9 @@ def test_indexers_sync(request, runtime_services):
logger.info("Waiting for 1 second before creating second router")
await asyncio.sleep(1)
# Launch second router with router_reset_states=False
logger.info("Creating second KV router with router_reset_states=False")
kv_router_config2 = KvRouterConfig(
router_snapshot_threshold=20, router_reset_states=False
)
# Launch second router - will automatically sync with the first router's state
logger.info("Creating second KV router")
kv_router_config2 = KvRouterConfig(router_snapshot_threshold=20)
kv_push_router2 = KvPushRouter(
endpoint=endpoint,
block_size=BLOCK_SIZE,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment