pending.rs 16.8 KB
Newer Older
1
2
3
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//! # Transfer Managers
//!
//! Transfer managers are responsible for multiple things:
//! - Before the transfer:
//!     - Rate-limiting the number of transfers that can be initiated concurrently. This is implemented through bounded channels.
//!         - Due to the nature of the [`super::OffloadManager`], we only apply this rate-limiting to offloads.
//! - During the transfer:
//!     - Initiating the transfer
//!     - Holding strong references to blocks being transfered.
//! - After the transfer:
//!     - Dropping these references once the transfer is complete.
//!     - Registering the blocks with the target pool.
//!     - Returning the registered blocks to the caller.
//!
//! This is implemented through the [`TransferManager`] trait, which takes a single [`PendingTransfer`]
//! and initiates the transfer.
//!
//! Since CUDA and NIXL transfers use completely different semantics, we implement two separate transfer managers.
//!
//! ## Workflow
24
25
//! 1. A transfer request is made by calling [`TransferManager::enqueue_transfer`]
//! 2. [`TransferManager::enqueue_transfer`] performs the transfer, and enqueues relevant data into a bounded channel.
26
27
28
//! 3. A worker thread (consuming this bounded channel and enforcing rate limiting) awaits the incoming transfers.
//! 4. After a transfer is complete, the worker thread registers the blocks with the target pool, and returns the registered blocks to the caller.

Ryan Olson's avatar
Ryan Olson committed
29
use nixl_sys::NixlDescriptor;
30
use std::marker::PhantomData;
31
use std::pin::Pin;
32
use std::sync::Arc;
Ryan Olson's avatar
Ryan Olson committed
33
use std::time::{Duration, Instant};
34
use tokio::runtime::Handle;
Ryan Olson's avatar
Ryan Olson committed
35
use tokio::sync::{mpsc, oneshot};
36
use tokio_util::sync::CancellationToken;
37

38
use crate::block_manager::block::{
Ryan Olson's avatar
Ryan Olson committed
39
40
    BlockDataProvider, BlockDataProviderMut, BlockError, BlockMetadata, BlockState, ImmutableBlock,
    MutableBlock, ReadableBlock, WritableBlock,
41
42
    locality::LocalityProvider,
    transfer::{TransferContext, WriteTo, WriteToStrategy},
43
};
Ryan Olson's avatar
Ryan Olson committed
44
45
use crate::block_manager::metrics::PoolMetrics;
use crate::block_manager::pool::{BlockPool, BlockPoolError};
46
47
use crate::block_manager::storage::{Local, Storage};

48
use anyhow::Result;
49
use async_trait::async_trait;
50
use futures::{StreamExt, stream::FuturesUnordered};
51

52
use super::BlockResult;
53

54
55
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;

Ryan Olson's avatar
Ryan Olson committed
56
57
const BLOCKS_BW_MIN_PUBLISH_INTERVAL_MS: u64 = 50;

58
/// Manage a set of pending transfers.
Ryan Olson's avatar
Ryan Olson committed
59
60
61
62
63
64
pub struct PendingTransfer<
    Source: Storage,
    Target: Storage,
    Locality: LocalityProvider,
    Metadata: BlockMetadata,
> {
65
    /// The block being copied from.
Ryan Olson's avatar
Ryan Olson committed
66
    sources: Vec<ImmutableBlock<Source, Locality, Metadata>>,
67
    /// The block being copied to.
Ryan Olson's avatar
Ryan Olson committed
68
    targets: Vec<MutableBlock<Target, Locality, Metadata>>,
69
    /// The oneshot sender that optionally returns the registered blocks once the transfer is complete.
Ryan Olson's avatar
Ryan Olson committed
70
    completion_indicator: Option<oneshot::Sender<BlockResult<Target, Locality, Metadata>>>,
71
    /// The target pool that will receive the registered block.
Ryan Olson's avatar
Ryan Olson committed
72
    target_pool: Arc<dyn BlockPool<Target, Locality, Metadata>>,
73
74
}

Ryan Olson's avatar
Ryan Olson committed
75
76
impl<Source: Storage, Target: Storage, Locality: LocalityProvider, Metadata: BlockMetadata>
    PendingTransfer<Source, Target, Locality, Metadata>
77
78
{
    pub fn new(
Ryan Olson's avatar
Ryan Olson committed
79
80
81
82
        sources: Vec<ImmutableBlock<Source, Locality, Metadata>>,
        targets: Vec<MutableBlock<Target, Locality, Metadata>>,
        completion_indicator: Option<oneshot::Sender<BlockResult<Target, Locality, Metadata>>>,
        target_pool: Arc<dyn BlockPool<Target, Locality, Metadata>>,
83
    ) -> Self {
84
        assert_eq!(sources.len(), targets.len());
85
        Self {
86
87
88
            sources,
            targets,
            completion_indicator,
89
            target_pool,
90
91
92
        }
    }

Ryan Olson's avatar
Ryan Olson committed
93
    async fn handle_complete(self) -> Result<()> {
94
        let Self {
95
96
97
            sources,
            mut targets,
            target_pool,
98
            completion_indicator,
99
100
101
            ..
        } = self;

102
103
104
105
        for (source, target) in sources.iter().zip(targets.iter_mut()) {
            transfer_metadata(source, target)?;
        }

Ryan Olson's avatar
Ryan Olson committed
106
107
108
        let blocks = target_pool.register_blocks(targets).await?;

        tracing::debug!("Transfer complete. Registered {} blocks.", blocks.len());
109

110
        if let Some(completion_indicator) = completion_indicator {
111
112
113
            completion_indicator
                .send(Ok(blocks))
                .map_err(|_| BlockPoolError::ProgressEngineShutdown)?;
114
        }
115
116
117
118
119

        Ok(())
    }
}

Ryan Olson's avatar
Ryan Olson committed
120
121
122
123
124
125
126
127
fn transfer_metadata<
    Source: Storage,
    Target: Storage,
    Locality: LocalityProvider,
    Metadata: BlockMetadata,
>(
    source: &ImmutableBlock<Source, Locality, Metadata>,
    target: &mut MutableBlock<Target, Locality, Metadata>,
128
129
) -> Result<()> {
    // Only registered blocks can be transferred. There are upstream checks for this, so this shouldn't ever fail.
130
    if let BlockState::Registered(reg_handle, _) = source.state() {
131
132
133
134
135
136
137
138
139
140
        // Bring the block back to the 'Reset' state.
        target.reset();
        // Transfer metadata.
        target.update_metadata(source.metadata().clone());
        // Copy tokens
        target.apply_token_block(reg_handle.token_block().clone())?;
    } else {
        Err(BlockPoolError::BlockError(BlockError::InvalidState(
            "Block is not registered.".to_string(),
        )))?;
141
    }
142
143
144
145
146

    Ok(())
}

#[async_trait]
Ryan Olson's avatar
Ryan Olson committed
147
148
149
150
151
152
pub trait TransferManager<
    Source: Storage,
    Target: Storage,
    Locality: LocalityProvider,
    Metadata: BlockMetadata,
>: Send + Sync
153
154
{
    /// Begin a transfer. Blocks if the pending queue is full.
155
    async fn enqueue_transfer(
156
        &self,
Ryan Olson's avatar
Ryan Olson committed
157
        pending_transfer: PendingTransfer<Source, Target, Locality, Metadata>,
158
    ) -> Result<()>;
159
160
}

Ryan Olson's avatar
Ryan Olson committed
161
162
163
164
165
166
167
168
169
170
171
172
struct TransferCompletionManager<
    Source: Storage,
    Target: Storage,
    Locality: LocalityProvider,
    Metadata: BlockMetadata,
> {
    pool_metrics: Arc<PoolMetrics>,
    transfer_type: String,
    last_publish_time: Option<Instant>,
    transfer_start: Instant,
    num_blocks_transferred: usize,
    _phantom: PhantomData<(Source, Target, Locality, Metadata)>,
173
174
}

Ryan Olson's avatar
Ryan Olson committed
175
176
impl<Source: Storage, Target: Storage, Locality: LocalityProvider, Metadata: BlockMetadata>
    TransferCompletionManager<Source, Target, Locality, Metadata>
177
{
Ryan Olson's avatar
Ryan Olson committed
178
179
180
181
182
183
184
185
186
187
    pub fn new(pool_metrics: Arc<PoolMetrics>, transfer_type: String) -> Self {
        Self {
            pool_metrics,
            transfer_type,
            last_publish_time: None,
            transfer_start: Instant::now(),
            num_blocks_transferred: 0,
            _phantom: PhantomData,
        }
    }
188

Ryan Olson's avatar
Ryan Olson committed
189
190
191
192
193
    pub async fn handle_complete(
        &mut self,
        pending_transfer: PendingTransfer<Source, Target, Locality, Metadata>,
    ) -> Result<()> {
        self.num_blocks_transferred += pending_transfer.sources.len();
194

Ryan Olson's avatar
Ryan Olson committed
195
196
197
        let should_publish = self.last_publish_time.is_none_or(|last_publish_time| {
            last_publish_time.elapsed() > Duration::from_millis(BLOCKS_BW_MIN_PUBLISH_INTERVAL_MS)
        });
198

Ryan Olson's avatar
Ryan Olson committed
199
200
201
202
        if should_publish {
            self.last_publish_time = Some(Instant::now());
            let duration = self.transfer_start.elapsed();
            let blocks_per_sec = self.num_blocks_transferred as f64 / duration.as_secs_f64();
203

Ryan Olson's avatar
Ryan Olson committed
204
205
206
207
208
209
210
211
212
213
214
215
216
            self.pool_metrics
                .gauge(self.transfer_type.as_str())
                .set(blocks_per_sec as i64);
        }

        match pending_transfer.handle_complete().await {
            Ok(_) => {}
            Err(e) => {
                // The only case where this can fail is if the progress engine is being shutdown.
                // This is not a problem, so we can just ignore it.
                tracing::warn!("Error handling transfer completion: {:?}", e);
            }
        }
217
218
219
220
221

        Ok(())
    }
}

Ryan Olson's avatar
Ryan Olson committed
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
type TransferFuture<Source, Target, Locality, Metadata> = Pin<
    Box<
        dyn std::future::Future<Output = PendingTransfer<Source, Target, Locality, Metadata>>
            + Send
            + Sync,
    >,
>;

pub struct LocalTransferManager<
    Source: Storage,
    Target: Storage,
    Locality: LocalityProvider,
    Metadata: BlockMetadata,
> {
    futures_tx: mpsc::Sender<TransferFuture<Source, Target, Locality, Metadata>>,
237
238
239
    transfer_ctx: Arc<TransferContext>,
}

Ryan Olson's avatar
Ryan Olson committed
240
241
242
impl<Source: Storage, Target: Storage, Locality: LocalityProvider, Metadata: BlockMetadata>
    LocalTransferManager<Source, Target, Locality, Metadata>
{
243
244
245
246
247
    pub fn new(
        transfer_ctx: Arc<TransferContext>,
        max_concurrent_transfers: usize,
        runtime: &Handle,
        cancellation_token: CancellationToken,
Ryan Olson's avatar
Ryan Olson committed
248
249
        pool_metrics: Arc<PoolMetrics>,
        transfer_type: String,
250
    ) -> Result<Self> {
251
252
        let (futures_tx, mut futures_rx) = mpsc::channel(1);

Ryan Olson's avatar
Ryan Olson committed
253
254
255
        let mut completion_manager =
            TransferCompletionManager::new(pool_metrics.clone(), transfer_type.clone());

256
257
        CriticalTaskExecutionHandle::new_with_runtime(
            move |cancel_token| async move {
Ryan Olson's avatar
Ryan Olson committed
258
                let mut pending_transfers: FuturesUnordered<TransferFuture<Source, Target, Locality, Metadata>> = FuturesUnordered::new();
259
260
                loop {
                    tokio::select! {
261

262
263
264
                        _ = cancel_token.cancelled() => {
                            return Ok(());
                        }
265

266
267
268
                        Some(future) = futures_rx.recv() => {
                            // If we're at max size, block the worker thread on the next() call until we have capacity.
                            while pending_transfers.len() >= max_concurrent_transfers {
Ryan Olson's avatar
Ryan Olson committed
269
270
271
272
273
                                if let Some(pending_transfer) = pending_transfers.next().await {
                                    completion_manager.handle_complete(pending_transfer).await?;
                                } else {
                                    break;
                                }
274
                            }
Ryan Olson's avatar
Ryan Olson committed
275

276
277
                            pending_transfers.push(future);
                        }
Ryan Olson's avatar
Ryan Olson committed
278
279
                        Some(pending_transfer) = pending_transfers.next(), if !pending_transfers.is_empty() => {
                            completion_manager.handle_complete(pending_transfer).await?;
280
281
282
                        }
                    }
                }
283
284
            },
            cancellation_token.clone(),
Ryan Olson's avatar
Ryan Olson committed
285
            "Local Transfer Manager",
286
287
288
289
290
            runtime,
        )?
        .detach();

        Ok(Self {
291
292
            futures_tx,
            transfer_ctx,
293
        })
294
295
296
297
    }
}

#[async_trait]
Ryan Olson's avatar
Ryan Olson committed
298
299
impl<Source, Target, Locality, Metadata> TransferManager<Source, Target, Locality, Metadata>
    for LocalTransferManager<Source, Target, Locality, Metadata>
300
where
Ryan Olson's avatar
Ryan Olson committed
301
302
303
    Source: Storage + NixlDescriptor,
    Target: Storage + NixlDescriptor,
    Locality: LocalityProvider,
304
305
    Metadata: BlockMetadata,
    // Check that the source block is readable, local, and writable to the target block.
Ryan Olson's avatar
Ryan Olson committed
306
    ImmutableBlock<Source, Locality, Metadata>: ReadableBlock<StorageType = Source>
307
        + Local
Ryan Olson's avatar
Ryan Olson committed
308
        + WriteToStrategy<MutableBlock<Target, Locality, Metadata>>,
309
    // Check that the target block is writable.
Ryan Olson's avatar
Ryan Olson committed
310
311
312
313
    MutableBlock<Target, Locality, Metadata>: WritableBlock<StorageType = Target>,
    // Check that the source and target blocks have the same locality.
    ImmutableBlock<Source, Locality, Metadata>: BlockDataProvider<Locality = Locality>,
    MutableBlock<Target, Locality, Metadata>: BlockDataProviderMut<Locality = Locality>,
314
{
315
    async fn enqueue_transfer(
316
        &self,
Ryan Olson's avatar
Ryan Olson committed
317
        mut pending_transfer: PendingTransfer<Source, Target, Locality, Metadata>,
318
    ) -> Result<()> {
319
320
        let notify = pending_transfer
            .sources
Ryan Olson's avatar
Ryan Olson committed
321
            .write_to(&mut pending_transfer.targets, self.transfer_ctx.clone())?;
322
323

        let completion_future = async move {
324
            let _ = notify.await;
Ryan Olson's avatar
Ryan Olson committed
325
            pending_transfer
326
327
328
329
330
        };

        // Futures_(tx/rx) has a capacity of 1. If the queue worker has received another future and is awaiting next() due to a full `FuturesUnordered`,
        // this call will block until the worker has processed the prior future.
        self.futures_tx.send(Box::pin(completion_future)).await?;
331
332
333
334

        Ok(())
    }
}
335
336

/// A transfer manager that enforces a max batch size for transfers.
Ryan Olson's avatar
Ryan Olson committed
337
pub struct TransferBatcher<Source, Target, Locality, Metadata, Manager>
338
339
340
where
    Source: Storage,
    Target: Storage,
Ryan Olson's avatar
Ryan Olson committed
341
    Locality: LocalityProvider,
342
    Metadata: BlockMetadata,
Ryan Olson's avatar
Ryan Olson committed
343
    Manager: TransferManager<Source, Target, Locality, Metadata>,
344
345
346
{
    transfer_manager: Manager,
    max_transfer_batch_size: usize,
347
348
    runtime: Handle,
    cancellation_token: CancellationToken,
Ryan Olson's avatar
Ryan Olson committed
349
    _phantom: PhantomData<(Source, Target, Locality, Metadata)>,
350
351
}

Ryan Olson's avatar
Ryan Olson committed
352
353
impl<Source, Target, Locality, Metadata, Manager>
    TransferBatcher<Source, Target, Locality, Metadata, Manager>
354
355
356
where
    Source: Storage,
    Target: Storage,
Ryan Olson's avatar
Ryan Olson committed
357
358
359
    Locality: LocalityProvider + 'static,
    Metadata: BlockMetadata + 'static,
    Manager: TransferManager<Source, Target, Locality, Metadata> + 'static,
360
{
361
362
363
364
365
366
    pub fn new(
        transfer_manager: Manager,
        max_transfer_batch_size: usize,
        runtime: &Handle,
        cancellation_token: CancellationToken,
    ) -> Self {
367
368
369
        Self {
            transfer_manager,
            max_transfer_batch_size,
370
371
            runtime: runtime.clone(),
            cancellation_token,
372
373
374
375
376
377
            _phantom: PhantomData,
        }
    }
}

#[async_trait]
Ryan Olson's avatar
Ryan Olson committed
378
379
380
impl<Source, Target, Locality, Metadata, Manager>
    TransferManager<Source, Target, Locality, Metadata>
    for TransferBatcher<Source, Target, Locality, Metadata, Manager>
381
where
Ryan Olson's avatar
Ryan Olson committed
382
383
384
    Source: Storage + 'static,
    Target: Storage + 'static,
    Locality: LocalityProvider + 'static,
385
    Metadata: BlockMetadata,
Ryan Olson's avatar
Ryan Olson committed
386
    Manager: TransferManager<Source, Target, Locality, Metadata>,
387
388
389
{
    async fn enqueue_transfer(
        &self,
Ryan Olson's avatar
Ryan Olson committed
390
        pending_transfer: PendingTransfer<Source, Target, Locality, Metadata>,
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
    ) -> Result<()> {
        // If it's smaller than the max batch size, just enqueue it.
        if pending_transfer.sources.len() < self.max_transfer_batch_size {
            return self
                .transfer_manager
                .enqueue_transfer(pending_transfer)
                .await;
        }

        // Otherwise, we need to split the transfer into multiple smaller transfers.

        let PendingTransfer {
            mut sources,
            mut targets,
            completion_indicator,
            target_pool,
        } = pending_transfer;

        let mut indicators = Vec::new();

        while !sources.is_empty() {
            let sources = sources
                .drain(..std::cmp::min(self.max_transfer_batch_size, sources.len()))
                .collect();
            let targets = targets
                .drain(..std::cmp::min(self.max_transfer_batch_size, targets.len()))
                .collect();

            // If we have a completion indicator, we need to create a new one for each sub-transfer.
            let indicator = if completion_indicator.is_some() {
                let (batch_tx, batch_rx) = oneshot::channel();
                indicators.push(batch_rx);
                Some(batch_tx)
            } else {
                None
            };

            let request = PendingTransfer::new(sources, targets, indicator, target_pool.clone());
            // Enqueue our reduced transfer. This may block if the queue is full.
            self.transfer_manager.enqueue_transfer(request).await?;
        }

        if let Some(completion_indicator) = completion_indicator {
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
            CriticalTaskExecutionHandle::new_with_runtime(
                move |cancel_token| async move {
                    let mut results = Vec::new();

                    for indicator in indicators.into_iter() {
                        // Await each sub-transfer, and append the results to our final results.
                        tokio::select! {
                            _ = cancel_token.cancelled() => {
                                return Ok(());
                            }

                            Ok(indicator) = indicator => {
                                let result = match indicator {
                                    Ok(result) => result,
                                    Err(e) => {
                                        tracing::error!("Error receiving transfer results: {:?}", e);
Ryan Olson's avatar
Ryan Olson committed
450
                                        let _ = completion_indicator.send(Err(e));
451
452
453
454
455
                                        return Ok(());
                                    }
                                };
                                results.extend(result);
                            }
456
                        }
457
458
459
                    }

                    // Send the final results to the top-level completion indicator.
Ryan Olson's avatar
Ryan Olson committed
460
                    let _ = completion_indicator.send(Ok(results));
461

462
463
464
465
466
467
                    Ok(())
                },
                self.cancellation_token.clone(),
                "Transfer Batcher",
                &self.runtime,
            )?.detach();
468
469
470
471
472
        }

        Ok(())
    }
}