power_of_two.rs 6.14 KB
Newer Older
1
2
3
4
//! Power-of-two choices load balancing policy

use super::{get_healthy_worker_indices, LoadBalancingPolicy};
use crate::core::Worker;
5
use crate::metrics::RouterMetrics;
6
7
use rand::Rng;
use std::collections::HashMap;
8
use std::sync::{Arc, RwLock};
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
use tracing::info;

/// Power-of-two choices policy
///
/// Randomly selects two workers and routes to the one with lower load.
/// This provides good load distribution with minimal coordination overhead.
#[derive(Debug)]
pub struct PowerOfTwoPolicy {
    /// Cached load information from external monitoring
    cached_loads: RwLock<HashMap<String, isize>>,
}

impl PowerOfTwoPolicy {
    pub fn new() -> Self {
        Self {
            cached_loads: RwLock::new(HashMap::new()),
        }
    }

    fn get_worker_load(&self, worker: &dyn Worker) -> isize {
        // First check cached loads (from external monitoring)
        if let Ok(loads) = self.cached_loads.read() {
            if let Some(&load) = loads.get(worker.url()) {
                return load;
            }
        }

        // Fall back to local load counter
        worker.load() as isize
    }
}

impl LoadBalancingPolicy for PowerOfTwoPolicy {
    fn select_worker(
        &self,
44
        workers: &[Arc<dyn Worker>],
45
46
47
48
49
50
51
52
53
54
55
56
57
        _request_text: Option<&str>,
    ) -> Option<usize> {
        let healthy_indices = get_healthy_worker_indices(workers);

        if healthy_indices.is_empty() {
            return None;
        }

        if healthy_indices.len() == 1 {
            return Some(healthy_indices[0]);
        }

        // Select two random workers
58
59
60
        let mut rng = rand::rng();
        let idx1 = rng.random_range(0..healthy_indices.len());
        let mut idx2 = rng.random_range(0..healthy_indices.len());
61
62
63

        // Ensure we pick two different workers
        while idx2 == idx1 {
64
            idx2 = rng.random_range(0..healthy_indices.len());
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
        }

        let worker_idx1 = healthy_indices[idx1];
        let worker_idx2 = healthy_indices[idx2];

        // Compare loads and select the less loaded one
        let load1 = self.get_worker_load(workers[worker_idx1].as_ref());
        let load2 = self.get_worker_load(workers[worker_idx2].as_ref());

        // Log selection for debugging
        let selected_idx = if load1 <= load2 {
            worker_idx1
        } else {
            worker_idx2
        };

        info!(
            "Power-of-two selection: {}={} vs {}={} -> selected {}",
            workers[worker_idx1].url(),
            load1,
            workers[worker_idx2].url(),
            load2,
            workers[selected_idx].url()
        );

        // Increment processed counter
        workers[selected_idx].increment_processed();
92
        RouterMetrics::record_processed_request(workers[selected_idx].url());
93
        RouterMetrics::record_policy_decision(self.name(), workers[selected_idx].url());
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121

        Some(selected_idx)
    }

    fn name(&self) -> &'static str {
        "power_of_two"
    }

    fn update_loads(&self, loads: &HashMap<String, isize>) {
        if let Ok(mut cached) = self.cached_loads.write() {
            *cached = loads.clone();
        }
    }

    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
}

impl Default for PowerOfTwoPolicy {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
122
    use crate::core::{BasicWorkerBuilder, WorkerType};
123
124
125
126

    #[test]
    fn test_power_of_two_selection() {
        let policy = PowerOfTwoPolicy::new();
127
128
129
130
131
132
133
134
135
        let worker1 = BasicWorkerBuilder::new("http://w1:8000")
            .worker_type(WorkerType::Regular)
            .build();
        let worker2 = BasicWorkerBuilder::new("http://w2:8000")
            .worker_type(WorkerType::Regular)
            .build();
        let worker3 = BasicWorkerBuilder::new("http://w3:8000")
            .worker_type(WorkerType::Regular)
            .build();
136
137
138
139
140
141
142
143
144
145

        // Set different loads
        for _ in 0..10 {
            worker1.increment_load();
        }
        for _ in 0..5 {
            worker2.increment_load();
        }
        // worker3 has load 0

146
147
        let workers: Vec<Arc<dyn Worker>> =
            vec![Arc::new(worker1), Arc::new(worker2), Arc::new(worker3)];
148
149

        // Run multiple selections
150
        let mut selected_counts = [0; 3];
151
152
153
154
155
156
157
158
159
160
161
162
163
164
        for _ in 0..100 {
            if let Some(idx) = policy.select_worker(&workers, None) {
                selected_counts[idx] += 1;
            }
        }

        // Worker with lowest load (worker3) should be selected most often
        assert!(selected_counts[2] > selected_counts[1]);
        assert!(selected_counts[1] > selected_counts[0]);
    }

    #[test]
    fn test_power_of_two_with_cached_loads() {
        let policy = PowerOfTwoPolicy::new();
165
        let workers: Vec<Arc<dyn Worker>> = vec![
166
167
168
169
170
171
172
173
174
175
            Arc::new(
                BasicWorkerBuilder::new("http://w1:8000")
                    .worker_type(WorkerType::Regular)
                    .build(),
            ),
            Arc::new(
                BasicWorkerBuilder::new("http://w2:8000")
                    .worker_type(WorkerType::Regular)
                    .build(),
            ),
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
        ];

        // Update cached loads
        let mut loads = HashMap::new();
        loads.insert("http://w1:8000".to_string(), 100);
        loads.insert("http://w2:8000".to_string(), 10);
        policy.update_loads(&loads);

        // Should prefer worker2 with lower cached load
        let mut w2_selected = 0;
        for _ in 0..50 {
            if let Some(idx) = policy.select_worker(&workers, None) {
                if idx == 1 {
                    w2_selected += 1;
                }
            }
        }

        // Worker2 should be selected significantly more often
        assert!(w2_selected > 35); // Should win most of the time
    }

    #[test]
    fn test_power_of_two_single_worker() {
        let policy = PowerOfTwoPolicy::new();
201
202
203
204
205
        let workers: Vec<Arc<dyn Worker>> = vec![Arc::new(
            BasicWorkerBuilder::new("http://w1:8000")
                .worker_type(WorkerType::Regular)
                .build(),
        )];
206
207
208
209
210

        // With single worker, should always select it
        assert_eq!(policy.select_worker(&workers, None), Some(0));
    }
}