external.rs 15.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;

use crate::{BlockId, KvbmSequenceHashProvider, SequenceHash};
use dynamo_tokens::TokenBlock;

use super::super::store::BlockStore;
use crate::sequence::BlockSequenceError;

/// Per-tier block_id tracking with an offset into the sequence.
///
/// Maintains an ordered mapping of `BlockId` → `SequenceHash` for assigned blocks,
/// a staging area for blocks whose hashes have been computed but not yet committed,
/// plus a FIFO queue of block_ids waiting for assignment. Index `i` in the assigned
/// map corresponds to sequence position `offset + i`.
///
/// The three-phase lifecycle is:
/// - **Unassigned** — block_ids queued for assignment (no hash yet).
/// - **Staged** — block_ids paired with their `SequenceHash` but not yet committed.
/// - **Assigned** — committed `BlockId → SequenceHash` pairs in positional order.
///
/// Multiple `ExternalBlockAssignments` instances can operate on the same `&[TokenBlock]` at
/// different offsets (multi-tier).
pub struct ExternalBlockAssignments {
    store: BlockStore<(), SequenceHash, SequenceHash>,

    /// Starting position in the sequence. Assignments begin at this position.
    offset: usize,
}

impl std::fmt::Debug for ExternalBlockAssignments {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ExternalBlockAssignments")
            .field("assigned_count", &self.store.assigned_count())
            .field("staged_count", &self.store.staged_count())
            .field("unassigned_count", &self.store.unassigned_count())
            .field("offset", &self.offset)
            .finish()
    }
}

impl ExternalBlockAssignments {
    /// Creates a new `ExternalBlockAssignments` starting at the given offset.
    pub fn new(offset: usize) -> Self {
        Self {
            store: BlockStore::new(),
            offset,
        }
    }

    /// Returns the starting position in the sequence.
    pub fn offset(&self) -> usize {
        self.offset
    }

    /// Checks whether a block_id is known (assigned, staged, or unassigned).
    pub fn contains(&self, block_id: &BlockId) -> bool {
        self.store.contains(block_id)
    }

    /// Positional access: returns `(BlockId, SequenceHash)` at the given index
    /// (relative to offset) in the assigned collection.
    pub fn get_assigned(&self, index: usize) -> Option<(BlockId, SequenceHash)> {
        self.store
            .get_assigned(index)
            .map(|(&id, &hash)| (id, hash))
    }

    /// Returns the number of assigned blocks.
    pub fn assigned_count(&self) -> usize {
        self.store.assigned_count()
    }

    /// Returns the number of staged blocks.
    pub fn staged_count(&self) -> usize {
        self.store.staged_count()
    }

    /// Returns the number of unassigned (pending) block_ids.
    pub fn unassigned_count(&self) -> usize {
        self.store.unassigned_count()
    }

    /// Positional access: returns `(BlockId, SequenceHash)` at the given index
    /// (relative to the start of staged) in the staged collection.
    pub fn get_staged(&self, index: usize) -> Option<(BlockId, SequenceHash)> {
        self.store.get_staged(index).map(|(&id, &hash)| (id, hash))
    }

    /// Iterates over assigned blocks in positional order, yielding `(BlockId, SequenceHash)`.
    pub fn assigned_iter(&self) -> impl Iterator<Item = (BlockId, SequenceHash)> + '_ {
        self.store.assigned_iter().map(|(&id, &hash)| (id, hash))
    }

    /// Iterates over staged blocks in staging order, yielding `(BlockId, SequenceHash)`.
    pub fn staged_iter(&self) -> impl Iterator<Item = (BlockId, SequenceHash)> + '_ {
        self.store.staged_iter().map(|(&id, &hash)| (id, hash))
    }

    /// Iterates over unassigned block_ids in FIFO order.
    pub fn unassigned_iter(&self) -> impl Iterator<Item = BlockId> + '_ {
        self.store.unassigned_iter().map(|(&id, _)| id)
    }

    /// Clears all assigned, staged, and unassigned blocks, preserving the offset.
    pub fn clear(&mut self) {
        self.store.clear();
    }

    /// Takes all staged blocks, returning them as a `Vec`.
    pub fn take_staged(&mut self) -> Vec<(BlockId, SequenceHash)> {
        self.store.take_staged()
    }

    /// Returns the next sequence position to be assigned:
    /// `offset + assigned_count + staged_count`.
    pub fn next_position(&self) -> usize {
        self.offset + self.store.assigned_count() + self.store.staged_count()
    }

    /// Absolute position range of assigned blocks: `offset..offset + assigned_count`.
    pub fn assigned_positions(&self) -> Range<usize> {
        self.offset..self.offset + self.store.assigned_count()
    }

    /// Absolute position range of staged blocks:
    /// `offset + assigned_count .. offset + assigned_count + staged_count`.
    pub fn staged_positions(&self) -> Range<usize> {
        let start = self.offset + self.store.assigned_count();
        start..start + self.store.staged_count()
    }

    /// Get the assigned `(BlockId, SequenceHash)` at an absolute sequence position.
    ///
    /// Returns `None` if `abs_pos` is outside [`assigned_positions()`](Self::assigned_positions).
    pub fn get_at_position(&self, abs_pos: usize) -> Option<(BlockId, SequenceHash)> {
        let relative = abs_pos.checked_sub(self.offset)?;
        self.get_assigned(relative)
    }

    /// Absolute position range that pending (unassigned) blocks will occupy once
    /// flushed: `next_position()..next_position() + unassigned_count()`.
    pub fn pending_positions(&self) -> Range<usize> {
        let start = self.next_position();
        start..start + self.store.unassigned_count()
    }

    /// Get the pending `BlockId` at an absolute sequence position (FIFO order).
    ///
    /// Position `next_position()` maps to the first unassigned block,
    /// `next_position() + 1` to the second, etc.
    /// Returns `None` if `abs_pos` is outside [`pending_positions()`](Self::pending_positions).
    pub fn get_pending_at_position(&self, abs_pos: usize) -> Option<BlockId> {
        let start = self.next_position();
        let relative = abs_pos.checked_sub(start)?;
        self.store.get_unassigned(relative).map(|(&id, _)| id)
    }

    /// Add new block_ids to the unassigned queue.
    ///
    /// `block_ids` is the **full, ordered** list of block IDs allocated to this
    /// assignment set. Known IDs (already in assigned, staged, or unassigned) must
    /// form a contiguous prefix and are silently skipped. New IDs are appended to
    /// the unassigned FIFO queue.
    ///
    /// This method does **not** assign blocks — call
    /// [`assign_pending`](Self::assign_pending) to pair unassigned IDs with
    /// available sequence blocks.
    ///
    /// # Block ID rules
    ///
    /// The list is partitioned into a **known prefix** and a **new suffix**:
    ///
    /// - **Known prefix** — IDs already present in `assigned`, `staged`, or
    ///   `unassigned`. These are silently skipped. They must appear contiguously
    ///   at the front of the list; interleaving a known ID after an unknown one
    ///   is an [`OrderingViolation`](BlockSequenceError::OrderingViolation).
    /// - **New suffix** — IDs not yet seen. These are appended (in order) to
    ///   the unassigned FIFO queue.
    ///
    /// # Algorithm (two-phase, atomic)
    ///
    /// 1. **Validate & collect** — iterate `block_ids`. Known IDs must form a
    ///    contiguous prefix (skip them). Unknown IDs are collected into a temp
    ///    buffer. If a known ID appears after an unknown one →
    ///    `OrderingViolation` error. No state is mutated until validation passes.
    /// 2. **Commit** — push all new IDs to the unassigned queue.
    pub fn extend_block_ids(
        &mut self,
        block_ids: impl IntoIterator<Item = BlockId>,
    ) -> Result<(), BlockSequenceError> {
        // Phase 1: Validate & collect
        let mut new_ids = Vec::new();
        let mut new_id_set = indexmap::IndexSet::new();
        let mut first_new_index: Option<usize> = None;

        for (i, id) in block_ids.into_iter().enumerate() {
            if self.contains(&id) {
                // Known ID — must come before any new IDs
                if let Some(first_new) = first_new_index {
                    return Err(BlockSequenceError::OrderingViolation {
                        known_id: id,
                        new_id: new_ids[0],
                        known_index: i,
                        first_new_index: first_new,
                    });
                }
                // Skip — already known
            } else {
                // Unknown ID — collect, rejecting internal duplicates
                if !new_id_set.insert(id) {
                    return Err(BlockSequenceError::DuplicateBlockId { block_id: id });
                }
                if first_new_index.is_none() {
                    first_new_index = Some(i);
                }
                new_ids.push(id);
            }
        }

        // Phase 2: Commit — no errors from here on
        for id in new_ids {
            self.store.insert_unassigned(id, ());
        }

        Ok(())
    }

    /// Inserts pre-matched `(BlockId, SequenceHash)` pairs directly into the
    /// assigned collection.
    ///
    /// This is the entry point for blocks whose hashes are already known (e.g.
    /// cache hits). Two-phase atomic: collects all items, validates no duplicate
    /// BlockIds across all three collections, then commits to assigned.
    pub fn extend_assigned(
        &mut self,
        items: impl IntoIterator<Item = (BlockId, SequenceHash)>,
    ) -> Result<usize, BlockSequenceError> {
        let items: Vec<(BlockId, SequenceHash)> = items.into_iter().collect();

        if let Err(block_id) = self
            .store
            .validate_no_duplicates(items.iter().map(|(id, _)| *id), items.len())
        {
            return Err(BlockSequenceError::DuplicateBlockId { block_id });
        }

        let count = items.len();
        for (id, hash) in items {
            self.store.insert_assigned(id, hash);
        }

        Ok(count)
    }

    /// FIFO drain from unassigned into staged, pairing each block_id with the
    /// sequence hash from the corresponding `TokenBlock`.
    ///
    /// Staging starts at `sequence_blocks[self.next_position()]` and proceeds
    /// forward, consuming one unassigned ID per available block. The loop stops
    /// when either the unassigned queue is empty or there are no more sequence
    /// blocks.
    ///
    /// Returns the range of newly staged indices (relative to the start of the
    /// staged collection before this call).
    ///
    /// Each staged pair is validated: the position embedded in the block's
    /// `kvbm_sequence_hash()` must equal the expected sequence index.
    /// A mismatch returns [`BlockSequenceError::PositionMismatch`].
    pub fn stage_pending(
        &mut self,
        sequence_blocks: &[TokenBlock],
    ) -> Result<Range<usize>, BlockSequenceError> {
        let staged_start_idx = self.store.staged_count();
        let start_pos = self.next_position();

        // How many sequence blocks are available starting from our next position?
        let available_blocks = sequence_blocks.len().saturating_sub(start_pos);

        // How many can we stage? Min of available blocks and unassigned count.
        let to_stage = available_blocks.min(self.store.unassigned_count());

        // Phase 1: Validate all positions before mutating
        for i in 0..to_stage {
            let seq_pos = start_pos + i;
            let block = &sequence_blocks[seq_pos];
            let hash = block.kvbm_sequence_hash();

            let actual_pos = hash.position();
            if actual_pos != seq_pos as u64 {
                let block_id = self.store.get_unassigned(i).map(|(&id, _)| id).unwrap();
                return Err(BlockSequenceError::PositionMismatch {
                    expected: seq_pos,
                    actual: actual_pos,
                    block_id,
                });
            }
        }

        // Phase 2: Commit — no errors from here on
        for i in 0..to_stage {
            let seq_pos = start_pos + i;
            let hash = sequence_blocks[seq_pos].kvbm_sequence_hash();
            let (block_id, _) = self.store.shift_unassigned().unwrap();
            self.store.insert_staged(block_id, hash);
        }

        let staged_end_idx = self.store.staged_count();
        Ok(staged_start_idx..staged_end_idx)
    }

    /// Moves all staged blocks into assigned (infallible).
    ///
    /// Returns the range of newly assigned indices (relative to the start of
    /// the assigned collection before this call).
    pub fn commit_staged(&mut self) -> Range<usize> {
        let start_idx = self.store.assigned_count();

        while let Some((block_id, hash)) = self.store.shift_staged() {
            self.store.insert_assigned(block_id, hash);
        }

        let end_idx = self.store.assigned_count();
        start_idx..end_idx
    }

    /// Drain the unassigned FIFO queue into assigned, pairing each block_id
    /// with the next available `TokenBlock` starting at `next_position()`.
    ///
    /// This is a convenience method equivalent to calling
    /// [`stage_pending`](Self::stage_pending) followed by
    /// [`commit_staged`](Self::commit_staged).
    ///
    /// Returns the range of newly assigned indices (relative to offset).
    /// An empty range means no new assignments were made.
    pub fn assign_pending(
        &mut self,
        sequence_blocks: &[TokenBlock],
    ) -> Result<Range<usize>, BlockSequenceError> {
        self.stage_pending(sequence_blocks)?;
        Ok(self.commit_staged())
    }
}

/// Zip two [`ExternalBlockAssignments`] over their overlapping assigned positions.
///
/// For each absolute position where **both** `a` and `b` have assigned blocks,
/// yields `(position, a_block_id, b_block_id)`.
///
/// Iteration order: ascending position.
pub fn zip_assigned(
    a: &ExternalBlockAssignments,
    b: &ExternalBlockAssignments,
) -> Vec<(usize, BlockId, BlockId)> {
    let a_range = a.assigned_positions();
    let b_range = b.assigned_positions();
    let start = a_range.start.max(b_range.start);
    let end = a_range.end.min(b_range.end);

    let mut result = Vec::new();
    for pos in start..end {
        // Both lookups are guaranteed to succeed within the intersection range.
        let (a_id, _) = a.get_at_position(pos).unwrap();
        let (b_id, _) = b.get_at_position(pos).unwrap();
        result.push((pos, a_id, b_id));
    }
    result
}

/// Zip `src` assigned positions with `dst` pending positions.
///
/// For each absolute position where `src` has an assigned block and `dst`
/// has a pending (unassigned) block, yields `(position, src_block_id, dst_block_id)`.
///
/// This is the onboard/offload planning primitive: the result tells you
/// which source blocks to transfer into which destination blocks.
pub fn zip_assigned_pending(
    src: &ExternalBlockAssignments,
    dst: &ExternalBlockAssignments,
) -> Vec<(usize, BlockId, BlockId)> {
    let src_range = src.assigned_positions();
    let dst_range = dst.pending_positions();
    let start = src_range.start.max(dst_range.start);
    let end = src_range.end.min(dst_range.end);

    let mut result = Vec::new();
    for pos in start..end {
        // Both lookups are guaranteed to succeed within the intersection range.
        let (src_id, _) = src.get_at_position(pos).unwrap();
        let dst_id = dst.get_pending_at_position(pos).unwrap();
        result.push((pos, src_id, dst_id));
    }
    result
}