"deploy/cloud/vscode:/vscode.git/clone" did not exist on "cb6de94d8e9f884a7f83940c2dc2cbdbd267328b"
Unverified Commit 9ba5f828 authored by Ryan Olson's avatar Ryan Olson Committed by GitHub
Browse files

feat: adding request and scheduled sequence (#6933)


Signed-off-by: default avatarRyan Olson <rolson@nvidia.com>
parent e7bcf651
......@@ -3985,14 +3985,18 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"bincode 2.0.1",
"bytes",
"derive_builder",
"dynamo-tokens",
"futures",
"indexmap 2.13.0",
"lru 0.16.3",
"parking_lot",
"prometheus",
"proptest",
"rand 0.9.2",
"rand_chacha 0.9.0",
"rmp-serde",
"rstest 0.26.1",
"serde",
......@@ -4001,6 +4005,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tracing",
"tracing-subscriber",
"xxhash-rust",
]
......
......@@ -93,6 +93,7 @@ modelexpress-client = { version = "0.2.0" }
modelexpress-common = { version = "0.2.0" }
humantime = { version = "2.2.0" }
indexmap = { version = "2" }
libc = { version = "0.2" }
oneshot = { version = "0.1.13", features = ["std", "async"] }
ordered-float = "4"
......
......@@ -15,8 +15,10 @@ dynamo-tokens = { workspace = true }
anyhow = { workspace = true }
async-stream = "0.3"
bytes = "1.10"
bincode = { version = "2.0.1", features = ["serde", "derive"] }
derive_builder = { workspace = true }
futures = { workspace = true }
indexmap = { workspace = true }
lru = "0.16"
parking_lot = { workspace = true }
prometheus = { workspace = true }
......@@ -34,6 +36,9 @@ testing = []
[dev-dependencies]
proptest = "1.5.0"
rand = { workspace = true }
rand_chacha = "0.9"
rstest = "0.26"
serde_json = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing-subscriber = { workspace = true }
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Composition layer wiring together [`BlockSequence`](crate::BlockSequence),
//! [`LogicalBlockAssignments`](crate::LogicalBlockAssignments), and
//! [`BlockManager`](crate::BlockManager) into higher-level request lifecycle types.
mod request;
mod scheduled;
pub use request::RequestSequence;
pub use scheduled::{
ApplyError, DecodeOutcome, NoopDelegate, SchedulableSequence, SchedulableSequenceBuilder,
ScheduleError, SequenceDelegate, SequenceEvent, SequenceState,
};
This diff is collapsed.
This diff is collapsed.
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
#![doc = include_str!("../README.md")]
//! Logical block lifecycle management for KVBM.
//!
//! This crate provides the core block lifecycle system:
//! - Type-safe state transitions (Reset -> Complete -> Registered)
//! - Block registry with deduplication and attachments
//! - Active/inactive/reset pool management
//! - Event pipeline for distributed coordination
//! - Block manager orchestration
pub mod blocks;
pub mod events;
pub mod integrations;
pub mod manager;
pub mod metrics;
pub mod pools;
pub mod pubsub;
pub mod registry;
pub mod sequence;
pub mod tinylfu;
#[cfg(any(test, feature = "testing"))]
pub mod testing;
use bincode::{Decode, Encode};
use serde::{Deserialize, Serialize};
// Re-export common types and traits
pub use blocks::{
BlockError, BlockMetadata, CompleteBlock, ImmutableBlock, MutableBlock, WeakBlock,
};
pub use integrations::{
ApplyError, DecodeOutcome, NoopDelegate, RequestSequence, SchedulableSequence,
SchedulableSequenceBuilder, ScheduleError, SequenceDelegate, SequenceEvent, SequenceState,
};
pub use manager::BlockManager;
pub use registry::BlockRegistry;
pub use sequence::{
BlockSequence, BlockSequenceError, ExternalBlockAssignments, LogicalBlockAssignmentError,
LogicalBlockAssignments, zip_assigned, zip_assigned_pending,
};
pub type BlockId = usize;
pub type SequenceHash = dynamo_tokens::PositionalLineageHash;
......@@ -34,3 +54,19 @@ impl KvbmSequenceHashProvider for dynamo_tokens::TokenBlock {
self.positional_lineage_hash()
}
}
/// Logical layout handle type encoding the layout ID.
///
/// KVBM manages G1, G2 and G3 layouts directly. G4 is managed by an external service.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Encode, Decode, Serialize, Deserialize)]
pub enum LogicalLayoutHandle {
/// Representation of GPU / Device Memory
G1,
/// Representation of CPU / Host Memory
G2,
/// Representation of Disk Storage
G3,
/// Representation of Blocks held in an external service
/// outside the control of the KVBM system.
G4,
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::ops::Range;
use crate::{BlockId, KvbmSequenceHashProvider, SequenceHash};
use dynamo_tokens::TokenBlock;
use super::super::store::BlockStore;
use crate::sequence::BlockSequenceError;
/// Per-tier block_id tracking with an offset into the sequence.
///
/// Maintains an ordered mapping of `BlockId` → `SequenceHash` for assigned blocks,
/// a staging area for blocks whose hashes have been computed but not yet committed,
/// plus a FIFO queue of block_ids waiting for assignment. Index `i` in the assigned
/// map corresponds to sequence position `offset + i`.
///
/// The three-phase lifecycle is:
/// - **Unassigned** — block_ids queued for assignment (no hash yet).
/// - **Staged** — block_ids paired with their `SequenceHash` but not yet committed.
/// - **Assigned** — committed `BlockId → SequenceHash` pairs in positional order.
///
/// Multiple `ExternalBlockAssignments` instances can operate on the same `&[TokenBlock]` at
/// different offsets (multi-tier).
pub struct ExternalBlockAssignments {
store: BlockStore<(), SequenceHash, SequenceHash>,
/// Starting position in the sequence. Assignments begin at this position.
offset: usize,
}
impl std::fmt::Debug for ExternalBlockAssignments {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExternalBlockAssignments")
.field("assigned_count", &self.store.assigned_count())
.field("staged_count", &self.store.staged_count())
.field("unassigned_count", &self.store.unassigned_count())
.field("offset", &self.offset)
.finish()
}
}
impl ExternalBlockAssignments {
/// Creates a new `ExternalBlockAssignments` starting at the given offset.
pub fn new(offset: usize) -> Self {
Self {
store: BlockStore::new(),
offset,
}
}
/// Returns the starting position in the sequence.
pub fn offset(&self) -> usize {
self.offset
}
/// Checks whether a block_id is known (assigned, staged, or unassigned).
pub fn contains(&self, block_id: &BlockId) -> bool {
self.store.contains(block_id)
}
/// Positional access: returns `(BlockId, SequenceHash)` at the given index
/// (relative to offset) in the assigned collection.
pub fn get_assigned(&self, index: usize) -> Option<(BlockId, SequenceHash)> {
self.store
.get_assigned(index)
.map(|(&id, &hash)| (id, hash))
}
/// Returns the number of assigned blocks.
pub fn assigned_count(&self) -> usize {
self.store.assigned_count()
}
/// Returns the number of staged blocks.
pub fn staged_count(&self) -> usize {
self.store.staged_count()
}
/// Returns the number of unassigned (pending) block_ids.
pub fn unassigned_count(&self) -> usize {
self.store.unassigned_count()
}
/// Positional access: returns `(BlockId, SequenceHash)` at the given index
/// (relative to the start of staged) in the staged collection.
pub fn get_staged(&self, index: usize) -> Option<(BlockId, SequenceHash)> {
self.store.get_staged(index).map(|(&id, &hash)| (id, hash))
}
/// Iterates over assigned blocks in positional order, yielding `(BlockId, SequenceHash)`.
pub fn assigned_iter(&self) -> impl Iterator<Item = (BlockId, SequenceHash)> + '_ {
self.store.assigned_iter().map(|(&id, &hash)| (id, hash))
}
/// Iterates over staged blocks in staging order, yielding `(BlockId, SequenceHash)`.
pub fn staged_iter(&self) -> impl Iterator<Item = (BlockId, SequenceHash)> + '_ {
self.store.staged_iter().map(|(&id, &hash)| (id, hash))
}
/// Iterates over unassigned block_ids in FIFO order.
pub fn unassigned_iter(&self) -> impl Iterator<Item = BlockId> + '_ {
self.store.unassigned_iter().map(|(&id, _)| id)
}
/// Clears all assigned, staged, and unassigned blocks, preserving the offset.
pub fn clear(&mut self) {
self.store.clear();
}
/// Takes all staged blocks, returning them as a `Vec`.
pub fn take_staged(&mut self) -> Vec<(BlockId, SequenceHash)> {
self.store.take_staged()
}
/// Returns the next sequence position to be assigned:
/// `offset + assigned_count + staged_count`.
pub fn next_position(&self) -> usize {
self.offset + self.store.assigned_count() + self.store.staged_count()
}
/// Absolute position range of assigned blocks: `offset..offset + assigned_count`.
pub fn assigned_positions(&self) -> Range<usize> {
self.offset..self.offset + self.store.assigned_count()
}
/// Absolute position range of staged blocks:
/// `offset + assigned_count .. offset + assigned_count + staged_count`.
pub fn staged_positions(&self) -> Range<usize> {
let start = self.offset + self.store.assigned_count();
start..start + self.store.staged_count()
}
/// Get the assigned `(BlockId, SequenceHash)` at an absolute sequence position.
///
/// Returns `None` if `abs_pos` is outside [`assigned_positions()`](Self::assigned_positions).
pub fn get_at_position(&self, abs_pos: usize) -> Option<(BlockId, SequenceHash)> {
let relative = abs_pos.checked_sub(self.offset)?;
self.get_assigned(relative)
}
/// Absolute position range that pending (unassigned) blocks will occupy once
/// flushed: `next_position()..next_position() + unassigned_count()`.
pub fn pending_positions(&self) -> Range<usize> {
let start = self.next_position();
start..start + self.store.unassigned_count()
}
/// Get the pending `BlockId` at an absolute sequence position (FIFO order).
///
/// Position `next_position()` maps to the first unassigned block,
/// `next_position() + 1` to the second, etc.
/// Returns `None` if `abs_pos` is outside [`pending_positions()`](Self::pending_positions).
pub fn get_pending_at_position(&self, abs_pos: usize) -> Option<BlockId> {
let start = self.next_position();
let relative = abs_pos.checked_sub(start)?;
self.store.get_unassigned(relative).map(|(&id, _)| id)
}
/// Add new block_ids to the unassigned queue.
///
/// `block_ids` is the **full, ordered** list of block IDs allocated to this
/// assignment set. Known IDs (already in assigned, staged, or unassigned) must
/// form a contiguous prefix and are silently skipped. New IDs are appended to
/// the unassigned FIFO queue.
///
/// This method does **not** assign blocks — call
/// [`assign_pending`](Self::assign_pending) to pair unassigned IDs with
/// available sequence blocks.
///
/// # Block ID rules
///
/// The list is partitioned into a **known prefix** and a **new suffix**:
///
/// - **Known prefix** — IDs already present in `assigned`, `staged`, or
/// `unassigned`. These are silently skipped. They must appear contiguously
/// at the front of the list; interleaving a known ID after an unknown one
/// is an [`OrderingViolation`](BlockSequenceError::OrderingViolation).
/// - **New suffix** — IDs not yet seen. These are appended (in order) to
/// the unassigned FIFO queue.
///
/// # Algorithm (two-phase, atomic)
///
/// 1. **Validate & collect** — iterate `block_ids`. Known IDs must form a
/// contiguous prefix (skip them). Unknown IDs are collected into a temp
/// buffer. If a known ID appears after an unknown one →
/// `OrderingViolation` error. No state is mutated until validation passes.
/// 2. **Commit** — push all new IDs to the unassigned queue.
pub fn extend_block_ids(
&mut self,
block_ids: impl IntoIterator<Item = BlockId>,
) -> Result<(), BlockSequenceError> {
// Phase 1: Validate & collect
let mut new_ids = Vec::new();
let mut new_id_set = indexmap::IndexSet::new();
let mut first_new_index: Option<usize> = None;
for (i, id) in block_ids.into_iter().enumerate() {
if self.contains(&id) {
// Known ID — must come before any new IDs
if let Some(first_new) = first_new_index {
return Err(BlockSequenceError::OrderingViolation {
known_id: id,
new_id: new_ids[0],
known_index: i,
first_new_index: first_new,
});
}
// Skip — already known
} else {
// Unknown ID — collect, rejecting internal duplicates
if !new_id_set.insert(id) {
return Err(BlockSequenceError::DuplicateBlockId { block_id: id });
}
if first_new_index.is_none() {
first_new_index = Some(i);
}
new_ids.push(id);
}
}
// Phase 2: Commit — no errors from here on
for id in new_ids {
self.store.insert_unassigned(id, ());
}
Ok(())
}
/// Inserts pre-matched `(BlockId, SequenceHash)` pairs directly into the
/// assigned collection.
///
/// This is the entry point for blocks whose hashes are already known (e.g.
/// cache hits). Two-phase atomic: collects all items, validates no duplicate
/// BlockIds across all three collections, then commits to assigned.
pub fn extend_assigned(
&mut self,
items: impl IntoIterator<Item = (BlockId, SequenceHash)>,
) -> Result<usize, BlockSequenceError> {
let items: Vec<(BlockId, SequenceHash)> = items.into_iter().collect();
if let Err(block_id) = self
.store
.validate_no_duplicates(items.iter().map(|(id, _)| *id), items.len())
{
return Err(BlockSequenceError::DuplicateBlockId { block_id });
}
let count = items.len();
for (id, hash) in items {
self.store.insert_assigned(id, hash);
}
Ok(count)
}
/// FIFO drain from unassigned into staged, pairing each block_id with the
/// sequence hash from the corresponding `TokenBlock`.
///
/// Staging starts at `sequence_blocks[self.next_position()]` and proceeds
/// forward, consuming one unassigned ID per available block. The loop stops
/// when either the unassigned queue is empty or there are no more sequence
/// blocks.
///
/// Returns the range of newly staged indices (relative to the start of the
/// staged collection before this call).
///
/// Each staged pair is validated: the position embedded in the block's
/// `kvbm_sequence_hash()` must equal the expected sequence index.
/// A mismatch returns [`BlockSequenceError::PositionMismatch`].
pub fn stage_pending(
&mut self,
sequence_blocks: &[TokenBlock],
) -> Result<Range<usize>, BlockSequenceError> {
let staged_start_idx = self.store.staged_count();
let start_pos = self.next_position();
// How many sequence blocks are available starting from our next position?
let available_blocks = sequence_blocks.len().saturating_sub(start_pos);
// How many can we stage? Min of available blocks and unassigned count.
let to_stage = available_blocks.min(self.store.unassigned_count());
// Phase 1: Validate all positions before mutating
for i in 0..to_stage {
let seq_pos = start_pos + i;
let block = &sequence_blocks[seq_pos];
let hash = block.kvbm_sequence_hash();
let actual_pos = hash.position();
if actual_pos != seq_pos as u64 {
let block_id = self.store.get_unassigned(i).map(|(&id, _)| id).unwrap();
return Err(BlockSequenceError::PositionMismatch {
expected: seq_pos,
actual: actual_pos,
block_id,
});
}
}
// Phase 2: Commit — no errors from here on
for i in 0..to_stage {
let seq_pos = start_pos + i;
let hash = sequence_blocks[seq_pos].kvbm_sequence_hash();
let (block_id, _) = self.store.shift_unassigned().unwrap();
self.store.insert_staged(block_id, hash);
}
let staged_end_idx = self.store.staged_count();
Ok(staged_start_idx..staged_end_idx)
}
/// Moves all staged blocks into assigned (infallible).
///
/// Returns the range of newly assigned indices (relative to the start of
/// the assigned collection before this call).
pub fn commit_staged(&mut self) -> Range<usize> {
let start_idx = self.store.assigned_count();
while let Some((block_id, hash)) = self.store.shift_staged() {
self.store.insert_assigned(block_id, hash);
}
let end_idx = self.store.assigned_count();
start_idx..end_idx
}
/// Drain the unassigned FIFO queue into assigned, pairing each block_id
/// with the next available `TokenBlock` starting at `next_position()`.
///
/// This is a convenience method equivalent to calling
/// [`stage_pending`](Self::stage_pending) followed by
/// [`commit_staged`](Self::commit_staged).
///
/// Returns the range of newly assigned indices (relative to offset).
/// An empty range means no new assignments were made.
pub fn assign_pending(
&mut self,
sequence_blocks: &[TokenBlock],
) -> Result<Range<usize>, BlockSequenceError> {
self.stage_pending(sequence_blocks)?;
Ok(self.commit_staged())
}
}
/// Zip two [`ExternalBlockAssignments`] over their overlapping assigned positions.
///
/// For each absolute position where **both** `a` and `b` have assigned blocks,
/// yields `(position, a_block_id, b_block_id)`.
///
/// Iteration order: ascending position.
pub fn zip_assigned(
a: &ExternalBlockAssignments,
b: &ExternalBlockAssignments,
) -> Vec<(usize, BlockId, BlockId)> {
let a_range = a.assigned_positions();
let b_range = b.assigned_positions();
let start = a_range.start.max(b_range.start);
let end = a_range.end.min(b_range.end);
let mut result = Vec::new();
for pos in start..end {
// Both lookups are guaranteed to succeed within the intersection range.
let (a_id, _) = a.get_at_position(pos).unwrap();
let (b_id, _) = b.get_at_position(pos).unwrap();
result.push((pos, a_id, b_id));
}
result
}
/// Zip `src` assigned positions with `dst` pending positions.
///
/// For each absolute position where `src` has an assigned block and `dst`
/// has a pending (unassigned) block, yields `(position, src_block_id, dst_block_id)`.
///
/// This is the onboard/offload planning primitive: the result tells you
/// which source blocks to transfer into which destination blocks.
pub fn zip_assigned_pending(
src: &ExternalBlockAssignments,
dst: &ExternalBlockAssignments,
) -> Vec<(usize, BlockId, BlockId)> {
let src_range = src.assigned_positions();
let dst_range = dst.pending_positions();
let start = src_range.start.max(dst_range.start);
let end = src_range.end.min(dst_range.end);
let mut result = Vec::new();
for pos in start..end {
// Both lookups are guaranteed to succeed within the intersection range.
let (src_id, _) = src.get_at_position(pos).unwrap();
let dst_id = dst.get_pending_at_position(pos).unwrap();
result.push((pos, src_id, dst_id));
}
result
}
This diff is collapsed.
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
mod external;
mod logical;
#[cfg(test)]
mod tests;
pub use external::{ExternalBlockAssignments, zip_assigned, zip_assigned_pending};
pub use logical::{LogicalBlockAssignmentError, LogicalBlockAssignments};
This diff is collapsed.
This diff is collapsed.
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Shared three-collection lifecycle store backing both
//! [`ExternalBlockAssignments`](super::assignments::ExternalBlockAssignments) and
//! [`LogicalBlockAssignments`](super::assignments::LogicalBlockAssignments).
use indexmap::IndexMap;
use crate::BlockId;
/// Generic three-phase lifecycle store keyed by [`BlockId`].
///
/// Manages three ordered `IndexMap` collections representing the lifecycle
/// phases: **unassigned** (`U`) → **staged** (`S`) → **assigned** (`A`).
///
/// Both [`ExternalBlockAssignments`](super::assignments::ExternalBlockAssignments) and
/// [`LogicalBlockAssignments`](super::assignments::LogicalBlockAssignments) compose this
/// type internally and add their own type-specific transition logic on top.
pub(crate) struct BlockStore<U, S, A> {
assigned: IndexMap<BlockId, A>,
staged: IndexMap<BlockId, S>,
unassigned: IndexMap<BlockId, U>,
}
impl<U, S, A> BlockStore<U, S, A> {
/// Creates an empty store.
pub fn new() -> Self {
Self {
assigned: IndexMap::new(),
staged: IndexMap::new(),
unassigned: IndexMap::new(),
}
}
// -- Counts ---------------------------------------------------------------
/// Returns the number of assigned entries.
pub fn assigned_count(&self) -> usize {
self.assigned.len()
}
/// Returns the number of staged entries.
pub fn staged_count(&self) -> usize {
self.staged.len()
}
/// Returns the number of unassigned entries.
pub fn unassigned_count(&self) -> usize {
self.unassigned.len()
}
// -- Queries --------------------------------------------------------------
/// Returns `true` if all three collections are empty.
pub fn is_empty(&self) -> bool {
self.assigned.is_empty() && self.staged.is_empty() && self.unassigned.is_empty()
}
/// Checks whether a `BlockId` is present in any of the three collections.
pub fn contains(&self, block_id: &BlockId) -> bool {
self.assigned.contains_key(block_id)
|| self.staged.contains_key(block_id)
|| self.unassigned.contains_key(block_id)
}
/// Checks whether a `BlockId` is present in the assigned collection.
#[allow(dead_code)]
pub fn contains_assigned(&self, block_id: &BlockId) -> bool {
self.assigned.contains_key(block_id)
}
/// Checks whether a `BlockId` is present in the staged collection.
#[allow(dead_code)]
pub fn contains_staged(&self, block_id: &BlockId) -> bool {
self.staged.contains_key(block_id)
}
/// Checks whether a `BlockId` is present in the unassigned collection.
#[allow(dead_code)]
pub fn contains_unassigned(&self, block_id: &BlockId) -> bool {
self.unassigned.contains_key(block_id)
}
// -- Index Access ---------------------------------------------------------
/// Returns the assigned entry at the given index (insertion order).
pub fn get_assigned(&self, index: usize) -> Option<(&BlockId, &A)> {
self.assigned.get_index(index)
}
/// Returns the staged entry at the given index (staging order).
pub fn get_staged(&self, index: usize) -> Option<(&BlockId, &S)> {
self.staged.get_index(index)
}
/// Returns the unassigned entry at the given index (FIFO order).
pub fn get_unassigned(&self, index: usize) -> Option<(&BlockId, &U)> {
self.unassigned.get_index(index)
}
// -- Iteration ------------------------------------------------------------
/// Iterates over assigned entries in positional order.
pub fn assigned_iter(&self) -> impl Iterator<Item = (&BlockId, &A)> {
self.assigned.iter()
}
/// Iterates over staged entries in staging order.
pub fn staged_iter(&self) -> impl Iterator<Item = (&BlockId, &S)> {
self.staged.iter()
}
/// Iterates over unassigned entries in FIFO order.
pub fn unassigned_iter(&self) -> impl Iterator<Item = (&BlockId, &U)> {
self.unassigned.iter()
}
// -- FIFO Pop -------------------------------------------------------------
/// Removes and returns the first unassigned entry (FIFO).
pub fn shift_unassigned(&mut self) -> Option<(BlockId, U)> {
self.unassigned.shift_remove_index(0)
}
/// Removes and returns the first staged entry (FIFO).
pub fn shift_staged(&mut self) -> Option<(BlockId, S)> {
self.staged.shift_remove_index(0)
}
// -- LIFO Pop -------------------------------------------------------------
/// Removes and returns the last unassigned entry (LIFO).
pub fn pop_unassigned(&mut self) -> Option<(BlockId, U)> {
self.unassigned.pop()
}
// -- Insert ---------------------------------------------------------------
/// Inserts into the assigned collection.
pub fn insert_assigned(&mut self, id: BlockId, val: A) {
self.assigned.insert(id, val);
}
/// Inserts into the staged collection.
pub fn insert_staged(&mut self, id: BlockId, val: S) {
self.staged.insert(id, val);
}
/// Inserts into the unassigned collection.
pub fn insert_unassigned(&mut self, id: BlockId, val: U) {
self.unassigned.insert(id, val);
}
// -- Bulk -----------------------------------------------------------------
/// Iterates over all block IDs across all three collections in lifecycle
/// order: assigned → staged → unassigned.
pub fn all_block_ids(&self) -> impl Iterator<Item = &BlockId> {
self.assigned
.keys()
.chain(self.staged.keys())
.chain(self.unassigned.keys())
}
/// Validates that none of the given `ids` collide with existing entries
/// or with each other.
///
/// Returns `Ok(())` if all IDs are unique, or `Err(id)` with the first
/// duplicate found.
pub fn validate_no_duplicates(
&self,
ids: impl Iterator<Item = BlockId>,
count_hint: usize,
) -> Result<(), BlockId> {
let mut seen = indexmap::IndexSet::with_capacity(count_hint);
for id in ids {
if self.contains(&id) || !seen.insert(id) {
return Err(id);
}
}
Ok(())
}
/// Clears all three collections.
pub fn clear(&mut self) {
self.assigned.clear();
self.staged.clear();
self.unassigned.clear();
}
/// Takes all assigned entries, returning them as a `Vec`.
pub fn take_assigned(&mut self) -> Vec<(BlockId, A)> {
std::mem::take(&mut self.assigned).into_iter().collect()
}
/// Takes all staged entries, returning them as a `Vec`.
pub fn take_staged(&mut self) -> Vec<(BlockId, S)> {
std::mem::take(&mut self.staged).into_iter().collect()
}
/// Takes all unassigned entries, returning them as a `Vec`.
pub fn take_unassigned(&mut self) -> Vec<(BlockId, U)> {
std::mem::take(&mut self.unassigned).into_iter().collect()
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment