kv_router.rs 22.4 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
use std::collections::HashMap;
5
use std::sync::Arc;
6
use std::time::Duration;
7

8
use anyhow::Result;
9
use derive_builder::Builder;
10
use dynamo_runtime::{
11
    component::Component,
12
    discovery::{DiscoveryQuery, watch_and_extract_field},
13
    pipeline::{
14
15
        AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, PushRouter, ResponseStream,
        SingleIn, async_trait,
16
17
    },
    protocols::annotated::Annotated,
18
    traits::DistributedRuntimeProvider,
19
20
};
use futures::stream::{self, StreamExt};
21
use serde::{Deserialize, Serialize};
22

23
pub mod approx;
24
pub mod indexer;
25
pub mod prefill_router;
26
27
pub mod protocols;
pub mod publisher;
28
pub mod recorder;
29
30
pub mod scheduler;
pub mod scoring;
31
pub mod sequence;
32
pub mod subscriber;
33

34
35
pub use prefill_router::PrefillRouter;

36
37
use crate::{
    kv_router::{
38
39
        approx::ApproxKvIndexer,
        indexer::{
40
41
            KvIndexer, KvIndexerInterface, KvRouterError, OverlapScores, RouterEvent,
            compute_block_hash_for_seq, compute_seq_hash_for_block,
42
        },
Yan Ru Pei's avatar
Yan Ru Pei committed
43
44
45
        protocols::{
            LocalBlockHash, RouterRequest, RouterResponse, WorkerSelectionResult, WorkerWithDpRank,
        },
46
        scheduler::{KvScheduler, KvSchedulerError, PotentialLoad, SchedulingRequest},
47
        subscriber::start_kv_router_background,
48
    },
49
    local_model::runtime_config::ModelRuntimeConfig,
50
    model_card::ModelDeploymentCard,
51
    preprocessor::PreprocessedRequest,
52
    protocols::common::llm_backend::LLMEngineOutput,
53
54
};

55
56
// [gluo TODO] shouldn't need to be public
// this should be discovered from the component
57
58
59
60
61

// for metric scraping (pull-based)
pub const KV_METRICS_ENDPOINT: &str = "load_metrics";

// for metric publishing (push-based)
62
pub const KV_EVENT_SUBJECT: &str = "kv_events";
63
pub const KV_HIT_RATE_SUBJECT: &str = "kv-hit-rate";
64
65
66
67
68
pub const KV_METRICS_SUBJECT: &str = "kv_metrics";

// for inter-router comms
pub const PREFILL_SUBJECT: &str = "prefill_events";
pub const ACTIVE_SEQUENCES_SUBJECT: &str = "active_sequences_events";
69

70
71
72
73
// for radix tree snapshot storage
pub const RADIX_STATE_BUCKET: &str = "radix-bucket";
pub const RADIX_STATE_FILE: &str = "radix-state";

74
75
76
77
/// A trait that users can implement to define custom selection logic
pub trait WorkerSelector {
    fn select_worker(
        &self,
Yan Ru Pei's avatar
Yan Ru Pei committed
78
        workers: &HashMap<protocols::WorkerId, Option<ModelRuntimeConfig>>,
79
        request: &SchedulingRequest,
80
        block_size: u32,
81
82
    ) -> Result<WorkerSelectionResult, KvSchedulerError>;
}
83

84
85
86
87
88
89
90
91
92
93
/// Override configuration for router settings that can be specified per-request
#[derive(Debug, Clone, Default, Builder, Serialize, Deserialize)]
pub struct RouterConfigOverride {
    #[builder(default)]
    pub overlap_score_weight: Option<f64>,

    #[builder(default)]
    pub router_temperature: Option<f64>,
}

94
/// KV Router configuration parameters
95
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
96
97
98
pub struct KvRouterConfig {
    pub overlap_score_weight: f64,

99
    pub router_temperature: f64,
100

101
102
    pub use_kv_events: bool,

103
104
    pub router_replica_sync: bool,

105
106
107
    /// Whether to track active blocks in the router (default: true)
    pub router_track_active_blocks: bool,

108
109
110
    /// Threshold for triggering snapshots. If None, no snapshots will be performed.
    pub router_snapshot_threshold: Option<u32>,

111
    /// Whether to reset the router state on startup (default: false)
112
    pub router_reset_states: bool,
113
114
115
116
117
}

impl Default for KvRouterConfig {
    fn default() -> Self {
        Self {
118
            overlap_score_weight: 1.0,
119
            router_temperature: 0.0,
120
            use_kv_events: true,
121
            router_replica_sync: false,
122
            router_track_active_blocks: true,
123
            router_snapshot_threshold: Some(1000000),
124
            router_reset_states: false,
125
126
127
128
129
130
131
        }
    }
}

impl KvRouterConfig {
    /// Create a new KvRouterConfig with optional weight values.
    /// If a weight is None, the default value will be used.
132
    #[allow(clippy::too_many_arguments)]
133
134
    pub fn new(
        overlap_score_weight: Option<f64>,
135
        temperature: Option<f64>,
136
        use_kv_events: Option<bool>,
137
        replica_sync: Option<bool>,
138
        track_active_blocks: Option<bool>,
139
140
        router_snapshot_threshold: Option<Option<u32>>,
        router_reset_states: Option<bool>,
141
142
143
144
    ) -> Self {
        let default = Self::default();
        Self {
            overlap_score_weight: overlap_score_weight.unwrap_or(default.overlap_score_weight),
145
            router_temperature: temperature.unwrap_or(default.router_temperature),
146
            use_kv_events: use_kv_events.unwrap_or(default.use_kv_events),
147
            router_replica_sync: replica_sync.unwrap_or(default.router_replica_sync),
148
149
            router_track_active_blocks: track_active_blocks
                .unwrap_or(default.router_track_active_blocks),
150
151
152
            router_snapshot_threshold: router_snapshot_threshold
                .unwrap_or(default.router_snapshot_threshold),
            router_reset_states: router_reset_states.unwrap_or(default.router_reset_states),
153
154
155
156
        }
    }
}

157
158
159
// TODO: is there a way (macro) to auto-derive the KvIndexerInterface trait for this
// since both variants implement it
pub enum Indexer {
160
161
    /// Updates itself based on KV events emitted by backend workers.
    /// Has the ability to persist and snapshot states.
162
    KvIndexer(KvIndexer),
163
164
165

    /// Predicts the cached blocks based on requests on a TTL basis.
    /// Currently does not persist or snapshot states (WIP to enable that).
166
    ApproxKvIndexer(ApproxKvIndexer),
167
168
169
170

    /// Used when we do not wish to use the indexer at all (e.g., when overlap_score_weight is 0).
    /// Note: This will cause KV events to accumulate in JetStream as we do not regularly purge them.
    None,
171
172
173
174
175
176
177
178
179
180
}

impl Indexer {
    async fn find_matches(
        &self,
        sequence: Vec<LocalBlockHash>,
    ) -> Result<OverlapScores, KvRouterError> {
        match self {
            Indexer::KvIndexer(indexer) => indexer.find_matches(sequence).await,
            Indexer::ApproxKvIndexer(indexer) => indexer.find_matches(sequence).await,
181
182
183
184
            Indexer::None => Ok(OverlapScores {
                scores: HashMap::new(),
                frequencies: Vec::new(),
            }),
185
186
        }
    }
187
188
189
190
191

    async fn dump_events(&self) -> Result<Vec<RouterEvent>, KvRouterError> {
        match self {
            Indexer::KvIndexer(indexer) => indexer.dump_events().await,
            Indexer::ApproxKvIndexer(indexer) => indexer.dump_events().await,
192
193
194
195
196
            Indexer::None => {
                panic!(
                    "Cannot dump events: indexer does not exist (is overlap_score_weight set to 0?)"
                );
            }
197
198
        }
    }
199
200
}

201
202
/// A KvRouter only decides which worker you should use. It doesn't send you there.
/// TODO: Rename this to indicate it only selects a worker, it does not route.
203
pub struct KvRouter {
204
205
206
    indexer: Indexer,

    // How about a Box<dyn KvIndexerInterface>
207
    scheduler: KvScheduler,
208

209
    block_size: u32,
210
211

    kv_router_config: KvRouterConfig,
Yan Ru Pei's avatar
Yan Ru Pei committed
212
213

    cancellation_token: tokio_util::sync::CancellationToken,
214
215
216
217
}

impl KvRouter {
    pub async fn new(
218
        component: Component,
219
        block_size: u32,
220
        selector: Option<Box<dyn WorkerSelector + Send + Sync>>,
221
        kv_router_config: Option<KvRouterConfig>,
222
        consumer_uuid: String,
223
    ) -> Result<Self> {
224
        let kv_router_config = kv_router_config.unwrap_or_default();
225
        let cancellation_token = component.drt().primary_token();
226
227
228
        let generate_endpoint = component.endpoint("generate");
        let client = generate_endpoint.client().await?;

229
        let instances_rx = client.instance_source.as_ref().clone();
230

231
232
233
234
235
236
237
238
239
240
241
242
243
244
        // Watch for runtime config updates via discovery interface
        let discovery = component.drt().discovery();
        let discovery_key = DiscoveryQuery::EndpointModels {
            namespace: component.namespace().name().to_string(),
            component: component.name().to_string(),
            endpoint: "generate".to_string(),
        };
        let discovery_stream = discovery
            .list_and_watch(discovery_key, Some(cancellation_token.clone()))
            .await?;
        let runtime_configs_rx =
            watch_and_extract_field(discovery_stream, |card: ModelDeploymentCard| {
                card.runtime_config
            });
245

246
247
248
249
        let indexer = if kv_router_config.overlap_score_weight == 0.0 {
            // When overlap_score_weight is zero, we don't need to track prefixes
            Indexer::None
        } else if kv_router_config.use_kv_events {
250
251
252
253
254
255
            let kv_indexer_metrics = indexer::KvIndexerMetrics::from_component(&component);
            Indexer::KvIndexer(KvIndexer::new(
                cancellation_token.clone(),
                block_size,
                kv_indexer_metrics,
            ))
256
257
258
259
260
261
262
263
        } else {
            // hard code 120 seconds for now
            Indexer::ApproxKvIndexer(ApproxKvIndexer::new(
                cancellation_token.clone(),
                block_size,
                Duration::from_secs(120),
            ))
        };
264

265
        let scheduler = KvScheduler::start(
266
            component.clone(),
267
            block_size,
268
            instances_rx,
269
            runtime_configs_rx,
270
            selector,
271
            kv_router_config.router_replica_sync,
272
            consumer_uuid.clone(),
273
274
        )
        .await?;
275

276
        // Start unified background process if using KvIndexer
277
        if let Indexer::KvIndexer(ref kv_indexer) = indexer {
278
279
280
281
            start_kv_router_background(
                component.clone(),
                consumer_uuid,
                kv_indexer.event_sender(),
282
                kv_indexer.remove_worker_sender(),
283
284
285
                kv_router_config
                    .router_snapshot_threshold
                    .map(|_| kv_indexer.get_workers_sender()),
286
287
288
289
290
291
292
293
                kv_router_config
                    .router_snapshot_threshold
                    .map(|_| kv_indexer.snapshot_event_sender()),
                cancellation_token.clone(),
                kv_router_config.router_snapshot_threshold,
                kv_router_config.router_reset_states,
            )
            .await?;
294
        }
295

296
        tracing::info!("KV Routing initialized");
297
        Ok(Self {
298
            indexer,
299
            scheduler,
300
            block_size,
301
            kv_router_config,
Yan Ru Pei's avatar
Yan Ru Pei committed
302
            cancellation_token,
303
        })
304
305
    }

306
    /// Give these tokens, find the worker with the best match in it's KV cache.
Yan Ru Pei's avatar
Yan Ru Pei committed
307
    /// Returns the best worker (with dp_rank) and overlap amount in number of blocks.
Yan Ru Pei's avatar
Yan Ru Pei committed
308
309
    /// Now also takes optional context_id for request tracking
    pub async fn find_best_match(
310
        &self,
Yan Ru Pei's avatar
Yan Ru Pei committed
311
        context_id: Option<&str>,
312
        tokens: &[u32],
313
        router_config_override: Option<&RouterConfigOverride>,
314
        update_states: bool,
Yan Ru Pei's avatar
Yan Ru Pei committed
315
    ) -> anyhow::Result<(WorkerWithDpRank, u32)> {
Yan Ru Pei's avatar
Yan Ru Pei committed
316
317
318
319
320
        // Validate that context_id is provided when update_states is true
        if update_states && context_id.is_none() {
            panic!("context_id must be provided if update_states is true");
        }

321
        let isl_tokens = tokens.len();
322

323
324
325
326
        let block_hashes = compute_block_hash_for_seq(tokens, self.block_size);
        let seq_hashes = compute_seq_hash_for_block(&block_hashes);

        let overlap_scores = self.indexer.find_matches(block_hashes.clone()).await?;
327

328
329
330
331
332
333
334
335
336
337
338
339
340
        // Determine who needs seq_hashes
        let approx_indexer_needs_it = matches!(self.indexer, Indexer::ApproxKvIndexer(_));
        let scheduler_needs_it = self.kv_router_config.router_track_active_blocks;

        // Optimize cloning: only clone if both need it, otherwise move
        let (maybe_seq_hashes_1, maybe_seq_hashes_2) =
            match (approx_indexer_needs_it, scheduler_needs_it) {
                (true, true) => (Some(seq_hashes.clone()), Some(seq_hashes)),
                (true, false) => (Some(seq_hashes), None),
                (false, true) => (None, Some(seq_hashes)),
                (false, false) => (None, None),
            };

Yan Ru Pei's avatar
Yan Ru Pei committed
341
        let best_worker = self
342
            .scheduler
343
            .schedule(
Yan Ru Pei's avatar
Yan Ru Pei committed
344
                context_id.map(|s| s.to_string()),
345
                isl_tokens,
346
                maybe_seq_hashes_2,
347
                overlap_scores.clone(),
348
                router_config_override,
349
                update_states,
350
            )
351
            .await?;
352

353
354
        if let Indexer::ApproxKvIndexer(ref indexer) = self.indexer {
            indexer
Yan Ru Pei's avatar
Yan Ru Pei committed
355
                .process_routing_decision(best_worker, block_hashes, maybe_seq_hashes_1.unwrap())
356
357
358
359
                .await
                .unwrap();
        };

360
361
        let overlap_amount = overlap_scores
            .scores
Yan Ru Pei's avatar
Yan Ru Pei committed
362
            .get(&best_worker)
363
364
            .copied()
            .unwrap_or(0);
Yan Ru Pei's avatar
Yan Ru Pei committed
365
        Ok((best_worker, overlap_amount))
366
367
    }

368
369
370
371
372
    pub async fn add_request(
        &self,
        request_id: String,
        tokens: &[u32],
        overlap_blocks: u32,
Yan Ru Pei's avatar
Yan Ru Pei committed
373
        worker: WorkerWithDpRank,
374
375
    ) {
        let isl_tokens = tokens.len();
376
377
378
379
380

        let maybe_seq_hashes = self.kv_router_config.router_track_active_blocks.then(|| {
            let block_hashes = compute_block_hash_for_seq(tokens, self.block_size);
            compute_seq_hash_for_block(&block_hashes)
        });
381
382
383
384

        self.scheduler
            .add_request(
                request_id,
385
                maybe_seq_hashes,
386
387
                isl_tokens,
                overlap_blocks,
Yan Ru Pei's avatar
Yan Ru Pei committed
388
                worker,
389
390
391
392
            )
            .await;
    }

393
    pub async fn mark_prefill_completed(&self, request_id: &str) -> Result<()> {
394
        self.scheduler.mark_prefill_completed(request_id).await
395
396
    }

397
    pub async fn free(&self, request_id: &str) -> Result<()> {
398
        self.scheduler.free(request_id).await
399
    }
400

401
    pub fn block_size(&self) -> u32 {
402
403
        self.block_size
    }
404

405
406
407
408
409
410
    /// Get potential prefill and decode loads for all workers
    pub async fn get_potential_loads(&self, tokens: &[u32]) -> Result<Vec<PotentialLoad>> {
        let isl_tokens = tokens.len();
        let block_hashes = compute_block_hash_for_seq(tokens, self.block_size);
        let overlap_scores = self.indexer.find_matches(block_hashes).await?;

411
412
413
414
415
        let maybe_seq_hashes = self.kv_router_config.router_track_active_blocks.then(|| {
            let block_hashes = compute_block_hash_for_seq(tokens, self.block_size);
            compute_seq_hash_for_block(&block_hashes)
        });

416
417
        Ok(self
            .scheduler
418
            .get_potential_loads(maybe_seq_hashes, isl_tokens, overlap_scores)
419
420
421
            .await)
    }

422
423
424
425
    /// Dump all events from the indexer
    pub async fn dump_events(&self) -> Result<Vec<RouterEvent>, KvRouterError> {
        self.indexer.dump_events().await
    }
426
427
}

Michael Feil's avatar
Michael Feil committed
428
429
// NOTE: KVRouter works like a PushRouter,
// but without the reverse proxy functionality, but based on contract of 3 request types
430
431
432
433
434
435
436
#[async_trait]
impl AsyncEngine<SingleIn<RouterRequest>, ManyOut<Annotated<RouterResponse>>, Error> for KvRouter {
    async fn generate(
        &self,
        request: SingleIn<RouterRequest>,
    ) -> Result<ManyOut<Annotated<RouterResponse>>> {
        let (request, ctx) = request.into_parts();
Michael Feil's avatar
Michael Feil committed
437
438
439
440
        let context_id = ctx.context().id().to_string();
        // Handle different request types
        let response = match request {
            RouterRequest::New { tokens } => {
Yan Ru Pei's avatar
Yan Ru Pei committed
441
                let (best_worker, overlap_blocks) = self
Yan Ru Pei's avatar
Yan Ru Pei committed
442
                    .find_best_match(Some(&context_id), &tokens, None, true)
Michael Feil's avatar
Michael Feil committed
443
444
445
                    .await?;

                RouterResponse::New {
Yan Ru Pei's avatar
Yan Ru Pei committed
446
447
                    worker_id: best_worker.worker_id,
                    dp_rank: best_worker.dp_rank,
Michael Feil's avatar
Michael Feil committed
448
449
450
                    overlap_blocks,
                }
            }
451
452
453
454
455
456
            RouterRequest::MarkPrefill => RouterResponse::PrefillMarked {
                success: self.mark_prefill_completed(&context_id).await.is_ok(),
            },
            RouterRequest::MarkFree => RouterResponse::FreeMarked {
                success: self.free(&context_id).await.is_ok(),
            },
Michael Feil's avatar
Michael Feil committed
457
        };
458
459
460
461
462
463

        let response = Annotated::from_data(response);
        let stream = stream::iter(vec![response]);
        Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
    }
}
464
465

pub struct KvPushRouter {
466
    inner: PushRouter<PreprocessedRequest, Annotated<LLMEngineOutput>>,
467
    pub chooser: Arc<KvRouter>,
468
469
470
471
}

impl KvPushRouter {
    pub fn new(
472
        inner: PushRouter<PreprocessedRequest, Annotated<LLMEngineOutput>>,
473
474
475
476
477
478
479
        chooser: Arc<KvRouter>,
    ) -> Self {
        KvPushRouter { inner, chooser }
    }
}

#[async_trait]
480
impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutput>>, Error>
481
482
    for KvPushRouter
{
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
    /// Generate method that handles KV-aware routing with three distinct behaviors:
    ///
    /// 1. **If `query_instance_id` annotation is set**:
    ///    - Returns the best matching worker ID without routing the request
    ///    - Does NOT update any router local states
    ///    - Response includes worker_instance_id and token_data annotations
    ///
    /// 2. **If `backend_instance_id` is set in the request**:
    ///    - Routes directly to the specified backend instance
    ///    - DOES update router states to track this request (unless query_instance_id is also set)
    ///    - Bypasses the normal KV matching logic
    ///
    /// 3. **If neither are set (default behavior)**:
    ///    - Finds the best worker based on KV cache overlap
    ///    - Updates router states to track the request
    ///    - Routes to the selected worker
    ///
    /// The router state updates include tracking active sequences and managing
    /// prefill/completion lifecycle for proper KV cache management.
502
503
    async fn generate(
        &self,
504
        request: SingleIn<PreprocessedRequest>,
505
    ) -> Result<ManyOut<Annotated<LLMEngineOutput>>, Error> {
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
        // Extract context ID for request tracking
        let context_id = request.context().id().to_string();

        // Check if this is a query_instance_id request first
        let query_instance_id = request.has_annotation("query_instance_id");

        let (instance_id, dp_rank, overlap_amount) = if let Some(id) = request.backend_instance_id {
            // If instance_id is set, use it and compute actual overlap
            let dp_rank = request.dp_rank.unwrap_or(0);
            if query_instance_id {
                tracing::debug!(
                    "backend_instance_id is set, routing to instance {id} with dp_rank {dp_rank} and ignoring query_instance_id annotation"
                );
            }

            // Compute actual overlap blocks by querying the indexer
            let block_hashes =
                compute_block_hash_for_seq(&request.token_ids, self.chooser.block_size());
            let overlap_scores = self.chooser.indexer.find_matches(block_hashes).await?;
            let worker = WorkerWithDpRank::new(id, dp_rank);
            let overlap_blocks = overlap_scores.scores.get(&worker).copied().unwrap_or(0);

            self.chooser
                .add_request(
                    context_id.clone(),
                    &request.token_ids,
                    overlap_blocks,
                    worker,
                )
                .await;
            (id, dp_rank, overlap_blocks)
        } else {
            // Otherwise, find the best match
            let (best_worker, overlap_amount) = self
                .chooser
                .find_best_match(
                    Some(&context_id),
                    &request.token_ids,
                    request.router_config_override.as_ref(),
                    !query_instance_id, // Don't update states if query_instance_id
                )
                .await?;
            (best_worker.worker_id, best_worker.dp_rank, overlap_amount)
        };

        // if request has the annotation "query_instance_id",
        // then the request will not be routed to the worker,
        // and instead the worker_instance_id will be returned.
        let stream_context = request.context().clone();
        if query_instance_id {
            let instance_id_str = instance_id.to_string();
            let response = Annotated::from_annotation("worker_instance_id", &instance_id_str)?;

            // Return the tokens in nvext.token_data format
            let response_tokens = Annotated::from_annotation("token_data", &request.token_ids)?;
            tracing::trace!(
                "Tokens requested in the response through the query_instance_id annotation: {:?}",
                response_tokens
            );
            let stream = stream::iter(vec![response, response_tokens]);
            return Ok(ResponseStream::new(Box::pin(stream), stream_context));
        }
        let (mut backend_input, context) = request.into_parts();
        backend_input.estimated_prefix_hit_num_blocks = Some(overlap_amount);
        backend_input.dp_rank = Some(dp_rank);
        let updated_request = context.map(|_| backend_input);

        let mut response_stream = self.inner.direct(updated_request, instance_id).await?;
        let stream_context = response_stream.context();
        let chooser = self.chooser.clone();
        let context_for_monitoring = stream_context.clone();

        let wrapped_stream = Box::pin(async_stream::stream! {
            let mut prefill_marked = false;

            loop {
                tokio::select! {
                    biased;

                    _ = context_for_monitoring.stopped() => {
                        tracing::debug!("Request {context_id} cancelled, ending stream");
                        break;
588
                    }
Yan Ru Pei's avatar
Yan Ru Pei committed
589

590
591
592
593
                    item = response_stream.next() => {
                        let Some(item) = item else {
                            break;
                        };
594

595
596
597
                        if !prefill_marked {
                            if let Err(e) = chooser.mark_prefill_completed(&context_id).await {
                                tracing::warn!("Failed to mark prefill completed for request {context_id}: {e:?}");
598
                            }
599
                            prefill_marked = true;
600
                        }
601
                        yield item;
602
                    }
603
604
                }
            }
605

606
607
            if let Err(e) = chooser.free(&context_id).await {
                tracing::warn!("Failed to free request {context_id}: {e:?}");
608
            }
609
610
        });
        Ok(ResponseStream::new(wrapped_stream, stream_context))
611
612
    }
}
Yan Ru Pei's avatar
Yan Ru Pei committed
613
614
615
616
617
618
619

impl Drop for KvRouter {
    fn drop(&mut self) {
        tracing::info!("Dropping KvRouter - cancelling background tasks");
        self.cancellation_token.cancel();
    }
}