engine.rs 25.4 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Main offload engine coordinating pipelines.
//!
//! The `OffloadEngine` is a standalone component that manages block offloading
//! between storage tiers (G1→G2, G2→G3, G2→G4).
//!
//! # Example
//! ```ignore
//! let engine = OffloadEngine::builder(leader.clone())
//!     .with_registry(registry.clone())
//!     .with_g1_to_g2_pipeline(
//!         PipelineBuilder::<G1, G2>::new()
//!             .policy(Arc::new(PresenceFilter::new(registry.clone())))
//!             .batch_size(32)
//!             .auto_chain(true)
//!             .build()
//!     )
//!     .with_g2_to_g3_pipeline(
//!         PipelineBuilder::<G2, G3>::new()
//!             .policy(Arc::new(PresenceAndLFUFilter::with_default_threshold(registry.clone())))
//!             .batch_size(64)
//!             .build()
//!     )
//!     .build()?;
//!
//! let handle = engine.enqueue_g2_to_g3(blocks);
//! handle.wait().await?;
//! ```

use std::sync::Arc;

use anyhow::Result;
use dashmap::DashMap;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

use crate::leader::InstanceLeader;
use crate::object::ObjectBlockOps;
use crate::worker::RemoteDescriptor;
use crate::{BlockId, G1, G2, G3, SequenceHash};
use kvbm_common::LogicalLayoutHandle;
use kvbm_logical::blocks::{BlockMetadata, BlockRegistry, WeakBlock};
use kvbm_logical::manager::BlockManager;
use kvbm_physical::transfer::{PhysicalLayout, TransferOptions};

use super::handle::{TransferHandle, TransferId, TransferState};
use super::pipeline::{
    ChainOutput, ChainOutputRx, ObjectPipeline, ObjectPipelineConfig, Pipeline, PipelineConfig,
    PipelineInput,
};
use super::queue::CancellableQueue;
use super::source::SourceBlocks;

/// Central coordinator for offload pipelines.
///
/// The engine manages multiple pipelines (G1→G2, G2→G3, G2→G4) and provides
/// a unified interface for enqueueing blocks for offload.
///
/// # Storage Tier Model
///
/// - G1→G2: `BlockManager<G2>` destination (host memory)
/// - G2→G3: `BlockManager<G3>` destination (disk/NVMe)
/// - G2→G4: ObjectBlockOps destination (object storage like S3)
///
/// # Distributed G2→G4 Offloading
///
/// For distributed setups where the leader doesn't have physical layouts (only workers do),
/// use `with_enable_remote_g4(true)` instead of `with_g2_to_g4_pipeline()`. This enables
/// remote G4 offloading where workers execute object storage uploads via their local
/// `ObjectBlockOps` implementations.
#[allow(dead_code)]
pub struct OffloadEngine {
    /// Reference to the instance leader for transfers
    leader: Arc<InstanceLeader>,
    /// Block registry for policy evaluation
    registry: Arc<BlockRegistry>,
    /// G1→G2 pipeline (BlockManager destination)
    g1_to_g2: Option<Pipeline<G1, G2>>,
    /// G2→G3 pipeline (BlockManager destination)
    g2_to_g3: Option<Pipeline<G2, G3>>,
    /// G2→G4 pipeline (Object storage destination) - for local mode only
    g2_to_g4: Option<ObjectPipeline<G2>>,
    /// Active transfer tracking
    transfers: Arc<DashMap<TransferId, Arc<std::sync::Mutex<TransferState>>>>,
    /// Chain router task handle (routes G1→G2 output to downstream pipelines)
    _chain_router_handle: Option<JoinHandle<()>>,
    /// Remote G4 offload task handle (for distributed mode)
    _remote_g4_offload_handle: Option<JoinHandle<()>>,
}

impl OffloadEngine {
    /// Create a new builder for the offload engine.
    pub fn builder(leader: Arc<InstanceLeader>) -> OffloadEngineBuilder {
        OffloadEngineBuilder::new(leader)
    }

    /// Enqueue blocks for G1→G2 offload.
    ///
    /// Returns a `TransferHandle` for tracking progress and cancellation.
    pub fn enqueue_g1_to_g2(&self, blocks: impl Into<SourceBlocks<G1>>) -> Result<TransferHandle> {
        let pipeline = self
            .g1_to_g2
            .as_ref()
            .ok_or_else(|| anyhow::anyhow!("G1→G2 pipeline not configured"))?;

        self.enqueue_to_pipeline(pipeline, blocks.into())
    }

    /// Enqueue blocks for G1→G2 offload with a precondition event.
    ///
    /// The precondition event must be satisfied before the batch is processed
    /// by the transfer executor. This enables coordination with worker forward passes.
    ///
    /// Returns a `TransferHandle` for tracking progress and cancellation.
    pub fn enqueue_g1_to_g2_with_precondition(
        &self,
        blocks: impl Into<SourceBlocks<G1>>,
        precondition: Option<velo::EventHandle>,
    ) -> Result<TransferHandle> {
        let pipeline = self
            .g1_to_g2
            .as_ref()
            .ok_or_else(|| anyhow::anyhow!("G1→G2 pipeline not configured"))?;

        self.enqueue_to_pipeline_with_precondition(pipeline, blocks.into(), precondition)
    }

    /// Enqueue blocks for G2→G3 offload.
    ///
    /// Returns a `TransferHandle` for tracking progress and cancellation.
    pub fn enqueue_g2_to_g3(&self, blocks: impl Into<SourceBlocks<G2>>) -> Result<TransferHandle> {
        let pipeline = self
            .g2_to_g3
            .as_ref()
            .ok_or_else(|| anyhow::anyhow!("G2→G3 pipeline not configured"))?;

        self.enqueue_to_pipeline(pipeline, blocks.into())
    }

    /// Enqueue blocks for G2→G4 offload (object storage).
    ///
    /// Returns a `TransferHandle` for tracking progress and cancellation.
    pub fn enqueue_g2_to_g4(&self, blocks: impl Into<SourceBlocks<G2>>) -> Result<TransferHandle> {
        let pipeline = self
            .g2_to_g4
            .as_ref()
            .ok_or_else(|| anyhow::anyhow!("G2→G4 pipeline not configured"))?;

        self.enqueue_to_object_pipeline(pipeline, blocks.into())
    }

    /// Create transfer state, store it, and return the components needed for enqueueing.
    fn create_transfer<T: BlockMetadata>(
        &self,
        source: &SourceBlocks<T>,
    ) -> (
        TransferId,
        Arc<std::sync::Mutex<TransferState>>,
        TransferHandle,
    ) {
        let input_block_ids = self.extract_block_ids(source);
        let transfer_id = TransferId::new();
        let (state, handle) = TransferState::new(transfer_id, input_block_ids);
        let state = Arc::new(std::sync::Mutex::new(state));
        self.transfers.insert(transfer_id, state.clone());
        (transfer_id, state, handle)
    }

    /// Internal: enqueue to a specific pipeline.
    fn enqueue_to_pipeline<Src: BlockMetadata, Dst: BlockMetadata>(
        &self,
        pipeline: &Pipeline<Src, Dst>,
        source: SourceBlocks<Src>,
    ) -> Result<TransferHandle> {
        let (transfer_id, state, handle) = self.create_transfer(&source);
        if !pipeline.enqueue(transfer_id, source, state) {
            tracing::warn!("Transfer {} was cancelled before enqueueing", transfer_id);
        }
        Ok(handle)
    }

    /// Internal: enqueue to a specific pipeline with a precondition.
    fn enqueue_to_pipeline_with_precondition<Src: BlockMetadata, Dst: BlockMetadata>(
        &self,
        pipeline: &Pipeline<Src, Dst>,
        source: SourceBlocks<Src>,
        precondition: Option<velo::EventHandle>,
    ) -> Result<TransferHandle> {
        let (transfer_id, state, handle) = self.create_transfer(&source);
        state.lock().unwrap().precondition = precondition;
        if !pipeline.enqueue(transfer_id, source, state) {
            tracing::warn!("Transfer {} was cancelled before enqueueing", transfer_id);
        }
        Ok(handle)
    }

    /// Internal: enqueue to an object pipeline (G2→G4).
    fn enqueue_to_object_pipeline(
        &self,
        pipeline: &ObjectPipeline<G2>,
        source: SourceBlocks<G2>,
    ) -> Result<TransferHandle> {
        let (transfer_id, state, handle) = self.create_transfer(&source);
        if !pipeline.enqueue(transfer_id, source, state) {
            tracing::warn!("Transfer {} was cancelled before enqueueing", transfer_id);
        }
        Ok(handle)
    }

    /// Extract block IDs from source blocks.
    ///
    /// For External/Strong blocks, returns the known block IDs.
    /// For Weak blocks, returns empty vec (IDs determined at upgrade time).
    fn extract_block_ids<T: BlockMetadata>(&self, source: &SourceBlocks<T>) -> Vec<BlockId> {
        match source {
            SourceBlocks::External(blocks) => blocks.iter().map(|b| b.block_id).collect(),
            SourceBlocks::Strong(blocks) => blocks.iter().map(|b| b.block_id()).collect(),
            SourceBlocks::Weak(_) => Vec::new(), // IDs not available without upgrade
        }
    }

    /// Release a completed transfer's resources.
    ///
    /// This is optional - transfers are automatically cleaned up,
    /// but call this to release resources earlier.
    pub fn release_transfer(&self, transfer_id: TransferId) {
        self.transfers.remove(&transfer_id);
    }

    /// Get the number of active transfers.
    pub fn active_transfer_count(&self) -> usize {
        self.transfers.len()
    }

    /// Check if G1→G2 pipeline is configured.
    pub fn has_g1_to_g2(&self) -> bool {
        self.g1_to_g2.is_some()
    }

    /// Check if G2→G3 pipeline is configured.
    pub fn has_g2_to_g3(&self) -> bool {
        self.g2_to_g3.is_some()
    }

    /// Check if G2→G4 pipeline is configured.
    pub fn has_g2_to_g4(&self) -> bool {
        self.g2_to_g4.is_some()
    }
}

/// Builder for OffloadEngine.
pub struct OffloadEngineBuilder {
    leader: Arc<InstanceLeader>,
    registry: Option<Arc<BlockRegistry>>,
    g1_manager: Option<Arc<BlockManager<G1>>>,
    g2_manager: Option<Arc<BlockManager<G2>>>,
    g3_manager: Option<Arc<BlockManager<G3>>>,
    /// Object storage operations for G4 (replaces `BlockManager<G4>`)
    object_ops: Option<Arc<dyn ObjectBlockOps>>,
    /// G2 physical layout for object transfers (needed by ObjectTransferExecutor)
    g2_physical_layout: Option<PhysicalLayout>,
    g1_to_g2_config: Option<PipelineConfig<G1, G2>>,
    g2_to_g3_config: Option<PipelineConfig<G2, G3>>,
    /// G2→G4 uses ObjectPipelineConfig (no destination BlockManager)
    g2_to_g4_config: Option<ObjectPipelineConfig<G2>>,
    /// Optional runtime handle override (defaults to leader.runtime())
    runtime: Option<tokio::runtime::Handle>,
    /// Enable remote G4 offloading via workers' ObjectBlockOps (for distributed mode)
    enable_remote_g4: bool,
}

impl OffloadEngineBuilder {
    /// Create a new builder with the given instance leader.
    pub fn new(leader: Arc<InstanceLeader>) -> Self {
        Self {
            leader,
            registry: None,
            g1_manager: None,
            g2_manager: None,
            g3_manager: None,
            object_ops: None,
            g2_physical_layout: None,
            g1_to_g2_config: None,
            g2_to_g3_config: None,
            g2_to_g4_config: None,
            runtime: None,
            enable_remote_g4: false,
        }
    }

    /// Set an explicit runtime handle for spawning pipeline tasks.
    ///
    /// If not set, defaults to `leader.runtime()`. Use this when you need
    /// pipeline tasks to run on a specific runtime (e.g., in tests).
    pub fn with_runtime(mut self, runtime: tokio::runtime::Handle) -> Self {
        self.runtime = Some(runtime);
        self
    }

    /// Set the block registry.
    pub fn with_registry(mut self, registry: Arc<BlockRegistry>) -> Self {
        self.registry = Some(registry);
        self
    }

    /// Set the G1 block manager.
    pub fn with_g1_manager(mut self, manager: Arc<BlockManager<G1>>) -> Self {
        self.g1_manager = Some(manager);
        self
    }

    /// Set the G2 block manager.
    pub fn with_g2_manager(mut self, manager: Arc<BlockManager<G2>>) -> Self {
        self.g2_manager = Some(manager);
        self
    }

    /// Set the G3 block manager.
    pub fn with_g3_manager(mut self, manager: Arc<BlockManager<G3>>) -> Self {
        self.g3_manager = Some(manager);
        self
    }

    /// Set object storage operations for G4.
    ///
    /// G4 is object storage (S3, MinIO, etc.) and uses `ObjectBlockOps`
    /// instead of a `BlockManager`. This replaces `with_g4_manager`.
    pub fn with_object_ops(mut self, object_ops: Arc<dyn ObjectBlockOps>) -> Self {
        self.object_ops = Some(object_ops);
        self
    }

    /// Set the G2 physical layout for object transfers.
    ///
    /// Required when using G2→G4 pipeline. The ObjectTransferExecutor needs
    /// the physical layout to read block data for upload to object storage.
    pub fn with_g2_physical_layout(mut self, layout: PhysicalLayout) -> Self {
        self.g2_physical_layout = Some(layout);
        self
    }

    /// Configure G1→G2 pipeline.
    pub fn with_g1_to_g2_pipeline(mut self, config: PipelineConfig<G1, G2>) -> Self {
        self.g1_to_g2_config = Some(config);
        self
    }

    /// Configure G2→G3 pipeline.
    pub fn with_g2_to_g3_pipeline(mut self, config: PipelineConfig<G2, G3>) -> Self {
        self.g2_to_g3_config = Some(config);
        self
    }

    /// Configure G2→G4 pipeline (object storage).
    ///
    /// Uses `ObjectPipelineConfig` instead of `PipelineConfig` since G4
    /// is object storage, not a BlockManager destination.
    ///
    /// For distributed setups where the leader doesn't have physical layouts,
    /// use `with_enable_remote_g4(true)` instead.
    pub fn with_g2_to_g4_pipeline(mut self, config: ObjectPipelineConfig<G2>) -> Self {
        self.g2_to_g4_config = Some(config);
        self
    }

    /// Enable remote G4 offloading via workers' ObjectBlockOps.
    ///
    /// In distributed setups, the leader doesn't have physical layouts (only workers do).
    /// This enables G2→G4 offloading where:
    /// 1. G1→G2 chain output is routed to a remote offload task
    /// 2. The task calls workers' ObjectBlockOps::put_blocks() via RPC
    /// 3. Workers upload blocks from their local G2 to object storage
    /// 4. Per-block results are returned and logged
    ///
    /// This is mutually exclusive with `with_g2_to_g4_pipeline()` - use one or the other.
    pub fn with_enable_remote_g4(mut self, enable: bool) -> Self {
        self.enable_remote_g4 = enable;
        self
    }

    /// Build the offload engine.
    pub fn build(self) -> Result<OffloadEngine> {
        let registry = self
            .registry
            .ok_or_else(|| anyhow::anyhow!("Block registry required"))?;

        // Get the runtime handle for spawning background tasks
        // Use explicit override if provided, otherwise get from leader
        let runtime = self.runtime.unwrap_or_else(|| self.leader.runtime());

        // Build G1→G2 pipeline if configured
        // Note: G1 is externally owned (vLLM GPU cache), so no G1 manager needed.
        // Pipeline works with ExternalBlock<G1> which contains block_id + sequence_hash.
        let mut g1_to_g2 = if let Some(config) = self.g1_to_g2_config {
            let g2_manager = self
                .g2_manager
                .clone()
                .ok_or_else(|| anyhow::anyhow!("G2 manager required for G1→G2 pipeline"))?;

            Some(Pipeline::new(
                config,
                registry.clone(),
                g2_manager,
                self.leader.clone(),
                LogicalLayoutHandle::G1,
                LogicalLayoutHandle::G2,
                runtime.clone(),
            ))
        } else {
            None
        };

        // Build G2→G3 pipeline if configured
        let g2_to_g3 = if let Some(config) = self.g2_to_g3_config {
            let g3_manager = self
                .g3_manager
                .ok_or_else(|| anyhow::anyhow!("G3 manager required for G2→G3 pipeline"))?;

            Some(Pipeline::new(
                config,
                registry.clone(),
                g3_manager,
                self.leader.clone(),
                LogicalLayoutHandle::G2,
                LogicalLayoutHandle::G3,
                runtime.clone(),
            ))
        } else {
            None
        };

        // Build G2→G4 pipeline if configured (object storage destination)
        // Note: For distributed mode, use enable_remote_g4 instead
        let g2_to_g4 = if let Some(config) = self.g2_to_g4_config {
            let object_ops = self
                .object_ops
                .ok_or_else(|| anyhow::anyhow!("ObjectBlockOps required for G2→G4 pipeline"))?;

            // ObjectPipeline takes LogicalLayoutHandle - the ObjectBlockOps implementation
            // resolves this to a physical layout internally
            Some(ObjectPipeline::new(
                config,
                object_ops,
                LogicalLayoutHandle::G2,
                self.leader.clone(),
                runtime.clone(),
            ))
        } else {
            None
        };

        // Create channel for remote G4 offload if enabled
        let (remote_g4_tx, remote_g4_rx) = if self.enable_remote_g4 {
            let (tx, rx) = mpsc::channel::<RemoteG4OffloadRequest>(64);
            (Some(tx), Some(rx))
        } else {
            (None, None)
        };

        // Wire up auto-chaining from G1→G2 to downstream G2→G3/G2→G4 pipelines
        let chain_router_handle = if let Some(ref mut g1_to_g2_pipeline) = g1_to_g2 {
            if g1_to_g2_pipeline.auto_chain() {
                if let Some(chain_rx) = g1_to_g2_pipeline.take_chain_rx() {
                    // Get references to downstream pipeline queues
                    let g2_to_g3_queue = g2_to_g3.as_ref().map(|p| p.eval_queue.clone());
                    let g2_to_g4_queue = g2_to_g4.as_ref().map(|p| p.eval_queue.clone());

                    // Check if we have any downstream target (local pipelines or remote G4)
                    let has_g2_to_g4_local = g2_to_g4_queue.is_some();
                    let has_g2_to_g4_remote = remote_g4_tx.is_some();

                    // Only spawn if there's at least one downstream target
                    if g2_to_g3_queue.is_some() || has_g2_to_g4_local || has_g2_to_g4_remote {
                        tracing::debug!(
                            has_g2_to_g3 = g2_to_g3_queue.is_some(),
                            has_g2_to_g4_local,
                            has_g2_to_g4_remote,
                            "Spawning chain router for G1→G2 auto-chaining"
                        );
                        Some(runtime.spawn(chain_router_task(
                            chain_rx,
                            g2_to_g3_queue,
                            g2_to_g4_queue,
                            remote_g4_tx,
                        )))
                    } else {
                        tracing::debug!(
                            "G1→G2 auto_chain enabled but no downstream pipelines configured"
                        );
                        None
                    }
                } else {
                    None
                }
            } else {
                None
            }
        } else {
            None
        };

        // Spawn remote G4 offload task if enabled
        let remote_g4_offload_handle = if let Some(rx) = remote_g4_rx {
            tracing::info!("Enabling remote G4 offload via workers' ObjectBlockOps");
            Some(runtime.spawn(remote_g4_offload_task(rx, self.leader.clone())))
        } else {
            None
        };

        Ok(OffloadEngine {
            leader: self.leader,
            registry,
            g1_to_g2,
            g2_to_g3,
            g2_to_g4,
            transfers: Arc::new(DashMap::new()),
            _chain_router_handle: chain_router_handle,
            _remote_g4_offload_handle: remote_g4_offload_handle,
        })
    }
}

/// Request for remote G4 offload (distributed mode).
///
/// Contains the information needed to call workers' ObjectBlockOps::put_blocks().
struct RemoteG4OffloadRequest {
    /// Transfer ID for tracking
    transfer_id: TransferId,
    /// Sequence hashes (keys for object storage)
    keys: Vec<SequenceHash>,
    /// Block IDs in G2 layout
    block_ids: Vec<BlockId>,
}

/// Routes chain output from G1→G2 to downstream G2→G3/G2→G4 pipelines.
///
/// Blocks are converted to WeakBlocks for best-effort offloading - if they're
/// evicted before the downstream pipeline processes them, that's acceptable.
/// This enables graceful degradation under memory pressure.
async fn chain_router_task(
    mut chain_rx: ChainOutputRx<G2>,
    g2_to_g3_queue: Option<Arc<CancellableQueue<PipelineInput<G2>>>>,
    g2_to_g4_queue: Option<Arc<CancellableQueue<PipelineInput<G2>>>>,
    remote_g4_tx: Option<mpsc::Sender<RemoteG4OffloadRequest>>,
) {
    while let Some(output) = chain_rx.recv().await {
        let ChainOutput {
            transfer_id,
            blocks,
            state,
        } = output;

        if blocks.is_empty() {
            continue;
        }

        // Convert strong blocks to weak blocks for best-effort downstream processing
        // This allows blocks to be evicted if memory pressure requires it
        let weak_blocks: Vec<WeakBlock<G2>> =
            blocks.iter().map(|block| block.downgrade()).collect();

        // Extract sequence hashes and block IDs for remote G4 offload before dropping
        let remote_g4_data: Option<(Vec<SequenceHash>, Vec<BlockId>)> = if remote_g4_tx.is_some() {
            Some((
                blocks.iter().map(|b| b.sequence_hash()).collect(),
                blocks.iter().map(|b| b.block_id()).collect(),
            ))
        } else {
            None
        };

        // Drop strong references - blocks can now be evicted if needed
        drop(blocks);

        tracing::debug!(
            %transfer_id,
            num_blocks = weak_blocks.len(),
            "Routing chain output to downstream pipelines as WeakBlocks"
        );

        // Enqueue to G2→G3 if available
        if let Some(ref queue) = g2_to_g3_queue {
            let input = PipelineInput {
                transfer_id,
                source: SourceBlocks::Weak(weak_blocks.clone()),
                state: state.clone(),
            };
            if !queue.push(transfer_id, input) {
                tracing::debug!(%transfer_id, "G2→G3 chain enqueue skipped (cancelled)");
            }
        }

        // Enqueue to local G2→G4 pipeline if available
        if let Some(ref queue) = g2_to_g4_queue {
            let input = PipelineInput {
                transfer_id,
                source: SourceBlocks::Weak(weak_blocks.clone()),
                state: state.clone(),
            };
            if !queue.push(transfer_id, input) {
                tracing::debug!(%transfer_id, "G2→G4 chain enqueue skipped (cancelled)");
            }
        }

        // Send to remote G4 offload if enabled (distributed mode)
        if let (Some(tx), Some((keys, block_ids))) = (&remote_g4_tx, remote_g4_data) {
            let request = RemoteG4OffloadRequest {
                transfer_id,
                keys,
                block_ids,
            };
            if tx.send(request).await.is_err() {
                tracing::debug!(%transfer_id, "Remote G4 offload channel closed");
            }
        }
    }

    tracing::debug!("Chain router task shutting down");
}

/// Task that processes remote G4 offload requests.
///
/// In distributed mode, this task receives requests from the chain router
/// and calls workers' ObjectBlockOps to upload blocks to object storage.
/// Uses execute_remote_offload with RemoteDescriptor::Object to coordinate
/// workers uploading their local G2 data to S3.
async fn remote_g4_offload_task(
    mut rx: mpsc::Receiver<RemoteG4OffloadRequest>,
    leader: Arc<InstanceLeader>,
) {
    tracing::info!("Remote G4 offload task started");

    while let Some(request) = rx.recv().await {
        let num_blocks = request.keys.len();
        tracing::debug!(
            %request.transfer_id,
            num_blocks,
            "Processing remote G4 offload request"
        );

        // Use the leader's execute_remote_offload with RemoteDescriptor::Object
        // This coordinates all workers to upload from their local G2 to object storage
        let result = leader.execute_remote_offload(
            LogicalLayoutHandle::G2, // Source is G2 (host memory)
            RemoteDescriptor::Object {
                keys: request.keys.clone(),
            },
            request.block_ids.clone(),
            TransferOptions::default(),
        );

        match result {
            Ok(notification) => {
                // Wait for all workers to complete
                match notification.await {
                    Ok(()) => {
                        tracing::info!(
                            %request.transfer_id,
                            num_blocks,
                            "Remote G4 offload completed successfully"
                        );
                    }
                    Err(e) => {
                        tracing::warn!(
                            %request.transfer_id,
                            num_blocks,
                            error = %e,
                            "Remote G4 offload failed"
                        );
                    }
                }
            }
            Err(e) => {
                tracing::warn!(
                    %request.transfer_id,
                    num_blocks,
                    error = %e,
                    "Failed to initiate remote G4 offload"
                );
            }
        }
    }

    tracing::info!("Remote G4 offload task shutting down");
}

#[cfg(test)]
mod tests {
    use super::*;

    // Note: Full tests require complex infrastructure setup (InstanceLeader, BlockManagers, etc.)
    // Basic API tests here.

    #[test]
    fn test_transfer_id_generation() {
        let id1 = TransferId::new();
        let id2 = TransferId::new();
        assert_ne!(id1, id2);
    }
}