Unverified Commit 4833e29b authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat(kv-router): warn on duplicate store replays (#8473)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent e9db304c
......@@ -32,7 +32,9 @@ use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use super::{EventKind, KvIndexerMetrics, SyncIndexer, WorkerTask};
use super::{
EventKind, EventWarningKind, KvIndexerMetrics, PreBoundEventCounters, SyncIndexer, WorkerTask,
};
use crate::active_set::reconcile_active_workers;
use crate::cleanup::{self, CleanableNode, CleanupGuard, CleanupState};
use crate::protocols::*;
......@@ -309,6 +311,7 @@ impl ConcurrentRadixTree {
&self,
lookup: &mut FxHashMap<WorkerWithDpRank, WorkerLookup>,
event: RouterEvent,
counters: Option<&PreBoundEventCounters>,
) -> Result<(), KvCacheEventError> {
let (worker_id, kv_event) = (event.worker_id, event.event);
let (id, op) = (kv_event.event_id, kv_event.data);
......@@ -317,7 +320,7 @@ impl ConcurrentRadixTree {
let worker = WorkerWithDpRank::new(worker_id, kv_event.dp_rank);
match op {
KvCacheEventData::Stored(op) => self.apply_stored(lookup, worker, op, id),
KvCacheEventData::Stored(op) => self.apply_stored(lookup, worker, op, id, counters),
KvCacheEventData::Removed(op) => self.apply_removed(lookup, worker, op, id),
KvCacheEventData::Cleared => {
// Ensure the worker is tracked in lookup before clearing,
......@@ -340,6 +343,7 @@ impl ConcurrentRadixTree {
worker: WorkerWithDpRank,
op: KvCacheStoreData,
id: u64,
counters: Option<&PreBoundEventCounters>,
) -> Result<(), KvCacheEventError> {
// Ensure this worker has an entry in the outer map.
let worker_lookup = lookup.entry(worker).or_default();
......@@ -364,6 +368,7 @@ impl ConcurrentRadixTree {
};
let mut needs_worker_insert = false;
let mut duplicate_store = !op.blocks.is_empty();
let mut num_blocks_added = 0;
......@@ -379,8 +384,8 @@ impl ConcurrentRadixTree {
// Insert worker into this node if it was the child from the
// previous iteration (skip for the initial parent, which is
// not one of the blocks being stored).
if needs_worker_insert {
parent_guard.workers.insert(worker);
if needs_worker_insert && parent_guard.workers.insert(worker) {
duplicate_store = false;
}
needs_worker_insert = true;
......@@ -390,6 +395,7 @@ impl ConcurrentRadixTree {
{
let existing_guard = existing.read();
if existing_guard.block_hash != Some(block_data.block_hash) {
duplicate_store = false;
tracing::warn!(
expected = ?block_data.block_hash,
actual = ?existing_guard.block_hash,
......@@ -400,6 +406,7 @@ impl ConcurrentRadixTree {
existing.clone()
}
None => {
duplicate_store = false;
// Reuse from lookup or create new
let new_block = worker_lookup
.get(&block_data.block_hash)
......@@ -417,11 +424,13 @@ impl ConcurrentRadixTree {
};
// Update lookup
if worker_lookup
.insert(block_data.block_hash, child.clone())
.is_none()
{
num_blocks_added += 1;
match worker_lookup.insert(block_data.block_hash, child.clone()) {
Some(existing) if Arc::ptr_eq(&existing, &child) => {}
Some(_) => duplicate_store = false,
None => {
num_blocks_added += 1;
duplicate_store = false;
}
}
current = child;
......@@ -429,8 +438,8 @@ impl ConcurrentRadixTree {
// Insert worker into the last child (not yet handled since there is
// no subsequent iteration to pick it up).
if needs_worker_insert {
current.write().workers.insert(worker);
if needs_worker_insert && current.write().workers.insert(worker) {
duplicate_store = false;
}
match self.tree_sizes.get(&worker) {
......@@ -443,6 +452,10 @@ impl ConcurrentRadixTree {
}
}
if duplicate_store && let Some(counters) = counters {
counters.inc_warning(EventWarningKind::DuplicateStore);
}
Ok(())
}
......@@ -649,7 +662,7 @@ impl SyncIndexer for ConcurrentRadixTree {
match task {
WorkerTask::Event(event) => {
let kind = EventKind::of(&event.event.data);
let result = self.apply_event(&mut lookup, event);
let result = self.apply_event(&mut lookup, event, counters.as_ref());
if result.is_err() {
tracing::warn!("Failed to apply event: {:?}", result.as_ref().err());
}
......
......@@ -67,7 +67,9 @@ use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use super::{EventKind, KvIndexerMetrics, SyncIndexer, WorkerTask};
use super::{
EventKind, EventWarningKind, KvIndexerMetrics, PreBoundEventCounters, SyncIndexer, WorkerTask,
};
use crate::cleanup::{self, CleanableNode, CleanupGuard, CleanupState};
use crate::protocols::*;
......@@ -156,10 +158,13 @@ impl Node {
}
#[inline]
fn promote_to_full(&mut self, worker: WorkerWithDpRank) {
fn promote_to_full(&mut self, worker: WorkerWithDpRank) -> bool {
if !self.full_edge_workers.contains(&worker) {
self.worker_cutoffs.remove(&worker);
self.full_edge_workers.insert(worker);
true
} else {
false
}
}
......@@ -250,6 +255,11 @@ struct RemoveOutcome {
stale_hashes: Vec<ExternalSequenceBlockHash>,
}
struct StoreInsertOutcome {
num_blocks_added: usize,
duplicate_store: bool,
}
/// Thread-safe radix tree (compressed trie) for concurrent KV cache lookups.
pub struct ConcurrentRadixTreeCompressed {
/// The root of the radix tree. Has an empty edge and only contains children.
......@@ -614,13 +624,14 @@ impl ConcurrentRadixTreeCompressed {
&self,
lookup: &mut FxHashMap<WorkerWithDpRank, WorkerLookup>,
event: RouterEvent,
counters: Option<&PreBoundEventCounters>,
) -> Result<(), KvCacheEventError> {
let (worker_id, kv_event) = (event.worker_id, event.event);
let (id, op) = (kv_event.event_id, kv_event.data);
let worker = WorkerWithDpRank::new(worker_id, kv_event.dp_rank);
match op {
KvCacheEventData::Stored(op) => self.apply_stored(lookup, worker, op, id),
KvCacheEventData::Stored(op) => self.apply_stored(lookup, worker, op, id, counters),
KvCacheEventData::Removed(op) => self.apply_removed(lookup, worker, op, id),
KvCacheEventData::Cleared => {
lookup.entry(worker).or_default();
......@@ -643,6 +654,7 @@ impl ConcurrentRadixTreeCompressed {
worker: WorkerWithDpRank,
op: KvCacheStoreData,
id: u64,
counters: Option<&PreBoundEventCounters>,
) -> Result<(), KvCacheEventError> {
lookup.entry(worker).or_default();
......@@ -723,19 +735,24 @@ impl ConcurrentRadixTreeCompressed {
None => self.root.clone(),
};
let num_blocks_added =
self.insert_blocks_from(lookup, worker, &parent, op.parent_hash, &op.blocks);
let outcome = self.insert_blocks_from(lookup, worker, &parent, op.parent_hash, &op.blocks);
match self.tree_sizes.get(&worker) {
Some(size) => {
size.fetch_add(num_blocks_added, Ordering::Relaxed);
size.fetch_add(outcome.num_blocks_added, Ordering::Relaxed);
}
None => {
self.tree_sizes
.insert(worker, AtomicUsize::new(num_blocks_added));
.insert(worker, AtomicUsize::new(outcome.num_blocks_added));
}
}
if outcome.duplicate_store
&& let Some(counters) = counters
{
counters.inc_warning(EventWarningKind::DuplicateStore);
}
Ok(())
}
......@@ -746,10 +763,11 @@ impl ConcurrentRadixTreeCompressed {
parent: &SharedNode,
seed_hash: Option<ExternalSequenceBlockHash>,
blocks: &[KvCacheStoredBlockData],
) -> usize {
) -> StoreInsertOutcome {
let mut current_parent = parent.clone();
let mut remaining = blocks;
let mut num_blocks_added = 0usize;
let mut duplicate_store = !blocks.is_empty();
// Track the last ExternalSequenceBlockHash we matched to detect if
// `current_parent` was split by a concurrent thread between iterations.
// A split shortens `current_parent`'s edge and moves our last-matched
......@@ -811,11 +829,18 @@ impl ConcurrentRadixTreeCompressed {
let wl = lookup.get_mut(&worker).unwrap();
for b in remaining {
if wl.insert(b.block_hash, new_node.clone()).is_none() {
num_blocks_added += 1;
match wl.insert(b.block_hash, new_node.clone()) {
Some(existing) if Arc::ptr_eq(&existing, &new_node) => {}
Some(_) => {}
None => {
num_blocks_added += 1;
}
}
}
return num_blocks_added;
return StoreInsertOutcome {
num_blocks_added,
duplicate_store: false,
};
}
}
};
......@@ -830,6 +855,7 @@ impl ConcurrentRadixTreeCompressed {
break;
}
if edge_elem.1 != rem_elem.block_hash {
duplicate_store = false;
tracing::warn!(
expected = ?rem_elem.block_hash,
actual = ?edge_elem.1,
......@@ -882,13 +908,21 @@ impl ConcurrentRadixTreeCompressed {
let wl = lookup.get_mut(&worker).unwrap();
for b in &remaining[..match_len] {
if wl.insert(b.block_hash, child.clone()).is_none() {
num_blocks_added += 1;
match wl.insert(b.block_hash, child.clone()) {
Some(existing) if Arc::ptr_eq(&existing, &child) => {}
Some(_) => {}
None => {
num_blocks_added += 1;
}
}
}
for b in tail {
if wl.insert(b.block_hash, new_node.clone()).is_none() {
num_blocks_added += 1;
match wl.insert(b.block_hash, new_node.clone()) {
Some(existing) if Arc::ptr_eq(&existing, &new_node) => {}
Some(_) => {}
None => {
num_blocks_added += 1;
}
}
}
} else {
......@@ -897,22 +931,36 @@ impl ConcurrentRadixTreeCompressed {
let wl = lookup.get_mut(&worker).unwrap();
for b in &remaining[..match_len] {
if wl.insert(b.block_hash, child.clone()).is_none() {
num_blocks_added += 1;
match wl.insert(b.block_hash, child.clone()) {
Some(existing) if Arc::ptr_eq(&existing, &child) => {}
Some(_) => {}
None => {
num_blocks_added += 1;
}
}
}
}
return num_blocks_added;
return StoreInsertOutcome {
num_blocks_added,
duplicate_store: false,
};
}
// Full edge match: upgrade worker to full coverage if necessary.
child_guard.promote_to_full(worker);
if child_guard.promote_to_full(worker) {
duplicate_store = false;
}
drop(child_guard);
let wl = lookup.get_mut(&worker).unwrap();
for b in &remaining[..edge_len] {
if wl.insert(b.block_hash, child.clone()).is_none() {
num_blocks_added += 1;
match wl.insert(b.block_hash, child.clone()) {
Some(existing) if Arc::ptr_eq(&existing, &child) => {}
Some(_) => duplicate_store = false,
None => {
num_blocks_added += 1;
duplicate_store = false;
}
}
}
......@@ -922,7 +970,10 @@ impl ConcurrentRadixTreeCompressed {
}
}
num_blocks_added
StoreInsertOutcome {
num_blocks_added,
duplicate_store,
}
}
// ------------------------------------------------------------------
......@@ -1245,7 +1296,7 @@ impl SyncIndexer for ConcurrentRadixTreeCompressed {
match task {
WorkerTask::Event(event) => {
let kind = EventKind::of(&event.event.data);
let result = self.apply_event(&mut lookup, event);
let result = self.apply_event(&mut lookup, event, counters.as_ref());
if result.is_err() {
tracing::warn!("Failed to apply event: {:?}", result.as_ref().err());
}
......
......@@ -49,7 +49,7 @@ fn apply_event_with_prune_tracking(
let event_id = event.event.event_id;
let worker_id = event.worker_id;
let event_for_prune = prune_manager.is_some().then(|| event.clone());
let result = trie.apply_event(event);
let result = trie.apply_event_with_counters(event, Some(counters));
let result_is_ok = result.is_ok();
let tree_size = trie.current_size();
tracing::trace!(
......
......@@ -51,6 +51,20 @@ impl std::fmt::Display for EventKind {
}
}
/// Lightweight, `Copy` discriminant for KV event warnings.
#[derive(Debug, Clone, Copy)]
pub enum EventWarningKind {
DuplicateStore,
}
impl std::fmt::Display for EventWarningKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::DuplicateStore => f.write_str(METRIC_WARNING_DUPLICATE_STORE),
}
}
}
/// Metrics for the KV Indexer.
#[derive(Clone)]
#[cfg_attr(not(feature = "metrics"), derive(Default))]
......@@ -58,6 +72,9 @@ pub struct KvIndexerMetrics {
/// Counter of events applied.
#[cfg(feature = "metrics")]
pub kv_cache_events_applied: IntCounterVec,
/// Counter of suspicious-but-valid KV events.
#[cfg(feature = "metrics")]
pub kv_cache_event_warnings: IntCounterVec,
}
/// Metric status labels.
......@@ -71,20 +88,28 @@ pub const METRIC_EVENT_STORED: &str = "stored";
pub const METRIC_EVENT_REMOVED: &str = "removed";
pub const METRIC_EVENT_CLEARED: &str = "cleared";
/// Metric warning labels.
pub const METRIC_WARNING_DUPLICATE_STORE: &str = "duplicate_store";
/// Metric name for KV cache events applied counter.
#[cfg(feature = "metrics")]
#[cfg(all(feature = "metrics", feature = "runtime-protocols"))]
const KV_CACHE_EVENTS_APPLIED_SUFFIX: &str = "kv_cache_events_applied";
#[cfg(feature = "metrics")]
const KV_CACHE_EVENTS_APPLIED_NAME: &str = "dynamo_kvrouter_kv_cache_events_applied";
#[cfg(all(feature = "metrics", feature = "runtime-protocols"))]
const KV_CACHE_EVENT_WARNINGS_SUFFIX: &str = "kv_cache_event_warnings";
#[cfg(feature = "metrics")]
const KV_CACHE_EVENT_WARNINGS_NAME: &str = "dynamo_kvrouter_kv_cache_event_warnings";
#[cfg(all(feature = "metrics", feature = "runtime-protocols"))]
static KV_INDEXER_METRICS: OnceLock<Arc<KvIndexerMetrics>> = OnceLock::new();
impl KvIndexerMetrics {
#[cfg(feature = "metrics")]
fn new(kv_cache_events_applied: IntCounterVec) -> Self {
#[cfg(all(feature = "metrics", feature = "runtime-protocols"))]
fn new(kv_cache_events_applied: IntCounterVec, kv_cache_event_warnings: IntCounterVec) -> Self {
Self {
kv_cache_events_applied,
kv_cache_event_warnings,
}
}
......@@ -96,16 +121,24 @@ impl KvIndexerMetrics {
{
KV_INDEXER_METRICS
.get_or_init(|| {
match component.metrics().create_intcountervec(
KV_CACHE_EVENTS_APPLIED_SUFFIX,
"Total number of KV cache events applied to index",
&["event_type", "status"],
&[],
match (
component.metrics().create_intcountervec(
KV_CACHE_EVENTS_APPLIED_SUFFIX,
"Total number of KV cache events applied to index",
&["event_type", "status"],
&[],
),
component.metrics().create_intcountervec(
KV_CACHE_EVENT_WARNINGS_SUFFIX,
"Total number of suspicious KV cache events seen by the router indexer",
&["warning_kind"],
&[],
),
) {
Ok(kv_cache_events_applied) => {
Arc::new(Self::new(kv_cache_events_applied))
}
Err(e) => {
(Ok(kv_cache_events_applied), Ok(kv_cache_event_warnings)) => Arc::new(
Self::new(kv_cache_events_applied, kv_cache_event_warnings),
),
(Err(e), _) | (_, Err(e)) => {
tracing::warn!("Failed to create kv indexer metrics from component: {}. Using unregistered metrics as fallback.", e);
Arc::new(Self::new_unregistered())
}
......@@ -134,6 +167,14 @@ impl KvIndexerMetrics {
&["event_type", "status"],
)
.unwrap(),
kv_cache_event_warnings: IntCounterVec::new(
Opts::new(
KV_CACHE_EVENT_WARNINGS_NAME,
"Total number of suspicious KV cache events seen by the router indexer",
),
&["warning_kind"],
)
.unwrap(),
}
}
......@@ -172,6 +213,17 @@ impl KvIndexerMetrics {
let _ = (self, event_type, result);
}
pub fn increment_event_warning(&self, warning_kind: &'static str) {
#[cfg(feature = "metrics")]
{
self.kv_cache_event_warnings
.with_label_values(&[warning_kind])
.inc_by(1);
}
#[cfg(not(feature = "metrics"))]
let _ = (self, warning_kind);
}
/// Pre-resolve all `IntCounter` handles for the finite (event_type, status) label space.
/// Call this once per worker thread at startup, then use
/// [`PreBoundEventCounters::inc`] in the hot loop to avoid the
......@@ -211,6 +263,8 @@ pub struct PreBoundEventCounters {
cleared_block_not_found: prometheus::IntCounter,
#[cfg(feature = "metrics")]
cleared_invalid_block: prometheus::IntCounter,
#[cfg(feature = "metrics")]
duplicate_store_warning: prometheus::IntCounter,
}
impl PreBoundEventCounters {
......@@ -218,6 +272,7 @@ impl PreBoundEventCounters {
#[cfg(feature = "metrics")]
{
let cv = &metrics.kv_cache_events_applied;
let warnings = &metrics.kv_cache_event_warnings;
Self {
stored_ok: cv.with_label_values(&[METRIC_EVENT_STORED, METRIC_STATUS_OK]),
stored_parent_not_found: cv
......@@ -240,6 +295,8 @@ impl PreBoundEventCounters {
.with_label_values(&[METRIC_EVENT_CLEARED, METRIC_STATUS_BLOCK_NOT_FOUND]),
cleared_invalid_block: cv
.with_label_values(&[METRIC_EVENT_CLEARED, METRIC_STATUS_INVALID_BLOCK]),
duplicate_store_warning: warnings
.with_label_values(&[METRIC_WARNING_DUPLICATE_STORE]),
}
}
#[cfg(not(feature = "metrics"))]
......@@ -294,4 +351,16 @@ impl PreBoundEventCounters {
#[cfg(not(feature = "metrics"))]
let _ = (self, kind, result);
}
pub fn inc_warning(&self, kind: EventWarningKind) {
#[cfg(feature = "metrics")]
{
let counter = match kind {
EventWarningKind::DuplicateStore => &self.duplicate_store_warning,
};
counter.inc();
}
#[cfg(not(feature = "metrics"))]
let _ = (self, kind);
}
}
......@@ -21,11 +21,14 @@
//! `KvIndexerInterface` with sticky event routing and worker threads, wrap it
//! in a `ThreadPoolIndexer`.
use dashmap::DashMap;
use dashmap::mapref::entry::Entry;
use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use super::{EventKind, KvIndexerMetrics, SyncIndexer, WorkerTask};
use super::{
EventKind, EventWarningKind, KvIndexerMetrics, PreBoundEventCounters, SyncIndexer, WorkerTask,
};
use crate::active_set::reconcile_active_workers;
use crate::protocols::{
DpRank, ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheEventError,
......@@ -54,10 +57,10 @@ impl SeqEntry {
}
/// Insert a worker for a given seq_hash, upgrading to Multi if needed.
fn insert(&mut self, seq_hash: ExternalSequenceBlockHash, worker: WorkerWithDpRank) {
fn insert(&mut self, seq_hash: ExternalSequenceBlockHash, worker: WorkerWithDpRank) -> bool {
match self {
Self::Single(existing_hash, workers) if *existing_hash == seq_hash => {
workers.insert(worker);
workers.insert(worker)
}
Self::Single(existing_hash, existing_workers) => {
// Upgrade to Multi
......@@ -65,10 +68,9 @@ impl SeqEntry {
map.insert(*existing_hash, std::mem::take(existing_workers));
map.entry(seq_hash).or_default().insert(worker);
*self = Self::Multi(map);
true
}
Self::Multi(map) => {
map.entry(seq_hash).or_default().insert(worker);
}
Self::Multi(map) => map.entry(seq_hash).or_default().insert(worker),
}
}
......@@ -152,7 +154,7 @@ impl SyncIndexer for PositionalIndexer {
match task {
WorkerTask::Event(event) => {
let kind = EventKind::of(&event.event.data);
let result = self.apply_event(&mut worker_blocks, event);
let result = self.apply_event(&mut worker_blocks, event, counters.as_ref());
if result.is_err() {
tracing::warn!("Failed to apply event: {:?}", result.as_ref().err());
}
......@@ -201,6 +203,7 @@ impl PositionalIndexer {
&self,
worker_blocks: &mut FxHashMap<WorkerWithDpRank, LevelIndex>,
event: RouterEvent,
counters: Option<&PreBoundEventCounters>,
) -> Result<(), KvCacheEventError> {
let (worker_id, kv_event) = (event.worker_id, event.event);
let (id, op) = (kv_event.event_id, kv_event.data);
......@@ -215,7 +218,7 @@ impl PositionalIndexer {
match op {
KvCacheEventData::Stored(store_data) => {
self.store_blocks_impl(worker_blocks, worker, store_data, id)?;
self.store_blocks_impl(worker_blocks, worker, store_data, id, counters)?;
Ok(())
}
......@@ -236,6 +239,7 @@ impl PositionalIndexer {
worker: WorkerWithDpRank,
store_data: KvCacheStoreData,
event_id: u64,
counters: Option<&PreBoundEventCounters>,
) -> Result<(), KvCacheEventError> {
let KvCacheStoreData {
parent_hash,
......@@ -265,32 +269,51 @@ impl PositionalIndexer {
let worker_blocks_entry = worker_blocks.entry(worker).or_default();
let num_stored_blocks = blocks.len();
let mut num_blocks_added = 0usize;
let mut duplicate_store = !blocks.is_empty();
for (i, block_data) in blocks.into_iter().enumerate() {
let position = start_pos + i;
let local_hash = block_data.tokens_hash;
let seq_hash = block_data.block_hash;
self.index
.entry((position, local_hash))
.and_modify(|entry| entry.insert(seq_hash, worker))
.or_insert_with(|| SeqEntry::new(seq_hash, worker));
match self.index.entry((position, local_hash)) {
Entry::Occupied(mut entry) => {
if entry.get_mut().insert(seq_hash, worker) {
duplicate_store = false;
}
}
Entry::Vacant(entry) => {
entry.insert(SeqEntry::new(seq_hash, worker));
duplicate_store = false;
}
}
// Insert into worker_blocks: worker -> seq_hash -> (position, local_hash)
worker_blocks_entry.insert(seq_hash, (position, local_hash));
match worker_blocks_entry.insert(seq_hash, (position, local_hash)) {
Some(existing) if existing == (position, local_hash) => {}
Some(_) => duplicate_store = false,
None => {
num_blocks_added += 1;
duplicate_store = false;
}
}
}
match self.tree_sizes.get(&worker) {
Some(size) => {
size.fetch_add(num_stored_blocks, Ordering::Relaxed);
size.fetch_add(num_blocks_added, Ordering::Relaxed);
}
None => {
self.tree_sizes
.insert(worker, AtomicUsize::new(num_stored_blocks));
.insert(worker, AtomicUsize::new(num_blocks_added));
}
}
if duplicate_store && let Some(counters) = counters {
counters.inc_warning(EventWarningKind::DuplicateStore);
}
Ok(())
}
......
......@@ -23,6 +23,7 @@ use std::{
use rustc_hash::{FxHashMap, FxHashSet};
use super::{EventWarningKind, PreBoundEventCounters};
use crate::active_set::reconcile_active_workers;
use crate::protocols::*;
......@@ -313,6 +314,14 @@ impl RadixTree {
///
/// * `event` - The `RouterEvent` to apply.
pub fn apply_event(&mut self, event: RouterEvent) -> Result<(), KvCacheEventError> {
self.apply_event_with_counters(event, None)
}
pub(crate) fn apply_event_with_counters(
&mut self,
event: RouterEvent,
counters: Option<&PreBoundEventCounters>,
) -> Result<(), KvCacheEventError> {
let (worker_id, kv_event) = (event.worker_id, event.event);
let (id, op) = (kv_event.event_id, kv_event.data);
......@@ -345,6 +354,7 @@ impl RadixTree {
};
let mut needs_worker_insert = false;
let mut duplicate_store = !op.blocks.is_empty();
// In each iteration we lock the parent and insert the worker
// deferred from the previous iteration, avoiding a second
......@@ -352,8 +362,8 @@ impl RadixTree {
for block_data in op.blocks {
let mut parent_mut = current.borrow_mut();
if needs_worker_insert {
parent_mut.workers.insert(worker);
if needs_worker_insert && parent_mut.workers.insert(worker) {
duplicate_store = false;
}
needs_worker_insert = true;
......@@ -361,6 +371,7 @@ impl RadixTree {
Some(block) => {
// Verify our simplifying assumption: block_hash is uniform across workers
if block.borrow().block_hash != Some(block_data.block_hash) {
duplicate_store = false;
tracing::warn!(
expected = ?block_data.block_hash,
actual = ?block.borrow().block_hash,
......@@ -370,6 +381,7 @@ impl RadixTree {
block.clone()
}
None => {
duplicate_store = false;
let new_block = worker_lookup
.get(&block_data.block_hash)
.cloned()
......@@ -400,15 +412,22 @@ impl RadixTree {
return Err(KvCacheEventError::InvalidBlockSequence);
}
worker_lookup.insert(block_data.block_hash, child.clone());
match worker_lookup.insert(block_data.block_hash, child.clone()) {
Some(existing) if Rc::ptr_eq(&existing, &child) => {}
_ => duplicate_store = false,
}
drop(parent_mut);
current = child;
}
// Insert worker into the last child.
if needs_worker_insert {
current.borrow_mut().workers.insert(worker);
if needs_worker_insert && current.borrow_mut().workers.insert(worker) {
duplicate_store = false;
}
if duplicate_store && let Some(counters) = counters {
counters.inc_warning(EventWarningKind::DuplicateStore);
}
Ok(())
......
......@@ -230,29 +230,41 @@ fn tree_size_indexer_template(
}
fn make_indexer(variant: &str) -> Box<dyn KvIndexerInterface> {
let token = CancellationToken::new();
let metrics = Arc::new(KvIndexerMetrics::new_unregistered());
make_indexer_with_metrics(variant, metrics).0
}
fn make_indexer_with_metrics(
variant: &str,
metrics: Arc<KvIndexerMetrics>,
) -> (Box<dyn KvIndexerInterface>, Arc<KvIndexerMetrics>) {
let token = CancellationToken::new();
let kv_block_size = 32;
match variant {
"single" => Box::new(KvIndexer::new(token, kv_block_size, metrics)),
"flat" => Box::new(ThreadPoolIndexer::new(
let indexer: Box<dyn KvIndexerInterface> = match variant {
"single" => Box::new(KvIndexer::new(token, kv_block_size, metrics.clone())),
"flat" => Box::new(ThreadPoolIndexer::new_with_metrics(
PositionalIndexer::new(32),
4,
kv_block_size,
Some(metrics.clone()),
)),
"concurrent" => Box::new(ThreadPoolIndexer::new(
"concurrent" => Box::new(ThreadPoolIndexer::new_with_metrics(
ConcurrentRadixTree::new(),
4,
kv_block_size,
Some(metrics.clone()),
)),
"concurrent_compressed" => Box::new(ThreadPoolIndexer::new(
"concurrent_compressed" => Box::new(ThreadPoolIndexer::new_with_metrics(
ConcurrentRadixTreeCompressed::new(),
4,
kv_block_size,
Some(metrics.clone()),
)),
_ => panic!("Unknown variant: {}", variant),
}
};
(indexer, metrics)
}
/// Ensure queued indexer work is drained, then give a short settle window.
......@@ -309,10 +321,110 @@ async fn assert_exact_scores(
}
}
#[cfg(feature = "metrics")]
fn event_metric_value(
metrics: &KvIndexerMetrics,
event_type: &'static str,
status: &'static str,
) -> u64 {
metrics
.kv_cache_events_applied
.get_metric_with_label_values(&[event_type, status])
.unwrap()
.get()
}
#[cfg(feature = "metrics")]
fn warning_metric_value(metrics: &KvIndexerMetrics, warning_kind: &'static str) -> u64 {
metrics
.kv_cache_event_warnings
.get_metric_with_label_values(&[warning_kind])
.unwrap()
.get()
}
#[cfg(feature = "metrics")]
fn assert_no_event_errors(metrics: &KvIndexerMetrics) {
let invalid_count = [
(METRIC_EVENT_STORED, METRIC_STATUS_PARENT_NOT_FOUND),
(METRIC_EVENT_STORED, METRIC_STATUS_BLOCK_NOT_FOUND),
(METRIC_EVENT_STORED, METRIC_STATUS_INVALID_BLOCK),
(METRIC_EVENT_REMOVED, METRIC_STATUS_PARENT_NOT_FOUND),
(METRIC_EVENT_REMOVED, METRIC_STATUS_BLOCK_NOT_FOUND),
(METRIC_EVENT_REMOVED, METRIC_STATUS_INVALID_BLOCK),
]
.into_iter()
.map(|(event_type, status)| event_metric_value(metrics, event_type, status))
.sum::<u64>();
assert_eq!(
invalid_count, 0,
"router indexer reported invalid KV events"
);
}
#[cfg(feature = "metrics")]
fn assert_no_event_warnings(metrics: &KvIndexerMetrics) {
assert_eq!(
warning_metric_value(metrics, METRIC_WARNING_DUPLICATE_STORE),
0,
"router indexer reported suspicious KV events",
);
}
mod interface_tests {
use super::*;
use rstest_reuse::apply;
#[cfg(feature = "metrics")]
#[tokio::test]
#[apply(indexer_template)]
async fn test_duplicate_store_replay_warns_without_error(variant: &str) {
let metrics = Arc::new(KvIndexerMetrics::new_unregistered());
let (index, metrics) = make_indexer_with_metrics(variant, metrics);
let worker = WorkerWithDpRank::new(0, 0);
let event = make_store_event(0, &[1, 2, 3]);
index.apply_event(event.clone()).await;
flush_and_settle(index.as_ref()).await;
let first_snapshot = snapshot_tree(index.as_ref()).await;
index.apply_event(event).await;
flush_and_settle(index.as_ref()).await;
assert_eq!(
first_snapshot,
snapshot_tree(index.as_ref()).await,
"replaying the same store event should not change the tree structure"
);
assert_score(index.as_ref(), &[1, 2, 3], worker, 3).await;
assert_no_event_errors(metrics.as_ref());
assert_eq!(
warning_metric_value(metrics.as_ref(), METRIC_WARNING_DUPLICATE_STORE),
1
);
}
#[cfg(feature = "metrics")]
#[tokio::test]
#[apply(indexer_template)]
async fn test_continuation_store_does_not_warn(variant: &str) {
let metrics = Arc::new(KvIndexerMetrics::new_unregistered());
let (index, metrics) = make_indexer_with_metrics(variant, metrics);
let worker = WorkerWithDpRank::new(0, 0);
index.apply_event(make_store_event(0, &[1, 2, 3])).await;
flush_and_settle(index.as_ref()).await;
index
.apply_event(make_store_event_with_parent(0, &[1, 2, 3], &[4, 5]))
.await;
flush_and_settle(index.as_ref()).await;
assert_score(index.as_ref(), &[1, 2, 3, 4, 5], worker, 5).await;
assert_no_event_errors(metrics.as_ref());
assert_no_event_warnings(metrics.as_ref());
}
#[tokio::test]
#[apply(indexer_template)]
async fn test_store_and_find(variant: &str) {
......@@ -2142,6 +2254,16 @@ mod metrics_tests {
.get(),
1
);
metrics.increment_event_warning(METRIC_WARNING_DUPLICATE_STORE);
assert_eq!(
metrics
.kv_cache_event_warnings
.get_metric_with_label_values(&[METRIC_WARNING_DUPLICATE_STORE])
.unwrap()
.get(),
1
);
}
}
......
......@@ -660,6 +660,7 @@ mod router_events {
assert_eq!(prompt_hashes.len(), 6);
assert!(harness.ok_count(METRIC_EVENT_STORED) >= 2);
harness.assert_no_event_warnings();
harness.shutdown();
}
......@@ -679,6 +680,7 @@ mod router_events {
assert_eq!(full_hashes.len(), 6);
assert!(harness.ok_count(METRIC_EVENT_STORED) >= 2);
harness.assert_no_event_warnings();
harness.shutdown();
}
......@@ -823,6 +825,7 @@ mod router_events {
assert!(saw_remove);
harness.assert_no_event_errors();
harness.assert_no_event_warnings();
harness.shutdown();
}
......
......@@ -7,7 +7,7 @@ use anyhow::anyhow;
use dynamo_kv_router::indexer::{
KvIndexerInterface, KvIndexerMetrics, LocalKvIndexer, METRIC_EVENT_REMOVED,
METRIC_EVENT_STORED, METRIC_STATUS_BLOCK_NOT_FOUND, METRIC_STATUS_INVALID_BLOCK,
METRIC_STATUS_OK, METRIC_STATUS_PARENT_NOT_FOUND,
METRIC_STATUS_OK, METRIC_STATUS_PARENT_NOT_FOUND, METRIC_WARNING_DUPLICATE_STORE,
};
use dynamo_kv_router::protocols::{
KvCacheEvent, KvCacheEventData, LocalBlockHash, RouterEvent, WorkerId, WorkerWithDpRank,
......@@ -95,6 +95,24 @@ impl RouterIndexerHarness {
.sum()
}
pub(crate) fn warning_count(&self, warning_kind: &'static str) -> u64 {
warning_metric_value(&self.metrics, warning_kind)
}
pub(crate) fn warning_counts(&self) -> Vec<(&'static str, u64)> {
[METRIC_WARNING_DUPLICATE_STORE]
.into_iter()
.map(|warning_kind| (warning_kind, self.warning_count(warning_kind)))
.collect()
}
pub(crate) fn total_warning_count(&self) -> u64 {
self.warning_counts()
.into_iter()
.map(|(_, count)| count)
.sum()
}
pub(crate) fn spawn_forwarder(&self) -> (Arc<TestKvEventSink>, JoinHandle<()>) {
let (event_tx, mut event_rx) = mpsc::unbounded_channel::<RouterEvent>();
let sink = Arc::new(TestKvEventSink {
......@@ -135,6 +153,26 @@ impl RouterIndexerHarness {
);
}
pub(crate) fn assert_no_event_warnings(&self) {
let breakdown = self
.warning_counts()
.into_iter()
.filter(|(_, count)| *count > 0)
.map(|(warning_kind, count)| format!("{warning_kind}={count}"))
.collect::<Vec<_>>()
.join(", ");
assert_eq!(
self.total_warning_count(),
0,
"router indexer reported suspicious KV events{}",
if breakdown.is_empty() {
String::new()
} else {
format!(": {breakdown}")
}
);
}
pub(crate) fn shutdown(&self) {
self.indexer.shutdown();
}
......@@ -166,6 +204,14 @@ pub(crate) fn metric_value(
.get()
}
pub(crate) fn warning_metric_value(metrics: &KvIndexerMetrics, warning_kind: &'static str) -> u64 {
metrics
.kv_cache_event_warnings
.get_metric_with_label_values(&[warning_kind])
.unwrap()
.get()
}
pub(crate) fn stored_hashes(events: &[RouterEvent]) -> Vec<LocalBlockHash> {
events
.iter()
......
......@@ -466,6 +466,7 @@ mod router_events {
assert!(saw_store);
assert!(harness.ok_count(METRIC_EVENT_STORED) > 0);
assert_eq!(core.kv_manager.num_active_blocks(), 0);
harness.assert_no_event_warnings();
harness.shutdown();
}
......@@ -514,6 +515,7 @@ mod router_events {
assert_eq!(core.state.waiting.front().copied(), Some(r2));
assert!(saw_remove);
assert!(harness.ok_count(METRIC_EVENT_REMOVED) > 0);
harness.assert_no_event_warnings();
harness.shutdown();
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment