// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 //! # Connector Protocol //! //! This module defines the messages used to communicate between the following components: //! - Leader -> TransferEngine (block_manager::distributed) //! - TransferEngine -> Scheduler //! - Worker -> Scheduler //! //! ## Locality //! //! The TransferEngine, Scheduler and Worker are all guaranteed to be in the same process. `Scheduler` //! is a per-worker scheduler and `TransferEngine` is also a per-worker component. //! //! ## Connector Operations //! //! There a two types of connector operations: load operations and store operations. The following must //! be true: //! - All loads must be initiated when the Slot is in the [`SlotState::Initialized`] state. //! - While the slot is in the [`SlotState::OnboardStaged`] or the [`SlotState::Onboarding`] state, //! no active tokens can be scheduled, no stores can be issued. //! - Uknowns: //! - What happens on cancellation? //! - To transition to the [`SlotState::Prefilling`] state, the slot must be in either the [`SlotState::Initialized`] //! [`SlotState::NotScheduled`], or [`SlotState::OnboardStaged`] state. //! - When in the [`SlotState::Prefilling`] state, store/save operations are allowed. //! - Store/Save operations are determined when processing the [`SchedulerOutput`]. //! - If a store operation is issued, the following will happen: //! - Leader will trigger a message to the TransferEngine with the use StoreRequest and a ConnectorStoreRequest //! - The presence of the ConnectorStoreRequest will trigger the TransferEngine to request a SchedulerStoreRequest, //! this will block the transfer engine's store task from executing until released by the scheduler. //! - The Scheduler will not release the store task until the Worker has made sufficient progress, i.e. the data is //! to be stored has been computed and in device memory. //! - All leader slots are visited on each build metadata step, this allows for any leader initiated actions to be //! included in the metadata sent to the worker. //! - An operation must include: request_id, the iteration on which it was issued, the operation type, and a descriptor. //! - The Worker will pick up all operations from the leader's metadata and enqueue to the scheduler. //! - The Worker will issue notifications to the Scheduler at the start of each iteration and the completion of each //! layer in that iteration. //! - For an operation to be scheduled to run, the following must be true: //! - The TransferEngine must have registered the operation with the Scheduler. //! - The Worker must have registered the operation with the Scheduler. //! - Sufficient progress, either layer-wise or iteration-wise, must have been made. //! - For an operation to run, the following must be true: //! - The operation must be in the scheduled queue. //! - A concurrent token must be acquired. //! - A running operation will be monitored by a task awaiting a completion event. //! - When the completion event is received, the atomic completion counter will be incremented. //! //! //! All transfer requests are triggered by the leader based on the details in the [`SchedulerOutput`]. //! //! [`SchedulerOutput`] is transform use super::scheduler::{SchedulingDecision, DISCONNECTED_WARNING}; use super::*; use tokio::sync::oneshot; pub type LayerName = String; pub type LayerIndex = u32; pub type Iteration = u64; #[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)] pub enum RequestType { /// If Scheduled, then the [`super::scheduler::TransferSchedulerClient`] will commuicate with the scheudler /// to await a boxed [`ScheduledTransferCompletionHandle`]. Scheduled, /// If Immediate, then the [`super::scheduler::TransferSchedulerClient`] will immediately return a /// [`ImmediateTransferCompletionHandle`]. Immediate, } #[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)] pub enum TransferType { Load, Store, } #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub enum SchedulerRequirement { IterationComplete(Iteration), /// The layer with the provided name and iteration counter must be complete. LayerNameComplete(LayerName, Iteration), /// The layer index and iteration counter must be complete. LayerComplete(LayerIndex, Iteration), } /// Issued by the leader, received by the TransferEngine. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LeaderTransferRequest { pub request_id: String, pub uuid: uuid::Uuid, pub requirement: Option, pub request_type: RequestType, } pub enum TransferToSchedulerMessage { ScheduleRequest(TransferScheduleRequest), ImmediateResult(ImmediateTransferResult), } /// Issued by the TransferEngine, received by the Scheduler. /// Note: In order to be considered for scheduling, the [`TransferScheduleRequest`] and the [`WorkerTransferRequest`] /// for the same operation (uuid) must be present on the scheduler. pub struct TransferScheduleRequest { pub leader_request: LeaderTransferRequest, pub response_tx: oneshot::Sender, } pub struct ScheduledTaskHandle { pub decision_rx: oneshot::Receiver<(SchedulingDecision, oneshot::Sender>)>, pub cancel_token: CancellationToken, } impl ScheduledTaskHandle { pub async fn wait_for_decision(self) -> Box { tokio::select! { Ok((decision, completion_tx)) = self.decision_rx => { Box::new(ScheduledTransferCompletionHandle::new(decision, completion_tx)) } _ = self.cancel_token.cancelled() => { Box::new(CancelledTransferCompletionHandle) } } } } /// Recived by the Worker, forward to the Scheduler. /// /// In ordered to be considered for scheduling, both the [`TransferScheduleRequest`] and the [`WorkerTransferRequest`] /// must be present on the scheduler. /// /// Note: No response is required. The Worker holds an atomic counter for each oepration type. The expected count (local/non-atomic) /// is incremented on receiving a request. The Worker knows all operations are complete when the shared atomic counter matches the /// expected count. /// /// Workers can not handle errors, they only deal with counters. All operations (which can be cancelled) must completed for a Worker /// to mark the request_id as complete. /// /// Scheduler requirements are only provided by the leader initiated transfer request. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkerTransferRequest { pub request_id: String, pub uuid: uuid::Uuid, pub transfer_type: TransferType, pub request_type: RequestType, } /// Sent by Worker to Scheduler. /// Combines [`WorkerTransferRequest`] and [`WorkerRequestState`] and issues a [`WorkerSchedulerRequest`] /// /// This object has all the links to the worker to track completion and observe any cancellation signals. pub struct WorkerSchedulerRequest { pub request_id: String, pub uuid: uuid::Uuid, pub transfer_type: TransferType, pub cancel_token: CancellationToken, } /// One-time use object returned from [`Scheduler::schedule_transfer`] /// This object carries with it the [`SchedulingDecision`] and is used to mark the transfer as complete. #[async_trait::async_trait] pub trait TransferCompletionHandle: Send { fn scheduler_decision(&self) -> SchedulingDecision; async fn mark_complete(&self, result: anyhow::Result<()>); } pub struct ScheduledTransferCompletionHandle { scheduler_decision: SchedulingDecision, completion_tx: Mutex>>>, } impl ScheduledTransferCompletionHandle { pub(crate) fn new( scheduler_decision: SchedulingDecision, completion_tx: oneshot::Sender>, ) -> Self { Self { scheduler_decision, completion_tx: Mutex::new(Some(completion_tx)), } } } #[async_trait::async_trait] impl TransferCompletionHandle for ScheduledTransferCompletionHandle { fn scheduler_decision(&self) -> SchedulingDecision { self.scheduler_decision } async fn mark_complete(&self, result: anyhow::Result<()>) { if let Some(completion_tx) = self.completion_tx.lock().unwrap().take() { if completion_tx.send(result).is_err() { tracing::error!( "failed to send completion status; this could lead to silent data corruption" ); } } } } impl Drop for ScheduledTransferCompletionHandle { fn drop(&mut self) { if self.completion_tx.lock().unwrap().is_some() { // This is a fundamental logic error. The results of the application are undefined. // We must abort. panic!(concat!( "logic error: implementation failed to respect the [TransferCompletionHandle] policy; ", "handle dropped without being explicitly marked; this may lead to data corruption if ", "the handle was dropped while a transfer was still in progress; please report immediately.", )); } } } pub struct ImmediateTransferResult { pub request_id: String, pub uuid: uuid::Uuid, pub status: anyhow::Result<()>, } pub struct ImmediateTransferCompletionHandle { request_id: String, uuid: uuid::Uuid, completion_tx: Mutex>>, } impl ImmediateTransferCompletionHandle { pub(crate) fn new( request_id: String, uuid: uuid::Uuid, completion_tx: tokio::sync::mpsc::Sender, ) -> Self { Self { request_id, uuid, completion_tx: Mutex::new(Some(completion_tx)), } } } #[async_trait::async_trait] impl TransferCompletionHandle for ImmediateTransferCompletionHandle { fn scheduler_decision(&self) -> SchedulingDecision { SchedulingDecision::Execute } async fn mark_complete(&self, result: anyhow::Result<()>) { // To ensure the future is Send, avoid holding the MutexGuard across .await. let completion_tx = { let mut guard = self.completion_tx.lock().unwrap(); guard.take() }; if let Some(completion_tx) = completion_tx { if completion_tx .send(TransferToSchedulerMessage::ImmediateResult( ImmediateTransferResult { request_id: self.request_id.clone(), uuid: self.uuid, status: result, }, )) .await .is_err() { tracing::error!(DISCONNECTED_WARNING); } } } } impl Drop for ImmediateTransferCompletionHandle { fn drop(&mut self) { if self.completion_tx.lock().unwrap().is_some() { // This is a fundamental logic error. The results of the application are undefined. // We must abort. panic!(concat!( "logic error: implementation failed to respect the [TransferCompletionHandle] policy; ", "handle dropped without being explicitly marked; this may lead to data corruption if ", "the handle was dropped while a transfer was still in progress; please report immediately.", )); } } } pub struct CancelledTransferCompletionHandle; #[async_trait::async_trait] impl TransferCompletionHandle for CancelledTransferCompletionHandle { fn scheduler_decision(&self) -> SchedulingDecision { SchedulingDecision::Cancel } async fn mark_complete(&self, _result: anyhow::Result<()>) { // Do nothing } }