protocol.rs 11.8 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! # Connector Protocol
//!
//! This module defines the messages used to communicate between the following components:
//! - Leader -> TransferEngine (block_manager::distributed)
//! - TransferEngine -> Scheduler
//! - Worker -> Scheduler
//!
//! ## Locality
//!
//! The TransferEngine, Scheduler and Worker are all guaranteed to be in the same process. `Scheduler`
//! is a per-worker scheduler and `TransferEngine` is also a per-worker component.
//!
//! ## Connector Operations
//!
//! There a two types of connector operations: load operations and store operations. The following must
//! be true:
//! - All loads must be initiated when the Slot is in the [`SlotState::Initialized`] state.
//! - While the slot is in the [`SlotState::OnboardStaged`] or the [`SlotState::Onboarding`] state,
//!   no active tokens can be scheduled, no stores can be issued.
//!   - Uknowns:
//!     - What happens on cancellation?
//! - To transition to the [`SlotState::Prefilling`] state, the slot must be in either the [`SlotState::Initialized`]
//!   [`SlotState::NotScheduled`], or [`SlotState::OnboardStaged`] state.
//!   - When in the [`SlotState::Prefilling`] state, store/save operations are allowed.
//!   - Store/Save operations are determined when processing the [`SchedulerOutput`].
//!   - If a store operation is issued, the following will happen:
//!     - Leader will trigger a message to the TransferEngine with the use StoreRequest and a ConnectorStoreRequest
//!     - The presence of the ConnectorStoreRequest will trigger the TransferEngine to request a SchedulerStoreRequest,
//!       this will block the transfer engine's store task from executing until released by the scheduler.
//!     - The Scheduler will not release the store task until the Worker has made sufficient progress, i.e. the data is
//!       to be stored has been computed and in device memory.
//!     - All leader slots are visited on each build metadata step, this allows for any leader initiated actions to be
//!       included in the metadata sent to the worker.
//!       - An operation must include: request_id, the iteration on which it was issued, the operation type, and a descriptor.
//!     - The Worker will pick up all operations from the leader's metadata and enqueue to the scheduler.
//!     - The Worker will issue notifications to the Scheduler at the start of each iteration and the completion of each
//!       layer in that iteration.
//!     - For an operation to be scheduled to run, the following must be true:
//!       - The TransferEngine must have registered the operation with the Scheduler.
//!       - The Worker must have registered the operation with the Scheduler.
//!       - Sufficient progress, either layer-wise or iteration-wise, must have been made.
//!     - For an operation to run, the following must be true:
//!       - The operation must be in the scheduled queue.
//!       - A concurrent token must be acquired.
//!     - A running operation will be monitored by a task awaiting a completion event.
//!       - When the completion event is received, the atomic completion counter will be incremented.
//!
//!
//! All transfer requests are triggered by the leader based on the details in the [`SchedulerOutput`].
//!
//! [`SchedulerOutput`] is transform

use super::scheduler::{SchedulingDecision, DISCONNECTED_WARNING};
use super::*;

use tokio::sync::oneshot;

pub type LayerName = String;
pub type LayerIndex = u32;
pub type Iteration = u64;

#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub enum RequestType {
    /// If Scheduled, then the [`super::scheduler::TransferSchedulerClient`] will commuicate with the scheudler
    /// to await a boxed [`ScheduledTransferCompletionHandle`].
    Scheduled,

    /// If Immediate, then the [`super::scheduler::TransferSchedulerClient`] will immediately return a
    /// [`ImmediateTransferCompletionHandle`].
    Immediate,
}

#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub enum TransferType {
    Load,
    Store,
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum SchedulerRequirement {
    IterationComplete(Iteration),

    /// The layer with the provided name and iteration counter must be complete.
    LayerNameComplete(LayerName, Iteration),

    /// The layer index and iteration counter must be complete.
    LayerComplete(LayerIndex, Iteration),
}

/// Issued by the leader, received by the TransferEngine.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LeaderTransferRequest {
    pub request_id: String,
    pub uuid: uuid::Uuid,
    pub requirement: Option<SchedulerRequirement>,
    pub request_type: RequestType,
}

pub enum TransferToSchedulerMessage {
    ScheduleRequest(TransferScheduleRequest),
    ImmediateResult(ImmediateTransferResult),
}

/// Issued by the TransferEngine, received by the Scheduler.
/// Note: In order to be considered for scheduling, the [`TransferScheduleRequest`] and the [`WorkerTransferRequest`]
/// for the same operation (uuid) must be present on the scheduler.
pub struct TransferScheduleRequest {
    pub leader_request: LeaderTransferRequest,
    pub response_tx: oneshot::Sender<ScheduledTaskHandle>,
}

pub struct ScheduledTaskHandle {
    pub decision_rx: oneshot::Receiver<(SchedulingDecision, oneshot::Sender<anyhow::Result<()>>)>,
    pub cancel_token: CancellationToken,
}

impl ScheduledTaskHandle {
    pub async fn wait_for_decision(self) -> Box<dyn TransferCompletionHandle> {
        tokio::select! {
            Ok((decision, completion_tx)) = self.decision_rx => {
                Box::new(ScheduledTransferCompletionHandle::new(decision, completion_tx))
            }
            _ = self.cancel_token.cancelled() => {
                Box::new(CancelledTransferCompletionHandle)
            }
        }
    }
}

/// Recived by the Worker, forward to the Scheduler.
///
/// In ordered to be considered for scheduling, both the [`TransferScheduleRequest`] and the [`WorkerTransferRequest`]
/// must be present on the scheduler.
///
/// Note: No response is required. The Worker holds an atomic counter for each oepration type. The expected count (local/non-atomic)
/// is incremented on receiving a request. The Worker knows all operations are complete when the shared atomic counter matches the
/// expected count.
///
/// Workers can not handle errors, they only deal with counters. All operations (which can be cancelled) must completed for a Worker
/// to mark the request_id as complete.
///
/// Scheduler requirements are only provided by the leader initiated transfer request.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerTransferRequest {
    pub request_id: String,
    pub uuid: uuid::Uuid,
    pub transfer_type: TransferType,
    pub request_type: RequestType,
}

/// Sent by Worker to Scheduler.
/// Combines [`WorkerTransferRequest`] and [`WorkerRequestState`] and issues a [`WorkerSchedulerRequest`]
///
/// This object has all the links to the worker to track completion and observe any cancellation signals.
pub struct WorkerSchedulerRequest {
    pub request_id: String,
    pub uuid: uuid::Uuid,
    pub transfer_type: TransferType,
    pub cancel_token: CancellationToken,
}

/// One-time use object returned from [`Scheduler::schedule_transfer`]
/// This object carries with it the [`SchedulingDecision`] and is used to mark the transfer as complete.
#[async_trait::async_trait]
pub trait TransferCompletionHandle: Send {
    fn scheduler_decision(&self) -> SchedulingDecision;
    async fn mark_complete(&self, result: anyhow::Result<()>);
}

pub struct ScheduledTransferCompletionHandle {
    scheduler_decision: SchedulingDecision,
    completion_tx: Mutex<Option<oneshot::Sender<anyhow::Result<()>>>>,
}

impl ScheduledTransferCompletionHandle {
    pub(crate) fn new(
        scheduler_decision: SchedulingDecision,
        completion_tx: oneshot::Sender<anyhow::Result<()>>,
    ) -> Self {
        Self {
            scheduler_decision,
            completion_tx: Mutex::new(Some(completion_tx)),
        }
    }
}

#[async_trait::async_trait]
impl TransferCompletionHandle for ScheduledTransferCompletionHandle {
    fn scheduler_decision(&self) -> SchedulingDecision {
        self.scheduler_decision
    }

    async fn mark_complete(&self, result: anyhow::Result<()>) {
        if let Some(completion_tx) = self.completion_tx.lock().unwrap().take() {
            if completion_tx.send(result).is_err() {
                tracing::error!(
                    "failed to send completion status; this could lead to silent data corruption"
                );
            }
        }
    }
}

impl Drop for ScheduledTransferCompletionHandle {
    fn drop(&mut self) {
        if self.completion_tx.lock().unwrap().is_some() {
            // This is a fundamental logic error. The results of the application are undefined.
            // We must abort.
            panic!(concat!(
                "logic error: implementation failed to respect the [TransferCompletionHandle] policy; ",
                "handle dropped without being explicitly marked; this may lead to data corruption if ",
                "the handle was dropped while a transfer was still in progress; please report immediately.",
            ));
        }
    }
}

pub struct ImmediateTransferResult {
    pub request_id: String,
    pub uuid: uuid::Uuid,
    pub status: anyhow::Result<()>,
}

pub struct ImmediateTransferCompletionHandle {
    request_id: String,
    uuid: uuid::Uuid,
    completion_tx: Mutex<Option<tokio::sync::mpsc::Sender<TransferToSchedulerMessage>>>,
}

impl ImmediateTransferCompletionHandle {
    pub(crate) fn new(
        request_id: String,
        uuid: uuid::Uuid,
        completion_tx: tokio::sync::mpsc::Sender<TransferToSchedulerMessage>,
    ) -> Self {
        Self {
            request_id,
            uuid,
            completion_tx: Mutex::new(Some(completion_tx)),
        }
    }
}

#[async_trait::async_trait]
impl TransferCompletionHandle for ImmediateTransferCompletionHandle {
    fn scheduler_decision(&self) -> SchedulingDecision {
        SchedulingDecision::Execute
    }

    async fn mark_complete(&self, result: anyhow::Result<()>) {
        // To ensure the future is Send, avoid holding the MutexGuard across .await.
        let completion_tx = {
            let mut guard = self.completion_tx.lock().unwrap();
            guard.take()
        };
        if let Some(completion_tx) = completion_tx {
            if completion_tx
                .send(TransferToSchedulerMessage::ImmediateResult(
                    ImmediateTransferResult {
                        request_id: self.request_id.clone(),
                        uuid: self.uuid,
                        status: result,
                    },
                ))
                .await
                .is_err()
            {
                tracing::error!(DISCONNECTED_WARNING);
            }
        }
    }
}

impl Drop for ImmediateTransferCompletionHandle {
    fn drop(&mut self) {
        if self.completion_tx.lock().unwrap().is_some() {
            // This is a fundamental logic error. The results of the application are undefined.
            // We must abort.
            panic!(concat!(
                "logic error: implementation failed to respect the [TransferCompletionHandle] policy; ",
                "handle dropped without being explicitly marked; this may lead to data corruption if ",
                "the handle was dropped while a transfer was still in progress; please report immediately.",
            ));
        }
    }
}

pub struct CancelledTransferCompletionHandle;

#[async_trait::async_trait]
impl TransferCompletionHandle for CancelledTransferCompletionHandle {
    fn scheduler_decision(&self) -> SchedulingDecision {
        SchedulingDecision::Cancel
    }

    async fn mark_complete(&self, _result: anyhow::Result<()>) {
        // Do nothing
    }
}