kv_router.rs 21.6 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
// SPDX-License-Identifier: Apache-2.0

4
use std::sync::Arc;
5
use std::time::Instant;
6

7
use anyhow::Result;
8
use dynamo_kv_router::{
9
    PrefillLoadEstimator,
10
    config::{KvRouterConfig, RouterConfigOverride, min_initial_workers_from_env},
11
    indexer::KvRouterError,
12
13
    protocols::KV_EVENT_SUBJECT,
    protocols::{
14
15
16
        BlockExtraInfo, BlockHashOptions, DpRank, LocalBlockHash, PrefillLoadHint, RouterEvent,
        RouterRequest, RouterResponse, TokensWithHashes, WorkerId, WorkerWithDpRank,
        compute_block_hash_for_seq,
17
18
    },
};
19
use dynamo_runtime::{
20
    component::{Client, Endpoint},
21
    discovery::DiscoveryQuery,
22
    pipeline::{
23
24
        AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn,
        async_trait,
25
    },
26
    protocols::EndpointId,
27
    protocols::annotated::Annotated,
28
    traits::DistributedRuntimeProvider,
29
};
30
use futures::stream;
31
use tracing::Instrument;
32
use validator::Validate;
33

34
35
36
37
38
39
40
// Re-export from dynamo-kv-router crate
pub use dynamo_kv_router::approx;
pub use dynamo_kv_router::protocols;
pub use dynamo_kv_router::scheduling;
pub use dynamo_kv_router::selector;

pub mod agent_controller;
41
pub mod indexer;
42
pub mod metrics;
43
pub mod prefill_router;
44
pub mod publisher;
45
pub mod push_router;
46
pub mod scheduler;
47
pub mod sequence;
48
pub mod sticky_sessions;
49

50
pub use agent_controller::AgentController;
51
pub use indexer::{Indexer, ServedIndexerHandle, ServedIndexerMode, ensure_served_indexer_service};
52
pub use prefill_router::PrefillRouter;
53
pub use push_router::{DirectRoutingRouter, KvPushRouter};
54
pub use sticky_sessions::StickySessionRouter;
55

56
use crate::{
57
    discovery::RuntimeConfigWatch,
58
    kv_router::{
59
        scheduler::{DefaultWorkerSelector, KvScheduler, PotentialLoad},
60
        sequence::{SequenceError, SequenceRequest},
61
    },
62
    local_model::runtime_config::ModelRuntimeConfig,
63
64
};

65
66
use std::collections::HashSet;

67
68
// [gluo TODO] shouldn't need to be public
// this should be discovered from the component
69
70
71
72
73
74
75
76
77
78

// for metric scraping (pull-based)
pub const KV_METRICS_ENDPOINT: &str = "load_metrics";

// for metric publishing (push-based)
pub const KV_METRICS_SUBJECT: &str = "kv_metrics";

// for inter-router comms
pub const PREFILL_SUBJECT: &str = "prefill_events";
pub const ACTIVE_SEQUENCES_SUBJECT: &str = "active_sequences_events";
79

80
81
82
83
// for radix tree snapshot storage
pub const RADIX_STATE_BUCKET: &str = "radix-bucket";
pub const RADIX_STATE_FILE: &str = "radix-state";

84
85
86
// for worker-local kvindexer query
pub const WORKER_KV_INDEXER_BUFFER_SIZE: usize = 1024; // store 1024 most recent events in worker buffer

87
88
89
90
91
92
/// Generates a dp_rank-specific endpoint name for the worker KV indexer query service.
/// Each dp_rank has its own LocalKvIndexer and query endpoint to ensure per-dp_rank monotonicity.
pub fn worker_kv_indexer_query_endpoint(dp_rank: DpRank) -> String {
    format!("worker_kv_indexer_query_dp{dp_rank}")
}

93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
fn log_routing_input_hashes(
    request_id: Option<&str>,
    block_size: u32,
    tokens: &[u32],
    local_hashes: &[LocalBlockHash],
) {
    if !tracing::enabled!(tracing::Level::DEBUG) {
        return;
    }

    let local_hash_ids: Vec<u64> = local_hashes.iter().map(|hash| hash.0).collect();

    tracing::debug!(
        request_id = request_id.unwrap_or(""),
        isl_tokens = tokens.len(),
        block_size,
        num_blocks = local_hashes.len(),
        local_hashes = ?local_hash_ids,
        "[ROUTING_INPUT] request local hashes"
    );
}

115
// for router discovery registration
116
pub const KV_ROUTER_ENDPOINT: &str = "router-discovery";
117
118

/// Creates an EndpointId for the KV router in the given namespace.
119
pub fn router_endpoint_id(namespace: String, component: String) -> EndpointId {
120
121
    EndpointId {
        namespace,
122
        component,
123
124
125
126
127
        name: KV_ROUTER_ENDPOINT.to_string(),
    }
}

/// Creates a DiscoveryQuery for the KV router in the given namespace.
128
pub fn router_discovery_query(namespace: String, component: String) -> DiscoveryQuery {
129
130
    DiscoveryQuery::Endpoint {
        namespace,
131
        component,
132
133
134
135
        endpoint: KV_ROUTER_ENDPOINT.to_string(),
    }
}

136
137
/// A KvRouter only decides which worker you should use. It doesn't send you there.
/// TODO: Rename this to indicate it only selects a worker, it does not route.
138
139
140
141
pub struct KvRouter<Sel = DefaultWorkerSelector>
where
    Sel: dynamo_kv_router::selector::WorkerSelector<ModelRuntimeConfig>,
{
142
    indexer: Indexer,
143
    scheduler: KvScheduler<Sel>,
144
    workers_with_configs: RuntimeConfigWatch,
145
    block_size: u32,
146
    kv_router_config: KvRouterConfig,
147
    prefill_load_estimator: Option<Arc<dyn PrefillLoadEstimator>>,
Yan Ru Pei's avatar
Yan Ru Pei committed
148
    cancellation_token: tokio_util::sync::CancellationToken,
149
    client: Client,
150
    is_eagle: bool,
151
    _served_indexer_handle: Option<ServedIndexerHandle>,
152
153
}

154
155
156
157
impl<Sel> KvRouter<Sel>
where
    Sel: dynamo_kv_router::selector::WorkerSelector<ModelRuntimeConfig> + Send + Sync + 'static,
{
158
    #[allow(clippy::too_many_arguments)]
159
    pub async fn new(
160
161
        endpoint: Endpoint,
        client: Client,
162
        workers_with_configs: RuntimeConfigWatch,
163
        block_size: u32,
164
        selector: Sel,
165
        kv_router_config: Option<KvRouterConfig>,
166
        prefill_load_estimator: Option<Arc<dyn PrefillLoadEstimator>>,
167
        worker_type: &'static str,
168
        model_name: Option<String>,
169
        is_eagle: bool,
170
    ) -> Result<Self> {
171
        let kv_router_config = kv_router_config.unwrap_or_default();
172
        kv_router_config.validate()?;
173
        let component = endpoint.component();
174
        let cancellation_token = component.drt().primary_token();
175
        let min_initial_workers = min_initial_workers_from_env()?;
176

177
178
179
180
181
182
183
        let indexer = Indexer::new(
            component,
            &kv_router_config,
            block_size,
            model_name.as_deref(),
        )
        .await?;
184

185
186
187
188
        if min_initial_workers > 0 && !kv_router_config.skip_initial_worker_wait {
            let mut startup_watch = workers_with_configs.clone();
            let _ = startup_watch
                .wait_for(|m| m.len() >= min_initial_workers)
189
190
                .await
                .map_err(|_| {
191
192
                    anyhow::anyhow!(
                        "runtime config watch closed before {} workers appeared",
193
                        min_initial_workers
194
                    )
195
196
                })?;
        }
197

198
        let scheduler = KvScheduler::start(
199
            component.clone(),
200
            block_size,
201
            workers_with_configs.clone(),
202
            selector,
203
            &kv_router_config,
204
            prefill_load_estimator.clone(),
205
            worker_type,
206
207
        )
        .await?;
208

209
210
        // Start KV event subscription if needed — skip when using a remote indexer.
        if kv_router_config.use_remote_indexer {
211
212
            tracing::info!("Skipping KV event subscription (using remote indexer)");
        } else if kv_router_config.should_subscribe_to_kv_events() {
213
            indexer::start_subscriber(component.clone(), &kv_router_config, indexer.clone())
214
                .await?;
215
        } else {
216
            tracing::info!(
217
218
219
                "Skipping KV event subscription (use_kv_events={}, overlap_score_weight={})",
                kv_router_config.use_kv_events,
                kv_router_config.overlap_score_weight,
220
            );
221
        }
222

223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
        let served_indexer_handle = if kv_router_config.serve_indexer {
            let model_name = model_name.clone().ok_or_else(|| {
                anyhow::anyhow!("model_name is required when serve_indexer is configured")
            })?;
            Some(
                ensure_served_indexer_service(
                    component.clone(),
                    ServedIndexerMode::from_use_kv_events(kv_router_config.use_kv_events),
                    model_name,
                    indexer.clone(),
                )
                .await?,
            )
        } else {
            None
        };

240
        tracing::info!("KV Routing initialized");
241
        Ok(Self {
242
            indexer,
243
            scheduler,
244
            workers_with_configs,
245
            block_size,
246
            kv_router_config,
247
            prefill_load_estimator,
Yan Ru Pei's avatar
Yan Ru Pei committed
248
            cancellation_token,
249
            client,
250
            is_eagle,
251
            _served_indexer_handle: served_indexer_handle,
252
        })
253
254
    }

255
256
257
258
259
    /// Get a reference to the client used by this KvRouter
    pub fn client(&self) -> &Client {
        &self.client
    }

260
261
262
263
264
265
266
267
    pub fn indexer(&self) -> &Indexer {
        &self.indexer
    }

    pub fn kv_router_config(&self) -> &KvRouterConfig {
        &self.kv_router_config
    }

268
269
270
271
    pub fn is_eagle(&self) -> bool {
        self.is_eagle
    }

272
273
    pub async fn record_routing_decision(
        &self,
274
        mut tokens_with_hashes: TokensWithHashes,
275
276
277
278
279
280
281
        worker: WorkerWithDpRank,
    ) -> Result<(), KvRouterError> {
        self.indexer
            .process_routing_decision_for_request(&mut tokens_with_hashes, worker)
            .await
    }

282
    /// Give these tokens, find the worker with the best match in it's KV cache.
Yan Ru Pei's avatar
Yan Ru Pei committed
283
    /// Returns the best worker (with dp_rank) and overlap amount in number of blocks.
284
285
    /// Now also takes optional context_id for request tracking.
    ///
286
287
288
    /// When `pinned_worker` is Some, scheduling and queueing are constrained to
    /// that exact worker/rank.
    ///
289
    /// When `allowed_worker_ids` is Some, only workers in that set are considered for selection.
290
    #[allow(clippy::too_many_arguments)]
Yan Ru Pei's avatar
Yan Ru Pei committed
291
    pub async fn find_best_match(
292
        &self,
Yan Ru Pei's avatar
Yan Ru Pei committed
293
        context_id: Option<&str>,
294
        tokens: &[u32],
295
        block_mm_infos: Option<&[Option<BlockExtraInfo>]>,
296
        router_config_override: Option<&RouterConfigOverride>,
297
        update_states: bool,
298
        lora_name: Option<String>,
299
        priority_jump: f64,
300
        expected_output_tokens: Option<u32>,
301
        pinned_worker: Option<WorkerWithDpRank>,
302
        allowed_worker_ids: Option<HashSet<WorkerId>>,
Yan Ru Pei's avatar
Yan Ru Pei committed
303
    ) -> anyhow::Result<(WorkerWithDpRank, u32)> {
304
305
        let start = Instant::now();

Yan Ru Pei's avatar
Yan Ru Pei committed
306
        if update_states && context_id.is_none() {
307
            anyhow::bail!("context_id must be provided when update_states is true");
Yan Ru Pei's avatar
Yan Ru Pei committed
308
309
        }

310
        let isl_tokens = tokens.len();
311
312
313
314
315
316
317
318
        let hash_options = BlockHashOptions {
            block_mm_infos,
            lora_name: lora_name.as_deref(),
            is_eagle: Some(self.is_eagle),
        };

        let block_hashes = tracing::info_span!("kv_router.compute_block_hashes")
            .in_scope(|| compute_block_hash_for_seq(tokens, self.block_size, hash_options));
319
        log_routing_input_hashes(context_id, self.block_size, tokens, &block_hashes);
320
321
322
323
        let hash_elapsed = start.elapsed();
        // Compute seq_hashes only if scheduler needs it for active blocks tracking
        let maybe_seq_hashes = tracing::info_span!("kv_router.compute_seq_hashes").in_scope(|| {
            self.kv_router_config.compute_seq_hashes_for_tracking(
324
325
                tokens,
                self.block_size,
326
327
328
                router_config_override,
                hash_options,
                Some(&block_hashes),
329
330
            )
        });
331
        let seq_hash_elapsed = start.elapsed();
332

333
        let overlap_scores = self
334
335
336
337
            .indexer
            .find_matches(block_hashes)
            .instrument(tracing::info_span!("kv_router.find_matches"))
            .await?;
338
        let find_matches_elapsed = start.elapsed();
339

340
        let response = self
341
            .scheduler
342
            .schedule(
Yan Ru Pei's avatar
Yan Ru Pei committed
343
                context_id.map(|s| s.to_string()),
344
                isl_tokens,
345
                maybe_seq_hashes,
346
                overlap_scores,
347
                router_config_override,
348
                update_states,
349
                lora_name,
350
                priority_jump,
351
                expected_output_tokens,
352
                pinned_worker,
353
                allowed_worker_ids,
354
            )
355
            .instrument(tracing::info_span!("kv_router.schedule"))
356
            .await?;
357
358
        let total_elapsed = start.elapsed();

359
360
361
362
        if let Some(m) = metrics::RoutingOverheadMetrics::get() {
            m.observe(
                hash_elapsed,
                seq_hash_elapsed,
363
                find_matches_elapsed,
364
365
366
                total_elapsed,
            );
        }
367

368
        #[cfg(feature = "bench")]
369
370
371
        tracing::info!(
            isl_tokens,
            hash_us = hash_elapsed.as_micros() as u64,
372
373
374
            seq_hash_us = (seq_hash_elapsed - hash_elapsed).as_micros() as u64,
            find_matches_us = (find_matches_elapsed - seq_hash_elapsed).as_micros() as u64,
            schedule_us = (total_elapsed - find_matches_elapsed).as_micros() as u64,
375
376
377
            total_us = total_elapsed.as_micros() as u64,
            "find_best_match completed"
        );
378

379
        Ok((response.best_worker, response.overlap_blocks))
380
381
    }

382
383
384
385
386
    /// Register externally-provided workers in the slot tracker.
    pub fn register_workers(&self, worker_ids: &HashSet<WorkerId>) {
        self.scheduler.register_workers(worker_ids);
    }

387
    #[allow(clippy::too_many_arguments)]
388
389
390
391
    pub async fn add_request(
        &self,
        request_id: String,
        tokens: &[u32],
392
        block_mm_infos: Option<&[Option<BlockExtraInfo>]>,
393
        overlap_blocks: u32,
394
        expected_output_tokens: Option<u32>,
Yan Ru Pei's avatar
Yan Ru Pei committed
395
        worker: WorkerWithDpRank,
396
        lora_name: Option<String>,
397
        router_config_override: Option<&RouterConfigOverride>,
398
399
    ) {
        let isl_tokens = tokens.len();
400
401
402
403
404
        let hash_options = BlockHashOptions {
            block_mm_infos,
            lora_name: lora_name.as_deref(),
            is_eagle: Some(self.is_eagle),
        };
405

406
407
408
409
        let maybe_seq_hashes = self.kv_router_config.compute_seq_hashes_for_tracking(
            tokens,
            self.block_size,
            router_config_override,
410
411
            hash_options,
            None,
412
        );
413
414
415
        let track_prefill_tokens = self
            .kv_router_config
            .track_prefill_tokens(router_config_override);
416
417
        let prefill_load_hint =
            self.prefill_load_hint_for(isl_tokens, overlap_blocks, track_prefill_tokens);
418

419
420
        if let Err(e) = self
            .scheduler
421
422
423
424
425
            .add_request(SequenceRequest {
                request_id: request_id.clone(),
                token_sequence: maybe_seq_hashes,
                isl: isl_tokens,
                overlap: overlap_blocks,
426
                track_prefill_tokens,
427
                expected_output_tokens,
428
                prefill_load_hint,
Yan Ru Pei's avatar
Yan Ru Pei committed
429
                worker,
430
                lora_name,
431
            })
432
433
434
435
            .await
        {
            tracing::warn!("Failed to add request {request_id}: {e}");
        }
436
437
    }

438
    pub async fn mark_prefill_completed(&self, request_id: &str) -> Result<(), SequenceError> {
439
        self.scheduler.mark_prefill_completed(request_id).await
440
441
    }

442
    pub async fn free(&self, request_id: &str) -> Result<(), SequenceError> {
443
        self.scheduler.free(request_id).await
444
    }
445

446
447
448
449
450
    /// Number of requests currently parked in the scheduler queue.
    pub fn pending_count(&self) -> usize {
        self.scheduler.pending_count()
    }

451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
    fn prefill_load_hint_for(
        &self,
        isl_tokens: usize,
        overlap_blocks: u32,
        track_prefill_tokens: bool,
    ) -> Option<PrefillLoadHint> {
        if !track_prefill_tokens {
            return None;
        }

        let prefix = (overlap_blocks as usize) * (self.block_size as usize);
        let effective_isl = isl_tokens.saturating_sub(prefix);
        if effective_isl == 0 {
            return None;
        }

        let Some(estimator) = &self.prefill_load_estimator else {
            return None;
        };

        match estimator.predict_prefill_duration(1, effective_isl, prefix) {
            Ok(expected_prefill_duration) => Some(PrefillLoadHint {
                initial_effective_prefill_tokens: effective_isl,
                expected_prefill_duration: Some(expected_prefill_duration),
            }),
            Err(error) => {
                tracing::warn!(
                    effective_isl,
                    prefix,
                    "failed to predict prefill duration for direct add_request path: {error}"
                );
                None
            }
        }
    }

487
488
489
490
491
492
    /// Get the worker type for this router ("prefill" or "decode").
    /// Used for Prometheus metric labeling.
    pub fn worker_type(&self) -> &'static str {
        self.scheduler.worker_type()
    }

493
494
495
496
497
498
499
    /// Return the worker's unique global DP rank when it owns exactly one rank.
    pub fn unique_dp_rank_for_worker(&self, worker_id: WorkerId) -> Option<u32> {
        let configs = self.workers_with_configs.borrow();
        let config = configs.get(&worker_id)?;
        (config.data_parallel_size == 1).then_some(config.data_parallel_start_rank)
    }

500
    pub fn add_output_block(
501
502
503
504
        &self,
        request_id: &str,
        decay_fraction: Option<f64>,
    ) -> Result<(), SequenceError> {
505
        self.scheduler.add_output_block(request_id, decay_fraction)
506
507
    }

508
    pub fn block_size(&self) -> u32 {
509
510
        self.block_size
    }
511

512
513
514
515
516
    /// Compute the overlap blocks for a given token sequence and worker.
    /// This queries the indexer to find how many blocks are already cached.
    pub async fn get_overlap_blocks(
        &self,
        tokens: &[u32],
517
        block_mm_infos: Option<&[Option<BlockExtraInfo>]>,
518
        worker: WorkerWithDpRank,
519
        lora_name: Option<&str>,
520
    ) -> Result<u32, KvRouterError> {
521
522
523
524
525
526
527
528
529
        let block_hashes = compute_block_hash_for_seq(
            tokens,
            self.block_size,
            BlockHashOptions {
                block_mm_infos,
                lora_name,
                is_eagle: Some(self.is_eagle),
            },
        );
530
        log_routing_input_hashes(None, self.block_size, tokens, &block_hashes);
531
532
533
534
        let overlap_scores = self.indexer.find_matches(block_hashes).await?;
        Ok(overlap_scores.scores.get(&worker).copied().unwrap_or(0))
    }

535
    /// Get potential prefill and decode loads for all workers
536
537
538
539
    pub async fn get_potential_loads(
        &self,
        tokens: &[u32],
        router_config_override: Option<&RouterConfigOverride>,
540
        block_mm_infos: Option<&[Option<BlockExtraInfo>]>,
541
        lora_name: Option<&str>,
542
    ) -> Result<Vec<PotentialLoad>> {
543
        let isl_tokens = tokens.len();
544
545
546
547
548
549
        let hash_options = BlockHashOptions {
            block_mm_infos,
            lora_name,
            is_eagle: Some(self.is_eagle),
        };
        let block_hashes = compute_block_hash_for_seq(tokens, self.block_size, hash_options);
550

551
552
553
554
        let maybe_seq_hashes = self.kv_router_config.compute_seq_hashes_for_tracking(
            tokens,
            self.block_size,
            router_config_override,
555
556
            hash_options,
            Some(&block_hashes),
557
        );
558
559
560
        let track_prefill_tokens = self
            .kv_router_config
            .track_prefill_tokens(router_config_override);
561
562
        let overlap_scores = self.indexer.find_matches(block_hashes).await?;

563
564
565
566
567
568
        Ok(self.scheduler.get_potential_loads(
            maybe_seq_hashes,
            isl_tokens,
            overlap_scores,
            track_prefill_tokens,
        ))
569
570
    }

571
572
573
574
    /// Dump all events from the indexer
    pub async fn dump_events(&self) -> Result<Vec<RouterEvent>, KvRouterError> {
        self.indexer.dump_events().await
    }
575
576
}

Michael Feil's avatar
Michael Feil committed
577
578
// NOTE: KVRouter works like a PushRouter,
// but without the reverse proxy functionality, but based on contract of 3 request types
579
#[async_trait]
580
581
582
583
584
impl<Sel> AsyncEngine<SingleIn<RouterRequest>, ManyOut<Annotated<RouterResponse>>, Error>
    for KvRouter<Sel>
where
    Sel: dynamo_kv_router::selector::WorkerSelector<ModelRuntimeConfig> + Send + Sync + 'static,
{
585
586
587
588
589
    async fn generate(
        &self,
        request: SingleIn<RouterRequest>,
    ) -> Result<ManyOut<Annotated<RouterResponse>>> {
        let (request, ctx) = request.into_parts();
Michael Feil's avatar
Michael Feil committed
590
591
592
        let context_id = ctx.context().id().to_string();
        // Handle different request types
        let response = match request {
593
594
595
596
            RouterRequest::New {
                tokens,
                block_mm_infos,
            } => {
Yan Ru Pei's avatar
Yan Ru Pei committed
597
                let (best_worker, overlap_blocks) = self
598
599
600
601
602
603
604
605
                    .find_best_match(
                        Some(&context_id),
                        &tokens,
                        block_mm_infos.as_deref(),
                        None,
                        true,
                        None,
                        0.0,
606
                        None,
607
                        None,
608
                        None,
609
                    )
Michael Feil's avatar
Michael Feil committed
610
611
612
                    .await?;

                RouterResponse::New {
Yan Ru Pei's avatar
Yan Ru Pei committed
613
614
                    worker_id: best_worker.worker_id,
                    dp_rank: best_worker.dp_rank,
Michael Feil's avatar
Michael Feil committed
615
616
617
                    overlap_blocks,
                }
            }
618
619
620
            RouterRequest::MarkPrefill => RouterResponse::PrefillMarked {
                success: self.mark_prefill_completed(&context_id).await.is_ok(),
            },
621
622
623
624
625
626
627
628
629
            RouterRequest::MarkFree { request_id } => {
                let request_id = match request_id.as_deref() {
                    Some(request_id) if !request_id.trim().is_empty() => request_id,
                    _ => &context_id,
                };
                RouterResponse::FreeMarked {
                    success: self.free(request_id).await.is_ok(),
                }
            }
Michael Feil's avatar
Michael Feil committed
630
        };
631
632
633
634
635
636

        let response = Annotated::from_data(response);
        let stream = stream::iter(vec![response]);
        Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
    }
}
637

638
639
640
641
impl<Sel> Drop for KvRouter<Sel>
where
    Sel: dynamo_kv_router::selector::WorkerSelector<ModelRuntimeConfig>,
{
Yan Ru Pei's avatar
Yan Ru Pei committed
642
643
644
645
646
    fn drop(&mut self) {
        tracing::info!("Dropping KvRouter - cancelling background tasks");
        self.cancellation_token.cancel();
    }
}