"lib/bindings/python/vscode:/vscode.git/clone" did not exist on "f3aa1e01291aa1ab747409a273975bde7cf4e47c"
Unverified Commit a1aea900 authored by jthomson04's avatar jthomson04 Committed by GitHub
Browse files

feat: KVBM prometheus monitoring (#1211)

parent 312ee8e2
...@@ -25,6 +25,7 @@ mod state; ...@@ -25,6 +25,7 @@ mod state;
pub mod block; pub mod block;
pub mod events; pub mod events;
pub mod layout; pub mod layout;
pub mod metrics;
pub mod offload; pub mod offload;
pub mod pool; pub mod pool;
pub mod storage; pub mod storage;
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
use super::events::EventManager; use super::events::EventManager;
use super::*; use super::*;
use prometheus::Registry;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum NixlOptions { pub enum NixlOptions {
...@@ -41,6 +42,9 @@ pub struct KvManagerRuntimeConfig { ...@@ -41,6 +42,9 @@ pub struct KvManagerRuntimeConfig {
#[builder(default)] #[builder(default)]
pub async_runtime: Option<Arc<tokio::runtime::Runtime>>, pub async_runtime: Option<Arc<tokio::runtime::Runtime>>,
#[builder(default = "Arc::new(Registry::new())")]
pub metrics_registry: Arc<Registry>,
} }
impl KvManagerRuntimeConfig { impl KvManagerRuntimeConfig {
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::Result;
use prometheus::{
core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, IntCounterVec,
IntGaugeVec, Opts, Registry,
};
use std::sync::Arc;
pub struct BlockManagerMetrics {
gauges: IntGaugeVec,
counters: IntCounterVec,
}
impl BlockManagerMetrics {
pub fn new(metrics_registry: &Arc<Registry>) -> Result<Arc<Self>> {
let gauge_opts = Opts::new("gauges", "Gauges for the pools")
.namespace("dynamo")
.subsystem("kvbm");
let counter_opts = Opts::new("pools", "Counters for the pools")
.namespace("dynamo")
.subsystem("kvbm");
let gauges = register_int_gauge_vec_with_registry!(
gauge_opts,
&["pool", "metric_type"],
metrics_registry
)?;
let counters = register_int_counter_vec_with_registry!(
counter_opts,
&["pool", "metric_type"],
metrics_registry
)?;
Ok(Arc::new(Self { gauges, counters }))
}
pub fn pool(self: &Arc<Self>, group: &str) -> Arc<PoolMetrics> {
PoolMetrics::new(self, group)
}
}
pub struct PoolMetrics {
block_manager_metrics: Arc<BlockManagerMetrics>,
group: String,
}
impl PoolMetrics {
pub fn new(block_manager_metrics: &Arc<BlockManagerMetrics>, group: &str) -> Arc<Self> {
Arc::new(Self {
block_manager_metrics: block_manager_metrics.clone(),
group: group.to_string(),
})
}
pub fn gauge(&self, metric_type: &str) -> GenericGauge<AtomicI64> {
self.block_manager_metrics
.gauges
.with_label_values(&[&self.group, &metric_type.to_string()])
}
pub fn counter(&self, metric_type: &str) -> GenericCounter<AtomicU64> {
self.block_manager_metrics
.counters
.with_label_values(&[&self.group, &metric_type.to_string()])
}
}
...@@ -42,9 +42,10 @@ ...@@ -42,9 +42,10 @@
//! The [`OffloadManager::onboard_worker`] is responsible for onboarding blocks. //! The [`OffloadManager::onboard_worker`] is responsible for onboarding blocks.
//! //!
//! The kind of offloads/onboards they perform is dictated by the source and target arguments //! The kind of offloads/onboards they perform is dictated by the source and target arguments
//! of the [`OffloadManager::offload`] and [`OffloadManager::onboard`] methods. //! of the [`OffloadManager::offload_worker`] and [`OffloadManager::onboard_worker`] methods.
use super::block::{BlockError, BlockMetadata, BlockState, ImmutableBlock, TransferContext}; use super::block::{BlockError, BlockMetadata, BlockState, ImmutableBlock, TransferContext};
use super::metrics::{BlockManagerMetrics, PoolMetrics};
use super::pool::BlockPoolError; use super::pool::BlockPoolError;
use super::storage::{Cuda, Storage}; use super::storage::{Cuda, Storage};
use super::{BlockPool, DeviceStorage, DiskStorage, PinnedStorage}; use super::{BlockPool, DeviceStorage, DiskStorage, PinnedStorage};
...@@ -101,6 +102,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -101,6 +102,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
device: Option<Arc<BlockPool<DeviceStorage, Metadata>>>, device: Option<Arc<BlockPool<DeviceStorage, Metadata>>>,
nixl_agent: Arc<Option<NixlAgent>>, nixl_agent: Arc<Option<NixlAgent>>,
async_rt_handle: Handle, async_rt_handle: Handle,
metrics: Arc<BlockManagerMetrics>,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
) -> Result<Arc<Self>> { ) -> Result<Arc<Self>> {
let (device_offload_tx, device_offload_rx) = mpsc::unbounded_channel(); let (device_offload_tx, device_offload_rx) = mpsc::unbounded_channel();
...@@ -120,8 +122,6 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -120,8 +122,6 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
tick: Arc::new(Mutex::new(0)), tick: Arc::new(Mutex::new(0)),
}); });
let this_clone = this.clone();
let cuda_ctx = Cuda::device_or_create(0)?; let cuda_ctx = Cuda::device_or_create(0)?;
// We want cuda offloads to happen in parallel with host onboards, so we need to use a different stream. // We want cuda offloads to happen in parallel with host onboards, so we need to use a different stream.
...@@ -147,6 +147,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -147,6 +147,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
&async_rt_handle, &async_rt_handle,
cancellation_token.clone(), cancellation_token.clone(),
)), )),
metrics.pool("device"),
cancellation_token.clone(), cancellation_token.clone(),
); );
CriticalTaskExecutionHandle::new_with_runtime( CriticalTaskExecutionHandle::new_with_runtime(
...@@ -179,6 +180,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -179,6 +180,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
&async_rt_handle, &async_rt_handle,
cancellation_token.clone(), cancellation_token.clone(),
)), )),
metrics.pool("host"),
cancellation_token.clone(), cancellation_token.clone(),
); );
CriticalTaskExecutionHandle::new_with_runtime( CriticalTaskExecutionHandle::new_with_runtime(
...@@ -205,6 +207,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -205,6 +207,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
&async_rt_handle, &async_rt_handle,
cancellation_token.clone(), cancellation_token.clone(),
)), )),
metrics.pool("host"),
cancellation_token.clone(), cancellation_token.clone(),
); );
CriticalTaskExecutionHandle::new_with_runtime( CriticalTaskExecutionHandle::new_with_runtime(
...@@ -231,6 +234,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -231,6 +234,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
&async_rt_handle, &async_rt_handle,
cancellation_token.clone(), cancellation_token.clone(),
)), )),
metrics.pool("disk"),
cancellation_token.clone(), cancellation_token.clone(),
); );
CriticalTaskExecutionHandle::new_with_runtime( CriticalTaskExecutionHandle::new_with_runtime(
...@@ -241,7 +245,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -241,7 +245,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
)? )?
.detach(); .detach();
Ok(this_clone) Ok(this)
} }
async fn offload_worker<Source: Storage, Target: Storage>( async fn offload_worker<Source: Storage, Target: Storage>(
...@@ -249,6 +253,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -249,6 +253,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
target_pool: Option<Arc<BlockPool<Target, Metadata>>>, target_pool: Option<Arc<BlockPool<Target, Metadata>>>,
mut offload_rx: mpsc::UnboundedReceiver<OffloadRequest<Source, Metadata>>, mut offload_rx: mpsc::UnboundedReceiver<OffloadRequest<Source, Metadata>>,
transfer_manager: Arc<dyn TransferManager<Source, Target, Metadata>>, transfer_manager: Arc<dyn TransferManager<Source, Target, Metadata>>,
pool_metrics: Arc<PoolMetrics>,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
) -> Result<()> { ) -> Result<()> {
if source_pool.is_none() || target_pool.is_none() { if source_pool.is_none() || target_pool.is_none() {
...@@ -270,6 +275,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -270,6 +275,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
match offload_rx.try_recv() { match offload_rx.try_recv() {
Ok(request) => { Ok(request) => {
queue.insert(request); queue.insert(request);
pool_metrics.gauge("offload_queue_size").inc();
} }
Err(TryRecvError::Empty) => { Err(TryRecvError::Empty) => {
break; break;
...@@ -280,6 +286,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -280,6 +286,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
// If there is a request, process it. // If there is a request, process it.
if let Some(request) = queue.pop_first() { if let Some(request) = queue.pop_first() {
pool_metrics.gauge("offload_queue_size").dec();
// Try to upgrade the block to a strong reference. // Try to upgrade the block to a strong reference.
let block = match request.block.upgrade() { let block = match request.block.upgrade() {
Some(block) => Some(block), Some(block) => Some(block),
...@@ -311,6 +318,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -311,6 +318,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
}; };
if let Some(target_block) = target_blocks.into_iter().next() { if let Some(target_block) = target_blocks.into_iter().next() {
pool_metrics.counter("offload_processed").inc();
transfer_manager transfer_manager
.enqueue_transfer(PendingTransfer::new( .enqueue_transfer(PendingTransfer::new(
vec![block], vec![block],
...@@ -327,6 +335,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -327,6 +335,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
_ = cancellation_token.cancelled() => return Ok(()), _ = cancellation_token.cancelled() => return Ok(()),
Some(request) = offload_rx.recv() => { Some(request) = offload_rx.recv() => {
queue.insert(request); queue.insert(request);
pool_metrics.gauge("offload_queue_size").inc();
} }
} }
} }
...@@ -338,6 +347,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -338,6 +347,7 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
target_pool: Option<Arc<BlockPool<Target, Metadata>>>, target_pool: Option<Arc<BlockPool<Target, Metadata>>>,
mut onboard_rx: mpsc::UnboundedReceiver<OnboardRequest<Source, Target, Metadata>>, mut onboard_rx: mpsc::UnboundedReceiver<OnboardRequest<Source, Target, Metadata>>,
transfer_manager: Arc<dyn TransferManager<Source, Target, Metadata>>, transfer_manager: Arc<dyn TransferManager<Source, Target, Metadata>>,
pool_metrics: Arc<PoolMetrics>,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
) -> Result<()> { ) -> Result<()> {
if source_pool.is_none() || target_pool.is_none() { if source_pool.is_none() || target_pool.is_none() {
...@@ -349,6 +359,11 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -349,6 +359,11 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
tokio::select! { tokio::select! {
_ = cancellation_token.cancelled() => return Ok::<(), anyhow::Error>(()), _ = cancellation_token.cancelled() => return Ok::<(), anyhow::Error>(()),
Some(request) = onboard_rx.recv() => { Some(request) = onboard_rx.recv() => {
pool_metrics
.gauge("onboard_queue_size")
.set(onboard_rx.len() as i64);
// Try to allocate blocks on the device. // Try to allocate blocks on the device.
let target_blocks = match target_pool.allocate_blocks(request.blocks.len()).await { let target_blocks = match target_pool.allocate_blocks(request.blocks.len()).await {
Ok(blocks) => blocks, Ok(blocks) => blocks,
...@@ -358,6 +373,10 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> { ...@@ -358,6 +373,10 @@ impl<Metadata: BlockMetadata> OffloadManager<Metadata> {
} }
}; };
pool_metrics
.counter("onboard_processed")
.inc_by(request.blocks.len() as u64);
let sources = request let sources = request
.blocks .blocks
.iter() .iter()
...@@ -535,6 +554,7 @@ mod tests { ...@@ -535,6 +554,7 @@ mod tests {
use aligned_vec::avec; use aligned_vec::avec;
use cudarc::runtime::sys::{cudaMemcpy, cudaMemcpyKind, cudaMemset}; use cudarc::runtime::sys::{cudaMemcpy, cudaMemcpyKind, cudaMemset};
use prometheus::Registry;
use std::fs::File; use std::fs::File;
use std::io::{Read, Seek, SeekFrom, Write}; use std::io::{Read, Seek, SeekFrom, Write};
use std::mem::ManuallyDrop; use std::mem::ManuallyDrop;
...@@ -621,6 +641,7 @@ mod tests { ...@@ -621,6 +641,7 @@ mod tests {
device_pool.clone(), device_pool.clone(),
agent_arc, agent_arc,
async_rt_handle, async_rt_handle,
BlockManagerMetrics::new(&Arc::new(Registry::new()))?,
CancellationToken::new(), CancellationToken::new(),
)?; )?;
......
...@@ -73,10 +73,12 @@ use super::block::{ ...@@ -73,10 +73,12 @@ use super::block::{
GlobalRegistry, GlobalRegistry,
}; };
use super::events::{EventManager, NullEventManager}; use super::events::{EventManager, NullEventManager};
use super::metrics::{BlockManagerMetrics, PoolMetrics};
use super::storage::Storage; use super::storage::Storage;
use crate::tokens::{SequenceHash, TokenBlock}; use crate::tokens::{SequenceHash, TokenBlock};
use prometheus::Registry;
use std::{ use std::{
collections::{BTreeSet, HashMap, VecDeque}, collections::{BTreeSet, HashMap, VecDeque},
sync::{Arc, Weak}, sync::{Arc, Weak},
...@@ -124,12 +126,18 @@ pub struct BlockPoolArgs<S: Storage, M: BlockMetadata> { ...@@ -124,12 +126,18 @@ pub struct BlockPoolArgs<S: Storage, M: BlockMetadata> {
#[builder(default = "Handle::current()")] #[builder(default = "Handle::current()")]
async_runtime: Handle, async_runtime: Handle,
#[builder(
default = "BlockManagerMetrics::new(&Arc::new(Registry::new())).unwrap().pool(\"pool\")"
)]
pool_metrics: Arc<PoolMetrics>,
} }
impl<S: Storage, M: BlockMetadata> BlockPoolArgsBuilder<S, M> { impl<S: Storage, M: BlockMetadata> BlockPoolArgsBuilder<S, M> {
pub fn build(self) -> anyhow::Result<BlockPool<S, M>> { pub fn build(self) -> anyhow::Result<BlockPool<S, M>> {
let args = self.build_internal()?; let args = self.build_internal()?;
let (event_manager, cancel_token, blocks, global_registry, async_runtime) = args.dissolve(); let (event_manager, cancel_token, blocks, global_registry, async_runtime, metrics) =
args.dissolve();
tracing::info!("building block pool"); tracing::info!("building block pool");
let pool = BlockPool::new( let pool = BlockPool::new(
...@@ -138,6 +146,7 @@ impl<S: Storage, M: BlockMetadata> BlockPoolArgsBuilder<S, M> { ...@@ -138,6 +146,7 @@ impl<S: Storage, M: BlockMetadata> BlockPoolArgsBuilder<S, M> {
blocks, blocks,
global_registry, global_registry,
async_runtime, async_runtime,
metrics,
); );
Ok(pool) Ok(pool)
...@@ -216,6 +225,7 @@ impl<S: Storage, M: BlockMetadata> BlockPool<S, M> { ...@@ -216,6 +225,7 @@ impl<S: Storage, M: BlockMetadata> BlockPool<S, M> {
blocks: Vec<Block<S, M>>, blocks: Vec<Block<S, M>>,
global_registry: GlobalRegistry, global_registry: GlobalRegistry,
async_runtime: Handle, async_runtime: Handle,
metrics: Arc<PoolMetrics>,
) -> Self { ) -> Self {
let (pool, progress_engine) = Self::with_progress_engine( let (pool, progress_engine) = Self::with_progress_engine(
event_manager, event_manager,
...@@ -223,6 +233,7 @@ impl<S: Storage, M: BlockMetadata> BlockPool<S, M> { ...@@ -223,6 +233,7 @@ impl<S: Storage, M: BlockMetadata> BlockPool<S, M> {
blocks, blocks,
global_registry, global_registry,
async_runtime, async_runtime,
metrics,
); );
// pool.runtime.handle().spawn(async move { // pool.runtime.handle().spawn(async move {
...@@ -262,6 +273,7 @@ impl<S: Storage, M: BlockMetadata> BlockPool<S, M> { ...@@ -262,6 +273,7 @@ impl<S: Storage, M: BlockMetadata> BlockPool<S, M> {
blocks: Vec<Block<S, M>>, blocks: Vec<Block<S, M>>,
global_registry: GlobalRegistry, global_registry: GlobalRegistry,
async_runtime: Handle, async_runtime: Handle,
metrics: Arc<PoolMetrics>,
) -> (Self, ProgressEngine<S, M>) { ) -> (Self, ProgressEngine<S, M>) {
let (priority_tx, priority_rx) = tokio::sync::mpsc::unbounded_channel(); let (priority_tx, priority_rx) = tokio::sync::mpsc::unbounded_channel();
let (ctrl_tx, ctrl_rx) = tokio::sync::mpsc::unbounded_channel(); let (ctrl_tx, ctrl_rx) = tokio::sync::mpsc::unbounded_channel();
...@@ -274,6 +286,7 @@ impl<S: Storage, M: BlockMetadata> BlockPool<S, M> { ...@@ -274,6 +286,7 @@ impl<S: Storage, M: BlockMetadata> BlockPool<S, M> {
blocks, blocks,
global_registry, global_registry,
async_runtime, async_runtime,
metrics,
); );
( (
...@@ -474,6 +487,7 @@ struct State<S: Storage, M: BlockMetadata> { ...@@ -474,6 +487,7 @@ struct State<S: Storage, M: BlockMetadata> {
registry: BlockRegistry, registry: BlockRegistry,
return_tx: tokio::sync::mpsc::UnboundedSender<Block<S, M>>, return_tx: tokio::sync::mpsc::UnboundedSender<Block<S, M>>,
event_manager: Arc<dyn EventManager>, event_manager: Arc<dyn EventManager>,
metrics: Arc<PoolMetrics>,
} }
struct ProgressEngine<S: Storage, M: BlockMetadata> { struct ProgressEngine<S: Storage, M: BlockMetadata> {
...@@ -482,6 +496,7 @@ struct ProgressEngine<S: Storage, M: BlockMetadata> { ...@@ -482,6 +496,7 @@ struct ProgressEngine<S: Storage, M: BlockMetadata> {
cancel_token: CancellationToken, cancel_token: CancellationToken,
state: State<S, M>, state: State<S, M>,
return_rx: tokio::sync::mpsc::UnboundedReceiver<Block<S, M>>, return_rx: tokio::sync::mpsc::UnboundedReceiver<Block<S, M>>,
metrics: Arc<PoolMetrics>,
} }
#[cfg(test)] #[cfg(test)]
...@@ -498,7 +513,7 @@ mod tests { ...@@ -498,7 +513,7 @@ mod tests {
self, self,
) -> anyhow::Result<(BlockPool<S, M>, ProgressEngine<S, M>)> { ) -> anyhow::Result<(BlockPool<S, M>, ProgressEngine<S, M>)> {
let args = self.build_internal()?; let args = self.build_internal()?;
let (event_manager, cancel_token, blocks, global_registry, async_runtime) = let (event_manager, cancel_token, blocks, global_registry, async_runtime, metrics) =
args.dissolve(); args.dissolve();
let (pool, progress_engine) = BlockPool::with_progress_engine( let (pool, progress_engine) = BlockPool::with_progress_engine(
event_manager, event_manager,
...@@ -506,6 +521,7 @@ mod tests { ...@@ -506,6 +521,7 @@ mod tests {
blocks, blocks,
global_registry, global_registry,
async_runtime, async_runtime,
metrics,
); );
Ok((pool, progress_engine)) Ok((pool, progress_engine))
......
...@@ -26,6 +26,7 @@ impl<S: Storage, M: BlockMetadata> State<S, M> { ...@@ -26,6 +26,7 @@ impl<S: Storage, M: BlockMetadata> State<S, M> {
return_tx: tokio::sync::mpsc::UnboundedSender<Block<S, M>>, return_tx: tokio::sync::mpsc::UnboundedSender<Block<S, M>>,
global_registry: GlobalRegistry, global_registry: GlobalRegistry,
async_runtime: Handle, async_runtime: Handle,
metrics: Arc<PoolMetrics>,
) -> Self { ) -> Self {
Self { Self {
active: ActiveBlockPool::new(), active: ActiveBlockPool::new(),
...@@ -33,6 +34,7 @@ impl<S: Storage, M: BlockMetadata> State<S, M> { ...@@ -33,6 +34,7 @@ impl<S: Storage, M: BlockMetadata> State<S, M> {
registry: BlockRegistry::new(event_manager.clone(), global_registry, async_runtime), registry: BlockRegistry::new(event_manager.clone(), global_registry, async_runtime),
return_tx, return_tx,
event_manager, event_manager,
metrics,
} }
} }
...@@ -126,6 +128,10 @@ impl<S: Storage, M: BlockMetadata> State<S, M> { ...@@ -126,6 +128,10 @@ impl<S: Storage, M: BlockMetadata> State<S, M> {
} }
} }
self.metrics
.counter("blocks_allocated")
.inc_by(count as u64);
Ok(blocks) Ok(blocks)
} }
...@@ -195,6 +201,10 @@ impl<S: Storage, M: BlockMetadata> State<S, M> { ...@@ -195,6 +201,10 @@ impl<S: Storage, M: BlockMetadata> State<S, M> {
assert_eq!(immutable_blocks.len(), expected_len); assert_eq!(immutable_blocks.len(), expected_len);
self.metrics
.counter("blocks_registered")
.inc_by(immutable_blocks.len() as u64);
Ok(immutable_blocks) Ok(immutable_blocks)
} }
...@@ -204,9 +214,9 @@ impl<S: Storage, M: BlockMetadata> State<S, M> { ...@@ -204,9 +214,9 @@ impl<S: Storage, M: BlockMetadata> State<S, M> {
return_rx: &mut tokio::sync::mpsc::UnboundedReceiver<Block<S, M>>, return_rx: &mut tokio::sync::mpsc::UnboundedReceiver<Block<S, M>>,
) -> Vec<ImmutableBlock<S, M>> { ) -> Vec<ImmutableBlock<S, M>> {
let mut immutable_blocks = Vec::new(); let mut immutable_blocks = Vec::new();
for sequence_hash in sequence_hashes { for sequence_hash in &sequence_hashes {
if !self.registry.is_registered(sequence_hash) { if !self.registry.is_registered(*sequence_hash) {
return immutable_blocks; break;
} }
// the block is registered, so to get it from either the: // the block is registered, so to get it from either the:
...@@ -214,16 +224,17 @@ impl<S: Storage, M: BlockMetadata> State<S, M> { ...@@ -214,16 +224,17 @@ impl<S: Storage, M: BlockMetadata> State<S, M> {
// 2. inactive pool // 2. inactive pool
// 3. return channel // 3. return channel
if let Some(immutable) = self.active.match_sequence_hash(sequence_hash) { if let Some(immutable) = self.active.match_sequence_hash(*sequence_hash) {
immutable_blocks.push(immutable); immutable_blocks.push(immutable);
continue; continue;
} }
let raw_block = let raw_block =
if let Some(raw_block) = self.inactive.match_sequence_hash(sequence_hash) { if let Some(raw_block) = self.inactive.match_sequence_hash(*sequence_hash) {
raw_block raw_block
} else { } else {
self.wait_for_returned_block(sequence_hash, return_rx).await self.wait_for_returned_block(*sequence_hash, return_rx)
.await
}; };
// this assert allows us to skip the error checking on the active pool registration step // this assert allows us to skip the error checking on the active pool registration step
...@@ -239,6 +250,13 @@ impl<S: Storage, M: BlockMetadata> State<S, M> { ...@@ -239,6 +250,13 @@ impl<S: Storage, M: BlockMetadata> State<S, M> {
immutable_blocks.push(immutable); immutable_blocks.push(immutable);
} }
self.metrics
.counter("cache_hits")
.inc_by(immutable_blocks.len() as u64);
self.metrics
.counter("cache_misses")
.inc_by(sequence_hashes.len() as u64 - immutable_blocks.len() as u64);
immutable_blocks immutable_blocks
} }
...@@ -254,6 +272,7 @@ impl<S: Storage, M: BlockMetadata> State<S, M> { ...@@ -254,6 +272,7 @@ impl<S: Storage, M: BlockMetadata> State<S, M> {
} }
impl<S: Storage, M: BlockMetadata> ProgressEngine<S, M> { impl<S: Storage, M: BlockMetadata> ProgressEngine<S, M> {
#[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
event_manager: Arc<dyn EventManager>, event_manager: Arc<dyn EventManager>,
priority_rx: tokio::sync::mpsc::UnboundedReceiver<PriorityRequest<S, M>>, priority_rx: tokio::sync::mpsc::UnboundedReceiver<PriorityRequest<S, M>>,
...@@ -262,10 +281,16 @@ impl<S: Storage, M: BlockMetadata> ProgressEngine<S, M> { ...@@ -262,10 +281,16 @@ impl<S: Storage, M: BlockMetadata> ProgressEngine<S, M> {
blocks: Vec<Block<S, M>>, blocks: Vec<Block<S, M>>,
global_registry: GlobalRegistry, global_registry: GlobalRegistry,
async_runtime: Handle, async_runtime: Handle,
metrics: Arc<PoolMetrics>,
) -> Self { ) -> Self {
let (return_tx, return_rx) = tokio::sync::mpsc::unbounded_channel(); let (return_tx, return_rx) = tokio::sync::mpsc::unbounded_channel();
let mut state = let mut state = State::<S, M>::new(
State::<S, M>::new(event_manager, return_tx, global_registry, async_runtime); event_manager,
return_tx,
global_registry,
async_runtime,
metrics.clone(),
);
tracing::debug!(count = blocks.len(), "adding blocks to inactive pool"); tracing::debug!(count = blocks.len(), "adding blocks to inactive pool");
state.inactive.add_blocks(blocks); state.inactive.add_blocks(blocks);
...@@ -276,6 +301,7 @@ impl<S: Storage, M: BlockMetadata> ProgressEngine<S, M> { ...@@ -276,6 +301,7 @@ impl<S: Storage, M: BlockMetadata> ProgressEngine<S, M> {
cancel_token, cancel_token,
state, state,
return_rx, return_rx,
metrics,
} }
} }
...@@ -284,14 +310,17 @@ impl<S: Storage, M: BlockMetadata> ProgressEngine<S, M> { ...@@ -284,14 +310,17 @@ impl<S: Storage, M: BlockMetadata> ProgressEngine<S, M> {
biased; biased;
Some(priority_req) = self.priority_rx.recv(), if !self.priority_rx.is_closed() => { Some(priority_req) = self.priority_rx.recv(), if !self.priority_rx.is_closed() => {
self.metrics.gauge("priority_request_queue_size").set(self.priority_rx.len() as i64);
self.state.handle_priority_request(priority_req, &mut self.return_rx).await; self.state.handle_priority_request(priority_req, &mut self.return_rx).await;
} }
Some(req) = self.ctrl_rx.recv(), if !self.ctrl_rx.is_closed() => { Some(req) = self.ctrl_rx.recv(), if !self.ctrl_rx.is_closed() => {
self.metrics.gauge("control_request_queue_size").set(self.ctrl_rx.len() as i64);
self.state.handle_control_request(req); self.state.handle_control_request(req);
} }
Some(block) = self.return_rx.recv() => { Some(block) = self.return_rx.recv() => {
self.metrics.gauge("return_block_queue_size").set(self.return_rx.len() as i64);
self.state.handle_return_block(block); self.state.handle_return_block(block);
} }
......
...@@ -20,6 +20,7 @@ use super::{ ...@@ -20,6 +20,7 @@ use super::{
block::{Block, GlobalRegistry, ImmutableBlock}, block::{Block, GlobalRegistry, ImmutableBlock},
config::NixlOptions, config::NixlOptions,
events::{EventManager, NullEventManager}, events::{EventManager, NullEventManager},
metrics::{BlockManagerMetrics, PoolMetrics},
}; };
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::Handle; use tokio::runtime::Handle;
...@@ -58,6 +59,9 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<Metadata> { ...@@ -58,6 +59,9 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<Metadata> {
let mut nixl_backends: HashMap<String, Arc<nixl_sys::Backend>> = HashMap::new(); let mut nixl_backends: HashMap<String, Arc<nixl_sys::Backend>> = HashMap::new();
let global_registry = GlobalRegistry::default(); let global_registry = GlobalRegistry::default();
let metrics = BlockManagerMetrics::new(&config.runtime.metrics_registry)?;
let event_manager = config let event_manager = config
.event_manager .event_manager
.clone() .clone()
...@@ -135,6 +139,7 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<Metadata> { ...@@ -135,6 +139,7 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<Metadata> {
worker_id, worker_id,
global_registry.clone(), global_registry.clone(),
async_rt_handle.clone(), async_rt_handle.clone(),
metrics.pool("disk"),
Some(event_manager.clone()), Some(event_manager.clone()),
)?; )?;
(Some(Arc::new(pool)), Some(blocks)) (Some(Arc::new(pool)), Some(blocks))
...@@ -158,6 +163,7 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<Metadata> { ...@@ -158,6 +163,7 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<Metadata> {
worker_id, worker_id,
global_registry.clone(), global_registry.clone(),
async_rt_handle.clone(), async_rt_handle.clone(),
metrics.pool("host"),
Some(event_manager.clone()), Some(event_manager.clone()),
)?; )?;
(Some(Arc::new(pool)), Some(blocks)) (Some(Arc::new(pool)), Some(blocks))
...@@ -180,6 +186,7 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<Metadata> { ...@@ -180,6 +186,7 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<Metadata> {
worker_id, worker_id,
global_registry.clone(), global_registry.clone(),
async_rt_handle.clone(), async_rt_handle.clone(),
metrics.pool("device"),
Some(event_manager.clone()), Some(event_manager.clone()),
)?; )?;
(Some(Arc::new(pool)), Some(blocks)) (Some(Arc::new(pool)), Some(blocks))
...@@ -200,6 +207,7 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<Metadata> { ...@@ -200,6 +207,7 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<Metadata> {
device_pool.clone(), device_pool.clone(),
nixl_agent.clone(), nixl_agent.clone(),
async_rt_handle, async_rt_handle,
metrics.clone(),
cancellation_token.clone(), cancellation_token.clone(),
)?; )?;
...@@ -475,7 +483,7 @@ fn create_layout<S: Storage + NixlRegisterableStorage>( ...@@ -475,7 +483,7 @@ fn create_layout<S: Storage + NixlRegisterableStorage>(
anyhow::bail!("failed to create layout"); anyhow::bail!("failed to create layout");
} }
#[expect(clippy::type_complexity)] #[expect(clippy::type_complexity, clippy::too_many_arguments)]
fn create_block_pool<S: Storage + NixlRegisterableStorage, M: BlockMetadata>( fn create_block_pool<S: Storage + NixlRegisterableStorage, M: BlockMetadata>(
layout: Arc<dyn NixlLayout<StorageType = S>>, layout: Arc<dyn NixlLayout<StorageType = S>>,
block_set_idx: usize, block_set_idx: usize,
...@@ -483,6 +491,7 @@ fn create_block_pool<S: Storage + NixlRegisterableStorage, M: BlockMetadata>( ...@@ -483,6 +491,7 @@ fn create_block_pool<S: Storage + NixlRegisterableStorage, M: BlockMetadata>(
worker_id: WorkerID, worker_id: WorkerID,
global_registry: GlobalRegistry, global_registry: GlobalRegistry,
async_runtime: Handle, async_runtime: Handle,
pool_metrics: Arc<PoolMetrics>,
event_manager: Option<Arc<dyn EventManager>>, event_manager: Option<Arc<dyn EventManager>>,
) -> Result<(BlockPool<S, M>, Vec<Block<S, M>>)> { ) -> Result<(BlockPool<S, M>, Vec<Block<S, M>>)> {
let blocks = block::layout_to_blocks::<_, M>(layout, block_set_idx, worker_id)?; let blocks = block::layout_to_blocks::<_, M>(layout, block_set_idx, worker_id)?;
...@@ -491,6 +500,7 @@ fn create_block_pool<S: Storage + NixlRegisterableStorage, M: BlockMetadata>( ...@@ -491,6 +500,7 @@ fn create_block_pool<S: Storage + NixlRegisterableStorage, M: BlockMetadata>(
.cancel_token(cancellation_token) .cancel_token(cancellation_token)
.global_registry(global_registry) .global_registry(global_registry)
.async_runtime(async_runtime) .async_runtime(async_runtime)
.pool_metrics(pool_metrics)
.event_manager(event_manager) .event_manager(event_manager)
.build()?; .build()?;
Ok((pool, blocks)) Ok((pool, blocks))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment