kv_router.rs 21.3 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
// SPDX-License-Identifier: Apache-2.0

4
use std::sync::Arc;
5
use std::time::Instant;
6

7
use anyhow::Result;
8
use dynamo_kv_router::{
9
    PrefillLoadEstimator,
10
    config::{KvRouterConfig, RouterConfigOverride, min_initial_workers_from_env},
11
    indexer::KvRouterError,
12
13
    protocols::KV_EVENT_SUBJECT,
    protocols::{
14
15
16
        BlockExtraInfo, BlockHashOptions, DpRank, LocalBlockHash, PrefillLoadHint, RouterEvent,
        RouterRequest, RouterResponse, TokensWithHashes, WorkerId, WorkerWithDpRank,
        compute_block_hash_for_seq,
17
18
    },
};
19
use dynamo_runtime::{
20
    component::{Client, Endpoint},
21
    discovery::DiscoveryQuery,
22
    pipeline::{
23
24
        AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn,
        async_trait,
25
    },
26
    protocols::EndpointId,
27
    protocols::annotated::Annotated,
28
    traits::DistributedRuntimeProvider,
29
};
30
use futures::stream;
31
use tracing::Instrument;
32
use validator::Validate;
33

34
35
36
37
38
39
40
// Re-export from dynamo-kv-router crate
pub use dynamo_kv_router::approx;
pub use dynamo_kv_router::protocols;
pub use dynamo_kv_router::scheduling;
pub use dynamo_kv_router::selector;

pub mod agent_controller;
41
pub mod indexer;
42
pub mod metrics;
43
pub mod prefill_router;
44
pub mod publisher;
45
pub mod push_router;
46
pub mod scheduler;
47
pub mod sequence;
48
pub mod sticky_sessions;
49

50
pub use agent_controller::AgentController;
51
pub use indexer::{Indexer, ServedIndexerHandle, ServedIndexerMode, ensure_served_indexer_service};
52
pub use prefill_router::PrefillRouter;
53
pub use push_router::{DirectRoutingRouter, KvPushRouter};
54
pub use sticky_sessions::StickySessionRouter;
55

56
use crate::{
57
    discovery::RuntimeConfigWatch,
58
    kv_router::{
59
        scheduler::{DefaultWorkerSelector, KvScheduler, PotentialLoad},
60
        sequence::{SequenceError, SequenceRequest},
61
    },
62
    local_model::runtime_config::ModelRuntimeConfig,
63
64
};

65
66
use std::collections::HashSet;

67
68
// [gluo TODO] shouldn't need to be public
// this should be discovered from the component
69
70
71
72
73
74
75
76
77
78

// for metric scraping (pull-based)
pub const KV_METRICS_ENDPOINT: &str = "load_metrics";

// for metric publishing (push-based)
pub const KV_METRICS_SUBJECT: &str = "kv_metrics";

// for inter-router comms
pub const PREFILL_SUBJECT: &str = "prefill_events";
pub const ACTIVE_SEQUENCES_SUBJECT: &str = "active_sequences_events";
79

80
81
82
83
// for radix tree snapshot storage
pub const RADIX_STATE_BUCKET: &str = "radix-bucket";
pub const RADIX_STATE_FILE: &str = "radix-state";

84
85
86
// for worker-local kvindexer query
pub const WORKER_KV_INDEXER_BUFFER_SIZE: usize = 1024; // store 1024 most recent events in worker buffer

87
88
89
90
91
92
/// Generates a dp_rank-specific endpoint name for the worker KV indexer query service.
/// Each dp_rank has its own LocalKvIndexer and query endpoint to ensure per-dp_rank monotonicity.
pub fn worker_kv_indexer_query_endpoint(dp_rank: DpRank) -> String {
    format!("worker_kv_indexer_query_dp{dp_rank}")
}

93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
fn log_routing_input_hashes(
    request_id: Option<&str>,
    block_size: u32,
    tokens: &[u32],
    local_hashes: &[LocalBlockHash],
) {
    if !tracing::enabled!(tracing::Level::DEBUG) {
        return;
    }

    let local_hash_ids: Vec<u64> = local_hashes.iter().map(|hash| hash.0).collect();

    tracing::debug!(
        request_id = request_id.unwrap_or(""),
        isl_tokens = tokens.len(),
        block_size,
        num_blocks = local_hashes.len(),
        local_hashes = ?local_hash_ids,
        "[ROUTING_INPUT] request local hashes"
    );
}

115
// for router discovery registration
116
pub const KV_ROUTER_ENDPOINT: &str = "router-discovery";
117
118

/// Creates an EndpointId for the KV router in the given namespace.
119
pub fn router_endpoint_id(namespace: String, component: String) -> EndpointId {
120
121
    EndpointId {
        namespace,
122
        component,
123
124
125
126
127
        name: KV_ROUTER_ENDPOINT.to_string(),
    }
}

/// Creates a DiscoveryQuery for the KV router in the given namespace.
128
pub fn router_discovery_query(namespace: String, component: String) -> DiscoveryQuery {
129
130
    DiscoveryQuery::Endpoint {
        namespace,
131
        component,
132
133
134
135
        endpoint: KV_ROUTER_ENDPOINT.to_string(),
    }
}

136
137
/// A KvRouter only decides which worker you should use. It doesn't send you there.
/// TODO: Rename this to indicate it only selects a worker, it does not route.
138
139
140
141
pub struct KvRouter<Sel = DefaultWorkerSelector>
where
    Sel: dynamo_kv_router::selector::WorkerSelector<ModelRuntimeConfig>,
{
142
    indexer: Indexer,
143
    scheduler: KvScheduler<Sel>,
144
    workers_with_configs: RuntimeConfigWatch,
145
    block_size: u32,
146
    kv_router_config: KvRouterConfig,
147
    prefill_load_estimator: Option<Arc<dyn PrefillLoadEstimator>>,
Yan Ru Pei's avatar
Yan Ru Pei committed
148
    cancellation_token: tokio_util::sync::CancellationToken,
149
    client: Client,
150
    is_eagle: bool,
151
    _served_indexer_handle: Option<ServedIndexerHandle>,
152
153
}

154
155
156
157
impl<Sel> KvRouter<Sel>
where
    Sel: dynamo_kv_router::selector::WorkerSelector<ModelRuntimeConfig> + Send + Sync + 'static,
{
158
    #[allow(clippy::too_many_arguments)]
159
    pub async fn new(
160
161
        endpoint: Endpoint,
        client: Client,
162
        workers_with_configs: RuntimeConfigWatch,
163
        block_size: u32,
164
        selector: Sel,
165
        kv_router_config: Option<KvRouterConfig>,
166
        prefill_load_estimator: Option<Arc<dyn PrefillLoadEstimator>>,
167
        worker_type: &'static str,
168
        model_name: Option<String>,
169
        is_eagle: bool,
170
    ) -> Result<Self> {
171
        let kv_router_config = kv_router_config.unwrap_or_default();
172
        kv_router_config.validate()?;
173
        let component = endpoint.component();
174
        let cancellation_token = component.drt().primary_token();
175
        let min_initial_workers = min_initial_workers_from_env()?;
176

177
178
179
180
181
182
183
        let indexer = Indexer::new(
            component,
            &kv_router_config,
            block_size,
            model_name.as_deref(),
        )
        .await?;
184

185
186
187
188
        if min_initial_workers > 0 && !kv_router_config.skip_initial_worker_wait {
            let mut startup_watch = workers_with_configs.clone();
            let _ = startup_watch
                .wait_for(|m| m.len() >= min_initial_workers)
189
190
                .await
                .map_err(|_| {
191
192
                    anyhow::anyhow!(
                        "runtime config watch closed before {} workers appeared",
193
                        min_initial_workers
194
                    )
195
196
                })?;
        }
197

198
        let scheduler = KvScheduler::start(
199
            component.clone(),
200
            block_size,
201
            workers_with_configs.clone(),
202
            selector,
203
            &kv_router_config,
204
            prefill_load_estimator.clone(),
205
            worker_type,
206
207
        )
        .await?;
208

209
210
        // Start KV event subscription if needed — skip when using a remote indexer.
        if kv_router_config.use_remote_indexer {
211
212
            tracing::info!("Skipping KV event subscription (using remote indexer)");
        } else if kv_router_config.should_subscribe_to_kv_events() {
213
            indexer::start_subscriber(component.clone(), &kv_router_config, indexer.clone())
214
                .await?;
215
        } else {
216
            tracing::info!(
217
218
219
                "Skipping KV event subscription (use_kv_events={}, overlap_score_weight={})",
                kv_router_config.use_kv_events,
                kv_router_config.overlap_score_weight,
220
            );
221
        }
222

223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
        let served_indexer_handle = if kv_router_config.serve_indexer {
            let model_name = model_name.clone().ok_or_else(|| {
                anyhow::anyhow!("model_name is required when serve_indexer is configured")
            })?;
            Some(
                ensure_served_indexer_service(
                    component.clone(),
                    ServedIndexerMode::from_use_kv_events(kv_router_config.use_kv_events),
                    model_name,
                    indexer.clone(),
                )
                .await?,
            )
        } else {
            None
        };

240
        tracing::info!("KV Routing initialized");
241
        Ok(Self {
242
            indexer,
243
            scheduler,
244
            workers_with_configs,
245
            block_size,
246
            kv_router_config,
247
            prefill_load_estimator,
Yan Ru Pei's avatar
Yan Ru Pei committed
248
            cancellation_token,
249
            client,
250
            is_eagle,
251
            _served_indexer_handle: served_indexer_handle,
252
        })
253
254
    }

255
256
257
258
259
    /// Get a reference to the client used by this KvRouter
    pub fn client(&self) -> &Client {
        &self.client
    }

260
261
262
263
264
265
266
267
    pub fn indexer(&self) -> &Indexer {
        &self.indexer
    }

    pub fn kv_router_config(&self) -> &KvRouterConfig {
        &self.kv_router_config
    }

268
269
270
271
    pub fn is_eagle(&self) -> bool {
        self.is_eagle
    }

272
273
    pub async fn record_routing_decision(
        &self,
274
        mut tokens_with_hashes: TokensWithHashes,
275
276
277
278
279
280
281
        worker: WorkerWithDpRank,
    ) -> Result<(), KvRouterError> {
        self.indexer
            .process_routing_decision_for_request(&mut tokens_with_hashes, worker)
            .await
    }

282
    /// Give these tokens, find the worker with the best match in it's KV cache.
Yan Ru Pei's avatar
Yan Ru Pei committed
283
    /// Returns the best worker (with dp_rank) and overlap amount in number of blocks.
284
285
286
    /// Now also takes optional context_id for request tracking.
    ///
    /// When `allowed_worker_ids` is Some, only workers in that set are considered for selection.
287
    #[allow(clippy::too_many_arguments)]
Yan Ru Pei's avatar
Yan Ru Pei committed
288
    pub async fn find_best_match(
289
        &self,
Yan Ru Pei's avatar
Yan Ru Pei committed
290
        context_id: Option<&str>,
291
        tokens: &[u32],
292
        block_mm_infos: Option<&[Option<BlockExtraInfo>]>,
293
        router_config_override: Option<&RouterConfigOverride>,
294
        update_states: bool,
295
        lora_name: Option<String>,
296
        priority_jump: f64,
297
        expected_output_tokens: Option<u32>,
298
        allowed_worker_ids: Option<HashSet<WorkerId>>,
Yan Ru Pei's avatar
Yan Ru Pei committed
299
    ) -> anyhow::Result<(WorkerWithDpRank, u32)> {
300
301
        let start = Instant::now();

Yan Ru Pei's avatar
Yan Ru Pei committed
302
        if update_states && context_id.is_none() {
303
            anyhow::bail!("context_id must be provided when update_states is true");
Yan Ru Pei's avatar
Yan Ru Pei committed
304
305
        }

306
        let isl_tokens = tokens.len();
307
308
309
310
311
312
313
314
        let hash_options = BlockHashOptions {
            block_mm_infos,
            lora_name: lora_name.as_deref(),
            is_eagle: Some(self.is_eagle),
        };

        let block_hashes = tracing::info_span!("kv_router.compute_block_hashes")
            .in_scope(|| compute_block_hash_for_seq(tokens, self.block_size, hash_options));
315
        log_routing_input_hashes(context_id, self.block_size, tokens, &block_hashes);
316
317
318
319
        let hash_elapsed = start.elapsed();
        // Compute seq_hashes only if scheduler needs it for active blocks tracking
        let maybe_seq_hashes = tracing::info_span!("kv_router.compute_seq_hashes").in_scope(|| {
            self.kv_router_config.compute_seq_hashes_for_tracking(
320
321
                tokens,
                self.block_size,
322
323
324
                router_config_override,
                hash_options,
                Some(&block_hashes),
325
326
            )
        });
327
        let seq_hash_elapsed = start.elapsed();
328

329
        let overlap_scores = self
330
331
332
333
            .indexer
            .find_matches(block_hashes)
            .instrument(tracing::info_span!("kv_router.find_matches"))
            .await?;
334
        let find_matches_elapsed = start.elapsed();
335

336
        let response = self
337
            .scheduler
338
            .schedule(
Yan Ru Pei's avatar
Yan Ru Pei committed
339
                context_id.map(|s| s.to_string()),
340
                isl_tokens,
341
                maybe_seq_hashes,
342
                overlap_scores,
343
                router_config_override,
344
                update_states,
345
                lora_name,
346
                priority_jump,
347
                expected_output_tokens,
348
                allowed_worker_ids,
349
            )
350
            .instrument(tracing::info_span!("kv_router.schedule"))
351
            .await?;
352
353
        let total_elapsed = start.elapsed();

354
355
356
357
        if let Some(m) = metrics::RoutingOverheadMetrics::get() {
            m.observe(
                hash_elapsed,
                seq_hash_elapsed,
358
                find_matches_elapsed,
359
360
361
                total_elapsed,
            );
        }
362

363
        #[cfg(feature = "bench")]
364
365
366
        tracing::info!(
            isl_tokens,
            hash_us = hash_elapsed.as_micros() as u64,
367
368
369
            seq_hash_us = (seq_hash_elapsed - hash_elapsed).as_micros() as u64,
            find_matches_us = (find_matches_elapsed - seq_hash_elapsed).as_micros() as u64,
            schedule_us = (total_elapsed - find_matches_elapsed).as_micros() as u64,
370
371
372
            total_us = total_elapsed.as_micros() as u64,
            "find_best_match completed"
        );
373

374
        Ok((response.best_worker, response.overlap_blocks))
375
376
    }

377
378
379
380
381
    /// Register externally-provided workers in the slot tracker.
    pub fn register_workers(&self, worker_ids: &HashSet<WorkerId>) {
        self.scheduler.register_workers(worker_ids);
    }

382
    #[allow(clippy::too_many_arguments)]
383
384
385
386
    pub async fn add_request(
        &self,
        request_id: String,
        tokens: &[u32],
387
        block_mm_infos: Option<&[Option<BlockExtraInfo>]>,
388
        overlap_blocks: u32,
389
        expected_output_tokens: Option<u32>,
Yan Ru Pei's avatar
Yan Ru Pei committed
390
        worker: WorkerWithDpRank,
391
        lora_name: Option<String>,
392
        router_config_override: Option<&RouterConfigOverride>,
393
394
    ) {
        let isl_tokens = tokens.len();
395
396
397
398
399
        let hash_options = BlockHashOptions {
            block_mm_infos,
            lora_name: lora_name.as_deref(),
            is_eagle: Some(self.is_eagle),
        };
400

401
402
403
404
        let maybe_seq_hashes = self.kv_router_config.compute_seq_hashes_for_tracking(
            tokens,
            self.block_size,
            router_config_override,
405
406
            hash_options,
            None,
407
        );
408
409
410
        let track_prefill_tokens = self
            .kv_router_config
            .track_prefill_tokens(router_config_override);
411
412
        let prefill_load_hint =
            self.prefill_load_hint_for(isl_tokens, overlap_blocks, track_prefill_tokens);
413

414
415
        if let Err(e) = self
            .scheduler
416
417
418
419
420
            .add_request(SequenceRequest {
                request_id: request_id.clone(),
                token_sequence: maybe_seq_hashes,
                isl: isl_tokens,
                overlap: overlap_blocks,
421
                track_prefill_tokens,
422
                expected_output_tokens,
423
                prefill_load_hint,
Yan Ru Pei's avatar
Yan Ru Pei committed
424
                worker,
425
                lora_name,
426
            })
427
428
429
430
            .await
        {
            tracing::warn!("Failed to add request {request_id}: {e}");
        }
431
432
    }

433
    pub async fn mark_prefill_completed(&self, request_id: &str) -> Result<(), SequenceError> {
434
        self.scheduler.mark_prefill_completed(request_id).await
435
436
    }

437
    pub async fn free(&self, request_id: &str) -> Result<(), SequenceError> {
438
        self.scheduler.free(request_id).await
439
    }
440

441
442
443
444
445
    /// Number of requests currently parked in the scheduler queue.
    pub fn pending_count(&self) -> usize {
        self.scheduler.pending_count()
    }

446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
    fn prefill_load_hint_for(
        &self,
        isl_tokens: usize,
        overlap_blocks: u32,
        track_prefill_tokens: bool,
    ) -> Option<PrefillLoadHint> {
        if !track_prefill_tokens {
            return None;
        }

        let prefix = (overlap_blocks as usize) * (self.block_size as usize);
        let effective_isl = isl_tokens.saturating_sub(prefix);
        if effective_isl == 0 {
            return None;
        }

        let Some(estimator) = &self.prefill_load_estimator else {
            return None;
        };

        match estimator.predict_prefill_duration(1, effective_isl, prefix) {
            Ok(expected_prefill_duration) => Some(PrefillLoadHint {
                initial_effective_prefill_tokens: effective_isl,
                expected_prefill_duration: Some(expected_prefill_duration),
            }),
            Err(error) => {
                tracing::warn!(
                    effective_isl,
                    prefix,
                    "failed to predict prefill duration for direct add_request path: {error}"
                );
                None
            }
        }
    }

482
483
484
485
486
487
    /// Get the worker type for this router ("prefill" or "decode").
    /// Used for Prometheus metric labeling.
    pub fn worker_type(&self) -> &'static str {
        self.scheduler.worker_type()
    }

488
489
490
491
492
493
494
    /// Return the worker's unique global DP rank when it owns exactly one rank.
    pub fn unique_dp_rank_for_worker(&self, worker_id: WorkerId) -> Option<u32> {
        let configs = self.workers_with_configs.borrow();
        let config = configs.get(&worker_id)?;
        (config.data_parallel_size == 1).then_some(config.data_parallel_start_rank)
    }

495
    pub fn add_output_block(
496
497
498
499
        &self,
        request_id: &str,
        decay_fraction: Option<f64>,
    ) -> Result<(), SequenceError> {
500
        self.scheduler.add_output_block(request_id, decay_fraction)
501
502
    }

503
    pub fn block_size(&self) -> u32 {
504
505
        self.block_size
    }
506

507
508
509
510
511
    /// Compute the overlap blocks for a given token sequence and worker.
    /// This queries the indexer to find how many blocks are already cached.
    pub async fn get_overlap_blocks(
        &self,
        tokens: &[u32],
512
        block_mm_infos: Option<&[Option<BlockExtraInfo>]>,
513
        worker: WorkerWithDpRank,
514
        lora_name: Option<&str>,
515
    ) -> Result<u32, KvRouterError> {
516
517
518
519
520
521
522
523
524
        let block_hashes = compute_block_hash_for_seq(
            tokens,
            self.block_size,
            BlockHashOptions {
                block_mm_infos,
                lora_name,
                is_eagle: Some(self.is_eagle),
            },
        );
525
        log_routing_input_hashes(None, self.block_size, tokens, &block_hashes);
526
527
528
529
        let overlap_scores = self.indexer.find_matches(block_hashes).await?;
        Ok(overlap_scores.scores.get(&worker).copied().unwrap_or(0))
    }

530
    /// Get potential prefill and decode loads for all workers
531
532
533
534
    pub async fn get_potential_loads(
        &self,
        tokens: &[u32],
        router_config_override: Option<&RouterConfigOverride>,
535
        block_mm_infos: Option<&[Option<BlockExtraInfo>]>,
536
        lora_name: Option<&str>,
537
    ) -> Result<Vec<PotentialLoad>> {
538
        let isl_tokens = tokens.len();
539
540
541
542
543
544
        let hash_options = BlockHashOptions {
            block_mm_infos,
            lora_name,
            is_eagle: Some(self.is_eagle),
        };
        let block_hashes = compute_block_hash_for_seq(tokens, self.block_size, hash_options);
545

546
547
548
549
        let maybe_seq_hashes = self.kv_router_config.compute_seq_hashes_for_tracking(
            tokens,
            self.block_size,
            router_config_override,
550
551
            hash_options,
            Some(&block_hashes),
552
        );
553
554
555
        let track_prefill_tokens = self
            .kv_router_config
            .track_prefill_tokens(router_config_override);
556
557
        let overlap_scores = self.indexer.find_matches(block_hashes).await?;

558
559
560
561
562
563
        Ok(self.scheduler.get_potential_loads(
            maybe_seq_hashes,
            isl_tokens,
            overlap_scores,
            track_prefill_tokens,
        ))
564
565
    }

566
567
568
569
    /// Dump all events from the indexer
    pub async fn dump_events(&self) -> Result<Vec<RouterEvent>, KvRouterError> {
        self.indexer.dump_events().await
    }
570
571
}

Michael Feil's avatar
Michael Feil committed
572
573
// NOTE: KVRouter works like a PushRouter,
// but without the reverse proxy functionality, but based on contract of 3 request types
574
#[async_trait]
575
576
577
578
579
impl<Sel> AsyncEngine<SingleIn<RouterRequest>, ManyOut<Annotated<RouterResponse>>, Error>
    for KvRouter<Sel>
where
    Sel: dynamo_kv_router::selector::WorkerSelector<ModelRuntimeConfig> + Send + Sync + 'static,
{
580
581
582
583
584
    async fn generate(
        &self,
        request: SingleIn<RouterRequest>,
    ) -> Result<ManyOut<Annotated<RouterResponse>>> {
        let (request, ctx) = request.into_parts();
Michael Feil's avatar
Michael Feil committed
585
586
587
        let context_id = ctx.context().id().to_string();
        // Handle different request types
        let response = match request {
588
589
590
591
            RouterRequest::New {
                tokens,
                block_mm_infos,
            } => {
Yan Ru Pei's avatar
Yan Ru Pei committed
592
                let (best_worker, overlap_blocks) = self
593
594
595
596
597
598
599
600
                    .find_best_match(
                        Some(&context_id),
                        &tokens,
                        block_mm_infos.as_deref(),
                        None,
                        true,
                        None,
                        0.0,
601
                        None,
602
                        None,
603
                    )
Michael Feil's avatar
Michael Feil committed
604
605
606
                    .await?;

                RouterResponse::New {
Yan Ru Pei's avatar
Yan Ru Pei committed
607
608
                    worker_id: best_worker.worker_id,
                    dp_rank: best_worker.dp_rank,
Michael Feil's avatar
Michael Feil committed
609
610
611
                    overlap_blocks,
                }
            }
612
613
614
            RouterRequest::MarkPrefill => RouterResponse::PrefillMarked {
                success: self.mark_prefill_completed(&context_id).await.is_ok(),
            },
615
616
617
618
619
620
621
622
623
            RouterRequest::MarkFree { request_id } => {
                let request_id = match request_id.as_deref() {
                    Some(request_id) if !request_id.trim().is_empty() => request_id,
                    _ => &context_id,
                };
                RouterResponse::FreeMarked {
                    success: self.free(request_id).await.is_ok(),
                }
            }
Michael Feil's avatar
Michael Feil committed
624
        };
625
626
627
628
629
630

        let response = Annotated::from_data(response);
        let stream = stream::iter(vec![response]);
        Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
    }
}
631

632
633
634
635
impl<Sel> Drop for KvRouter<Sel>
where
    Sel: dynamo_kv_router::selector::WorkerSelector<ModelRuntimeConfig>,
{
Yan Ru Pei's avatar
Yan Ru Pei committed
636
637
638
639
640
    fn drop(&mut self) {
        tracing::info!("Dropping KvRouter - cancelling background tasks");
        self.cancellation_token.cancel();
    }
}