offload.md 5.18 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# Offload Module

The offload module manages the asynchronous transfer of KV cache blocks between storage tiers. It provides a pipeline-based architecture for evaluating, batching, and executing block transfers with full cancellation support.

## Overview

Offloading moves blocks from a source tier (e.g., GPU memory) to a destination tier (e.g., host memory, remote storage, or object storage). The pipeline ensures:

- **Policy-based filtering**: Only blocks meeting criteria are transferred
- **Batched execution**: Blocks are grouped for efficient transfer
- **Cancellation support**: Transfers can be cancelled at any point before commitment
- **Precondition synchronization**: Transfers wait for forward pass completion

## Pipeline Architecture

```text
┌─────────────────┐     ┌─────────────────────┐     ┌─────────────────────┐     ┌──────────────────┐
│ PolicyEvaluator │────►│ PreconditionAwaiter │────►│       Batcher       │────►│ TransferExecutor │
└─────────────────┘     └─────────────────────┘     └─────────────────────┘     └──────────────────┘
                                                             ▲                          ▲
                                                             │                          │
                                                    CancellableQueue          CancellableQueue
                                                             │                          │
                                                             └──────── CancelSweeper ───┘
```

### Stages

| Stage | Purpose |
|-------|---------|
| **PolicyEvaluator** | Filters blocks based on configured policies (frequency, presence, etc.) |
| **PreconditionAwaiter** | Waits for forward pass completion before proceeding |
| **Batcher** | Groups containers into batches based on total block count |
| **TransferExecutor** | Upgrades blocks and executes the actual transfer |

## Container Data Model

The fundamental unit flowing through the pipeline is an **OffloadContainer**:

```rust,ignore
struct OffloadContainer<T: BlockMetadata> {
    /// The blocks to offload
    blocks: Vec<SourceBlock<T>>,
    /// Precondition event (forward pass completion)
    precondition: Option<EventHandle>,
    /// Cancellation token
    cancel_token: CancellationToken,
}
```

Containers are grouped into batches for efficient transfer:

```rust,ignore
struct OffloadBatch<T: BlockMetadata> {
    /// Multiple containers, each independently cancellable
    containers: Vec<OffloadContainer<T>>,
}
```


### P1: Container is the Unit of Cancellation

Individual blocks within a container are not independently cancellable. When a container is cancelled, all its blocks are cancelled together.

### P2: Token Travels with Container

Each container carries its own `CancellationToken`, cloned from the `TransferHandle` at enqueue time. The token travels with the container through all pipeline stages until upgrade.

### P3: Upgrade is the Commitment Boundary

The upgrade step (Weak → Strong) is the point of no return:

- **Before upgrade**: Containers can be cancelled via sweep or token check
- **After upgrade**: We own the blocks; cancellation no longer applies

### P4: Sweep Before Upgrade

The last cancellation check occurs immediately before upgrade. The `TransferExecutor` calls `batch.sweep_cancelled()` to remove cancelled containers before committing.

### P5: Flat Map After Upgrade

After upgrade, all blocks from all containers are consolidated into a single `Vec<ImmutableBlock<T>>` for efficient batch transfer. Per-container identity is lost at this point.

### P6: PreconditionAwaiter Uses Select

The precondition awaiter can be cancelled via `select!` on both the precondition event and the cancellation token. If cancelled while waiting, the container is dropped immediately.

## Configuration

Pipeline behavior is controlled via `PipelineConfig`:

| Option | Default | Description |
|--------|---------|-------------|
| `batch_config.max_batch_size` | 64 | Maximum blocks per batch |
| `batch_config.min_batch_size` | 8 | Minimum blocks before flush |
| `batch_config.flush_interval` | 10ms | Time before flushing partial batch |
| `policy_timeout` | 100ms | Timeout for policy evaluation |
| `sweep_interval` | 10ms | Interval for cancel sweeper |
| `max_concurrent_transfers` | 1 | Concurrent transfer batches |

## Usage

### Enqueueing Blocks

```rust,ignore
let handle = pipeline.enqueue(source_blocks, precondition_event);

// Track progress
println!("Status: {:?}", handle.status());

// Wait for completion
let result = handle.wait().await?;
```

### Cancelling a Transfer

```rust,ignore
// Request cancellation and wait for confirmation
handle.cancel().await;
// All blocks are now released
```

## Related Documentation

- [offload-developer.md](offload-developer.md) - Implementation details and extension rules