Unverified Commit c6d8f225 authored by Ryan Olson's avatar Ryan Olson Committed by GitHub
Browse files

feat(velo-events): trait-based event system for async coordination (#6315)


Signed-off-by: default avatarRyan Olson <rolson@nvidia.com>
parent 713c96d2
......@@ -3622,9 +3622,9 @@ dependencies = [
[[package]]
name = "js-sys"
version = "0.3.90"
version = "0.3.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14dc6f6450b3f6d4ed5b16327f38fed626d375a886159ca555bd7822c0c3a5a6"
checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c"
dependencies = [
"once_cell",
"wasm-bindgen",
......@@ -8563,6 +8563,22 @@ version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "velo-events"
version = "1.0.0"
dependencies = [
"anyhow",
"dashmap 6.1.0",
"futures",
"parking_lot",
"serde",
"tokio",
"tokio-util",
"tracing",
"uuid",
"xxhash-rust",
]
[[package]]
name = "version-compare"
version = "0.2.1"
......@@ -8646,9 +8662,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen"
version = "0.2.113"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60722a937f594b7fde9adb894d7c092fc1bb6612897c46368d18e7a20208eff2"
checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e"
dependencies = [
"cfg-if 1.0.4",
"once_cell",
......@@ -8659,9 +8675,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.63"
version = "0.4.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a89f4650b770e4521aa6573724e2aed4704372151bd0de9d16a3bbabb87441a"
checksum = "e9c5522b3a28661442748e09d40924dfb9ca614b21c00d3fd135720e48b67db8"
dependencies = [
"cfg-if 1.0.4",
"futures-util",
......@@ -8673,9 +8689,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.113"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fac8c6395094b6b91c4af293f4c79371c163f9a6f56184d2c9a85f5a95f3950"
checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
......@@ -8683,9 +8699,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.113"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab3fabce6159dc20728033842636887e4877688ae94382766e00b180abac9d60"
checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3"
dependencies = [
"bumpalo",
"proc-macro2",
......@@ -8696,9 +8712,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.113"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de0e091bdb824da87dc01d967388880d017a0a9bc4f3bdc0d86ee9f9336e3bb5"
checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16"
dependencies = [
"unicode-ident",
]
......@@ -8752,9 +8768,9 @@ dependencies = [
[[package]]
name = "web-sys"
version = "0.3.90"
version = "0.3.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "705eceb4ce901230f8625bd1d665128056ccbe4b7408faa625eec1ba80f59a97"
checksum = "854ba17bb104abfb26ba36da9729addc7ce7f06f5c0f90f3c391f8461cca21f9"
dependencies = [
"js-sys",
"wasm-bindgen",
......
......@@ -18,6 +18,7 @@ members = [
"lib/bindings/c",
"lib/bindings/python/codegen",
"lib/config",
"lib/velo-events",
]
resolver = "3"
......@@ -42,8 +43,14 @@ dynamo-mocker = { path = "lib/mocker", version = "1.0.0" }
dynamo-kv-router = { path = "lib/kv-router", version = "1.0.0", features = ["metrics"] }
dynamo-async-openai = { path = "lib/async-openai", version = "1.0.0", features = ["byot"] }
dynamo-parsers = { path = "lib/parsers", version = "1.0.0" }
# kvbm
kvbm-kernels = { path = "lib/kvbm-kernels", version = "1.0.0" }
kvbm-logical = { path = "lib/kvbm-logical", version = "1.0.0" }
# velo
velo-events = { path = "lib/velo-events", version = "0.9.0" }
# External dependencies
anyhow = { version = "1" }
async-nats = { version = "0.45.0", features = ["service"] }
......
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Build & Test
```bash
# Build
cargo build -p velo-events
# Run all tests
cargo test -p velo-events
# Run a single test
cargo test -p velo-events <test_name>
# Check (no codegen)
cargo check -p velo-events
```
## Architecture
`velo-events` is a generational event system for coordinating async awaiters with minimal overhead. Events can be triggered (success) or poisoned (error), and entries are recycled across generations.
### Core types (`event.rs`, `manager.rs`)
- **`Event`** — concrete RAII guard for a single event. Dropping without calling `trigger(self)` or `poison(self, ...)` auto-poisons the event. `into_handle(self)` disarms the guard and returns the bare handle. `trigger` and `poison` consume `self`, preventing double-completion at compile time.
- **`EventManager`** — concrete struct that manages a collection of events: `new_event`, `awaiter`, `poll`, `trigger`, `poison`, `merge_events`, `force_shutdown`. Create with `EventManager::local()` for local use or `EventManager::new(base, backend)` for distributed setups.
- **`EventBackend`** — public trait with 3 methods (`trigger`, `poison`, `awaiter`) that serves as the routing customization point. `EventSystemBase` implements this for the local path; distributed backends implement it to add network routing.
### Base implementation (`base/`)
- **`EventSystemBase`** — the core event storage, allocation, and recycling engine. Uses `DashMap` for concurrent event storage with a free-list for entry recycling. Implements `EventBackend` for local trigger/poison/awaiter routing. Constructors: `EventSystemBase::local()` (random system_id, local flag set) and `EventSystemBase::distributed(system_id)` (explicit id, no local flag). Public `_inner` methods (`trigger_inner`, `poison_inner`, `awaiter_inner`) allow distributed backends to delegate local operations.
### Handle encoding (`handle.rs`)
`EventHandle` packs identity into a single `u128`: `[system_id: 64][local_index: 32][generation: 32]`. Bit 31 of `local_index` distinguishes local (bit set) from distributed (bit clear) handles. Both local and distributed systems have unique non-zero `system_id` values. `EventSystemBase` validates that handles belong to the system that created them.
### Slot machinery (`slot/`)
Single-lock synchronization primitives. See [docs/slot-state-machine.md](docs/slot-state-machine.md)
for invariants. Any change to `slot/` must preserve all invariants (I1-I6)
and update the document.
Key types:
- **`EventEntry`** — per-index state machine with a single `ParkingMutex<EventState>` protecting generation tracking, waker registration, and poison history.
- **`EventAwaiter`**`Future` impl that resolves to `Result<()>`. Supports both immediate (already-complete) and pending modes. Delegates poll to `EventEntry::poll_waiter`.
- **`CompletionKind`**`Triggered` | `Poisoned(Arc<EventPoison>)`.
### Factory (`factory.rs`)
`DistributedEventFactory` creates an `EventManager` pre-configured with a `system_id` for distributed (Nova-managed) deployments.
## Key Design Decisions
- `Event` is an RAII guard by default — dropping without triggering auto-poisons. `into_handle()` is the explicit opt-out for manager-level operations. `Clone` is intentionally not implemented; each event is a unique ownership token.
- `EventManager` is a concrete `Clone` struct holding `Arc<EventSystemBase>` (lifecycle) + `Arc<dyn EventBackend>` (routing). `EventManager::local()` creates both from the same `EventSystemBase`. `EventManager::new(base, backend)` accepts a custom backend for distributed routing.
- `EventBackend` is the public routing trait (3 methods) that enables distributed routing without touching the core event lifecycle. Distributed backends call `EventSystemBase::trigger_inner` / `poison_inner` / `awaiter_inner` for local handles and route remote handles over the network.
- Slot entries track a `BTreeMap<Generation, PoisonArc>` for poison history, allowing past-generation poison queries.
- Generation overflow causes entry retirement and a new entry allocation (transparent retry loop in `new_event_with_backend`).
- `force_shutdown` poisons all pending events and rejects future allocations via an `AtomicBool` flag.
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
[package]
name = "velo-events"
version.workspace = true
edition.workspace = true
description.workspace = true
authors.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
keywords.workspace = true
[dependencies]
anyhow = { workspace = true }
dashmap = { workspace = true }
futures = { workspace = true }
parking_lot = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true }
xxhash-rust = { workspace = true }
# velo-events
A generational event system for coordinating async tasks with [minimal overhead](https://drive.google.com/file/d/1s9M1I-dUbhqWLrMFB5ehPSM-qDQBGPZG).
Events can be created, awaited, merged into precondition graphs, and poisoned
on failure. The local implementation lives in this crate; a distributed event
system can be built on top via active messaging.
## Core concepts
| Operation | What it does |
|-----------|-------------|
| **Create** | `manager.new_event()` allocates a pending event and returns an `Event` — an RAII guard you can trigger or await. |
| **Await** | `manager.awaiter(handle)?.await` suspends the current task until the event completes (or is poisoned). |
| **Merge** | `manager.merge_events(vec![a, b, c])` creates a new event that completes only after **all** inputs complete — this is how you build precondition graphs. |
| **Poison** | Events can fail with a reason string. Dropping an `Event` without triggering it auto-poisons so events are never silently lost. |
## Usage
### Create, trigger, await
```rust,no_run
use velo_events::EventManager;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let manager = EventManager::local();
let event = manager.new_event()?;
let handle = event.handle();
// Spawn a task that waits for the event
let mgr = manager.clone();
let waiter = tokio::spawn(async move {
mgr.awaiter(handle)?.await
});
// Complete the event — consumes self, disarms the drop guard
event.trigger()?;
waiter.await??;
Ok(())
}
```
### RAII drop safety
`Event` is an RAII guard: dropping it without calling `trigger()` or `poison()`
automatically poisons the event so waiters are never silently abandoned. Both
`trigger` and `poison` consume `self`, preventing double-completion at compile
time.
To opt out of auto-poisoning (e.g. when handing ownership to a manager-level
operation), call `into_handle()`:
```rust,no_run
use velo_events::EventManager;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let manager = EventManager::local();
let event = manager.new_event()?;
let handle = event.handle();
// If this function returns early or panics, the event
// drops and is automatically poisoned.
do_work()?;
event.trigger()?; // success — consumes the event
Ok(())
}
fn do_work() -> anyhow::Result<()> { Ok(()) }
```
### Merging events (precondition graphs)
`merge_events` lets you express "wait for all of these before proceeding":
```rust,no_run
use velo_events::EventManager;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let manager = EventManager::local();
let load_weights = manager.new_event()?;
let load_tokenizer = manager.new_event()?;
// merged event completes only after both inputs complete
let ready = manager.merge_events(vec![
load_weights.handle(),
load_tokenizer.handle(),
])?;
load_weights.trigger()?;
load_tokenizer.trigger()?;
manager.awaiter(ready)?.await?;
Ok(())
}
```
Because merged events are themselves events, you can merge merges to build
arbitrary DAGs of preconditions.
### Poison propagation
When an event is poisoned, all awaiters receive an error containing the
reason. Merged events accumulate poison reasons from their inputs:
```rust,no_run
use velo_events::{EventManager, EventPoison};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let manager = EventManager::local();
let a = manager.new_event()?;
let b = manager.new_event()?;
let merged = manager.merge_events(vec![a.handle(), b.handle()])?;
manager.poison(a.handle(), "a failed")?;
manager.poison(b.handle(), "b failed")?;
let err = manager.awaiter(merged)?.await.unwrap_err();
let poison = err.downcast::<EventPoison>()?;
assert!(poison.reason().contains("a failed"));
assert!(poison.reason().contains("b failed"));
Ok(())
}
```
### Application responsibility
In distributed systems, concurrent trigger/poison calls cannot be coordinated
through the type system alone. Application logic must carefully manage how
events are completed.
**Pattern: don't use trigger/poison as if/else on one event.** Poison reasons
are kept in a `BTreeMap` history per entry, so poison strings persist in memory.
Instead, create a separate event per outcome arm and use `tokio::select!` to
race them:
```rust,no_run
use velo_events::EventManager;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let manager = EventManager::local();
let success_event = manager.new_event()?;
let failure_event = manager.new_event()?;
let success_handle = success_event.handle();
let failure_handle = failure_event.handle();
// Producer decides which arm:
// success_event.trigger()? OR failure_event.trigger()?
// Consumer races:
let success_awaiter = manager.awaiter(success_handle)?;
let failure_awaiter = manager.awaiter(failure_handle)?;
tokio::select! {
ok = success_awaiter => { ok?; /* success path */ }
err = failure_awaiter => { err?; /* failure path */ }
}
Ok(())
}
```
## Distributed events
For distributed deployments, `EventBackend` and `EventSystemBase` are public
so you can implement custom routing. Create a base with an explicit system_id,
implement `EventBackend` to route local vs remote handles, and pass both to
`EventManager::new`:
```rust,no_run
use velo_events::{EventSystemBase, EventBackend, EventManager, EventHandle, EventAwaiter};
use anyhow::Result;
use std::sync::Arc;
struct MyDistributedBackend {
local: Arc<EventSystemBase>,
// router: MyRouter,
}
impl EventBackend for MyDistributedBackend {
fn trigger(&self, handle: EventHandle) -> Result<()> {
if handle.system_id() == self.local.system_id() {
self.local.trigger_inner(handle) // fast local path
} else {
todo!("route over network")
}
}
fn poison(&self, handle: EventHandle, reason: Arc<str>) -> Result<()> {
if handle.system_id() == self.local.system_id() {
self.local.poison_inner(handle, reason)
} else {
todo!("route over network")
}
}
fn awaiter(&self, handle: EventHandle) -> Result<EventAwaiter> {
if handle.system_id() == self.local.system_id() {
self.local.awaiter_inner(handle)
} else {
todo!("route over network")
}
}
}
let base = EventSystemBase::distributed(0x42);
let backend = Arc::new(MyDistributedBackend { local: base.clone() });
let manager = EventManager::new(base, backend);
// handles produced by this manager carry system_id = 0x42
```
For simpler cases where you just need handles stamped with a system_id (without
custom routing), `DistributedEventFactory` is a convenience wrapper:
```rust,no_run
use velo_events::DistributedEventFactory;
let factory = DistributedEventFactory::new(0x42.try_into().unwrap());
let manager = factory.event_manager();
// handles produced by this manager carry system_id = 0x42
```
# Slot State Machine Specification
## Overview
The slot module uses a single entry-level `ParkingMutex<EventState>` that holds
all per-entry state including waker registration. This design eliminates
stale-completion races by construction — there is no separate lock or atomic
guard whose ordering could allow stale results to leak across generations.
## State Variables
All fields are protected by a single `parking_lot::Mutex`:
```rust
struct EventState {
last_triggered: Generation, // highest completed generation
active_generation: Option<Generation>, // currently pending generation
wakers: Vec<Waker>, // registered waiter wakers
poisoned: BTreeMap<Generation, PoisonArc>, // poison history per generation
retired: bool, // permanently unusable
}
```
## Lifecycle Phases
```
begin_generation()
Idle ──────────────────> Active
^ |
| finalize_completion()|
| v
+──────────────────── Completing
```
All transitions happen under a single lock acquisition.
### Idle
- `active_generation = None`
- `wakers` is empty (drained by prior `finalize_completion`)
- Entry is available for reuse via the free list
### Active
- `active_generation = Some(gen)`
- Waiters may register wakers via `poll_waiter`
- Only one generation can be active at a time
### Completing
- `finalize_completion` sets `last_triggered`, clears `active_generation`,
stores poison (if applicable), drains wakers, then wakes them
- Transitions back to Idle
## Operations
### begin_generation
1. Acquire lock
2. Validate: no active generation, not retired, not overflowed
3. Compute `next = last_triggered + 1`
4. Drain stale wakers (`std::mem::take`)
5. Set `active_generation = Some(next)`
6. Release lock
7. Wake stale wakers (outside lock)
### finalize_completion(generation, completion)
1. Acquire lock
2. Validate: `active_generation == Some(generation)`
3. Set `last_triggered = generation`
4. Clear `active_generation`
5. Insert/remove from poison map
6. Drain wakers
7. Release lock
8. Wake all drained wakers (outside lock)
### register_local_waiter(generation)
1. Acquire lock
2. If `generation <= last_triggered`: return Ready or Poisoned
3. If `active_generation == Some(generation)`: return Pending
4. Otherwise: return InvalidGeneration error
### poll_waiter(observed_generation, cx)
1. Acquire lock
2. If `observed_generation <= last_triggered`: return completion result
3. If `active_generation.is_none()`: return "generation expired" error
4. Register waker with deduplication (`will_wake` check)
5. Return Pending
### try_to_poison(generation, poison)
1. Acquire lock
2. If `generation <= last_triggered`:
- If `poisoned.contains_key(generation)`: return `AlreadyPoisoned`
- Else: return `AlreadyCompleted` error
3. Validate: `active_generation == Some(generation)`
4. Set `last_triggered = generation`
5. Clear `active_generation`
6. Insert into poison map
7. Drain wakers
8. Release lock
9. Wake all drained wakers (outside lock)
10. Return `Poisoned`
This is equivalent to an atomic `status_for` + `finalize_completion(Poisoned)`,
eliminating the TOCTOU window when the two are called separately.
### retire
1. Acquire lock
2. Debug-assert: wakers list is empty (callers should ensure all waiters resolved before retirement)
3. Set `retired = true`, clear `active_generation`
4. Drain wakers (defensive, prevents silent hangs if invariant violated)
5. Release lock
6. Wake drained wakers (outside lock)
## Invariants
- **I1: Generation monotonicity**`last_triggered` only increases. Each
`begin_generation` computes `last_triggered + 1`.
- **I2: Single completion per generation**`active_generation` guard ensures
only one generation is active. `finalize_completion` validates
`active_generation == Some(generation)`.
- **I3: Completion visibility** — Waiter resolution is determined by
`observed_generation <= last_triggered` (success) plus the `poisoned`
BTreeMap (error). Both are set under the same lock that the waiter reads.
- **I4: No stale completion leakage** — There is no stored completion value
that could leak. Waiters resolve via generation comparison + poison map.
`begin_generation` unconditionally drains stale wakers.
- **I5: No lost wakeups**`finalize_completion` sets `last_triggered` and
drains wakers in the same lock scope. Any waiter that registered before
the drain will be woken. Any waiter that polls after the drain will see
`observed_generation <= last_triggered` and resolve immediately.
- **I6: Stale waiter resolution** — Waiters from generation N check
`observed_generation <= last_triggered` on every poll. After generation N
completes, this check succeeds regardless of what generation is currently
active. `begin_generation` also flushes stale wakers defensively.
## Concurrency Rules
A single `parking_lot::Mutex` per entry serializes all state mutations. This
eliminates the need for:
- Atomic `waiter_count` (no conditional clearing)
- Atomic `completed` flag (redundant with lock-protected state)
- Separate slot-level `generation` counter (entry-level suffices)
- Manual waker deduplication across lock boundaries
The only concurrency pattern is: acquire lock, read/write state, release lock,
then wake drained wakers outside the lock (waker invocation only enqueues
tasks on the runtime, it does not poll them synchronously).
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Core event storage engine backed by a generational slot system.
pub(crate) mod system;
pub use system::EventSystemBase;
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use anyhow::{Result, anyhow, bail};
use dashmap::DashMap;
use parking_lot::Mutex as ParkingMutex;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use tokio_util::task::TaskTracker;
use tracing::{error, trace};
use crate::event::{Event, EventBackend};
use crate::handle::{EventHandle, LOCAL_FLAG};
use crate::slot::{
CompletionKind, EventAwaiter, EventEntry, EventKey, PoisonArc, PoisonOutcome, WaitRegistration,
};
use crate::status::{EventPoison, EventStatus};
/// Maximum counter value for local indices (31-bit counter space, ~2B entries).
const MAX_LOCAL_INDEX: u32 = (1u32 << 31) - 1;
/// Core event storage, allocation, and recycling engine.
///
/// Handles event storage, allocation, recycling, and generation tracking.
/// This is the implementation backing [`EventManager`](crate::EventManager).
/// Events created by an `EventSystemBase` are bound to that system. Passing
/// a handle from one system to another will return an error.
///
/// `EventSystemBase` also implements [`EventBackend`] for the local path,
/// so it can be used directly as both the base and the backend for local-only
/// setups. For distributed setups, implement [`EventBackend`] on your own type
/// and delegate local operations to the `_inner` methods on `EventSystemBase`.
pub struct EventSystemBase {
system_id: u64,
is_local: bool,
events: DashMap<EventKey, Arc<EventEntry>>,
free_lists: ParkingMutex<VecDeque<Arc<EventEntry>>>,
next_local_index: AtomicU32,
tasks: TaskTracker,
shutdown: AtomicBool,
}
impl EventSystemBase {
/// Create a new local event system with a random system_id.
///
/// The system_id is derived from `xxh3_64(Uuid::new_v4())` to ensure
/// each local system is uniquely identifiable. Handles produced by this
/// system have bit 31 set in their `local_index` to mark them as local.
///
/// Events created by this system can only be triggered, awaited, poisoned,
/// or polled through this same system instance.
pub fn local() -> Arc<Self> {
let system_id = xxhash_rust::xxh3::xxh3_64(uuid::Uuid::new_v4().as_bytes());
Self::create(system_id, true)
}
/// Create a system pre-configured with a system_id for distributed use.
///
/// Handles produced by this system do **not** have the local flag set,
/// distinguishing them from local handles.
pub fn distributed(system_id: u64) -> Arc<Self> {
Self::create(system_id, false)
}
fn create(system_id: u64, is_local: bool) -> Arc<Self> {
Arc::new(Self {
system_id,
is_local,
events: DashMap::new(),
free_lists: ParkingMutex::new(VecDeque::new()),
next_local_index: AtomicU32::new(0),
tasks: TaskTracker::new(),
shutdown: AtomicBool::new(false),
})
}
/// The unique system identity stamped into every handle produced by this system.
pub fn system_id(&self) -> u64 {
self.system_id
}
// ── Ownership validation ─────────────────────────────────────────
fn validate_handle(&self, handle: EventHandle) -> Result<()> {
if handle.system_id() != self.system_id {
bail!(
"Handle {} belongs to system {:#x}, not this system {:#x}",
handle,
handle.system_id(),
self.system_id,
);
}
Ok(())
}
// ── Backend-aware event creation ─────────────────────────────────
/// Allocate a new pending event, using `backend` for the RAII guard's
/// completion routing.
pub(crate) fn new_event_with_backend(
self: &Arc<Self>,
backend: Arc<dyn EventBackend>,
) -> Result<Event> {
if self.is_shutdown() {
bail!("Event system shutdown in progress");
}
loop {
let entry = self.allocate_entry()?;
match entry.begin_generation() {
Ok(generation) => {
if self.is_shutdown() {
let handle = entry.key().handle(self.system_id, generation);
let poison = Arc::new(EventPoison::new(
handle,
"Event system shutdown in progress",
));
let _ = self.poison_local_entry(entry, handle, poison);
bail!("Event system shutdown in progress");
}
let handle = entry.key().handle(self.system_id, generation);
return Ok(Event::new(handle, backend));
}
Err(crate::slot::entry::EventEntryError::GenerationOverflow { key }) => {
trace!(
?key,
"retiring event entry after exhausting generation space"
);
self.retire_entry(entry);
continue;
}
Err(err) => {
self.recycle_entry(entry);
return Err(err.into());
}
}
}
}
/// Merge events, using `backend` for the spawned task's completion routing.
pub(crate) fn merge_events_with(
self: &Arc<Self>,
inputs: Vec<EventHandle>,
backend: Arc<dyn EventBackend>,
) -> Result<EventHandle> {
if inputs.is_empty() {
bail!("Cannot merge empty event list");
}
for input in &inputs {
self.validate_handle(*input)?;
}
let merged = self.new_event_with_backend(backend.clone())?;
// Disarm the RAII guard — the spawned task owns completion via handle.
let handle = merged.into_handle();
let system = Arc::clone(self);
self.tasks.spawn(async move {
let mut failure_reasons: Option<Vec<Arc<str>>> = None;
for dependency in &inputs {
let wait_result = match backend.awaiter(*dependency) {
Ok(waiter) => waiter.await,
Err(err) => Err(err),
};
match wait_result {
Ok(()) => {}
Err(err) => {
let reason = match err.downcast::<EventPoison>() {
Ok(poison) => format!(
"Merge dependency {} poisoned: {}",
dependency,
poison.reason()
),
Err(other) => {
format!("Merge dependency {} failed: {}", dependency, other)
}
};
let reason_arc: Arc<str> = Arc::from(reason);
error!("{}", &*reason_arc);
failure_reasons
.get_or_insert_with(Vec::new)
.push(reason_arc);
}
}
}
let result = match failure_reasons {
None => backend.trigger(handle),
Some(reasons) => {
if reasons.len() == 1 {
backend.poison(handle, reasons[0].clone())
} else {
let mut message = String::from("Multiple merge dependencies failed:\n");
for (idx, reason) in reasons.iter().enumerate() {
if idx > 0 {
message.push('\n');
}
message.push_str(reason.as_ref());
}
backend.poison(handle, Arc::from(message))
}
}
};
if let Err(e) = result {
error!("Failed to complete merged event {}: {}", handle, e);
}
drop(system); // ensure system lives until the task completes
});
Ok(handle)
}
// ── Public inner methods (for distributed backends) ──────────────
/// Trigger a local event by handle. Validates that the handle belongs to this system.
///
/// Distributed backends should call this for handles that belong to the local system.
pub fn trigger_inner(&self, handle: EventHandle) -> Result<()> {
self.validate_handle(handle)?;
let entry = self
.events
.get(&EventKey::from_handle(handle))
.map(|guard| guard.clone())
.ok_or_else(|| anyhow!("Unknown event {}", handle))?;
self.trigger_local_entry(entry, handle)
}
/// Poison a local event by handle. Validates that the handle belongs to this system.
///
/// Distributed backends should call this for handles that belong to the local system.
pub fn poison_inner(&self, handle: EventHandle, reason: impl Into<Arc<str>>) -> Result<()> {
self.validate_handle(handle)?;
let reason: Arc<str> = reason.into();
let entry = self
.events
.get(&EventKey::from_handle(handle))
.map(|guard| guard.clone())
.ok_or_else(|| anyhow!("Unknown event {}", handle))?;
let poison = Arc::new(EventPoison::new(handle, reason));
self.poison_local_entry(entry, handle, poison)
}
/// Create a future that resolves when the local event completes.
/// Validates that the handle belongs to this system.
///
/// Distributed backends should call this for handles that belong to the local system.
pub fn awaiter_inner(&self, handle: EventHandle) -> Result<EventAwaiter> {
self.validate_handle(handle)?;
self.wait_local(handle)
}
pub(crate) fn poll_inner(&self, handle: EventHandle) -> Result<EventStatus> {
self.validate_handle(handle)?;
self.poll_local(handle)
}
pub(crate) fn force_shutdown_inner(&self, reason: impl Into<Arc<str>>) {
let was_shutdown = self.shutdown.swap(true, Ordering::SeqCst);
if was_shutdown {
return;
}
let reason: Arc<str> = reason.into();
let mut pending = Vec::new();
for entry in self.events.iter() {
if let Some(handle) = entry.value().active_handle(self.system_id) {
pending.push((entry.value().clone(), handle));
}
}
for (entry, handle) in pending {
let poison = Arc::new(EventPoison::new(handle, Arc::clone(&reason)));
if let Err(err) = self.poison_local_entry(entry, handle, poison) {
error!("force_shutdown: failed to poison {}: {}", handle, err);
}
}
self.free_lists.lock().clear();
}
// ── Low-level helpers ─────────────────────────────────────────────
/// Return the poison reason for a completed generation, if any.
#[allow(dead_code)]
pub(crate) fn poison_reason(&self, handle: EventHandle) -> Option<Arc<str>> {
let entry = self.events.get(&EventKey::from_handle(handle))?;
entry.poison_reason(handle.generation())
}
pub(crate) fn trigger_local_entry(
&self,
entry: Arc<EventEntry>,
handle: EventHandle,
) -> Result<()> {
self.complete_local_entry(entry, handle, CompletionKind::Triggered)
}
pub(crate) fn poison_local_entry(
&self,
entry: Arc<EventEntry>,
handle: EventHandle,
poison: PoisonArc,
) -> Result<()> {
match entry
.try_to_poison(handle.generation(), poison)
.map_err(anyhow::Error::new)?
{
PoisonOutcome::Poisoned => {
self.recycle_entry(entry);
Ok(())
}
PoisonOutcome::AlreadyPoisoned => Ok(()),
}
}
fn complete_local_entry(
&self,
entry: Arc<EventEntry>,
handle: EventHandle,
completion: CompletionKind,
) -> Result<()> {
entry
.finalize_completion(handle.generation(), completion)
.map_err(anyhow::Error::new)?;
self.recycle_entry(entry);
Ok(())
}
fn wait_local(&self, handle: EventHandle) -> Result<EventAwaiter> {
let entry = self
.events
.get(&EventKey::from_handle(handle))
.map(|guard| guard.clone())
.ok_or_else(|| anyhow!("Unknown local event {}", handle))?;
match entry.register_local_waiter(handle.generation())? {
WaitRegistration::Ready => {
Ok(EventAwaiter::immediate(Arc::new(CompletionKind::Triggered)))
}
WaitRegistration::Poisoned(poison) => Ok(EventAwaiter::immediate(Arc::new(
CompletionKind::Poisoned(poison),
))),
WaitRegistration::Pending => Ok(EventAwaiter::pending(entry, handle.generation())),
}
}
fn poll_local(&self, handle: EventHandle) -> Result<EventStatus> {
let entry = self
.events
.get(&EventKey::from_handle(handle))
.map(|guard| guard.clone())
.ok_or_else(|| anyhow!("Unknown local event {}", handle))?;
Ok(entry.status_for(handle.generation()))
}
fn allocate_entry(self: &Arc<Self>) -> Result<Arc<EventEntry>> {
if let Some(entry) = self.try_reuse_entry() {
return Ok(entry);
}
let counter = self
.next_local_index
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
(current < MAX_LOCAL_INDEX).then_some(current + 1)
})
.map_err(|_| {
anyhow!(
"Local event index space exhausted ({} entries)",
MAX_LOCAL_INDEX
)
})?;
let local_index = if self.is_local {
counter | LOCAL_FLAG
} else {
counter
};
let key = EventKey::new(local_index);
let entry = Arc::new(EventEntry::new(key));
self.events.insert(key, entry.clone());
Ok(entry)
}
fn try_reuse_entry(&self) -> Option<Arc<EventEntry>> {
let mut free_lists = self.free_lists.lock();
free_lists.pop_front()
}
fn recycle_entry(&self, entry: Arc<EventEntry>) {
if entry.is_retired() {
return;
}
let mut free_lists = self.free_lists.lock();
free_lists.push_back(entry);
}
/// Mark an entry as permanently unusable but keep it in `self.events`.
///
/// Retired entries are intentionally **not** removed from the DashMap so that
/// callers holding stale handles to poisoned generations can still query
/// poison history via `poison_reason()` / `status_for()`. Removing the entry
/// would turn a diagnosable poison into an opaque "Unknown event" error.
///
/// Future optimisation: evict the full `EventEntry` from the DashMap and
/// migrate only the poisoned generation keys into a secondary
/// `HashSet<(EventKey, Generation)>` with a shared "entry retired" reason.
/// This trades per-generation `Arc<str>` detail for bounded memory on
/// long-running systems that exhaust many entries' generation spaces.
fn retire_entry(&self, entry: Arc<EventEntry>) {
entry.retire();
}
fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::Acquire)
}
}
// ── EventBackend impl ────────────────────────────────────────────────
impl EventBackend for EventSystemBase {
fn trigger(&self, handle: EventHandle) -> Result<()> {
self.trigger_inner(handle)
}
fn poison(&self, handle: EventHandle, reason: Arc<str>) -> Result<()> {
self.poison_inner(handle, reason)
}
fn awaiter(&self, handle: EventHandle) -> Result<EventAwaiter> {
self.awaiter_inner(handle)
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Concrete [`Event`] RAII guard and [`EventBackend`] routing trait.
use anyhow::Result;
use std::sync::{Arc, LazyLock};
use crate::handle::EventHandle;
use crate::slot::EventAwaiter;
/// Static poison reason reused across all drop-triggered poisons.
static DROP_POISON_REASON: LazyLock<Arc<str>> =
LazyLock::new(|| Arc::from("event dropped without being triggered"));
// ── Backend trait: routing customization point ──────────────────────
/// Routing layer for event completion operations.
///
/// Only three methods — just the operations that need local-vs-remote routing
/// in a distributed setup. [`EventSystemBase`](crate::EventSystemBase) implements
/// this for the local path; a distributed backend would add network routing.
pub trait EventBackend: Send + Sync {
/// Mark the event as successfully completed, waking all waiters.
fn trigger(&self, handle: EventHandle) -> Result<()>;
/// Poison the event with the given reason, waking all waiters with an error.
fn poison(&self, handle: EventHandle, reason: Arc<str>) -> Result<()>;
/// Create a future that resolves when the event completes.
fn awaiter(&self, handle: EventHandle) -> Result<EventAwaiter>;
}
// ── Concrete Event ─────────────────────────────────────────────────
/// A single event that can be triggered or poisoned exactly once.
///
/// `Event` is an RAII guard: dropping it without calling [`trigger`](Event::trigger)
/// or [`poison`](Event::poison) automatically poisons the event so waiters are
/// never silently abandoned. To opt out of drop-poisoning (e.g. when handing
/// ownership to a manager-level operation), call [`into_handle`](Event::into_handle).
///
/// `trigger` and `poison` consume `self`, preventing double-completion at
/// compile time.
pub struct Event {
inner: Option<EventInner>,
}
struct EventInner {
handle: EventHandle,
backend: Arc<dyn EventBackend>,
}
impl Event {
/// Create a new event RAII guard.
pub(crate) fn new(handle: EventHandle, backend: Arc<dyn EventBackend>) -> Self {
Self {
inner: Some(EventInner { handle, backend }),
}
}
/// Take the inner state, disarming the drop guard.
fn take_inner(&mut self) -> EventInner {
self.inner.take().expect("event already consumed")
}
/// Return the handle that identifies this event.
pub fn handle(&self) -> EventHandle {
self.inner.as_ref().expect("event already consumed").handle
}
/// Mark the event as successfully completed, waking all waiters.
/// Consumes the event, disarming the drop guard.
pub fn trigger(mut self) -> Result<()> {
let inner = self.take_inner();
inner.backend.trigger(inner.handle)
}
/// Poison the event with the given reason, waking all waiters with an error.
/// Consumes the event, disarming the drop guard.
pub fn poison(mut self, reason: impl Into<Arc<str>>) -> Result<()> {
let inner = self.take_inner();
inner.backend.poison(inner.handle, reason.into())
}
/// Create a future that resolves when this event completes.
pub fn awaiter(&self) -> Result<EventAwaiter> {
let inner = self.inner.as_ref().expect("event already consumed");
inner.backend.awaiter(inner.handle)
}
/// Disarm the drop guard and return the bare handle.
///
/// After this call the event will **not** be auto-poisoned on drop.
/// Use the returned handle with [`EventManager`](crate::EventManager)
/// methods to complete the event manually.
pub fn into_handle(mut self) -> EventHandle {
self.take_inner().handle
}
}
impl Drop for Event {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
let _ = inner
.backend
.poison(inner.handle, Arc::clone(&*DROP_POISON_REASON));
}
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Factory for creating distributed event systems with a system identity.
use std::{num::NonZero, sync::Arc};
use crate::base::EventSystemBase;
use crate::manager::EventManager;
/// Factory that creates an [`EventManager`] pre-configured with a system_id.
///
/// Use this when events need globally-unique handles that embed a non-zero
/// system identifier (e.g. in a Nova-managed distributed system).
///
/// For purely local use, call [`EventManager::local()`] directly instead.
pub struct DistributedEventFactory {
system_id: u64,
base: Arc<EventSystemBase>,
}
impl DistributedEventFactory {
/// Create a new factory (and its backing event system) for the given system.
pub fn new(system_id: NonZero<u64>) -> Self {
Self {
system_id: system_id.get(),
base: EventSystemBase::distributed(system_id.get()),
}
}
/// The system identity stamped into every handle produced by this factory.
pub fn system_id(&self) -> u64 {
self.system_id
}
/// Borrow the underlying event system base.
pub fn system(&self) -> &Arc<EventSystemBase> {
&self.base
}
/// Create an [`EventManager`] backed by this factory's system.
///
/// Currently uses the local backend; a future distributed backend will
/// route remote handles over the network.
pub fn event_manager(&self) -> EventManager {
EventManager::new(self.base.clone(), self.base.clone() as _)
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Unified event handle encoded in a single `u128` value.
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use crate::status::Generation;
const SYSTEM_BITS: u32 = 64;
const LOCAL_BITS: u32 = 32;
const GENERATION_BITS: u32 = 32;
const LOCAL_SHIFT: u32 = GENERATION_BITS;
const SYSTEM_SHIFT: u32 = LOCAL_SHIFT + LOCAL_BITS;
const SYSTEM_MASK: u128 = ((1u128 << SYSTEM_BITS) - 1) << SYSTEM_SHIFT;
const LOCAL_MASK: u128 = ((1u128 << LOCAL_BITS) - 1) << LOCAL_SHIFT;
const GENERATION_MASK: u128 = (1u128 << GENERATION_BITS) - 1;
/// Bit 31 of `local_index` marks handles as local vs distributed.
pub(crate) const LOCAL_FLAG: u32 = 1 << 31;
/// Mask for the counter portion of `local_index` (strips the local flag bit).
pub(crate) const INDEX_COUNTER_MASK: u32 = LOCAL_FLAG - 1;
/// Public event handle encoded in a single u128 value.
///
/// Layout (MSB to LSB): `[system_id: 64 bits][local_index: 32 bits][generation: 32 bits]`
///
/// The `local_index` field uses bit 31 as a local/distributed flag:
/// - Bit 31 = 1: local event (created by `LocalEventSystem::new()`)
/// - Bit 31 = 0: distributed event (created via `DistributedEventFactory`)
///
/// Both local and distributed systems have unique non-zero `system_id` values.
/// Use `is_local()` / `is_distributed()` to check origin type.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct EventHandle(u128);
impl EventHandle {
/// Create a handle with an explicit system id.
pub(crate) fn new(system_id: u64, local_index: u32, generation: Generation) -> Self {
let raw = ((system_id as u128) << SYSTEM_SHIFT)
| ((local_index as u128) << LOCAL_SHIFT)
| (generation as u128);
Self(raw)
}
/// Reconstruct a handle from its raw u128 representation.
pub fn from_raw(raw: u128) -> Self {
Self(raw)
}
/// Return the raw u128 representation.
pub fn raw(&self) -> u128 {
self.0
}
/// Extract the system id (upper 64 bits).
pub fn system_id(&self) -> u64 {
((self.0 & SYSTEM_MASK) >> SYSTEM_SHIFT) as u64
}
/// Extract the local index (middle 32 bits), including the local flag bit.
pub fn local_index(&self) -> u32 {
((self.0 & LOCAL_MASK) >> LOCAL_SHIFT) as u32
}
/// Extract the generation counter (lower 32 bits).
pub fn generation(&self) -> Generation {
(self.0 & GENERATION_MASK) as Generation
}
/// Returns `true` when the handle was created by a local event system.
pub fn is_local(&self) -> bool {
(self.local_index() & LOCAL_FLAG) != 0
}
/// Returns `true` when the handle was created by a distributed event system.
pub fn is_distributed(&self) -> bool {
!self.is_local()
}
/// Extract the counter portion of the local index (strips the flag bit).
pub(crate) fn index_counter(&self) -> u32 {
self.local_index() & INDEX_COUNTER_MASK
}
/// Return a copy of this handle with a different generation.
pub fn with_generation(&self, generation: Generation) -> Self {
Self::new(self.system_id(), self.local_index(), generation)
}
}
impl Display for EventHandle {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(
f,
"EventHandle {{ system={}, index={}, generation={}, {} }}",
self.system_id(),
self.index_counter(),
self.generation(),
if self.is_local() {
"local"
} else {
"distributed"
}
)
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
#![doc = include_str!("../README.md")]
#![deny(missing_docs)]
// Core types
mod event;
mod manager;
// Public types
pub mod factory;
mod handle;
mod status;
// Core event storage engine
mod base;
// Internal synchronization (see docs/slot-state-machine.md)
pub(crate) mod slot;
// ── Re-exports ───────────────────────────────────────────────────────
pub use base::EventSystemBase;
pub use event::{Event, EventBackend};
pub use factory::DistributedEventFactory;
pub use handle::EventHandle;
pub use manager::EventManager;
pub use slot::EventAwaiter;
pub use status::{EventPoison, EventStatus, Generation};
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
use tokio::task::yield_now;
fn create_system() -> EventManager {
EventManager::local()
}
#[tokio::test]
async fn wait_resolves_after_trigger() -> Result<()> {
let system = create_system();
let event = system.new_event()?;
let handle = event.handle();
let waiter = {
let system = system.clone();
tokio::spawn(async move { system.awaiter(handle)?.await })
};
yield_now().await;
event.trigger()?;
waiter.await??;
Ok(())
}
#[tokio::test]
async fn wait_ready_if_triggered_first() -> Result<()> {
let system = create_system();
let event = system.new_event()?;
let handle = event.handle();
event.trigger()?;
system.awaiter(handle)?.await?;
Ok(())
}
#[tokio::test]
async fn poison_is_visible() -> Result<()> {
let system = create_system();
let event = system.new_event()?;
let handle = event.handle();
system.poison(handle, "boom")?;
let err = system.awaiter(handle)?.await.unwrap_err();
let poison = err.downcast::<EventPoison>().unwrap();
assert_eq!(poison.reason(), "boom");
Ok(())
}
#[tokio::test]
async fn entry_reused_after_completion() -> Result<()> {
let system = create_system();
let event = system.new_event()?;
let handle = event.handle();
let index = handle.local_index();
let generation = handle.generation();
event.trigger()?;
system.awaiter(handle)?.await?;
let next = system.new_event()?;
let next_handle = next.handle();
assert_eq!(next_handle.local_index(), index);
assert_eq!(next_handle.generation(), generation + 1);
Ok(())
}
#[tokio::test]
async fn multiple_waiters_wake() -> Result<()> {
let system = create_system();
let event = system.new_event()?;
let handle = event.handle();
let mut waiters = Vec::new();
for _ in 0..8 {
let system_clone = system.clone();
waiters.push(tokio::spawn(
async move { system_clone.awaiter(handle)?.await },
));
}
yield_now().await;
event.trigger()?;
for waiter in waiters {
waiter.await??;
}
Ok(())
}
#[tokio::test]
async fn merge_triggers_after_dependencies() -> Result<()> {
let system = create_system();
let first = system.new_event()?;
let second = system.new_event()?;
let merged = system.merge_events(vec![first.handle(), second.handle()])?;
first.trigger()?;
second.trigger()?;
system.awaiter(merged)?.await?;
Ok(())
}
#[tokio::test]
async fn merge_poison_accumulates_reasons() -> Result<()> {
let system = create_system();
let first = system.new_event()?;
let second = system.new_event()?;
let merged = system.merge_events(vec![first.handle(), second.handle()])?;
system.poison(first.handle(), "first failed")?;
system.poison(second.handle(), "second failed")?;
let err = system.awaiter(merged)?.await.unwrap_err();
let poison = err.downcast::<EventPoison>().unwrap();
assert!(poison.reason().contains("first failed"));
assert!(poison.reason().contains("second failed"));
Ok(())
}
#[tokio::test]
async fn force_shutdown_poison_pending() -> Result<()> {
let system = create_system();
let event = system.new_event()?;
let handle = event.handle();
let waiter = {
let system = system.clone();
tokio::spawn(async move { system.awaiter(handle)?.await })
};
yield_now().await;
system.force_shutdown("shutdown");
let err = waiter.await.unwrap().unwrap_err();
let poison = err.downcast::<EventPoison>().unwrap();
assert_eq!(poison.reason(), "shutdown");
Ok(())
}
#[tokio::test]
async fn new_event_fails_after_force_shutdown() -> Result<()> {
let system = create_system();
let event = system.new_event()?;
system.force_shutdown("shutdown");
let err = match system.new_event() {
Ok(_) => panic!("expected shutdown to block new events"),
Err(err) => err,
};
assert!(err.to_string().contains("shutdown"));
let err = system.awaiter(event.handle())?.await.unwrap_err();
assert!(err.downcast::<EventPoison>().is_ok());
Ok(())
}
#[tokio::test]
async fn force_shutdown_is_idempotent() -> Result<()> {
let system = create_system();
let _ = system.new_event()?;
system.force_shutdown("shutdown");
system.force_shutdown("shutdown");
assert!(system.new_event().is_err());
Ok(())
}
// ── Concrete manager tests ────────────────────────────────────────
fn exercise_manager(mgr: &EventManager) -> Result<()> {
let event = mgr.new_event()?;
let handle = event.into_handle();
assert_eq!(mgr.poll(handle)?, EventStatus::Pending);
mgr.trigger(handle)?;
assert_eq!(mgr.poll(handle)?, EventStatus::Ready);
Ok(())
}
#[tokio::test]
async fn trait_exercise_manager() -> Result<()> {
let system = create_system();
exercise_manager(&system)
}
#[tokio::test]
async fn trait_exercise_drop_poison() -> Result<()> {
let system = create_system();
let event = system.new_event()?;
let handle = event.handle();
{
let _event = event;
// event drops here without trigger → poisons the event
}
let err = system.awaiter(handle)?.await.unwrap_err();
let poison = err.downcast::<EventPoison>().unwrap();
assert!(
poison
.reason()
.contains("event dropped without being triggered")
);
Ok(())
}
#[tokio::test]
async fn trait_exercise_trigger() -> Result<()> {
let system = create_system();
let event = system.new_event()?;
let handle = event.handle();
event.trigger()?;
system.awaiter(handle)?.await?;
Ok(())
}
// ── DistributedEventFactory (factory.rs) ─────────────────────────
#[tokio::test]
async fn distributed_factory_stamps_system_id() -> Result<()> {
use crate::factory::DistributedEventFactory;
let factory = DistributedEventFactory::new(0x42.try_into().unwrap());
assert_eq!(factory.system_id(), 0x42);
let mgr = factory.event_manager();
let event = mgr.new_event()?;
let handle = event.handle();
assert_eq!(handle.system_id(), 0x42);
assert!(handle.is_distributed());
// system() returns the same underlying system
assert!(std::sync::Arc::ptr_eq(factory.system(), mgr.base()));
event.trigger()?;
mgr.awaiter(handle)?.await?;
Ok(())
}
// ── EventHandle accessors (handle.rs) ────────────────────────────
#[test]
fn handle_round_trip_raw() {
let system = create_system();
let event = system.new_event().unwrap();
let handle = event.handle();
let raw = handle.raw();
let reconstructed = EventHandle::from_raw(raw);
assert_eq!(handle, reconstructed);
}
#[test]
fn handle_system_id_local() {
let system = create_system();
let event = system.new_event().unwrap();
let handle = event.handle();
assert_ne!(handle.system_id(), 0);
assert!(handle.is_local());
}
#[test]
fn handle_with_generation() {
let system = create_system();
let event = system.new_event().unwrap();
let handle = event.handle();
let new_handle = handle.with_generation(99);
assert_eq!(new_handle.generation(), 99);
assert_eq!(new_handle.local_index(), handle.local_index());
assert_eq!(new_handle.system_id(), handle.system_id());
}
#[test]
fn handle_display() {
let system = create_system();
let event = system.new_event().unwrap();
let handle = event.handle();
let display = format!("{}", handle);
assert!(display.contains("EventHandle"));
assert!(display.contains("system="));
assert!(display.contains("index="));
assert!(display.contains("generation="));
assert!(display.contains("local"));
}
// ── Event poison / awaiter ──────────────────────────────────────
#[tokio::test]
async fn event_explicit_poison() -> Result<()> {
let system = create_system();
let event = system.new_event()?;
let handle = event.handle();
event.poison("explicit")?;
let err = system.awaiter(handle)?.await.unwrap_err();
let poison = err.downcast::<EventPoison>().unwrap();
assert_eq!(poison.reason(), "explicit");
Ok(())
}
#[tokio::test]
async fn event_awaiter() -> Result<()> {
let system = create_system();
let event = system.new_event()?;
let handle = event.handle();
let awaiter = event.awaiter()?;
// Verify event.handle() still works before consuming
assert_eq!(event.handle(), handle);
event.trigger()?;
awaiter.await?;
Ok(())
}
// ── Event::poison (direct) ──────────────────────────────────────
#[tokio::test]
async fn event_poison_directly() -> Result<()> {
let system = create_system();
let event = system.new_event()?;
let handle = event.handle();
event.poison("direct reason")?;
let err = system.awaiter(handle)?.await.unwrap_err();
let poison = err.downcast::<EventPoison>().unwrap();
assert_eq!(poison.reason(), "direct reason");
Ok(())
}
// ── EventPoison Display and accessors (status.rs) ────────────────
#[tokio::test]
async fn poison_display_and_reason_arc() -> Result<()> {
let system = create_system();
let event = system.new_event()?;
let handle = event.handle();
system.poison(handle, "test reason")?;
let err = system.awaiter(handle)?.await.unwrap_err();
let poison = err.downcast::<EventPoison>().unwrap();
// Display impl
let display = format!("{}", poison);
assert!(display.contains("poisoned"));
assert!(display.contains("test reason"));
// reason_arc accessor
let arc = poison.reason_arc();
assert_eq!(&**arc, "test reason");
// handle accessor
assert_eq!(poison.handle(), handle);
// std::error::Error impl — no source
assert!(std::error::Error::source(&poison).is_none());
Ok(())
}
// ── System-level edge cases ──────────────────────────────────────
#[tokio::test]
async fn poison_reason_helper() -> Result<()> {
let system = create_system();
let event = system.new_event()?;
let handle = event.handle();
system.poison(handle, "oops")?;
let reason = system.poison_reason(handle);
assert!(reason.is_some());
assert_eq!(&*reason.unwrap(), "oops");
Ok(())
}
// ── Local vs distributed flag ────────────────────────────────────
#[test]
fn is_local_vs_distributed() {
// Local system produces local handles
let local = create_system();
let event = local.new_event().unwrap();
let handle = event.handle();
assert!(handle.is_local());
assert!(!handle.is_distributed());
assert_ne!(handle.system_id(), 0);
// Distributed factory produces distributed handles
let factory = DistributedEventFactory::new(0x99.try_into().unwrap());
let mgr = factory.event_manager();
let event = mgr.new_event().unwrap();
let handle = event.handle();
assert!(handle.is_distributed());
assert!(!handle.is_local());
assert_eq!(handle.system_id(), 0x99);
}
// ── Cross-system validation tests ────────────────────────────────
#[tokio::test]
async fn cross_system_awaiter_rejected() -> Result<()> {
let system_a = create_system();
let system_b = create_system();
let event = system_a.new_event()?;
let handle = event.handle();
match system_b.awaiter(handle) {
Ok(_) => panic!("expected error for cross-system awaiter"),
Err(err) => assert!(err.to_string().contains("belongs to system")),
}
Ok(())
}
#[tokio::test]
async fn cross_system_trigger_rejected() -> Result<()> {
let system_a = create_system();
let system_b = create_system();
let event = system_a.new_event()?;
let handle = event.handle();
let err = system_b.trigger(handle).unwrap_err();
assert!(err.to_string().contains("belongs to system"));
Ok(())
}
#[tokio::test]
async fn cross_system_poison_rejected() -> Result<()> {
let system_a = create_system();
let system_b = create_system();
let event = system_a.new_event()?;
let handle = event.handle();
let err = system_b.poison(handle, "bad").unwrap_err();
assert!(err.to_string().contains("belongs to system"));
Ok(())
}
#[tokio::test]
async fn cross_system_poll_rejected() -> Result<()> {
let system_a = create_system();
let system_b = create_system();
let event = system_a.new_event()?;
let handle = event.handle();
let err = system_b.poll(handle).unwrap_err();
assert!(err.to_string().contains("belongs to system"));
Ok(())
}
#[tokio::test]
async fn cross_system_merge_rejected() -> Result<()> {
let system_a = create_system();
let system_b = create_system();
let event = system_a.new_event()?;
let handle = event.handle();
let err = system_b.merge_events(vec![handle]).unwrap_err();
assert!(err.to_string().contains("belongs to system"));
Ok(())
}
#[tokio::test]
async fn cross_type_local_on_distributed_rejected() -> Result<()> {
let local = create_system();
let factory = DistributedEventFactory::new(0x10.try_into().unwrap());
let distributed = factory.event_manager();
let event = local.new_event()?;
let handle = event.handle();
let err = distributed.trigger(handle).unwrap_err();
assert!(err.to_string().contains("belongs to system"));
Ok(())
}
#[tokio::test]
async fn cross_type_distributed_on_local_rejected() -> Result<()> {
let local = create_system();
let factory = DistributedEventFactory::new(0x20.try_into().unwrap());
let distributed = factory.event_manager();
let event = distributed.new_event()?;
let handle = event.handle();
let err = local.trigger(handle).unwrap_err();
assert!(err.to_string().contains("belongs to system"));
Ok(())
}
#[tokio::test]
async fn cross_distributed_systems_rejected() -> Result<()> {
let factory_a = DistributedEventFactory::new(0x30.try_into().unwrap());
let factory_b = DistributedEventFactory::new(0x40.try_into().unwrap());
let mgr_a = factory_a.event_manager();
let mgr_b = factory_b.event_manager();
let event = mgr_a.new_event()?;
let handle = event.handle();
let err = mgr_b.trigger(handle).unwrap_err();
assert!(err.to_string().contains("belongs to system"));
Ok(())
}
// ── slot regression tests ────────────────────────────────────────
#[tokio::test]
async fn race1_no_stale_completion_leakage() -> Result<()> {
// Regression test for Race 1: stale completion visible to new-generation waiters.
//
// Scenario: waiter from gen N is still alive when gen N+1 starts.
// In the old slot module, begin_generation would skip clearing completion
// when waiter_count > 0, causing gen N+1 waiters to see gen N's result.
// The slot design eliminates this structurally.
let system = create_system();
// Gen 1: create event and a waiter (keeps waiter alive across generation boundary)
let event1 = system.new_event()?;
let handle1 = event1.handle();
let _waiter1 = system.awaiter(handle1)?;
// Complete gen 1
event1.trigger()?;
// Gen 2: same entry reused from free list
let event2 = system.new_event()?;
let handle2 = event2.handle();
assert_eq!(handle2.local_index(), handle1.local_index());
assert_eq!(handle2.generation(), handle1.generation() + 1);
// Create waiter for gen 2 — must be Pending, not stale Ready from gen 1
let waiter2 = system.awaiter(handle2)?;
let waker = futures::task::noop_waker();
let mut cx = std::task::Context::from_waker(&waker);
let mut waiter2 = waiter2;
let poll = std::pin::Pin::new(&mut waiter2).poll(&mut cx);
assert!(
poll.is_pending(),
"Gen N+1 waiter should be Pending, not resolved with stale completion"
);
// Complete gen 2 and verify it resolves
event2.trigger()?;
waiter2.await?;
Ok(())
}
#[tokio::test]
async fn stale_waiter_resolves_after_generation_transition() -> Result<()> {
// Test that a waiter from gen N resolves correctly even after gen N+1 starts.
let system = create_system();
let event1 = system.new_event()?;
let handle1 = event1.handle();
// Create a waiter for gen 1
let waiter1 = system.awaiter(handle1)?;
// Complete gen 1
event1.trigger()?;
// Start gen 2 (same entry reused)
let event2 = system.new_event()?;
assert_eq!(event2.handle().local_index(), handle1.local_index());
// Waiter from gen 1 should still resolve correctly
waiter1.await?;
Ok(())
}
#[tokio::test]
async fn stale_waiter_with_poison_resolves_after_generation_transition() -> Result<()> {
// Poisoned gen N waiter resolves correctly after gen N+1 begins.
let system = create_system();
let event1 = system.new_event()?;
let handle1 = event1.handle();
let waiter1 = system.awaiter(handle1)?;
// Poison gen 1
system.poison(handle1, "gen1 failed")?;
// Start gen 2
let _event2 = system.new_event()?;
// Waiter from gen 1 should see the poison
let err = waiter1.await.unwrap_err();
let poison = err.downcast::<EventPoison>()?;
assert_eq!(poison.reason(), "gen1 failed");
Ok(())
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Concrete [`EventManager`] that ties lifecycle and routing together.
use anyhow::Result;
use std::sync::Arc;
use crate::base::EventSystemBase;
use crate::event::{Event, EventBackend};
use crate::handle::EventHandle;
use crate::slot::EventAwaiter;
use crate::status::EventStatus;
/// Manages a collection of events — creating, triggering, poisoning, and
/// merging them.
///
/// `EventManager` is `Clone` and `Send + Sync`, so it can be cheaply shared
/// across async tasks.
///
/// # Local vs distributed
///
/// [`EventManager::local()`] creates a purely local manager backed by
/// [`EventSystemBase`]. For distributed setups, construct a manager with
/// [`EventManager::new()`] providing a custom [`EventBackend`] that routes
/// remote handles over the network.
#[derive(Clone)]
pub struct EventManager {
base: Arc<EventSystemBase>,
backend: Arc<dyn EventBackend>,
}
impl EventManager {
/// Create a purely local event manager.
///
/// The [`EventSystemBase`] is used as both the lifecycle store and
/// the completion backend.
pub fn local() -> Self {
let base = EventSystemBase::local();
let backend = base.clone() as Arc<dyn EventBackend>;
Self { base, backend }
}
/// Create an event manager with a custom backend for routing.
///
/// Used for distributed setups where trigger/poison/awaiter may be routed
/// over the network.
pub fn new(base: Arc<EventSystemBase>, backend: Arc<dyn EventBackend>) -> Self {
Self { base, backend }
}
/// The system identity stamped into every handle produced by this manager.
pub fn system_id(&self) -> u64 {
self.base.system_id()
}
/// Borrow the underlying event system base.
pub fn base(&self) -> &Arc<EventSystemBase> {
&self.base
}
/// Allocate a new pending event.
pub fn new_event(&self) -> Result<Event> {
self.base.new_event_with_backend(self.backend.clone())
}
/// Create a future that resolves when the given event completes.
pub fn awaiter(&self, handle: EventHandle) -> Result<EventAwaiter> {
self.backend.awaiter(handle)
}
/// Non-blocking status check.
pub fn poll(&self, handle: EventHandle) -> Result<EventStatus> {
self.base.poll_inner(handle)
}
/// Trigger the event identified by `handle`.
pub fn trigger(&self, handle: EventHandle) -> Result<()> {
self.backend.trigger(handle)
}
/// Poison the event identified by `handle` with the given reason.
pub fn poison(&self, handle: EventHandle, reason: impl Into<Arc<str>>) -> Result<()> {
self.backend.poison(handle, reason.into())
}
/// Create a new event that completes when **all** `inputs` complete.
///
/// If any input is poisoned the merged event is poisoned with the
/// accumulated reasons.
pub fn merge_events(&self, inputs: Vec<EventHandle>) -> Result<EventHandle> {
self.base.merge_events_with(inputs, self.backend.clone())
}
/// Poison every pending event and reject future allocations.
pub fn force_shutdown(&self, reason: impl Into<Arc<str>>) {
self.base.force_shutdown_inner(reason)
}
/// Return the poison reason for a completed generation, if any.
#[allow(dead_code)]
pub(crate) fn poison_reason(&self, handle: EventHandle) -> Option<Arc<str>> {
self.base.poison_reason(handle)
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use crate::status::EventPoison;
pub(crate) type PoisonArc = Arc<EventPoison>;
#[derive(Clone, Debug)]
pub(crate) enum CompletionKind {
Triggered,
Poisoned(PoisonArc),
}
impl CompletionKind {
pub(crate) fn as_result(&self) -> Result<(), EventPoison> {
match self {
Self::Triggered => Ok(()),
Self::Poisoned(poison) => Err((**poison).clone()),
}
}
}
pub(crate) enum WaitRegistration {
Ready,
Pending,
Poisoned(PoisonArc),
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use anyhow::anyhow;
use parking_lot::Mutex as ParkingMutex;
use std::collections::BTreeMap;
use std::fmt::{self, Display, Formatter};
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use super::completion::{CompletionKind, PoisonArc, WaitRegistration};
use crate::handle::EventHandle;
use crate::status::{EventStatus, Generation};
const MAX_GENERATION: Generation = Generation::MAX;
const GENERATION_BITS: u32 = 32;
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub(crate) struct EventKey {
index: u32,
}
impl EventKey {
pub(crate) fn new(index: u32) -> Self {
Self { index }
}
pub(crate) fn from_handle(handle: EventHandle) -> Self {
Self {
index: handle.local_index(),
}
}
pub(crate) fn handle(&self, system_id: u64, generation: Generation) -> EventHandle {
EventHandle::new(system_id, self.index, generation)
}
}
impl Display for EventKey {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "EventKey(index={})", self.index)
}
}
#[derive(Debug)]
pub(crate) enum EventEntryError {
ActiveGeneration {
key: EventKey,
active: Generation,
},
GenerationOverflow {
key: EventKey,
},
InvalidGeneration {
key: EventKey,
requested: Generation,
active: Option<Generation>,
},
AlreadyCompleted {
key: EventKey,
generation: Generation,
},
}
impl Display for EventEntryError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::ActiveGeneration { key, active } => {
write!(f, "Event {} already has active generation {}", key, active)
}
Self::GenerationOverflow { key } => {
write!(
f,
"Event {} exhausted generation space ({} bits)",
key, GENERATION_BITS
)
}
Self::InvalidGeneration {
key,
requested,
active,
} => match active {
Some(current) => write!(
f,
"Invalid generation {} for event {}; active generation {}",
requested, key, current
),
None => write!(
f,
"Invalid generation {} for event {}; no active generation",
requested, key
),
},
Self::AlreadyCompleted { key, generation } => {
write!(
f,
"Event {} generation {} already completed successfully",
key, generation
)
}
}
}
}
impl std::error::Error for EventEntryError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
None
}
}
pub(crate) type EventEntryResult<T> = std::result::Result<T, EventEntryError>;
/// Outcome of an atomic try-to-poison operation.
#[derive(Debug)]
pub(crate) enum PoisonOutcome {
/// Successfully poisoned. Caller must recycle the entry.
Poisoned,
/// Already poisoned (idempotent success). No recycling needed.
AlreadyPoisoned,
}
/// Owner-side event entry reused across generations.
///
/// All state mutations are serialized through a single `ParkingMutex<EventState>`,
/// eliminating the stale-completion race present in the original `slot` module.
pub(crate) struct EventEntry {
key: EventKey,
state: ParkingMutex<EventState>,
}
impl EventEntry {
pub(crate) fn new(key: EventKey) -> Self {
Self {
key,
state: ParkingMutex::new(EventState::default()),
}
}
pub(crate) fn key(&self) -> EventKey {
self.key
}
/// Advance to the next generation.
///
/// Flushes any stale wakers from the previous generation so they re-poll
/// and resolve via the `observed_generation <= last_triggered` check.
pub(crate) fn begin_generation(&self) -> EventEntryResult<Generation> {
let stale_wakers;
let next;
{
let mut state = self.state.lock();
if let Some(active) = state.active_generation {
return Err(EventEntryError::ActiveGeneration {
key: self.key,
active,
});
}
if state.last_triggered == MAX_GENERATION || state.retired {
return Err(EventEntryError::GenerationOverflow { key: self.key });
}
next = state
.last_triggered
.checked_add(1)
.expect("checked for overflow above");
// Flush stale wakers from the previous generation.
stale_wakers = std::mem::take(&mut state.wakers);
state.active_generation = Some(next);
}
// Wake stale wakers outside lock to reduce contention.
for waker in stale_wakers {
waker.wake();
}
Ok(next)
}
pub(crate) fn status_for(&self, generation: Generation) -> EventStatus {
let state = self.state.lock();
if generation <= state.last_triggered {
if state.poisoned.contains_key(&generation) {
EventStatus::Poisoned
} else {
EventStatus::Ready
}
} else {
EventStatus::Pending
}
}
pub(crate) fn register_local_waiter(
&self,
generation: Generation,
) -> EventEntryResult<WaitRegistration> {
let state = self.state.lock();
if generation <= state.last_triggered {
if let Some(poison) = state.poisoned.get(&generation) {
return Ok(WaitRegistration::Poisoned(poison.clone()));
}
return Ok(WaitRegistration::Ready);
}
match state.active_generation {
Some(active) if active == generation => Ok(WaitRegistration::Pending),
Some(active) => Err(EventEntryError::InvalidGeneration {
key: self.key,
requested: generation,
active: Some(active),
}),
None => Err(EventEntryError::InvalidGeneration {
key: self.key,
requested: generation,
active: None,
}),
}
}
/// Complete the current generation with the given result.
///
/// Stores poison history (if applicable) and wakes all registered waiters.
/// Both the state update and waker drain happen under the same lock
/// acquisition, preventing the stale-completion race (Race 1) and the
/// drop-then-signal fragility (Race 2) present in the original `slot` module.
pub(crate) fn finalize_completion(
&self,
generation: Generation,
completion: CompletionKind,
) -> EventEntryResult<()> {
let wakers;
{
let mut state = self.state.lock();
if state.active_generation != Some(generation) {
return Err(EventEntryError::InvalidGeneration {
key: self.key,
requested: generation,
active: state.active_generation,
});
}
state.last_triggered = generation;
state.active_generation = None;
match &completion {
CompletionKind::Poisoned(poison) => {
state.poisoned.insert(generation, poison.clone());
}
CompletionKind::Triggered => {
state.poisoned.remove(&generation);
}
}
wakers = std::mem::take(&mut state.wakers);
}
// Wake all registered waiters outside the lock.
for waker in wakers {
waker.wake();
}
Ok(())
}
/// Atomically attempt to poison the given generation.
///
/// Holds the entry lock across both the status check and the state
/// transition, eliminating the TOCTOU window present when `status_for`
/// and `finalize_completion` are called separately.
pub(crate) fn try_to_poison(
&self,
generation: Generation,
poison: PoisonArc,
) -> EventEntryResult<PoisonOutcome> {
let wakers;
{
let mut state = self.state.lock();
if generation <= state.last_triggered {
return if state.poisoned.contains_key(&generation) {
Ok(PoisonOutcome::AlreadyPoisoned)
} else {
Err(EventEntryError::AlreadyCompleted {
key: self.key,
generation,
})
};
}
if state.active_generation != Some(generation) {
return Err(EventEntryError::InvalidGeneration {
key: self.key,
requested: generation,
active: state.active_generation,
});
}
// Transition to poisoned (same mutations as finalize_completion)
state.last_triggered = generation;
state.active_generation = None;
state.poisoned.insert(generation, poison);
wakers = std::mem::take(&mut state.wakers);
}
for waker in wakers {
waker.wake();
}
Ok(PoisonOutcome::Poisoned)
}
/// Poll for waiter resolution, called by [`super::waiter::EventAwaiter::poll`].
///
/// Checks the entry state under lock and either returns a result or
/// registers the provided waker for future notification.
pub(crate) fn poll_waiter(
&self,
observed_generation: Generation,
cx: &mut Context<'_>,
) -> Poll<anyhow::Result<()>> {
let mut state = self.state.lock();
// Check if our generation has completed.
if observed_generation <= state.last_triggered {
if let Some(poison) = state.poisoned.get(&observed_generation) {
return Poll::Ready(Err(anyhow::Error::new((**poison).clone())));
}
return Poll::Ready(Ok(()));
}
// Generation not yet completed — check if still active.
if state.active_generation.is_none() {
return Poll::Ready(Err(anyhow!("generation expired without completion")));
}
// Register waker with deduplication (critical for select! loops).
let waker = cx.waker();
if let Some(existing) = state.wakers.iter_mut().find(|w| w.will_wake(waker)) {
existing.clone_from(waker);
} else {
state.wakers.push(waker.clone());
}
Poll::Pending
}
pub(crate) fn retire(&self) {
let wakers;
{
let mut state = self.state.lock();
debug_assert!(
state.wakers.is_empty(),
"retire() called with {} registered wakers on {:?}",
state.wakers.len(),
self.key,
);
state.retired = true;
state.active_generation = None;
wakers = std::mem::take(&mut state.wakers);
}
for waker in wakers {
waker.wake();
}
}
pub(crate) fn is_retired(&self) -> bool {
let state = self.state.lock();
state.retired
}
pub(crate) fn active_handle(&self, system_id: u64) -> Option<EventHandle> {
let generation = {
let state = self.state.lock();
if state.retired {
return None;
}
state.active_generation
}?;
Some(self.key.handle(system_id, generation))
}
#[allow(dead_code)]
pub(crate) fn poison_reason(&self, generation: Generation) -> Option<Arc<str>> {
let state = self.state.lock();
state
.poisoned
.get(&generation)
.map(|p| Arc::<str>::from(p.reason().to_string()))
}
}
/// Per-entry state protected by a single mutex.
///
/// All fields are read and written under the same lock, which structurally
/// prevents the races present in the original two-lock (`EventState` +
/// `SlotStateInner`) design.
struct EventState {
/// Highest generation that has completed (triggered or poisoned).
last_triggered: Generation,
/// Currently pending generation, if any.
active_generation: Option<Generation>,
/// Registered wakers from pending `EventAwaiter` futures.
wakers: Vec<Waker>,
/// Poison history keyed by generation.
poisoned: BTreeMap<Generation, PoisonArc>,
/// Permanently unusable (generation space exhausted).
retired: bool,
}
impl Default for EventState {
fn default() -> Self {
Self {
last_triggered: 0,
active_generation: None,
wakers: Vec::with_capacity(2),
poisoned: BTreeMap::new(),
retired: false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Wake, Waker};
fn make_entry(index: u32) -> EventEntry {
EventEntry::new(EventKey::new(index))
}
#[test]
fn entry_error_active_generation() {
let entry = make_entry(0);
entry.begin_generation().unwrap(); // generation 1 now active
let err = entry.begin_generation().unwrap_err();
let msg = format!("{}", err);
assert!(msg.contains("already has active generation"));
}
#[test]
fn entry_error_generation_overflow() {
let entry = make_entry(1);
entry.retire();
let err = entry.begin_generation().unwrap_err();
let msg = format!("{}", err);
assert!(msg.contains("exhausted generation space"));
}
#[test]
fn entry_error_invalid_generation_waiter() {
let entry = make_entry(2);
let generation = entry.begin_generation().unwrap();
// Request a waiter for a generation that doesn't match
match entry.register_local_waiter(generation + 99) {
Err(err) => {
let msg = format!("{}", err);
assert!(msg.contains("Invalid generation"));
assert!(msg.contains("active generation"));
}
Ok(_) => panic!("expected InvalidGeneration error"),
}
}
#[test]
fn entry_error_invalid_generation_no_active() {
let entry = make_entry(3);
// No active generation at all
match entry.register_local_waiter(1) {
Err(err) => {
let msg = format!("{}", err);
assert!(msg.contains("Invalid generation"));
assert!(msg.contains("no active generation"));
}
Ok(_) => panic!("expected InvalidGeneration error"),
}
}
#[test]
fn entry_key_display() {
let key = EventKey::new(42);
let display = format!("{}", key);
assert!(display.contains("EventKey"));
assert!(display.contains("42"));
}
#[test]
fn entry_active_handle_when_retired() {
let entry = make_entry(4);
entry.begin_generation().unwrap();
entry.retire();
assert!(entry.active_handle(0).is_none());
assert!(entry.is_retired());
}
#[test]
fn entry_active_handle_when_active() {
let entry = make_entry(5);
let generation = entry.begin_generation().unwrap();
let handle = entry.active_handle(0);
assert!(handle.is_some());
assert_eq!(handle.unwrap().generation(), generation);
}
#[test]
fn entry_error_source() {
let entry = make_entry(6);
entry.begin_generation().unwrap();
let err = entry.begin_generation().unwrap_err();
assert!(std::error::Error::source(&err).is_none());
}
#[test]
fn entry_status_for_pending_and_ready() {
let entry = make_entry(7);
let generation = entry.begin_generation().unwrap();
assert_eq!(entry.status_for(generation), EventStatus::Pending);
// Trigger it
entry
.finalize_completion(generation, CompletionKind::Triggered)
.unwrap();
assert_eq!(entry.status_for(generation), EventStatus::Ready);
// Past generations are Ready
assert_eq!(entry.status_for(0), EventStatus::Ready);
}
#[test]
fn entry_status_for_poisoned() {
let entry = make_entry(8);
let generation = entry.begin_generation().unwrap();
let handle = entry.key().handle(0, generation);
let poison = Arc::new(crate::status::EventPoison::new(handle, "test"));
entry
.finalize_completion(generation, CompletionKind::Poisoned(poison))
.unwrap();
assert_eq!(entry.status_for(generation), EventStatus::Poisoned);
}
#[test]
fn entry_poison_reason() {
let entry = make_entry(9);
let generation = entry.begin_generation().unwrap();
let handle = entry.key().handle(0, generation);
let poison = Arc::new(crate::status::EventPoison::new(handle, "oops"));
entry
.finalize_completion(generation, CompletionKind::Poisoned(poison))
.unwrap();
let reason = entry.poison_reason(generation);
assert_eq!(&*reason.unwrap(), "oops");
}
#[derive(Default)]
struct CountingWake {
count: AtomicUsize,
}
impl Wake for CountingWake {
fn wake(self: Arc<Self>) {
self.count.fetch_add(1, Ordering::SeqCst);
}
}
fn counting_waker() -> (Arc<CountingWake>, Waker) {
let state = Arc::new(CountingWake::default());
let waker = Waker::from(Arc::clone(&state));
(state, waker)
}
#[test]
fn poll_waiter_deduplicates_waker_registrations() {
let entry = make_entry(10);
let generation = entry.begin_generation().unwrap();
let (wake_state, waker) = counting_waker();
let mut cx = Context::from_waker(&waker);
assert!(entry.poll_waiter(generation, &mut cx).is_pending());
assert!(entry.poll_waiter(generation, &mut cx).is_pending());
{
let state = entry.state.lock();
assert_eq!(state.wakers.len(), 1, "waker should be deduplicated");
}
entry
.finalize_completion(generation, CompletionKind::Triggered)
.unwrap();
assert_eq!(wake_state.count.load(Ordering::SeqCst), 1);
}
#[test]
fn finalize_completion_wakes_all_distinct_waiters() {
let entry = make_entry(11);
let generation = entry.begin_generation().unwrap();
let (first_state, first_waker) = counting_waker();
let (second_state, second_waker) = counting_waker();
let mut first_cx = Context::from_waker(&first_waker);
let mut second_cx = Context::from_waker(&second_waker);
assert!(entry.poll_waiter(generation, &mut first_cx).is_pending());
assert!(entry.poll_waiter(generation, &mut second_cx).is_pending());
entry
.finalize_completion(generation, CompletionKind::Triggered)
.unwrap();
assert_eq!(first_state.count.load(Ordering::SeqCst), 1);
assert_eq!(second_state.count.load(Ordering::SeqCst), 1);
}
#[test]
fn begin_generation_flushes_stale_wakers() {
let entry = make_entry(12);
let generation = entry.begin_generation().unwrap();
entry
.finalize_completion(generation, CompletionKind::Triggered)
.unwrap();
let (wake_state, stale_waker) = counting_waker();
{
let mut state = entry.state.lock();
state.wakers.push(stale_waker);
}
let next_generation = entry.begin_generation().unwrap();
assert_eq!(next_generation, generation + 1);
assert_eq!(wake_state.count.load(Ordering::SeqCst), 1);
}
#[test]
fn try_to_poison_pending_succeeds() {
let entry = make_entry(14);
let generation = entry.begin_generation().unwrap();
let handle = entry.key().handle(0, generation);
let poison = Arc::new(crate::status::EventPoison::new(handle, "boom"));
match entry.try_to_poison(generation, poison).unwrap() {
PoisonOutcome::Poisoned => {}
PoisonOutcome::AlreadyPoisoned => panic!("expected Poisoned"),
}
assert_eq!(entry.status_for(generation), EventStatus::Poisoned);
}
#[test]
fn try_to_poison_already_poisoned_is_idempotent() {
let entry = make_entry(15);
let generation = entry.begin_generation().unwrap();
let handle = entry.key().handle(0, generation);
let poison = Arc::new(crate::status::EventPoison::new(handle, "first"));
match entry.try_to_poison(generation, poison).unwrap() {
PoisonOutcome::Poisoned => {}
PoisonOutcome::AlreadyPoisoned => panic!("expected Poisoned on first call"),
}
let poison2 = Arc::new(crate::status::EventPoison::new(handle, "second"));
match entry.try_to_poison(generation, poison2).unwrap() {
PoisonOutcome::AlreadyPoisoned => {}
PoisonOutcome::Poisoned => panic!("expected AlreadyPoisoned on second call"),
}
}
#[test]
fn try_to_poison_already_triggered_returns_error() {
let entry = make_entry(16);
let generation = entry.begin_generation().unwrap();
entry
.finalize_completion(generation, CompletionKind::Triggered)
.unwrap();
let handle = entry.key().handle(0, generation);
let poison = Arc::new(crate::status::EventPoison::new(handle, "too late"));
let err = entry.try_to_poison(generation, poison).unwrap_err();
let msg = format!("{}", err);
assert!(msg.contains("already completed successfully"), "got: {msg}");
}
#[test]
fn try_to_poison_invalid_generation() {
let entry = make_entry(17);
let _generation = entry.begin_generation().unwrap();
let handle = entry.key().handle(0, 999);
let poison = Arc::new(crate::status::EventPoison::new(handle, "wrong gen"));
let err = entry.try_to_poison(999, poison).unwrap_err();
let msg = format!("{}", err);
assert!(msg.contains("Invalid generation"), "got: {msg}");
}
#[test]
fn try_to_poison_wakes_waiters() {
let entry = make_entry(18);
let generation = entry.begin_generation().unwrap();
let (wake_state, waker) = counting_waker();
let mut cx = Context::from_waker(&waker);
assert!(entry.poll_waiter(generation, &mut cx).is_pending());
let handle = entry.key().handle(0, generation);
let poison = Arc::new(crate::status::EventPoison::new(handle, "wake test"));
entry.try_to_poison(generation, poison).unwrap();
assert_eq!(wake_state.count.load(Ordering::SeqCst), 1);
}
#[test]
fn retire_wakes_registered_wakers() {
let entry = make_entry(13);
let generation = entry.begin_generation().unwrap();
let (wake_state, waker) = counting_waker();
let mut cx = Context::from_waker(&waker);
// Register a waker by polling the pending generation.
assert!(entry.poll_waiter(generation, &mut cx).is_pending());
// Retire the entry — in debug builds the debug_assert fires (catching
// the invariant violation), in release builds the wakers are defensively
// drained and woken.
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
entry.retire();
}));
if cfg!(debug_assertions) {
assert!(
result.is_err(),
"debug_assert should fire when wakers are registered"
);
} else {
result.expect("retire() should not panic in release");
assert_eq!(wake_state.count.load(Ordering::SeqCst), 1);
assert!(entry.is_retired());
}
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Single-lock synchronization primitives for the event system.
//!
//! All per-entry state — generation tracking, completion status, and waker
//! registration — is consolidated under a single `parking_lot::Mutex`,
//! eliminating stale-completion races by construction.
//!
//! See `docs/slot-state-machine.md` for the formal state machine specification.
mod completion;
pub(crate) mod entry;
mod waiter;
pub(crate) use completion::{CompletionKind, PoisonArc, WaitRegistration};
pub(crate) use entry::{EventEntry, EventKey, PoisonOutcome};
pub use waiter::EventAwaiter;
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use super::completion::CompletionKind;
use super::entry::EventEntry;
use crate::status::Generation;
/// Future that waits for an event to complete.
///
/// This can be used in `tokio::select!` and polled multiple times efficiently.
/// Waker deduplication inside the entry lock prevents unbounded growth.
pub struct EventAwaiter {
entry: Option<Arc<EventEntry>>,
observed_generation: Generation,
immediate_result: Option<Arc<CompletionKind>>,
}
impl EventAwaiter {
/// Creates a waiter that immediately resolves with the given result.
#[allow(private_interfaces)]
pub(crate) fn immediate(result: Arc<CompletionKind>) -> Self {
Self {
entry: None,
observed_generation: 0,
immediate_result: Some(result),
}
}
/// Creates a waiter that will poll the entry for completion.
#[allow(private_interfaces)]
pub(crate) fn pending(entry: Arc<EventEntry>, generation: Generation) -> Self {
Self {
entry: Some(entry),
observed_generation: generation,
immediate_result: None,
}
}
}
impl Future for EventAwaiter {
type Output = anyhow::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// Fast path: immediate result (already-completed event)
if let Some(result) = &this.immediate_result {
return Poll::Ready(result.as_ref().as_result().map_err(anyhow::Error::new));
}
let entry = this
.entry
.as_ref()
.expect("EventAwaiter with no entry or immediate_result");
entry.poll_waiter(this.observed_generation, cx)
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Event status types shared across local and distributed implementations.
use std::fmt::{self, Display, Formatter};
use std::sync::Arc;
use crate::handle::EventHandle;
/// Alias for event generation counters.
pub type Generation = u32;
/// Status returned from non-blocking event queries.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[allow(missing_docs)]
pub enum EventStatus {
Pending,
Ready,
Poisoned,
}
/// Describes a poisoned event generation.
#[derive(Clone, Debug)]
pub struct EventPoison {
handle: EventHandle,
reason: Arc<str>,
}
impl EventPoison {
/// Create a new poisoned event.
pub fn new(handle: EventHandle, reason: impl Into<Arc<str>>) -> Self {
Self {
handle,
reason: reason.into(),
}
}
/// Get the handle of the poisoned event.
pub fn handle(&self) -> EventHandle {
self.handle
}
/// Get the reason of the poisoned event.
pub fn reason(&self) -> &str {
&self.reason
}
/// Get the reason of the poisoned event as an `Arc<str>`.
pub fn reason_arc(&self) -> &Arc<str> {
&self.reason
}
}
impl Display for EventPoison {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "Event {} poisoned: {}", self.handle, self.reason())
}
}
impl std::error::Error for EventPoison {}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment