router.rs 28.4 KB
Newer Older
1
use crate::tree::Tree;
2
3
4
use actix_web::http::header::{HeaderValue, CONTENT_TYPE};
use actix_web::{HttpRequest, HttpResponse};
use bytes::Bytes;
Byron Hsu's avatar
Byron Hsu committed
5
use futures_util::{StreamExt, TryStreamExt};
6
use log::{debug, error, info, warn};
7
use std::collections::HashMap;
8
use std::fmt::Debug;
9
use std::sync::atomic::AtomicUsize;
10
use std::sync::{Arc, Mutex, RwLock};
11
12
use std::thread;
use std::time::Duration;
13
use tokio;
14
15

#[derive(Debug)]
16
17
pub enum Router {
    RoundRobin {
18
        worker_urls: Arc<RwLock<Vec<String>>>,
19
        current_index: AtomicUsize,
20
        timeout_secs: u64,
21
22
    },
    Random {
23
        worker_urls: Arc<RwLock<Vec<String>>>,
24
        timeout_secs: u64,
25
    },
26
27
    CacheAware {
        /*
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
            Cache-Aware Load Balancing Router

            This router combines two strategies to optimize both cache utilization and request distribution:

            1. Cache-Aware Routing (Approximate Tree)
            2. Load Balancing (Shortest Queue with Balance Thresholds)

            The router dynamically switches between these strategies based on load conditions:
            - Uses load balancing when the system is imbalanced
            - Uses cache-aware routing when the system is balanced

            A system is considered imbalanced if both conditions are met:
            1. (max - min) > abs_threshold
            2. max > rel_threshold * min

            Strategy Details:

            1. Cache-Aware Routing (Approximate Tree)
            -------------------------------------------
            This strategy maintains an approximate radix tree for each worker based on request history,
            eliminating the need for direct cache state queries. The tree stores raw text characters
            instead of token IDs to avoid tokenization overhead.

            Process:
            a. For each request, find the worker with the highest prefix match
            b. If match rate > cache_threshold:
            Route to the worker with highest match (likely has relevant data cached)
            c. If match rate ≤ cache_threshold:
            Route to the worker with smallest tree size (most available cache capacity)
            d. Background maintenance:
            Periodically evict least recently used leaf nodes to prevent memory overflow

            2. Load Balancing (Shortest Queue)
            -------------------------------------------
            This strategy tracks pending request counts per worker and routes new requests
            to the least busy worker when the system is detected to be imbalanced.

            Configuration Parameters:
            ------------------------
            1. cache_threshold: (float, 0.0 to 1.0)
            Minimum prefix match ratio to use highest-match routing.
            Below this threshold, routes to worker with most available cache space.

            2. balance_abs_threshold: (integer)
            Absolute difference threshold for load imbalance detection.
            System is potentially imbalanced if (max_load - min_load) > abs_threshold

            3. balance_rel_threshold: (float)
            Relative ratio threshold for load imbalance detection.
            System is potentially imbalanced if max_load > min_load * rel_threshold
            Used in conjunction with abs_threshold to determine final imbalance state.

            4. eviction_interval_secs: (integer)
            Interval between LRU eviction cycles for the approximate trees.

            5. max_tree_size: (integer)
            Maximum nodes per tree. When exceeded, LRU leaf nodes are evicted
            during the next eviction cycle.
86
        */
87
        worker_urls: Arc<RwLock<Vec<String>>>,
88
89
90
        tree: Arc<Mutex<Tree>>,
        running_queue: Arc<Mutex<HashMap<String, usize>>>,
        processed_queue: Arc<Mutex<HashMap<String, usize>>>,
91
        cache_threshold: f32,
92
93
        balance_abs_threshold: usize,
        balance_rel_threshold: f32,
94
        timeout_secs: u64,
95
        _eviction_thread: Option<thread::JoinHandle<()>>,
96
97
98
    },
}

99
#[derive(Debug, Clone)]
100
pub enum PolicyConfig {
101
102
103
104
105
106
    RandomConfig {
        timeout_secs: u64,
    },
    RoundRobinConfig {
        timeout_secs: u64,
    },
107
    CacheAwareConfig {
108
        cache_threshold: f32,
109
110
        balance_abs_threshold: usize,
        balance_rel_threshold: f32,
111
112
        eviction_interval_secs: u64,
        max_tree_size: usize,
113
        timeout_secs: u64,
114
115
116
    },
}

117
impl Router {
118
    pub fn new(worker_urls: Vec<String>, policy_config: PolicyConfig) -> Result<Self, String> {
119
120
121
122
123
124
125
        // Get timeout from policy config
        let timeout_secs = match &policy_config {
            PolicyConfig::RandomConfig { timeout_secs } => *timeout_secs,
            PolicyConfig::RoundRobinConfig { timeout_secs } => *timeout_secs,
            PolicyConfig::CacheAwareConfig { timeout_secs, .. } => *timeout_secs,
        };

126
        // Wait until all workers are healthy
127
        Self::wait_for_healthy_workers(&worker_urls, timeout_secs, 10)?;
128
129
130

        // Create router based on policy...
        Ok(match policy_config {
131
            PolicyConfig::RandomConfig { timeout_secs } => Router::Random {
132
                worker_urls: Arc::new(RwLock::new(worker_urls)),
133
                timeout_secs,
134
            },
135
            PolicyConfig::RoundRobinConfig { timeout_secs } => Router::RoundRobin {
136
                worker_urls: Arc::new(RwLock::new(worker_urls)),
137
                current_index: std::sync::atomic::AtomicUsize::new(0),
138
                timeout_secs,
139
            },
140
            PolicyConfig::CacheAwareConfig {
141
                cache_threshold,
142
143
                balance_abs_threshold,
                balance_rel_threshold,
144
145
                eviction_interval_secs,
                max_tree_size,
146
                timeout_secs,
147
            } => {
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
                let mut running_queue = HashMap::new();
                for url in &worker_urls {
                    running_queue.insert(url.clone(), 0);
                }

                let mut processed_queue = HashMap::new();
                for url in &worker_urls {
                    processed_queue.insert(url.clone(), 0);
                }

                let tree = Arc::new(Mutex::new(Tree::new()));
                let running_queue = Arc::new(Mutex::new(running_queue));
                let processed_queue = Arc::new(Mutex::new(processed_queue));

                // Create background eviction thread
                let tree_clone = Arc::clone(&tree);
                let processed_queue_clone = Arc::clone(&processed_queue);
165
                let running_queue_clone = Arc::clone(&running_queue);
166
167
168
169
170
171
172
                let eviction_thread = thread::spawn(move || {
                    loop {
                        // Sleep for the specified interval
                        thread::sleep(Duration::from_secs(eviction_interval_secs));

                        let locked_tree_clone = tree_clone.lock().unwrap();
                        // Run eviction
173
                        locked_tree_clone.evict_tenant_by_size(max_tree_size);
174
175
176

                        // Print the process queue
                        let locked_processed_queue = processed_queue_clone.lock().unwrap();
177
                        info!("Processed Queue: {:?}", locked_processed_queue);
178
179
180

                        // Print the running queue
                        let locked_running_queue = running_queue_clone.lock().unwrap();
181
                        info!("Running Queue: {:?}", locked_running_queue);
182
183
                    }
                });
184
185

                for url in &worker_urls {
186
                    tree.lock().unwrap().insert(&"".to_string(), url);
187
188
                }

189
                Router::CacheAware {
190
                    worker_urls: Arc::new(RwLock::new(worker_urls)),
191
192
193
                    tree,
                    running_queue,
                    processed_queue,
194
                    cache_threshold,
195
196
                    balance_abs_threshold,
                    balance_rel_threshold,
197
                    timeout_secs,
198
                    _eviction_thread: Some(eviction_thread),
199
200
                }
            }
201
        })
202
203
    }

204
205
206
207
208
209
210
211
212
213
    fn wait_for_healthy_workers(
        worker_urls: &[String],
        timeout_secs: u64,
        interval_secs: u64,
    ) -> Result<(), String> {
        let start_time = std::time::Instant::now();
        let sync_client = reqwest::blocking::Client::new();

        loop {
            if start_time.elapsed() > Duration::from_secs(timeout_secs) {
214
215
216
217
                error!(
                    "Timeout {}s waiting for workers to become healthy",
                    timeout_secs
                );
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
                return Err(format!(
                    "Timeout {}s waiting for workers to become healthy",
                    timeout_secs
                ));
            }

            let mut all_healthy = true;
            let mut unhealthy_workers = Vec::new();

            for url in worker_urls {
                match sync_client.get(&format!("{}/health", url)).send() {
                    Ok(res) => {
                        if !res.status().is_success() {
                            info!(
                                "Worker {} health check is pending with status: {}.",
                                url,
                                res.status()
                            );
                            all_healthy = false;
                            unhealthy_workers.push((url, format!("Status: {}", res.status())));
                        }
                    }
                    Err(e) => {
                        info!("Worker {} health check is pending with error: {}", url, e);
                        all_healthy = false;
                        unhealthy_workers.push((url, format!("Error: {}", e)));
                    }
                }
            }

            if all_healthy {
                info!("All workers are healthy");
                return Ok(());
            } else {
                info!("Unhealthy workers:");
                for (url, reason) in &unhealthy_workers {
                    info!("  {} - {}", url, reason);
                }
                thread::sleep(Duration::from_secs(interval_secs));
            }
        }
    }

261
262
263
    fn select_first_worker(&self) -> Result<String, String> {
        match self {
            Router::RoundRobin { worker_urls, .. }
264
            | Router::Random { worker_urls, .. }
265
266
267
268
269
270
271
272
273
274
275
            | Router::CacheAware { worker_urls, .. } => {
                if worker_urls.read().unwrap().is_empty() {
                    Err("No workers are available".to_string())
                } else {
                    Ok(worker_urls.read().unwrap()[0].clone())
                }
            }
        }
    }

    async fn send_request(
276
277
        &self,
        client: &reqwest::Client,
278
        worker_url: &str,
279
        route: &str,
280
    ) -> HttpResponse {
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
        match client.get(format!("{}{}", worker_url, route)).send().await {
            Ok(res) => {
                let status = actix_web::http::StatusCode::from_u16(res.status().as_u16())
                    .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR);

                match res.bytes().await {
                    Ok(body) => HttpResponse::build(status).body(body.to_vec()),
                    Err(e) => HttpResponse::InternalServerError()
                        .body(format!("Failed to read response body: {}", e)),
                }
            }
            Err(e) => HttpResponse::InternalServerError().body(format!(
                "Failed to send request to worker {}: {}",
                worker_url, e
            )),
        }
    }

    pub async fn route_to_first(&self, client: &reqwest::Client, route: &str) -> HttpResponse {
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
        const MAX_REQUEST_RETRIES: u32 = 3;
        const MAX_TOTAL_RETRIES: u32 = 6;
        let mut total_retries = 0;

        while total_retries < MAX_TOTAL_RETRIES {
            match self.select_first_worker() {
                Ok(worker_url) => {
                    let mut request_retries = 0;

                    // Try the same worker multiple times
                    while request_retries < MAX_REQUEST_RETRIES {
                        if total_retries >= 1 {
                            info!("Retrying request after {} failed attempts", total_retries);
                        }

                        let response = self.send_request(client, &worker_url, route).await;

                        if response.status().is_success() {
                            return response;
                        }

                        warn!(
                            "Request to {} failed (attempt {}/{})",
                            worker_url,
                            request_retries + 1,
                            MAX_REQUEST_RETRIES
                        );

                        request_retries += 1;
                        total_retries += 1;

                        if request_retries == MAX_REQUEST_RETRIES {
                            warn!("Removing failed worker: {}", worker_url);
                            self.remove_worker(&worker_url);
                            break;
                        }
                    }
                }
                Err(e) => return HttpResponse::InternalServerError().body(e),
            }
340
        }
341
342

        HttpResponse::InternalServerError().body("All retry attempts failed")
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
    }

    fn get_text_from_request(&self, body: &Bytes, route: &str) -> String {
        // convert body to json
        let json = serde_json::from_slice::<serde_json::Value>(body).unwrap();

        if route == "generate" {
            // get the "text" field
            let text = json.get("text").and_then(|t| t.as_str()).unwrap_or("");
            return text.to_string();
        } else if route == "v1/chat/completions" {
            // get the messages field as raw text
            if let Some(messages) = json.get("messages") {
                // Convert messages back to a string, preserving all JSON formatting
                return serde_json::to_string(messages).unwrap_or_default();
            }
        } else if route == "v1/completions" {
            let prompt = json.get("prompt").and_then(|t| t.as_str()).unwrap_or("");
            return prompt.to_string();
        }

        return "".to_string();
    }

    // TODO: return Result<String, String> instead of panicking
    fn select_generate_worker(&self, body: &Bytes, route: &str) -> String {
        let text = self.get_text_from_request(&body, route);
370

371
372
373
374
        let worker_url = match self {
            Router::RoundRobin {
                worker_urls,
                current_index,
375
                ..
376
            } => {
377
                let idx = current_index
378
379
380
                    .fetch_update(
                        std::sync::atomic::Ordering::SeqCst,
                        std::sync::atomic::Ordering::SeqCst,
381
                        |x| Some((x + 1) % worker_urls.read().unwrap().len()),
382
                    )
383
                    .unwrap();
384
                worker_urls.read().unwrap()[idx].clone()
385
            }
386

387
            Router::Random { worker_urls, .. } => worker_urls.read().unwrap()
388
389
                [rand::random::<usize>() % worker_urls.read().unwrap().len()]
            .clone(),
390

391
            Router::CacheAware {
392
                worker_urls,
393
394
395
                tree,
                running_queue,
                processed_queue,
396
                cache_threshold,
397
398
                balance_abs_threshold,
                balance_rel_threshold,
399
400
                ..
            } => {
401
                // TODO: delay scheduling if cache hit rate is high because it may cause imbalance. prioritize low hit rate ones
402

Byron Hsu's avatar
Byron Hsu committed
403
                let tree = tree.lock().unwrap();
404
                let mut running_queue = running_queue.lock().unwrap();
405

406
407
408
409
410
411
412
413
414
415
416
417
                // Get current load statistics
                let max_load = *running_queue.values().max().unwrap_or(&0);
                let min_load = *running_queue.values().min().unwrap_or(&0);

                // Load is considered imbalanced if:
                // 1. (max - min) > abs_threshold AND
                // 2. max > rel_threshold * min
                let is_imbalanced = max_load.saturating_sub(min_load) > *balance_abs_threshold
                    && (max_load as f32) > (min_load as f32 * balance_rel_threshold);

                let selected_url = if is_imbalanced {
                    // Log load balancing trigger and current queue state
418
                    info!(
419
420
421
422
423
424
425
426
427
428
429
                        "Load balancing triggered due to workload imbalance:\n\
                        Max load: {}, Min load: {}\n\
                        Current running queue: {:?}",
                        max_load, min_load, running_queue
                    );

                    // Use shortest queue routing when load is imbalanced
                    running_queue
                        .iter()
                        .min_by_key(|(_url, &count)| count)
                        .map(|(url, _)| url.clone())
430
                        .unwrap_or_else(|| worker_urls.read().unwrap()[0].clone())
431
432
                } else {
                    // Use cache-aware routing when load is balanced
433
434
435
                    let (matched_text, matched_worker) = tree.prefix_match(&text);
                    let matched_rate =
                        matched_text.chars().count() as f32 / text.chars().count() as f32;
436

437
438
439
440
                    if matched_rate > *cache_threshold {
                        matched_worker.to_string()
                    } else {
                        tree.get_smallest_tenant()
441
                    }
442
                };
443

444
445
                // Update queues and tree
                *running_queue.get_mut(&selected_url).unwrap() += 1;
446

447
448
449
450
451
                *processed_queue
                    .lock()
                    .unwrap()
                    .get_mut(&selected_url)
                    .unwrap() += 1;
452
453
454
                tree.insert(&text, &selected_url);

                selected_url
455
456
            }
        };
457

458
459
460
461
462
463
        worker_url
    }

    async fn send_generate_request(
        &self,
        client: &reqwest::Client,
464
465
        req: &HttpRequest,
        body: &Bytes,
466
467
468
        route: &str,
        worker_url: &str,
    ) -> HttpResponse {
469
470
471
        let is_stream = serde_json::from_slice::<serde_json::Value>(&body)
            .map(|v| v.get("stream").and_then(|s| s.as_bool()).unwrap_or(false))
            .unwrap_or(false);
472

473
        let res = match client
474
            .post(format!("{}{}", worker_url, route))
475
476
477
478
479
480
481
482
483
484
485
486
487
488
            .header(
                "Content-Type",
                req.headers()
                    .get("Content-Type")
                    .and_then(|h| h.to_str().ok())
                    .unwrap_or("application/json"),
            )
            .body(body.to_vec())
            .send()
            .await
        {
            Ok(res) => res,
            Err(_) => return HttpResponse::InternalServerError().finish(),
        };
489

490
491
        let status = actix_web::http::StatusCode::from_u16(res.status().as_u16())
            .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR);
492

493
        if !is_stream {
494
495
            // For non-streaming requests, get response first
            let response = match res.bytes().await {
496
                Ok(body) => HttpResponse::build(status).body(body.to_vec()),
497
498
499
500
                Err(e) => {
                    let error_msg = format!("Failed to get response body: {}", e);
                    HttpResponse::InternalServerError().body(error_msg)
                }
501
502
503
504
505
            };

            // Then decrement running queue counter if using CacheAware
            if let Router::CacheAware { running_queue, .. } = self {
                if let Ok(mut queue) = running_queue.lock() {
506
                    if let Some(count) = queue.get_mut(worker_url) {
507
508
509
                        *count = count.saturating_sub(1);
                    }
                }
510
            }
511
512
513
514

            response
        } else if let Router::CacheAware { running_queue, .. } = self {
            let running_queue = Arc::clone(running_queue);
515
            let worker_url = worker_url.to_string();
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533

            HttpResponse::build(status)
                .insert_header((CONTENT_TYPE, HeaderValue::from_static("text/event-stream")))
                .streaming(
                    res.bytes_stream()
                        .map_err(|_| {
                            actix_web::error::ErrorInternalServerError("Failed to read stream")
                        })
                        .inspect(move |bytes| {
                            let bytes = bytes.as_ref().unwrap();
                            if bytes
                                .as_ref()
                                .windows(12)
                                .any(|window| window == b"data: [DONE]")
                            {
                                let mut locked_queue = running_queue.lock().unwrap();
                                let count = locked_queue.get_mut(&worker_url).unwrap();
                                *count = count.saturating_sub(1);
534
                                debug!("Streaming is done!!")
535
536
537
                            }
                        }),
                )
538
539
540
541
        } else {
            HttpResponse::build(status)
                .insert_header((CONTENT_TYPE, HeaderValue::from_static("text/event-stream")))
                .streaming(res.bytes_stream().map_err(|_| {
542
                    actix_web::error::ErrorInternalServerError("Failed to read stream")
543
                }))
544
545
        }
    }
546

547
548
549
    pub async fn route_generate_request(
        &self,
        client: &reqwest::Client,
550
551
        req: &HttpRequest,
        body: &Bytes,
552
553
        route: &str,
    ) -> HttpResponse {
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
        const MAX_REQUEST_RETRIES: u32 = 3;
        const MAX_TOTAL_RETRIES: u32 = 6;
        let mut total_retries = 0;

        while total_retries < MAX_TOTAL_RETRIES {
            let worker_url = self.select_generate_worker(body, route);
            let mut request_retries = 0;

            // Try the same worker multiple times
            while request_retries < MAX_REQUEST_RETRIES {
                if total_retries >= 1 {
                    info!("Retrying request after {} failed attempts", total_retries);
                }
                let response = self
                    .send_generate_request(client, req, body, route, &worker_url)
                    .await;

                if response.status().is_success() {
                    return response;
                }

                warn!(
                    "Generate request to {} failed (attempt {}/{})",
                    worker_url,
                    request_retries + 1,
                    MAX_REQUEST_RETRIES
                );

                request_retries += 1;
                total_retries += 1;

                if request_retries == MAX_REQUEST_RETRIES {
                    warn!("Removing failed worker: {}", worker_url);
                    self.remove_worker(&worker_url);
                    break;
                }
            }
        }

        HttpResponse::InternalServerError().body("All retry attempts failed")
594
595
    }

596
    pub async fn add_worker(&self, worker_url: &str) -> Result<String, String> {
597
        let interval_secs = 10; // check every 10 seconds
598
599
600
601
602
        let timeout_secs = match self {
            Router::Random { timeout_secs, .. } => *timeout_secs,
            Router::RoundRobin { timeout_secs, .. } => *timeout_secs,
            Router::CacheAware { timeout_secs, .. } => *timeout_secs,
        };
603
604
605
606
607
608

        let start_time = std::time::Instant::now();
        let client = reqwest::Client::new();

        loop {
            if start_time.elapsed() > Duration::from_secs(timeout_secs) {
609
610
611
612
                error!(
                    "Timeout {}s waiting for worker {} to become healthy",
                    timeout_secs, worker_url
                );
613
                return Err(format!(
614
615
616
617
618
619
620
621
622
623
                    "Timeout {}s waiting for worker {} to become healthy",
                    timeout_secs, worker_url
                ));
            }

            match client.get(&format!("{}/health", worker_url)).send().await {
                Ok(res) => {
                    if res.status().is_success() {
                        match self {
                            Router::RoundRobin { worker_urls, .. }
624
                            | Router::Random { worker_urls, .. }
625
626
627
                            | Router::CacheAware { worker_urls, .. } => {
                                info!("Worker {} health check passed", worker_url);
                                let mut urls = worker_urls.write().unwrap();
628
                                if urls.contains(&worker_url.to_string()) {
629
                                    return Err(format!("Worker {} already exists", worker_url));
630
631
                                }
                                info!("Added worker: {}", worker_url);
632
                                urls.push(worker_url.to_string());
633
634
                            }
                        }
635
636
637
638
639
640
641
642
643
644

                        // If cache aware, initialize the queues for the new worker
                        if let Router::CacheAware {
                            running_queue,
                            processed_queue,
                            tree,
                            ..
                        } = self
                        {
                            // Add worker to running queue with initial count of 0
645
646
647
648
                            running_queue
                                .lock()
                                .unwrap()
                                .insert(worker_url.to_string(), 0);
649
650
651
652
653

                            // Add worker to processed queue with initial count of 0
                            processed_queue
                                .lock()
                                .unwrap()
654
                                .insert(worker_url.to_string(), 0);
655
656
657
658
659
660

                            // Add worker to tree
                            tree.lock().unwrap().insert(&"".to_string(), &worker_url);
                        }

                        return Ok(format!("Successfully added worker: {}", worker_url));
661
662
                    } else {
                        info!(
663
664
665
                            "Worker {} health check is pending with status: {}.",
                            worker_url,
                            res.status()
666
667
668
669
670
671
672
673
674
675
676
677
                        );
                        // if the url does not have http or https prefix, warn users
                        if !worker_url.starts_with("http://") && !worker_url.starts_with("https://")
                        {
                            warn!("The worker url {} does not have http or https prefix. Please add the prefix to the url.", worker_url);
                        }

                        tokio::time::sleep(Duration::from_secs(interval_secs)).await;
                        continue;
                    }
                }
                Err(e) => {
678
679
680
681
                    info!(
                        "Worker {} health check is pending with error: {}",
                        worker_url, e
                    );
682
683
684
685
686
687
688
689
690

                    // if the url does not have http or https prefix, warn users
                    if !worker_url.starts_with("http://") && !worker_url.starts_with("https://") {
                        warn!("The worker url {} does not have http or https prefix. Please add the prefix to the url.", worker_url);
                    }

                    tokio::time::sleep(Duration::from_secs(interval_secs)).await;
                    continue;
                }
691
692
693
            }
        }
    }
694

695
    pub fn remove_worker(&self, worker_url: &str) {
696
697
        match self {
            Router::RoundRobin { worker_urls, .. }
698
            | Router::Random { worker_urls, .. }
699
700
            | Router::CacheAware { worker_urls, .. } => {
                let mut urls = worker_urls.write().unwrap();
701
702
703
704
705
706
707
                if let Some(index) = urls.iter().position(|url| url == &worker_url) {
                    urls.remove(index);
                    info!("Removed worker: {}", worker_url);
                } else {
                    warn!("Worker {} not found, skipping removal", worker_url);
                    return;
                }
708
709
710
711
            }
        }

        // if cache aware, remove the worker from the tree
712
713
714
715
716
717
718
        if let Router::CacheAware {
            tree,
            running_queue,
            processed_queue,
            ..
        } = self
        {
719
            tree.lock().unwrap().remove_tenant(&worker_url);
720
721
722
723
724
725
726
727
            running_queue
                .lock()
                .unwrap()
                .remove(&worker_url.to_string());
            processed_queue
                .lock()
                .unwrap()
                .remove(&worker_url.to_string());
728
729
730
731
            info!(
                "Removed worker from tree and cleaned up queues: {}",
                worker_url
            );
732
733
        }
    }
734
}