// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 use anyhow::{Result, anyhow, bail}; use dashmap::DashMap; use parking_lot::Mutex as ParkingMutex; use std::collections::VecDeque; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use tokio_util::task::TaskTracker; use tracing::{error, trace}; use crate::event::{Event, EventBackend}; use crate::handle::{EventHandle, LOCAL_FLAG}; use crate::slot::{ CompletionKind, EventAwaiter, EventEntry, EventKey, PoisonArc, PoisonOutcome, WaitRegistration, }; use crate::status::{EventPoison, EventStatus}; /// Maximum counter value for local indices (31-bit counter space, ~2B entries). const MAX_LOCAL_INDEX: u32 = (1u32 << 31) - 1; /// Core event storage, allocation, and recycling engine. /// /// Handles event storage, allocation, recycling, and generation tracking. /// This is the implementation backing [`EventManager`](crate::EventManager). /// Events created by an `EventSystemBase` are bound to that system. Passing /// a handle from one system to another will return an error. /// /// `EventSystemBase` also implements [`EventBackend`] for the local path, /// so it can be used directly as both the base and the backend for local-only /// setups. For distributed setups, implement [`EventBackend`] on your own type /// and delegate local operations to the `_inner` methods on `EventSystemBase`. pub struct EventSystemBase { system_id: u64, is_local: bool, events: DashMap>, free_lists: ParkingMutex>>, next_local_index: AtomicU32, tasks: TaskTracker, shutdown: AtomicBool, } impl EventSystemBase { /// Create a new local event system with a random system_id. /// /// The system_id is derived from `xxh3_64(Uuid::new_v4())` to ensure /// each local system is uniquely identifiable. Handles produced by this /// system have bit 31 set in their `local_index` to mark them as local. /// /// Events created by this system can only be triggered, awaited, poisoned, /// or polled through this same system instance. pub fn local() -> Arc { let system_id = xxhash_rust::xxh3::xxh3_64(uuid::Uuid::new_v4().as_bytes()); Self::create(system_id, true) } /// Create a system pre-configured with a system_id for distributed use. /// /// Handles produced by this system do **not** have the local flag set, /// distinguishing them from local handles. pub fn distributed(system_id: u64) -> Arc { Self::create(system_id, false) } fn create(system_id: u64, is_local: bool) -> Arc { Arc::new(Self { system_id, is_local, events: DashMap::new(), free_lists: ParkingMutex::new(VecDeque::new()), next_local_index: AtomicU32::new(0), tasks: TaskTracker::new(), shutdown: AtomicBool::new(false), }) } /// The unique system identity stamped into every handle produced by this system. pub fn system_id(&self) -> u64 { self.system_id } // ── Ownership validation ───────────────────────────────────────── fn validate_handle(&self, handle: EventHandle) -> Result<()> { if handle.system_id() != self.system_id { bail!( "Handle {} belongs to system {:#x}, not this system {:#x}", handle, handle.system_id(), self.system_id, ); } Ok(()) } // ── Backend-aware event creation ───────────────────────────────── /// Allocate a new pending event, using `backend` for the RAII guard's /// completion routing. pub(crate) fn new_event_with_backend( self: &Arc, backend: Arc, ) -> Result { if self.is_shutdown() { bail!("Event system shutdown in progress"); } loop { let entry = self.allocate_entry()?; match entry.begin_generation() { Ok(generation) => { if self.is_shutdown() { let handle = entry.key().handle(self.system_id, generation); let poison = Arc::new(EventPoison::new( handle, "Event system shutdown in progress", )); let _ = self.poison_local_entry(entry, handle, poison); bail!("Event system shutdown in progress"); } let handle = entry.key().handle(self.system_id, generation); return Ok(Event::new(handle, backend)); } Err(crate::slot::entry::EventEntryError::GenerationOverflow { key }) => { trace!( ?key, "retiring event entry after exhausting generation space" ); self.retire_entry(entry); continue; } Err(err) => { self.recycle_entry(entry); return Err(err.into()); } } } } /// Merge events, using `backend` for the spawned task's completion routing. pub(crate) fn merge_events_with( self: &Arc, inputs: Vec, backend: Arc, ) -> Result { if inputs.is_empty() { bail!("Cannot merge empty event list"); } for input in &inputs { self.validate_handle(*input)?; } let merged = self.new_event_with_backend(backend.clone())?; // Disarm the RAII guard — the spawned task owns completion via handle. let handle = merged.into_handle(); let system = Arc::clone(self); self.tasks.spawn(async move { let mut failure_reasons: Option>> = None; for dependency in &inputs { let wait_result = match backend.awaiter(*dependency) { Ok(waiter) => waiter.await, Err(err) => Err(err), }; match wait_result { Ok(()) => {} Err(err) => { let reason = match err.downcast::() { Ok(poison) => format!( "Merge dependency {} poisoned: {}", dependency, poison.reason() ), Err(other) => { format!("Merge dependency {} failed: {}", dependency, other) } }; let reason_arc: Arc = Arc::from(reason); error!("{}", &*reason_arc); failure_reasons .get_or_insert_with(Vec::new) .push(reason_arc); } } } let result = match failure_reasons { None => backend.trigger(handle), Some(reasons) => { if reasons.len() == 1 { backend.poison(handle, reasons[0].clone()) } else { let mut message = String::from("Multiple merge dependencies failed:\n"); for (idx, reason) in reasons.iter().enumerate() { if idx > 0 { message.push('\n'); } message.push_str(reason.as_ref()); } backend.poison(handle, Arc::from(message)) } } }; if let Err(e) = result { error!("Failed to complete merged event {}: {}", handle, e); } drop(system); // ensure system lives until the task completes }); Ok(handle) } // ── Public inner methods (for distributed backends) ────────────── /// Trigger a local event by handle. Validates that the handle belongs to this system. /// /// Distributed backends should call this for handles that belong to the local system. pub fn trigger_inner(&self, handle: EventHandle) -> Result<()> { self.validate_handle(handle)?; let entry = self .events .get(&EventKey::from_handle(handle)) .map(|guard| guard.clone()) .ok_or_else(|| anyhow!("Unknown event {}", handle))?; self.trigger_local_entry(entry, handle) } /// Poison a local event by handle. Validates that the handle belongs to this system. /// /// Distributed backends should call this for handles that belong to the local system. pub fn poison_inner(&self, handle: EventHandle, reason: impl Into>) -> Result<()> { self.validate_handle(handle)?; let reason: Arc = reason.into(); let entry = self .events .get(&EventKey::from_handle(handle)) .map(|guard| guard.clone()) .ok_or_else(|| anyhow!("Unknown event {}", handle))?; let poison = Arc::new(EventPoison::new(handle, reason)); self.poison_local_entry(entry, handle, poison) } /// Create a future that resolves when the local event completes. /// Validates that the handle belongs to this system. /// /// Distributed backends should call this for handles that belong to the local system. pub fn awaiter_inner(&self, handle: EventHandle) -> Result { self.validate_handle(handle)?; self.wait_local(handle) } pub(crate) fn poll_inner(&self, handle: EventHandle) -> Result { self.validate_handle(handle)?; self.poll_local(handle) } pub(crate) fn force_shutdown_inner(&self, reason: impl Into>) { let was_shutdown = self.shutdown.swap(true, Ordering::SeqCst); if was_shutdown { return; } let reason: Arc = reason.into(); let mut pending = Vec::new(); for entry in self.events.iter() { if let Some(handle) = entry.value().active_handle(self.system_id) { pending.push((entry.value().clone(), handle)); } } for (entry, handle) in pending { let poison = Arc::new(EventPoison::new(handle, Arc::clone(&reason))); if let Err(err) = self.poison_local_entry(entry, handle, poison) { error!("force_shutdown: failed to poison {}: {}", handle, err); } } self.free_lists.lock().clear(); } // ── Low-level helpers ───────────────────────────────────────────── /// Return the poison reason for a completed generation, if any. #[allow(dead_code)] pub(crate) fn poison_reason(&self, handle: EventHandle) -> Option> { let entry = self.events.get(&EventKey::from_handle(handle))?; entry.poison_reason(handle.generation()) } pub(crate) fn trigger_local_entry( &self, entry: Arc, handle: EventHandle, ) -> Result<()> { self.complete_local_entry(entry, handle, CompletionKind::Triggered) } pub(crate) fn poison_local_entry( &self, entry: Arc, handle: EventHandle, poison: PoisonArc, ) -> Result<()> { match entry .try_to_poison(handle.generation(), poison) .map_err(anyhow::Error::new)? { PoisonOutcome::Poisoned => { self.recycle_entry(entry); Ok(()) } PoisonOutcome::AlreadyPoisoned => Ok(()), } } fn complete_local_entry( &self, entry: Arc, handle: EventHandle, completion: CompletionKind, ) -> Result<()> { entry .finalize_completion(handle.generation(), completion) .map_err(anyhow::Error::new)?; self.recycle_entry(entry); Ok(()) } fn wait_local(&self, handle: EventHandle) -> Result { let entry = self .events .get(&EventKey::from_handle(handle)) .map(|guard| guard.clone()) .ok_or_else(|| anyhow!("Unknown local event {}", handle))?; match entry.register_local_waiter(handle.generation())? { WaitRegistration::Ready => { Ok(EventAwaiter::immediate(Arc::new(CompletionKind::Triggered))) } WaitRegistration::Poisoned(poison) => Ok(EventAwaiter::immediate(Arc::new( CompletionKind::Poisoned(poison), ))), WaitRegistration::Pending => Ok(EventAwaiter::pending(entry, handle.generation())), } } fn poll_local(&self, handle: EventHandle) -> Result { let entry = self .events .get(&EventKey::from_handle(handle)) .map(|guard| guard.clone()) .ok_or_else(|| anyhow!("Unknown local event {}", handle))?; Ok(entry.status_for(handle.generation())) } fn allocate_entry(self: &Arc) -> Result> { if let Some(entry) = self.try_reuse_entry() { return Ok(entry); } let counter = self .next_local_index .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| { (current < MAX_LOCAL_INDEX).then_some(current + 1) }) .map_err(|_| { anyhow!( "Local event index space exhausted ({} entries)", MAX_LOCAL_INDEX ) })?; let local_index = if self.is_local { counter | LOCAL_FLAG } else { counter }; let key = EventKey::new(local_index); let entry = Arc::new(EventEntry::new(key)); self.events.insert(key, entry.clone()); Ok(entry) } fn try_reuse_entry(&self) -> Option> { let mut free_lists = self.free_lists.lock(); free_lists.pop_front() } fn recycle_entry(&self, entry: Arc) { if entry.is_retired() { return; } let mut free_lists = self.free_lists.lock(); free_lists.push_back(entry); } /// Mark an entry as permanently unusable but keep it in `self.events`. /// /// Retired entries are intentionally **not** removed from the DashMap so that /// callers holding stale handles to poisoned generations can still query /// poison history via `poison_reason()` / `status_for()`. Removing the entry /// would turn a diagnosable poison into an opaque "Unknown event" error. /// /// Future optimisation: evict the full `EventEntry` from the DashMap and /// migrate only the poisoned generation keys into a secondary /// `HashSet<(EventKey, Generation)>` with a shared "entry retired" reason. /// This trades per-generation `Arc` detail for bounded memory on /// long-running systems that exhaust many entries' generation spaces. fn retire_entry(&self, entry: Arc) { entry.retire(); } fn is_shutdown(&self) -> bool { self.shutdown.load(Ordering::Acquire) } } // ── EventBackend impl ──────────────────────────────────────────────── impl EventBackend for EventSystemBase { fn trigger(&self, handle: EventHandle) -> Result<()> { self.trigger_inner(handle) } fn poison(&self, handle: EventHandle, reason: Arc) -> Result<()> { self.poison_inner(handle, reason) } fn awaiter(&self, handle: EventHandle) -> Result { self.awaiter_inner(handle) } }