"vscode:/vscode.git/clone" did not exist on "891c7338bf5f8258900997693244f38f7f2e9b75"
engine.rs 9.47 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
5
use std::collections::{BTreeMap, BTreeSet};

6
7
8
9
10
11
12
13
use anyhow::bail;

use super::super::events::SimulationWorkerStage;
use super::super::runtime_utils::WorkerCompletionPayload;
#[cfg(test)]
use super::super::state::OfflineWorkerSnapshot;
use super::super::state::OfflineWorkerState;
use super::{EngineEffects, EnginePassMode, ScheduledWorkerCompletion};
14
use crate::common::protocols::{DirectRequest, MockEngineArgs};
15
16
17
18
19
20
use crate::replay::TraceCollector;
use crate::scheduler::RouterEventVisibility;

pub(in crate::replay::offline) struct EngineComponent {
    stage: SimulationWorkerStage,
    pass_mode: EnginePassMode,
21
22
23
24
25
26
27
28
29
30
    /// Workers keyed by stable ID (monotonic, never reused).
    workers: BTreeMap<usize, OfflineWorkerState>,
    /// Counter for generating the next stable worker ID.
    next_id: usize,
    /// Workers marked for removal — skipped by round-robin, removed when drained.
    pending_removal: BTreeSet<usize>,
    /// Engine args used to construct new workers during scale-up.
    args: MockEngineArgs,
    /// Whether new workers should capture KV events (true when a router is present).
    capture_kv_events: bool,
31
32
33
34
35
36
37
38
}

impl EngineComponent {
    pub(in crate::replay::offline) fn new(
        stage: SimulationWorkerStage,
        pass_mode: EnginePassMode,
        workers: Vec<OfflineWorkerState>,
    ) -> Self {
39
40
        let count = workers.len();
        let map: BTreeMap<usize, OfflineWorkerState> = workers.into_iter().enumerate().collect();
41
42
43
        Self {
            stage,
            pass_mode,
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
            workers: map,
            next_id: count,
            pending_removal: BTreeSet::new(),
            args: MockEngineArgs::default(),
            capture_kv_events: false,
        }
    }

    /// Set the engine args and KV capture flag used when adding workers dynamically.
    pub(in crate::replay::offline) fn set_scaling_args(
        &mut self,
        args: MockEngineArgs,
        capture_kv_events: bool,
    ) {
        self.args = args;
        self.capture_kv_events = capture_kv_events;
    }

    /// Add a new worker, returning its stable ID.
    pub(in crate::replay::offline) fn add_worker(&mut self) -> usize {
        let id = self.next_id;
        self.next_id += 1;
        let worker = OfflineWorkerState::new(id, self.args.clone(), self.capture_kv_events);
        self.workers.insert(id, worker);
        id
    }

    /// Mark a worker for removal. It will be skipped by `drive_ready` and
    /// removed once fully drained.
    pub(in crate::replay::offline) fn mark_for_removal(&mut self, worker_id: usize) {
        self.pending_removal.insert(worker_id);
    }

    /// Remove all marked workers that have fully drained, returning their IDs.
    pub(in crate::replay::offline) fn try_remove_drained(&mut self) -> Vec<usize> {
        let mut removed = Vec::new();
        self.pending_removal.retain(|&id| {
            if let Some(worker) = self.workers.get(&id) {
                if worker.is_drained() {
                    removed.push(id);
                    return false; // remove from pending set
                }
            } else {
                // Worker already gone
                return false;
            }
            true // keep in pending set
        });
        for &id in &removed {
            self.workers.remove(&id);
        }
        removed
    }

    /// Apply a target worker count: add new workers or mark excess for removal.
    /// Returns `(added_ids, newly_marked_ids)` so the caller can update the
    /// router immediately. Newly marked workers should be removed from the
    /// router right away to prevent new requests from landing on them, even
    /// though the workers themselves remain in the engine until fully drained.
    pub(in crate::replay::offline) fn apply_target_count(
        &mut self,
        target: usize,
    ) -> (Vec<usize>, Vec<usize>) {
        let active_ids = self.active_worker_ids();
        let current = active_ids.len();
        let mut added = Vec::new();
        let mut newly_marked = Vec::new();

        if target > current {
            for _ in 0..(target - current) {
                added.push(self.add_worker());
            }
        } else if target < current {
            let excess = current - target;
            for &id in active_ids.iter().rev().take(excess) {
                self.mark_for_removal(id);
                newly_marked.push(id);
            }
122
        }
123
124
125
126
127
128
129
130
131
132
133
134
135

        // Clean up any workers that have already fully drained.
        self.try_remove_drained();
        (added, newly_marked)
    }

    /// Return stable IDs of all active (non-pending-removal) workers.
    pub(in crate::replay::offline) fn active_worker_ids(&self) -> Vec<usize> {
        self.workers
            .keys()
            .filter(|id| !self.pending_removal.contains(id))
            .copied()
            .collect()
136
137
138
139
    }

    pub(in crate::replay::offline) fn dispatch(
        &mut self,
140
        worker_id: usize,
141
142
        request: DirectRequest,
    ) -> anyhow::Result<()> {
143
144
145
146
147
        let worker = self
            .workers
            .get_mut(&worker_id)
            .ok_or_else(|| anyhow::anyhow!("offline replay selected unknown worker {worker_id}"))?;
        worker.receive_request(request);
148
149
150
151
152
153
154
155
        Ok(())
    }

    pub(in crate::replay::offline) fn drive_ready(
        &mut self,
        now_ms: f64,
        mut collector: Option<&mut TraceCollector>,
    ) -> anyhow::Result<EngineEffects> {
156
157
158
159
160
        // Collect worker IDs first to avoid borrow issues.
        let worker_ids: Vec<usize> = self.workers.keys().copied().collect();
        for worker_id in worker_ids {
            let worker = self.workers.get(&worker_id).unwrap();
            if !worker.is_ready() {
161
162
163
164
165
166
167
168
                continue;
            }

            let executed = match self.pass_mode {
                EnginePassMode::Visible => {
                    let Some(collector) = collector.as_deref_mut() else {
                        bail!("offline replay visible engine pass requires a collector");
                    };
169
170
171
172
                    self.workers
                        .get_mut(&worker_id)
                        .unwrap()
                        .execute_pass(collector, now_ms)
173
                }
174
175
176
177
178
                EnginePassMode::Hidden => self
                    .workers
                    .get_mut(&worker_id)
                    .unwrap()
                    .execute_hidden_pass(now_ms),
179
180
            };

181
182
183
184
            let mut effects = EngineEffects {
                admissions: executed.admissions,
                ..EngineEffects::default()
            };
185
186
187
            if let Some(fpm) = executed.fpm {
                effects.fpm_snapshots.push((worker_id, fpm));
            }
188
189
190
191
192
193
194
195
196
            let completion_kv_events =
                if executed.router_event_visibility == RouterEventVisibility::PassStart {
                    effects.pass_start_kv_events = executed.kv_events;
                    Vec::new()
                } else {
                    executed.kv_events
                };
            let payload = WorkerCompletionPayload {
                stage: self.stage,
197
                worker_idx: worker_id,
198
199
200
201
202
203
204
205
206
207
                completed_requests: executed.completed_requests,
                output_signals: executed.output_signals,
                kv_events: completion_kv_events,
            };

            if executed.end_ms == now_ms {
                effects.immediate_completions.push(payload);
                return Ok(effects);
            }

208
            self.workers.get_mut(&worker_id).unwrap().mark_busy();
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
            effects
                .scheduled_completions
                .push(ScheduledWorkerCompletion {
                    at_ms: executed.end_ms,
                    payload,
                });
            return Ok(effects);
        }

        Ok(EngineEffects::default())
    }

    pub(in crate::replay::offline) fn on_scheduled_completion(
        &mut self,
        payload: WorkerCompletionPayload,
    ) -> anyhow::Result<WorkerCompletionPayload> {
        if payload.stage != self.stage {
            bail!(
                "offline replay completion stage mismatch: expected {:?}, got {:?}",
                self.stage,
                payload.stage
            );
        }
232
233
234
235
236
237
238
239
240
241
242
243
244
245
        let worker = self.workers.get_mut(&payload.worker_idx).ok_or_else(|| {
            anyhow::anyhow!(
                "offline replay completion for unknown worker {}",
                payload.worker_idx
            )
        })?;
        worker.mark_idle();
        worker.mark_completed(payload.completed_requests);
        // Eagerly clean up drained workers that are pending removal so they
        // don't linger indefinitely when no further scaling events trigger
        // apply_target_count.
        if self.pending_removal.contains(&payload.worker_idx) {
            self.try_remove_drained();
        }
246
247
248
249
        Ok(payload)
    }

    pub(in crate::replay::offline) fn in_flight(&self) -> usize {
250
251
252
253
        self.workers
            .values()
            .map(OfflineWorkerState::in_flight)
            .sum()
254
255
256
    }

    pub(in crate::replay::offline) fn is_drained(&self) -> bool {
257
        self.workers.values().all(OfflineWorkerState::is_drained)
258
259
260
261
262
263
264
265
266
    }

    pub(in crate::replay::offline) fn worker_count(&self) -> usize {
        self.workers.len()
    }

    #[cfg(test)]
    pub(crate) fn debug_snapshots(&self) -> Vec<OfflineWorkerSnapshot> {
        self.workers
267
            .values()
268
269
270
271
            .map(OfflineWorkerState::debug_snapshot)
            .collect()
    }
}