kv_router.rs 21.6 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
// SPDX-License-Identifier: Apache-2.0

4
use std::sync::Arc;
5
use std::time::Instant;
6

7
use anyhow::Result;
8
use dynamo_kv_router::{
9
    PrefillLoadEstimator,
10
    config::{KvRouterConfig, RouterConfigOverride, min_initial_workers_from_env},
11
    indexer::KvRouterError,
12
13
    protocols::KV_EVENT_SUBJECT,
    protocols::{
14
15
16
        BlockExtraInfo, BlockHashOptions, DpRank, LocalBlockHash, PrefillLoadHint, RouterEvent,
        RouterRequest, RouterResponse, TokensWithHashes, WorkerId, WorkerWithDpRank,
        compute_block_hash_for_seq,
17
18
    },
};
19
use dynamo_runtime::{
20
    component::{Client, Endpoint},
21
    discovery::DiscoveryQuery,
22
    pipeline::{
23
24
        AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn,
        async_trait,
25
    },
26
    protocols::EndpointId,
27
    protocols::annotated::Annotated,
28
    traits::DistributedRuntimeProvider,
29
};
30
use futures::stream;
31
use tracing::Instrument;
32
use validator::Validate;
33

34
35
36
37
38
39
40
// Re-export from dynamo-kv-router crate
pub use dynamo_kv_router::approx;
pub use dynamo_kv_router::protocols;
pub use dynamo_kv_router::scheduling;
pub use dynamo_kv_router::selector;

pub mod agent_controller;
41
pub mod indexer;
42
pub mod metrics;
43
pub mod prefill_router;
44
pub mod publisher;
45
pub mod push_router;
46
pub mod scheduler;
47
pub mod sequence;
48
pub mod sticky_sessions;
49

50
pub use agent_controller::AgentController;
51
pub use indexer::{Indexer, ServedIndexerHandle, ServedIndexerMode, ensure_served_indexer_service};
52
pub use prefill_router::PrefillRouter;
53
pub use push_router::{DirectRoutingRouter, KvPushRouter};
54
pub use sticky_sessions::StickySessionRouter;
55

56
use crate::{
57
    discovery::RuntimeConfigWatch,
58
    kv_router::{
59
        scheduler::{DefaultWorkerSelector, KvScheduler, PotentialLoad},
60
        sequence::{SequenceError, SequenceRequest},
61
    },
62
    local_model::runtime_config::ModelRuntimeConfig,
63
64
};

65
66
use std::collections::HashSet;

67
68
// [gluo TODO] shouldn't need to be public
// this should be discovered from the component
69
70
71
72
73
74
75
76
77
78

// for metric scraping (pull-based)
pub const KV_METRICS_ENDPOINT: &str = "load_metrics";

// for metric publishing (push-based)
pub const KV_METRICS_SUBJECT: &str = "kv_metrics";

// for inter-router comms
pub const PREFILL_SUBJECT: &str = "prefill_events";
pub const ACTIVE_SEQUENCES_SUBJECT: &str = "active_sequences_events";
79

80
81
82
83
// for radix tree snapshot storage
pub const RADIX_STATE_BUCKET: &str = "radix-bucket";
pub const RADIX_STATE_FILE: &str = "radix-state";

84
85
86
// for worker-local kvindexer query
pub const WORKER_KV_INDEXER_BUFFER_SIZE: usize = 1024; // store 1024 most recent events in worker buffer

87
88
89
90
91
92
/// Generates a dp_rank-specific endpoint name for the worker KV indexer query service.
/// Each dp_rank has its own LocalKvIndexer and query endpoint to ensure per-dp_rank monotonicity.
pub fn worker_kv_indexer_query_endpoint(dp_rank: DpRank) -> String {
    format!("worker_kv_indexer_query_dp{dp_rank}")
}

93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
fn log_routing_input_hashes(
    request_id: Option<&str>,
    block_size: u32,
    tokens: &[u32],
    local_hashes: &[LocalBlockHash],
) {
    if !tracing::enabled!(tracing::Level::DEBUG) {
        return;
    }

    let local_hash_ids: Vec<u64> = local_hashes.iter().map(|hash| hash.0).collect();

    tracing::debug!(
        request_id = request_id.unwrap_or(""),
        isl_tokens = tokens.len(),
        block_size,
        num_blocks = local_hashes.len(),
        local_hashes = ?local_hash_ids,
        "[ROUTING_INPUT] request local hashes"
    );
}

115
// for router discovery registration
116
pub const KV_ROUTER_ENDPOINT: &str = "router-discovery";
117
118

/// Creates an EndpointId for the KV router in the given namespace.
119
pub fn router_endpoint_id(namespace: String, component: String) -> EndpointId {
120
121
    EndpointId {
        namespace,
122
        component,
123
124
125
126
127
        name: KV_ROUTER_ENDPOINT.to_string(),
    }
}

/// Creates a DiscoveryQuery for the KV router in the given namespace.
128
pub fn router_discovery_query(namespace: String, component: String) -> DiscoveryQuery {
129
130
    DiscoveryQuery::Endpoint {
        namespace,
131
        component,
132
133
134
135
        endpoint: KV_ROUTER_ENDPOINT.to_string(),
    }
}

136
137
/// A KvRouter only decides which worker you should use. It doesn't send you there.
/// TODO: Rename this to indicate it only selects a worker, it does not route.
138
139
140
141
pub struct KvRouter<Sel = DefaultWorkerSelector>
where
    Sel: dynamo_kv_router::selector::WorkerSelector<ModelRuntimeConfig>,
{
142
    indexer: Indexer,
143
    scheduler: KvScheduler<Sel>,
144
    workers_with_configs: RuntimeConfigWatch,
145
    block_size: u32,
146
    kv_router_config: KvRouterConfig,
147
    prefill_load_estimator: Option<Arc<dyn PrefillLoadEstimator>>,
Yan Ru Pei's avatar
Yan Ru Pei committed
148
    cancellation_token: tokio_util::sync::CancellationToken,
149
    client: Client,
150
    is_eagle: bool,
151
    _served_indexer_handle: Option<ServedIndexerHandle>,
152
153
}

154
155
156
157
impl<Sel> KvRouter<Sel>
where
    Sel: dynamo_kv_router::selector::WorkerSelector<ModelRuntimeConfig> + Send + Sync + 'static,
{
158
    #[allow(clippy::too_many_arguments)]
159
    pub async fn new(
160
161
        endpoint: Endpoint,
        client: Client,
162
        workers_with_configs: RuntimeConfigWatch,
163
        block_size: u32,
164
        selector: Sel,
165
        kv_router_config: Option<KvRouterConfig>,
166
        prefill_load_estimator: Option<Arc<dyn PrefillLoadEstimator>>,
167
        worker_type: &'static str,
168
        model_name: Option<String>,
169
        is_eagle: bool,
170
    ) -> Result<Self> {
171
        let kv_router_config = kv_router_config.unwrap_or_default();
172
        kv_router_config.validate()?;
173
        let component = endpoint.component();
174
        let cancellation_token = component.drt().primary_token();
175
        let min_initial_workers = min_initial_workers_from_env()?;
176

177
178
179
180
181
182
183
        let indexer = Indexer::new(
            component,
            &kv_router_config,
            block_size,
            model_name.as_deref(),
        )
        .await?;
184

185
186
187
188
        if min_initial_workers > 0 && !kv_router_config.skip_initial_worker_wait {
            let mut startup_watch = workers_with_configs.clone();
            let _ = startup_watch
                .wait_for(|m| m.len() >= min_initial_workers)
189
190
                .await
                .map_err(|_| {
191
192
                    anyhow::anyhow!(
                        "runtime config watch closed before {} workers appeared",
193
                        min_initial_workers
194
                    )
195
196
                })?;
        }
197

198
        let scheduler = KvScheduler::start(
199
            component.clone(),
200
            block_size,
201
            workers_with_configs.clone(),
202
            selector,
203
            &kv_router_config,
204
            prefill_load_estimator.clone(),
205
            worker_type,
206
207
        )
        .await?;
208

209
210
        // Start KV event subscription if needed — skip when using a remote indexer.
        if kv_router_config.use_remote_indexer {
211
212
            tracing::info!("Skipping KV event subscription (using remote indexer)");
        } else if kv_router_config.should_subscribe_to_kv_events() {
213
            indexer::start_subscriber(component.clone(), &kv_router_config, indexer.clone())
214
                .await?;
215
        } else {
216
            tracing::info!(
217
218
219
                "Skipping KV event subscription (use_kv_events={}, overlap_score_weight={})",
                kv_router_config.use_kv_events,
                kv_router_config.overlap_score_weight,
220
            );
221
        }
222

223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
        let served_indexer_handle = if kv_router_config.serve_indexer {
            let model_name = model_name.clone().ok_or_else(|| {
                anyhow::anyhow!("model_name is required when serve_indexer is configured")
            })?;
            Some(
                ensure_served_indexer_service(
                    component.clone(),
                    ServedIndexerMode::from_use_kv_events(kv_router_config.use_kv_events),
                    model_name,
                    indexer.clone(),
                )
                .await?,
            )
        } else {
            None
        };

240
        tracing::info!("KV Routing initialized");
241
        Ok(Self {
242
            indexer,
243
            scheduler,
244
            workers_with_configs,
245
            block_size,
246
            kv_router_config,
247
            prefill_load_estimator,
Yan Ru Pei's avatar
Yan Ru Pei committed
248
            cancellation_token,
249
            client,
250
            is_eagle,
251
            _served_indexer_handle: served_indexer_handle,
252
        })
253
254
    }

255
256
257
258
259
    /// Get a reference to the client used by this KvRouter
    pub fn client(&self) -> &Client {
        &self.client
    }

260
261
262
263
264
265
266
267
    pub fn indexer(&self) -> &Indexer {
        &self.indexer
    }

    pub fn kv_router_config(&self) -> &KvRouterConfig {
        &self.kv_router_config
    }

268
269
270
271
    pub fn is_eagle(&self) -> bool {
        self.is_eagle
    }

272
273
    pub async fn record_routing_decision(
        &self,
274
        mut tokens_with_hashes: TokensWithHashes,
275
276
277
278
279
280
281
        worker: WorkerWithDpRank,
    ) -> Result<(), KvRouterError> {
        self.indexer
            .process_routing_decision_for_request(&mut tokens_with_hashes, worker)
            .await
    }

282
    /// Give these tokens, find the worker with the best match in it's KV cache.
Yan Ru Pei's avatar
Yan Ru Pei committed
283
    /// Returns the best worker (with dp_rank) and overlap amount in number of blocks.
284
285
    /// Now also takes optional context_id for request tracking.
    ///
286
287
288
    /// When `pinned_worker` is Some, scheduling and queueing are constrained to
    /// that exact worker/rank.
    ///
289
    /// When `allowed_worker_ids` is Some, only workers in that set are considered for selection.
290
    #[allow(clippy::too_many_arguments)]
Yan Ru Pei's avatar
Yan Ru Pei committed
291
    pub async fn find_best_match(
292
        &self,
Yan Ru Pei's avatar
Yan Ru Pei committed
293
        context_id: Option<&str>,
294
        tokens: &[u32],
295
        block_mm_infos: Option<&[Option<BlockExtraInfo>]>,
296
        router_config_override: Option<&RouterConfigOverride>,
297
        update_states: bool,
298
        lora_name: Option<String>,
299
        priority_jump: f64,
300
        expected_output_tokens: Option<u32>,
301
        pinned_worker: Option<WorkerWithDpRank>,
302
        allowed_worker_ids: Option<HashSet<WorkerId>>,
Yan Ru Pei's avatar
Yan Ru Pei committed
303
    ) -> anyhow::Result<(WorkerWithDpRank, u32)> {
304
305
        let start = Instant::now();

Yan Ru Pei's avatar
Yan Ru Pei committed
306
        if update_states && context_id.is_none() {
307
            anyhow::bail!("context_id must be provided when update_states is true");
Yan Ru Pei's avatar
Yan Ru Pei committed
308
309
        }

310
        let isl_tokens = tokens.len();
311
312
313
314
315
316
317
318
        let hash_options = BlockHashOptions {
            block_mm_infos,
            lora_name: lora_name.as_deref(),
            is_eagle: Some(self.is_eagle),
        };

        let block_hashes = tracing::info_span!("kv_router.compute_block_hashes")
            .in_scope(|| compute_block_hash_for_seq(tokens, self.block_size, hash_options));
319
        log_routing_input_hashes(context_id, self.block_size, tokens, &block_hashes);
320
321
322
323
        let hash_elapsed = start.elapsed();
        // Compute seq_hashes only if scheduler needs it for active blocks tracking
        let maybe_seq_hashes = tracing::info_span!("kv_router.compute_seq_hashes").in_scope(|| {
            self.kv_router_config.compute_seq_hashes_for_tracking(
324
325
                tokens,
                self.block_size,
326
327
328
                router_config_override,
                hash_options,
                Some(&block_hashes),
329
330
            )
        });
331
        let seq_hash_elapsed = start.elapsed();
332

333
        let overlap_scores = self
334
335
336
337
            .indexer
            .find_matches(block_hashes)
            .instrument(tracing::info_span!("kv_router.find_matches"))
            .await?;
338
        let find_matches_elapsed = start.elapsed();
339

340
        let response = self
341
            .scheduler
342
            .schedule(
Yan Ru Pei's avatar
Yan Ru Pei committed
343
                context_id.map(|s| s.to_string()),
344
                isl_tokens,
345
                maybe_seq_hashes,
346
                overlap_scores,
347
                router_config_override,
348
                update_states,
349
                lora_name,
350
                priority_jump,
351
                expected_output_tokens,
352
                pinned_worker,
353
                allowed_worker_ids,
354
            )
355
            .instrument(tracing::info_span!("kv_router.schedule"))
356
            .await?;
357
358
        let total_elapsed = start.elapsed();

359
360
361
362
        if let Some(m) = metrics::RoutingOverheadMetrics::get() {
            m.observe(
                hash_elapsed,
                seq_hash_elapsed,
363
                find_matches_elapsed,
364
365
366
                total_elapsed,
            );
        }
367

368
        #[cfg(feature = "bench")]
369
370
371
        tracing::info!(
            isl_tokens,
            hash_us = hash_elapsed.as_micros() as u64,
372
373
374
            seq_hash_us = (seq_hash_elapsed - hash_elapsed).as_micros() as u64,
            find_matches_us = (find_matches_elapsed - seq_hash_elapsed).as_micros() as u64,
            schedule_us = (total_elapsed - find_matches_elapsed).as_micros() as u64,
375
376
377
            total_us = total_elapsed.as_micros() as u64,
            "find_best_match completed"
        );
378

379
        Ok((response.best_worker, response.overlap_blocks))
380
381
    }

382
383
384
385
386
    /// Register externally-provided workers in the slot tracker.
    pub fn register_workers(&self, worker_ids: &HashSet<WorkerId>) {
        self.scheduler.register_workers(worker_ids);
    }

387
    #[allow(clippy::too_many_arguments)]
388
389
390
391
    pub async fn add_request(
        &self,
        request_id: String,
        tokens: &[u32],
392
        block_mm_infos: Option<&[Option<BlockExtraInfo>]>,
393
        overlap_blocks: u32,
394
        expected_output_tokens: Option<u32>,
Yan Ru Pei's avatar
Yan Ru Pei committed
395
        worker: WorkerWithDpRank,
396
        lora_name: Option<String>,
397
        router_config_override: Option<&RouterConfigOverride>,
398
399
    ) {
        let isl_tokens = tokens.len();
400
401
402
403
404
        let hash_options = BlockHashOptions {
            block_mm_infos,
            lora_name: lora_name.as_deref(),
            is_eagle: Some(self.is_eagle),
        };
405

406
407
408
409
        let maybe_seq_hashes = self.kv_router_config.compute_seq_hashes_for_tracking(
            tokens,
            self.block_size,
            router_config_override,
410
411
            hash_options,
            None,
412
        );
413
414
415
        let track_prefill_tokens = self
            .kv_router_config
            .track_prefill_tokens(router_config_override);
416
417
        let prefill_load_hint =
            self.prefill_load_hint_for(isl_tokens, overlap_blocks, track_prefill_tokens);
418

419
420
        if let Err(e) = self
            .scheduler
421
422
423
            .add_request(SequenceRequest {
                request_id: request_id.clone(),
                token_sequence: maybe_seq_hashes,
424
                track_prefill_tokens,
425
                expected_output_tokens,
426
                prefill_load_hint,
Yan Ru Pei's avatar
Yan Ru Pei committed
427
                worker,
428
                lora_name,
429
            })
430
431
432
433
            .await
        {
            tracing::warn!("Failed to add request {request_id}: {e}");
        }
434
435
    }

436
    pub async fn mark_prefill_completed(&self, request_id: &str) -> Result<(), SequenceError> {
437
        self.scheduler.mark_prefill_completed(request_id).await
438
439
    }

440
    pub async fn free(&self, request_id: &str) -> Result<(), SequenceError> {
441
        self.scheduler.free(request_id).await
442
    }
443

444
445
446
447
448
    /// Number of requests currently parked in the scheduler queue.
    pub fn pending_count(&self) -> usize {
        self.scheduler.pending_count()
    }

449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
    fn prefill_load_hint_for(
        &self,
        isl_tokens: usize,
        overlap_blocks: u32,
        track_prefill_tokens: bool,
    ) -> Option<PrefillLoadHint> {
        if !track_prefill_tokens {
            return None;
        }

        let prefix = (overlap_blocks as usize) * (self.block_size as usize);
        let effective_isl = isl_tokens.saturating_sub(prefix);
        if effective_isl == 0 {
            return None;
        }

465
466
467
468
469
470
471
472
473
474
475
476
477
        let expected_prefill_duration = match &self.prefill_load_estimator {
            Some(estimator) => match estimator.predict_prefill_duration(1, effective_isl, prefix) {
                Ok(expected_prefill_duration) => Some(expected_prefill_duration),
                Err(error) => {
                    tracing::warn!(
                        effective_isl,
                        prefix,
                        "failed to predict prefill duration for direct add_request path: {error}"
                    );
                    None
                }
            },
            None => None,
478
479
        };

480
481
482
483
        Some(PrefillLoadHint {
            initial_effective_prefill_tokens: effective_isl,
            expected_prefill_duration,
        })
484
485
    }

486
487
488
489
490
491
    /// Get the worker type for this router ("prefill" or "decode").
    /// Used for Prometheus metric labeling.
    pub fn worker_type(&self) -> &'static str {
        self.scheduler.worker_type()
    }

492
493
494
495
496
497
498
    /// Return the worker's unique global DP rank when it owns exactly one rank.
    pub fn unique_dp_rank_for_worker(&self, worker_id: WorkerId) -> Option<u32> {
        let configs = self.workers_with_configs.borrow();
        let config = configs.get(&worker_id)?;
        (config.data_parallel_size == 1).then_some(config.data_parallel_start_rank)
    }

499
    pub fn add_output_block(
500
501
502
503
        &self,
        request_id: &str,
        decay_fraction: Option<f64>,
    ) -> Result<(), SequenceError> {
504
        self.scheduler.add_output_block(request_id, decay_fraction)
505
506
    }

507
    pub fn block_size(&self) -> u32 {
508
509
        self.block_size
    }
510

511
512
513
514
515
    /// Compute the overlap blocks for a given token sequence and worker.
    /// This queries the indexer to find how many blocks are already cached.
    pub async fn get_overlap_blocks(
        &self,
        tokens: &[u32],
516
        block_mm_infos: Option<&[Option<BlockExtraInfo>]>,
517
        worker: WorkerWithDpRank,
518
        lora_name: Option<&str>,
519
    ) -> Result<u32, KvRouterError> {
520
521
522
523
524
525
526
527
528
        let block_hashes = compute_block_hash_for_seq(
            tokens,
            self.block_size,
            BlockHashOptions {
                block_mm_infos,
                lora_name,
                is_eagle: Some(self.is_eagle),
            },
        );
529
        log_routing_input_hashes(None, self.block_size, tokens, &block_hashes);
530
531
532
533
        let overlap_scores = self.indexer.find_matches(block_hashes).await?;
        Ok(overlap_scores.scores.get(&worker).copied().unwrap_or(0))
    }

534
    /// Get potential prefill and decode loads for all workers
535
536
537
538
    pub async fn get_potential_loads(
        &self,
        tokens: &[u32],
        router_config_override: Option<&RouterConfigOverride>,
539
        block_mm_infos: Option<&[Option<BlockExtraInfo>]>,
540
        lora_name: Option<&str>,
541
    ) -> Result<Vec<PotentialLoad>> {
542
        let isl_tokens = tokens.len();
543
544
545
546
547
548
        let hash_options = BlockHashOptions {
            block_mm_infos,
            lora_name,
            is_eagle: Some(self.is_eagle),
        };
        let block_hashes = compute_block_hash_for_seq(tokens, self.block_size, hash_options);
549

550
551
552
553
        let maybe_seq_hashes = self.kv_router_config.compute_seq_hashes_for_tracking(
            tokens,
            self.block_size,
            router_config_override,
554
555
            hash_options,
            Some(&block_hashes),
556
        );
557
558
559
        let track_prefill_tokens = self
            .kv_router_config
            .track_prefill_tokens(router_config_override);
560
561
        let overlap_scores = self.indexer.find_matches(block_hashes).await?;

562
563
564
565
566
567
        Ok(self.scheduler.get_potential_loads(
            maybe_seq_hashes,
            isl_tokens,
            overlap_scores,
            track_prefill_tokens,
        ))
568
569
    }

570
571
572
573
    /// Dump all events from the indexer
    pub async fn dump_events(&self) -> Result<Vec<RouterEvent>, KvRouterError> {
        self.indexer.dump_events().await
    }
574
575
}

Michael Feil's avatar
Michael Feil committed
576
577
// NOTE: KVRouter works like a PushRouter,
// but without the reverse proxy functionality, but based on contract of 3 request types
578
#[async_trait]
579
580
581
582
583
impl<Sel> AsyncEngine<SingleIn<RouterRequest>, ManyOut<Annotated<RouterResponse>>, Error>
    for KvRouter<Sel>
where
    Sel: dynamo_kv_router::selector::WorkerSelector<ModelRuntimeConfig> + Send + Sync + 'static,
{
584
585
586
587
588
    async fn generate(
        &self,
        request: SingleIn<RouterRequest>,
    ) -> Result<ManyOut<Annotated<RouterResponse>>> {
        let (request, ctx) = request.into_parts();
Michael Feil's avatar
Michael Feil committed
589
590
591
        let context_id = ctx.context().id().to_string();
        // Handle different request types
        let response = match request {
592
593
594
595
            RouterRequest::New {
                tokens,
                block_mm_infos,
            } => {
Yan Ru Pei's avatar
Yan Ru Pei committed
596
                let (best_worker, overlap_blocks) = self
597
598
599
600
601
602
603
604
                    .find_best_match(
                        Some(&context_id),
                        &tokens,
                        block_mm_infos.as_deref(),
                        None,
                        true,
                        None,
                        0.0,
605
                        None,
606
                        None,
607
                        None,
608
                    )
Michael Feil's avatar
Michael Feil committed
609
610
611
                    .await?;

                RouterResponse::New {
Yan Ru Pei's avatar
Yan Ru Pei committed
612
613
                    worker_id: best_worker.worker_id,
                    dp_rank: best_worker.dp_rank,
Michael Feil's avatar
Michael Feil committed
614
615
616
                    overlap_blocks,
                }
            }
617
618
619
            RouterRequest::MarkPrefill => RouterResponse::PrefillMarked {
                success: self.mark_prefill_completed(&context_id).await.is_ok(),
            },
620
621
622
623
624
625
626
627
628
            RouterRequest::MarkFree { request_id } => {
                let request_id = match request_id.as_deref() {
                    Some(request_id) if !request_id.trim().is_empty() => request_id,
                    _ => &context_id,
                };
                RouterResponse::FreeMarked {
                    success: self.free(request_id).await.is_ok(),
                }
            }
Michael Feil's avatar
Michael Feil committed
629
        };
630
631
632
633
634
635

        let response = Annotated::from_data(response);
        let stream = stream::iter(vec![response]);
        Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
    }
}
636

637
638
639
640
impl<Sel> Drop for KvRouter<Sel>
where
    Sel: dynamo_kv_router::selector::WorkerSelector<ModelRuntimeConfig>,
{
Yan Ru Pei's avatar
Yan Ru Pei committed
641
642
643
644
645
    fn drop(&mut self) {
        tracing::info!("Dropping KvRouter - cancelling background tasks");
        self.cancellation_token.cancel();
    }
}