"tests/fault_tolerance/vscode:/vscode.git/clone" did not exist on "e0fb52d1c4e7e8b06b9b836ba9a60e30c45c63f6"
pending.rs 17.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//! # Transfer Managers
//!
//! Transfer managers are responsible for multiple things:
//! - Before the transfer:
//!     - Rate-limiting the number of transfers that can be initiated concurrently. This is implemented through bounded channels.
//!         - Due to the nature of the [`super::OffloadManager`], we only apply this rate-limiting to offloads.
//! - During the transfer:
//!     - Initiating the transfer
//!     - Holding strong references to blocks being transfered.
//! - After the transfer:
//!     - Dropping these references once the transfer is complete.
//!     - Registering the blocks with the target pool.
//!     - Returning the registered blocks to the caller.
//!
//! This is implemented through the [`TransferManager`] trait, which takes a single [`PendingTransfer`]
//! and initiates the transfer.
//!
//! Since CUDA and NIXL transfers use completely different semantics, we implement two separate transfer managers.
//!
//! ## Workflow
36
37
//! 1. A transfer request is made by calling [`TransferManager::enqueue_transfer`]
//! 2. [`TransferManager::enqueue_transfer`] performs the transfer, and enqueues relevant data into a bounded channel.
38
39
40
//! 3. A worker thread (consuming this bounded channel and enforcing rate limiting) awaits the incoming transfers.
//! 4. After a transfer is complete, the worker thread registers the blocks with the target pool, and returns the registered blocks to the caller.

Ryan Olson's avatar
Ryan Olson committed
41
use nixl_sys::NixlDescriptor;
42
use std::marker::PhantomData;
43
use std::pin::Pin;
44
use std::sync::Arc;
Ryan Olson's avatar
Ryan Olson committed
45
use std::time::{Duration, Instant};
46
use tokio::runtime::Handle;
Ryan Olson's avatar
Ryan Olson committed
47
use tokio::sync::{mpsc, oneshot};
48
use tokio_util::sync::CancellationToken;
49

50
use crate::block_manager::block::{
Ryan Olson's avatar
Ryan Olson committed
51
52
53
54
    locality::LocalityProvider,
    transfer::{TransferContext, WriteTo, WriteToStrategy},
    BlockDataProvider, BlockDataProviderMut, BlockError, BlockMetadata, BlockState, ImmutableBlock,
    MutableBlock, ReadableBlock, WritableBlock,
55
};
Ryan Olson's avatar
Ryan Olson committed
56
57
use crate::block_manager::metrics::PoolMetrics;
use crate::block_manager::pool::{BlockPool, BlockPoolError};
58
59
use crate::block_manager::storage::{Local, Storage};

60
use anyhow::Result;
61
use async_trait::async_trait;
62
use futures::{stream::FuturesUnordered, StreamExt};
63

64
use super::BlockResult;
65

66
67
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;

Ryan Olson's avatar
Ryan Olson committed
68
69
const BLOCKS_BW_MIN_PUBLISH_INTERVAL_MS: u64 = 50;

70
/// Manage a set of pending transfers.
Ryan Olson's avatar
Ryan Olson committed
71
72
73
74
75
76
pub struct PendingTransfer<
    Source: Storage,
    Target: Storage,
    Locality: LocalityProvider,
    Metadata: BlockMetadata,
> {
77
    /// The block being copied from.
Ryan Olson's avatar
Ryan Olson committed
78
    sources: Vec<ImmutableBlock<Source, Locality, Metadata>>,
79
    /// The block being copied to.
Ryan Olson's avatar
Ryan Olson committed
80
    targets: Vec<MutableBlock<Target, Locality, Metadata>>,
81
    /// The oneshot sender that optionally returns the registered blocks once the transfer is complete.
Ryan Olson's avatar
Ryan Olson committed
82
    completion_indicator: Option<oneshot::Sender<BlockResult<Target, Locality, Metadata>>>,
83
    /// The target pool that will receive the registered block.
Ryan Olson's avatar
Ryan Olson committed
84
    target_pool: Arc<dyn BlockPool<Target, Locality, Metadata>>,
85
86
}

Ryan Olson's avatar
Ryan Olson committed
87
88
impl<Source: Storage, Target: Storage, Locality: LocalityProvider, Metadata: BlockMetadata>
    PendingTransfer<Source, Target, Locality, Metadata>
89
90
{
    pub fn new(
Ryan Olson's avatar
Ryan Olson committed
91
92
93
94
        sources: Vec<ImmutableBlock<Source, Locality, Metadata>>,
        targets: Vec<MutableBlock<Target, Locality, Metadata>>,
        completion_indicator: Option<oneshot::Sender<BlockResult<Target, Locality, Metadata>>>,
        target_pool: Arc<dyn BlockPool<Target, Locality, Metadata>>,
95
    ) -> Self {
96
        assert_eq!(sources.len(), targets.len());
97
        Self {
98
99
100
            sources,
            targets,
            completion_indicator,
101
            target_pool,
102
103
104
        }
    }

Ryan Olson's avatar
Ryan Olson committed
105
    async fn handle_complete(self) -> Result<()> {
106
        let Self {
107
108
109
            sources,
            mut targets,
            target_pool,
110
            completion_indicator,
111
112
113
            ..
        } = self;

114
115
116
117
        for (source, target) in sources.iter().zip(targets.iter_mut()) {
            transfer_metadata(source, target)?;
        }

Ryan Olson's avatar
Ryan Olson committed
118
119
120
        let blocks = target_pool.register_blocks(targets).await?;

        tracing::debug!("Transfer complete. Registered {} blocks.", blocks.len());
121

122
        if let Some(completion_indicator) = completion_indicator {
123
124
125
            completion_indicator
                .send(Ok(blocks))
                .map_err(|_| BlockPoolError::ProgressEngineShutdown)?;
126
        }
127
128
129
130
131

        Ok(())
    }
}

Ryan Olson's avatar
Ryan Olson committed
132
133
134
135
136
137
138
139
fn transfer_metadata<
    Source: Storage,
    Target: Storage,
    Locality: LocalityProvider,
    Metadata: BlockMetadata,
>(
    source: &ImmutableBlock<Source, Locality, Metadata>,
    target: &mut MutableBlock<Target, Locality, Metadata>,
140
141
) -> Result<()> {
    // Only registered blocks can be transferred. There are upstream checks for this, so this shouldn't ever fail.
142
    if let BlockState::Registered(reg_handle, _) = source.state() {
143
144
145
146
147
148
149
150
151
152
        // Bring the block back to the 'Reset' state.
        target.reset();
        // Transfer metadata.
        target.update_metadata(source.metadata().clone());
        // Copy tokens
        target.apply_token_block(reg_handle.token_block().clone())?;
    } else {
        Err(BlockPoolError::BlockError(BlockError::InvalidState(
            "Block is not registered.".to_string(),
        )))?;
153
    }
154
155
156
157
158

    Ok(())
}

#[async_trait]
Ryan Olson's avatar
Ryan Olson committed
159
160
161
162
163
164
pub trait TransferManager<
    Source: Storage,
    Target: Storage,
    Locality: LocalityProvider,
    Metadata: BlockMetadata,
>: Send + Sync
165
166
{
    /// Begin a transfer. Blocks if the pending queue is full.
167
    async fn enqueue_transfer(
168
        &self,
Ryan Olson's avatar
Ryan Olson committed
169
        pending_transfer: PendingTransfer<Source, Target, Locality, Metadata>,
170
    ) -> Result<()>;
171
172
}

Ryan Olson's avatar
Ryan Olson committed
173
174
175
176
177
178
179
180
181
182
183
184
struct TransferCompletionManager<
    Source: Storage,
    Target: Storage,
    Locality: LocalityProvider,
    Metadata: BlockMetadata,
> {
    pool_metrics: Arc<PoolMetrics>,
    transfer_type: String,
    last_publish_time: Option<Instant>,
    transfer_start: Instant,
    num_blocks_transferred: usize,
    _phantom: PhantomData<(Source, Target, Locality, Metadata)>,
185
186
}

Ryan Olson's avatar
Ryan Olson committed
187
188
impl<Source: Storage, Target: Storage, Locality: LocalityProvider, Metadata: BlockMetadata>
    TransferCompletionManager<Source, Target, Locality, Metadata>
189
{
Ryan Olson's avatar
Ryan Olson committed
190
191
192
193
194
195
196
197
198
199
    pub fn new(pool_metrics: Arc<PoolMetrics>, transfer_type: String) -> Self {
        Self {
            pool_metrics,
            transfer_type,
            last_publish_time: None,
            transfer_start: Instant::now(),
            num_blocks_transferred: 0,
            _phantom: PhantomData,
        }
    }
200

Ryan Olson's avatar
Ryan Olson committed
201
202
203
204
205
    pub async fn handle_complete(
        &mut self,
        pending_transfer: PendingTransfer<Source, Target, Locality, Metadata>,
    ) -> Result<()> {
        self.num_blocks_transferred += pending_transfer.sources.len();
206

Ryan Olson's avatar
Ryan Olson committed
207
208
209
        let should_publish = self.last_publish_time.is_none_or(|last_publish_time| {
            last_publish_time.elapsed() > Duration::from_millis(BLOCKS_BW_MIN_PUBLISH_INTERVAL_MS)
        });
210

Ryan Olson's avatar
Ryan Olson committed
211
212
213
214
        if should_publish {
            self.last_publish_time = Some(Instant::now());
            let duration = self.transfer_start.elapsed();
            let blocks_per_sec = self.num_blocks_transferred as f64 / duration.as_secs_f64();
215

Ryan Olson's avatar
Ryan Olson committed
216
217
218
219
220
221
222
223
224
225
226
227
228
            self.pool_metrics
                .gauge(self.transfer_type.as_str())
                .set(blocks_per_sec as i64);
        }

        match pending_transfer.handle_complete().await {
            Ok(_) => {}
            Err(e) => {
                // The only case where this can fail is if the progress engine is being shutdown.
                // This is not a problem, so we can just ignore it.
                tracing::warn!("Error handling transfer completion: {:?}", e);
            }
        }
229
230
231
232
233

        Ok(())
    }
}

Ryan Olson's avatar
Ryan Olson committed
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
type TransferFuture<Source, Target, Locality, Metadata> = Pin<
    Box<
        dyn std::future::Future<Output = PendingTransfer<Source, Target, Locality, Metadata>>
            + Send
            + Sync,
    >,
>;

pub struct LocalTransferManager<
    Source: Storage,
    Target: Storage,
    Locality: LocalityProvider,
    Metadata: BlockMetadata,
> {
    futures_tx: mpsc::Sender<TransferFuture<Source, Target, Locality, Metadata>>,
249
250
251
    transfer_ctx: Arc<TransferContext>,
}

Ryan Olson's avatar
Ryan Olson committed
252
253
254
impl<Source: Storage, Target: Storage, Locality: LocalityProvider, Metadata: BlockMetadata>
    LocalTransferManager<Source, Target, Locality, Metadata>
{
255
256
257
258
259
    pub fn new(
        transfer_ctx: Arc<TransferContext>,
        max_concurrent_transfers: usize,
        runtime: &Handle,
        cancellation_token: CancellationToken,
Ryan Olson's avatar
Ryan Olson committed
260
261
        pool_metrics: Arc<PoolMetrics>,
        transfer_type: String,
262
    ) -> Result<Self> {
263
264
        let (futures_tx, mut futures_rx) = mpsc::channel(1);

Ryan Olson's avatar
Ryan Olson committed
265
266
267
        let mut completion_manager =
            TransferCompletionManager::new(pool_metrics.clone(), transfer_type.clone());

268
269
        CriticalTaskExecutionHandle::new_with_runtime(
            move |cancel_token| async move {
Ryan Olson's avatar
Ryan Olson committed
270
                let mut pending_transfers: FuturesUnordered<TransferFuture<Source, Target, Locality, Metadata>> = FuturesUnordered::new();
271
272
                loop {
                    tokio::select! {
273

274
275
276
                        _ = cancel_token.cancelled() => {
                            return Ok(());
                        }
277

278
279
280
                        Some(future) = futures_rx.recv() => {
                            // If we're at max size, block the worker thread on the next() call until we have capacity.
                            while pending_transfers.len() >= max_concurrent_transfers {
Ryan Olson's avatar
Ryan Olson committed
281
282
283
284
285
                                if let Some(pending_transfer) = pending_transfers.next().await {
                                    completion_manager.handle_complete(pending_transfer).await?;
                                } else {
                                    break;
                                }
286
                            }
Ryan Olson's avatar
Ryan Olson committed
287

288
289
                            pending_transfers.push(future);
                        }
Ryan Olson's avatar
Ryan Olson committed
290
291
                        Some(pending_transfer) = pending_transfers.next(), if !pending_transfers.is_empty() => {
                            completion_manager.handle_complete(pending_transfer).await?;
292
293
294
                        }
                    }
                }
295
296
            },
            cancellation_token.clone(),
Ryan Olson's avatar
Ryan Olson committed
297
            "Local Transfer Manager",
298
299
300
301
302
            runtime,
        )?
        .detach();

        Ok(Self {
303
304
            futures_tx,
            transfer_ctx,
305
        })
306
307
308
309
    }
}

#[async_trait]
Ryan Olson's avatar
Ryan Olson committed
310
311
impl<Source, Target, Locality, Metadata> TransferManager<Source, Target, Locality, Metadata>
    for LocalTransferManager<Source, Target, Locality, Metadata>
312
where
Ryan Olson's avatar
Ryan Olson committed
313
314
315
    Source: Storage + NixlDescriptor,
    Target: Storage + NixlDescriptor,
    Locality: LocalityProvider,
316
317
    Metadata: BlockMetadata,
    // Check that the source block is readable, local, and writable to the target block.
Ryan Olson's avatar
Ryan Olson committed
318
    ImmutableBlock<Source, Locality, Metadata>: ReadableBlock<StorageType = Source>
319
        + Local
Ryan Olson's avatar
Ryan Olson committed
320
        + WriteToStrategy<MutableBlock<Target, Locality, Metadata>>,
321
    // Check that the target block is writable.
Ryan Olson's avatar
Ryan Olson committed
322
323
324
325
    MutableBlock<Target, Locality, Metadata>: WritableBlock<StorageType = Target>,
    // Check that the source and target blocks have the same locality.
    ImmutableBlock<Source, Locality, Metadata>: BlockDataProvider<Locality = Locality>,
    MutableBlock<Target, Locality, Metadata>: BlockDataProviderMut<Locality = Locality>,
326
{
327
    async fn enqueue_transfer(
328
        &self,
Ryan Olson's avatar
Ryan Olson committed
329
        mut pending_transfer: PendingTransfer<Source, Target, Locality, Metadata>,
330
    ) -> Result<()> {
331
332
        let notify = pending_transfer
            .sources
Ryan Olson's avatar
Ryan Olson committed
333
            .write_to(&mut pending_transfer.targets, self.transfer_ctx.clone())?;
334
335

        let completion_future = async move {
336
            let _ = notify.await;
Ryan Olson's avatar
Ryan Olson committed
337
            pending_transfer
338
339
340
341
342
        };

        // Futures_(tx/rx) has a capacity of 1. If the queue worker has received another future and is awaiting next() due to a full `FuturesUnordered`,
        // this call will block until the worker has processed the prior future.
        self.futures_tx.send(Box::pin(completion_future)).await?;
343
344
345
346

        Ok(())
    }
}
347
348

/// A transfer manager that enforces a max batch size for transfers.
Ryan Olson's avatar
Ryan Olson committed
349
pub struct TransferBatcher<Source, Target, Locality, Metadata, Manager>
350
351
352
where
    Source: Storage,
    Target: Storage,
Ryan Olson's avatar
Ryan Olson committed
353
    Locality: LocalityProvider,
354
    Metadata: BlockMetadata,
Ryan Olson's avatar
Ryan Olson committed
355
    Manager: TransferManager<Source, Target, Locality, Metadata>,
356
357
358
{
    transfer_manager: Manager,
    max_transfer_batch_size: usize,
359
360
    runtime: Handle,
    cancellation_token: CancellationToken,
Ryan Olson's avatar
Ryan Olson committed
361
    _phantom: PhantomData<(Source, Target, Locality, Metadata)>,
362
363
}

Ryan Olson's avatar
Ryan Olson committed
364
365
impl<Source, Target, Locality, Metadata, Manager>
    TransferBatcher<Source, Target, Locality, Metadata, Manager>
366
367
368
where
    Source: Storage,
    Target: Storage,
Ryan Olson's avatar
Ryan Olson committed
369
370
371
    Locality: LocalityProvider + 'static,
    Metadata: BlockMetadata + 'static,
    Manager: TransferManager<Source, Target, Locality, Metadata> + 'static,
372
{
373
374
375
376
377
378
    pub fn new(
        transfer_manager: Manager,
        max_transfer_batch_size: usize,
        runtime: &Handle,
        cancellation_token: CancellationToken,
    ) -> Self {
379
380
381
        Self {
            transfer_manager,
            max_transfer_batch_size,
382
383
            runtime: runtime.clone(),
            cancellation_token,
384
385
386
387
388
389
            _phantom: PhantomData,
        }
    }
}

#[async_trait]
Ryan Olson's avatar
Ryan Olson committed
390
391
392
impl<Source, Target, Locality, Metadata, Manager>
    TransferManager<Source, Target, Locality, Metadata>
    for TransferBatcher<Source, Target, Locality, Metadata, Manager>
393
where
Ryan Olson's avatar
Ryan Olson committed
394
395
396
    Source: Storage + 'static,
    Target: Storage + 'static,
    Locality: LocalityProvider + 'static,
397
    Metadata: BlockMetadata,
Ryan Olson's avatar
Ryan Olson committed
398
    Manager: TransferManager<Source, Target, Locality, Metadata>,
399
400
401
{
    async fn enqueue_transfer(
        &self,
Ryan Olson's avatar
Ryan Olson committed
402
        pending_transfer: PendingTransfer<Source, Target, Locality, Metadata>,
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
    ) -> Result<()> {
        // If it's smaller than the max batch size, just enqueue it.
        if pending_transfer.sources.len() < self.max_transfer_batch_size {
            return self
                .transfer_manager
                .enqueue_transfer(pending_transfer)
                .await;
        }

        // Otherwise, we need to split the transfer into multiple smaller transfers.

        let PendingTransfer {
            mut sources,
            mut targets,
            completion_indicator,
            target_pool,
        } = pending_transfer;

        let mut indicators = Vec::new();

        while !sources.is_empty() {
            let sources = sources
                .drain(..std::cmp::min(self.max_transfer_batch_size, sources.len()))
                .collect();
            let targets = targets
                .drain(..std::cmp::min(self.max_transfer_batch_size, targets.len()))
                .collect();

            // If we have a completion indicator, we need to create a new one for each sub-transfer.
            let indicator = if completion_indicator.is_some() {
                let (batch_tx, batch_rx) = oneshot::channel();
                indicators.push(batch_rx);
                Some(batch_tx)
            } else {
                None
            };

            let request = PendingTransfer::new(sources, targets, indicator, target_pool.clone());
            // Enqueue our reduced transfer. This may block if the queue is full.
            self.transfer_manager.enqueue_transfer(request).await?;
        }

        if let Some(completion_indicator) = completion_indicator {
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
            CriticalTaskExecutionHandle::new_with_runtime(
                move |cancel_token| async move {
                    let mut results = Vec::new();

                    for indicator in indicators.into_iter() {
                        // Await each sub-transfer, and append the results to our final results.
                        tokio::select! {
                            _ = cancel_token.cancelled() => {
                                return Ok(());
                            }

                            Ok(indicator) = indicator => {
                                let result = match indicator {
                                    Ok(result) => result,
                                    Err(e) => {
                                        tracing::error!("Error receiving transfer results: {:?}", e);
Ryan Olson's avatar
Ryan Olson committed
462
                                        let _ = completion_indicator.send(Err(e));
463
464
465
466
467
                                        return Ok(());
                                    }
                                };
                                results.extend(result);
                            }
468
                        }
469
470
471
                    }

                    // Send the final results to the top-level completion indicator.
Ryan Olson's avatar
Ryan Olson committed
472
                    let _ = completion_indicator.send(Ok(results));
473

474
475
476
477
478
479
                    Ok(())
                },
                self.cancellation_token.clone(),
                "Transfer Batcher",
                &self.runtime,
            )?.detach();
480
481
482
483
484
        }

        Ok(())
    }
}