Unverified Commit 4224e57d authored by Ziqi Fan's avatar Ziqi Fan Committed by GitHub
Browse files

feat: rm the old KVBM metrics | update G2 to G3 metrics collection (#3561)


Signed-off-by: default avatarZiqi Fan <ziqif@nvidia.com>
parent 55e458d8
...@@ -102,7 +102,7 @@ DYN_KVBM_METRICS=true \ ...@@ -102,7 +102,7 @@ DYN_KVBM_METRICS=true \
python -m dynamo.vllm \ python -m dynamo.vllm \
--model Qwen/Qwen3-0.6B \ --model Qwen/Qwen3-0.6B \
--enforce-eager \ --enforce-eager \
--connector kvbm & --connector kvbm
# optional if firewall blocks KVBM metrics ports to send prometheus metrics # optional if firewall blocks KVBM metrics ports to send prometheus metrics
sudo ufw allow 6880/tcp sudo ufw allow 6880/tcp
......
...@@ -13,6 +13,34 @@ use pyo3::PyResult; ...@@ -13,6 +13,34 @@ use pyo3::PyResult;
use std::time::Duration; use std::time::Duration;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
/// Creates a disk offload filter based on environment configuration.
/// Returns `Ok(None)` if the filter is disabled via `DYN_KVBM_DISABLE_DISK_OFFLOAD_FILTER`,
/// otherwise constructs a `FrequencyFilter` with standard parameters.
fn create_disk_offload_filter(
cancel_token: &CancellationToken,
runtime: &tokio::runtime::Handle,
) -> Result<Option<Arc<FrequencyFilter>>> {
// Check if disk offload filter is disabled via environment variable
let disable_filter = std::env::var("DYN_KVBM_DISABLE_DISK_OFFLOAD_FILTER")
.map(|v| v == "true" || v == "1")
.unwrap_or(false);
if disable_filter {
return Ok(None);
}
// TODO: These values seem plausible for most use cases, but we need to figure out a better way to configure them.
let frequency_filter = FrequencyFilter::new(
2,
Duration::from_secs(600),
1_000_000,
cancel_token.child_token(),
runtime.clone(),
)?;
Ok(Some(Arc::new(frequency_filter)))
}
mod controller; mod controller;
mod distributed; mod distributed;
...@@ -104,23 +132,11 @@ impl BlockManager { ...@@ -104,23 +132,11 @@ impl BlockManager {
.logical(Some(BlockParallelismStrategy::LeaderWorkerSharded)); .logical(Some(BlockParallelismStrategy::LeaderWorkerSharded));
if leader.num_disk_blocks() > 0 { if leader.num_disk_blocks() > 0 {
// Check if disk offload filter is disabled via environment variable if let Some(filter) =
let disable_filter = std::env::var("DYN_KVBM_DISABLE_DISK_OFFLOAD_FILTER") create_disk_offload_filter(&cancel_token, &rt.inner().runtime().primary())
.map(|v| v == "true" || v == "1") .map_err(to_pyerr)?
.unwrap_or(false); {
host_layout_config = host_layout_config.offload_filter(Some(filter));
if !disable_filter {
// TODO: These values seem plausible for most use cases, but we need to figure out a better way to configure them.
let frequency_filter = FrequencyFilter::new(
2,
Duration::from_secs(600),
1e6 as usize,
cancel_token.child_token(),
rt.inner().runtime().primary().clone(),
)
.map_err(to_pyerr)?;
host_layout_config =
host_layout_config.offload_filter(Some(Arc::new(frequency_filter)));
} }
} }
...@@ -316,12 +332,20 @@ impl BlockManagerBuilder { ...@@ -316,12 +332,20 @@ impl BlockManagerBuilder {
} }
if leader_inner.num_host_blocks() > 0 { if leader_inner.num_host_blocks() > 0 {
config = config.host_layout( let mut host_layout_config =
dynamo_llm::block_manager::KvManagerLayoutConfig::builder() dynamo_llm::block_manager::KvManagerLayoutConfig::builder()
.num_blocks(leader_inner.num_host_blocks()) .num_blocks(leader_inner.num_host_blocks())
.logical(Some(BlockParallelismStrategy::LeaderWorkerSharded)) .logical(Some(BlockParallelismStrategy::LeaderWorkerSharded));
.build()?,
); if leader_inner.num_disk_blocks() > 0 {
if let Some(filter) =
create_disk_offload_filter(&cancel_token, &drt.inner().runtime().primary())?
{
host_layout_config = host_layout_config.offload_filter(Some(filter));
}
}
config = config.host_layout(host_layout_config.build()?);
} }
if leader_inner.num_disk_blocks() > 0 { if leader_inner.num_disk_blocks() > 0 {
......
...@@ -1277,10 +1277,6 @@ async fn process_offload_request( ...@@ -1277,10 +1277,6 @@ async fn process_offload_request(
leader: &Arc<KvbmLeader>, leader: &Arc<KvbmLeader>,
kvbm_metrics: KvbmMetrics, kvbm_metrics: KvbmMetrics,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
kvbm_metrics
.offload_blocks_d2h
.inc_by(offload_req.block_ids.len() as u64);
let request_id = &offload_req.request_id; let request_id = &offload_req.request_id;
let operation_id = &offload_req.operation_id; let operation_id = &offload_req.operation_id;
...@@ -1367,6 +1363,10 @@ async fn process_offload_request( ...@@ -1367,6 +1363,10 @@ async fn process_offload_request(
"offload - stage 4 complete" "offload - stage 4 complete"
); );
kvbm_metrics
.offload_blocks_d2h
.inc_by(blocks_to_register.len() as u64);
// 5. Register the mutable blocks // 5. Register the mutable blocks
let immutable_blocks = block_manager let immutable_blocks = block_manager
.host() .host()
......
...@@ -15,7 +15,6 @@ pub mod connector; ...@@ -15,7 +15,6 @@ pub mod connector;
pub mod distributed; pub mod distributed;
pub mod events; pub mod events;
pub mod layout; pub mod layout;
pub mod metrics;
pub mod metrics_kvbm; pub mod metrics_kvbm;
pub mod offload; pub mod offload;
pub mod pool; pub mod pool;
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use prometheus::{
IntCounterVec, IntGaugeVec, Opts, Registry,
core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
};
use std::sync::Arc;
pub struct BlockManagerMetrics {
gauges: IntGaugeVec,
counters: IntCounterVec,
}
impl BlockManagerMetrics {
pub fn new(metrics_registry: &Arc<Registry>) -> Result<Arc<Self>> {
let gauge_opts = Opts::new("gauges", "Gauges for the pools")
.namespace("dynamo")
.subsystem("kvbm");
let counter_opts = Opts::new("pools", "Counters for the pools")
.namespace("dynamo")
.subsystem("kvbm");
let gauges = register_int_gauge_vec_with_registry!(
gauge_opts,
&["pool", "metric_type"],
metrics_registry
)?;
let counters = register_int_counter_vec_with_registry!(
counter_opts,
&["pool", "metric_type"],
metrics_registry
)?;
Ok(Arc::new(Self { gauges, counters }))
}
pub fn pool(self: &Arc<Self>, group: &str) -> Arc<PoolMetrics> {
PoolMetrics::new(self, group)
}
}
pub struct PoolMetrics {
block_manager_metrics: Arc<BlockManagerMetrics>,
group: String,
}
impl PoolMetrics {
pub fn new(block_manager_metrics: &Arc<BlockManagerMetrics>, group: &str) -> Arc<Self> {
Arc::new(Self {
block_manager_metrics: block_manager_metrics.clone(),
group: group.to_string(),
})
}
pub fn gauge(&self, metric_type: &str) -> GenericGauge<AtomicI64> {
self.block_manager_metrics
.gauges
.with_label_values(&[&self.group, &metric_type.to_string()])
}
pub fn counter(&self, metric_type: &str) -> GenericCounter<AtomicU64> {
self.block_manager_metrics
.counters
.with_label_values(&[&self.group, &metric_type.to_string()])
}
}
...@@ -37,7 +37,6 @@ use super::block::{ ...@@ -37,7 +37,6 @@ use super::block::{
locality::LocalityProvider, locality::LocalityProvider,
transfer::{PoolConfig, TransferContext}, transfer::{PoolConfig, TransferContext},
}; };
use super::metrics::{BlockManagerMetrics, PoolMetrics};
use super::pool::{BlockPool, BlockPoolError}; use super::pool::{BlockPool, BlockPoolError};
use super::storage::{Cuda, Storage}; use super::storage::{Cuda, Storage};
use super::{DeviceStorage, DiskStorage, KvManagerModelConfig, PinnedStorage}; use super::{DeviceStorage, DiskStorage, KvManagerModelConfig, PinnedStorage};
...@@ -77,7 +76,6 @@ pub const MAX_TRANSFER_BATCH_SIZE: usize = 16; ...@@ -77,7 +76,6 @@ pub const MAX_TRANSFER_BATCH_SIZE: usize = 16;
pub struct OffloadManagerConfig { pub struct OffloadManagerConfig {
pub nixl_agent: Arc<Option<NixlAgent>>, pub nixl_agent: Arc<Option<NixlAgent>>,
pub async_rt_handle: Handle, pub async_rt_handle: Handle,
pub metrics: Arc<BlockManagerMetrics>,
pub cancellation_token: CancellationToken, pub cancellation_token: CancellationToken,
pub model_config: KvManagerModelConfig, pub model_config: KvManagerModelConfig,
/// Optional KVBM-level metrics for tracking offload/onboard operations /// Optional KVBM-level metrics for tracking offload/onboard operations
...@@ -103,9 +101,6 @@ pub struct OffloadManager<Locality: LocalityProvider, Metadata: BlockMetadata> { ...@@ -103,9 +101,6 @@ pub struct OffloadManager<Locality: LocalityProvider, Metadata: BlockMetadata> {
/// An incrementing counter for offloaded blocks. Within the same priority, blocks with lower tick values are processed first. /// An incrementing counter for offloaded blocks. Within the same priority, blocks with lower tick values are processed first.
tick: Arc<AtomicU64>, tick: Arc<AtomicU64>,
/// Optional KVBM-level metrics for tracking offload/onboard operations
kvbm_metrics: Option<crate::block_manager::metrics_kvbm::KvbmMetrics>,
} }
impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
...@@ -134,7 +129,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -134,7 +129,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
host_onboard_tx, host_onboard_tx,
disk_onboard_tx, disk_onboard_tx,
tick: Arc::new(AtomicU64::new(0)), tick: Arc::new(AtomicU64::new(0)),
kvbm_metrics: config.kvbm_metrics.clone(),
}); });
let cuda_ctx = Cuda::device_or_create(0)?; let cuda_ctx = Cuda::device_or_create(0)?;
...@@ -155,10 +149,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -155,10 +149,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
Some(pool_config), Some(pool_config),
)); ));
let device_metrics = config.metrics.pool("device");
let host_metrics = config.metrics.pool("host");
let disk_metrics = config.metrics.pool("disk");
// Device -> Host offload // Device -> Host offload
let device_to_host_task = OffloadManager::offload_worker( let device_to_host_task = OffloadManager::offload_worker(
this.device.clone(), this.device.clone(),
...@@ -170,15 +160,16 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -170,15 +160,16 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
MAX_CONCURRENT_TRANSFERS, MAX_CONCURRENT_TRANSFERS,
&config.async_rt_handle, &config.async_rt_handle,
config.cancellation_token.clone(), config.cancellation_token.clone(),
device_metrics.clone(),
"offload_bw".to_string(),
)?, )?,
MAX_TRANSFER_BATCH_SIZE, MAX_TRANSFER_BATCH_SIZE,
&config.async_rt_handle, &config.async_rt_handle,
config.cancellation_token.clone(), config.cancellation_token.clone(),
)), )),
filters.device.clone(), filters.device.clone(),
device_metrics.clone(), config
.kvbm_metrics
.as_ref()
.map(|m| m.offload_blocks_d2h.clone()),
config.cancellation_token.clone(), config.cancellation_token.clone(),
); );
CriticalTaskExecutionHandle::new_with_runtime( CriticalTaskExecutionHandle::new_with_runtime(
...@@ -207,15 +198,16 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -207,15 +198,16 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
MAX_CONCURRENT_TRANSFERS, MAX_CONCURRENT_TRANSFERS,
&config.async_rt_handle, &config.async_rt_handle,
config.cancellation_token.clone(), config.cancellation_token.clone(),
host_metrics.clone(),
"offload_bw".to_string(),
)?, )?,
MAX_TRANSFER_BATCH_SIZE, MAX_TRANSFER_BATCH_SIZE,
&config.async_rt_handle, &config.async_rt_handle,
config.cancellation_token.clone(), config.cancellation_token.clone(),
)), )),
filters.host.clone(), filters.host.clone(),
host_metrics.clone(), config
.kvbm_metrics
.as_ref()
.map(|m| m.offload_blocks_h2d.clone()),
config.cancellation_token.clone(), config.cancellation_token.clone(),
); );
CriticalTaskExecutionHandle::new_with_runtime( CriticalTaskExecutionHandle::new_with_runtime(
...@@ -237,14 +229,11 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -237,14 +229,11 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
MAX_CONCURRENT_TRANSFERS, MAX_CONCURRENT_TRANSFERS,
&config.async_rt_handle, &config.async_rt_handle,
config.cancellation_token.clone(), config.cancellation_token.clone(),
host_metrics.clone(),
"onboard_bw".to_string(),
)?, )?,
MAX_TRANSFER_BATCH_SIZE, MAX_TRANSFER_BATCH_SIZE,
&config.async_rt_handle, &config.async_rt_handle,
config.cancellation_token.clone(), config.cancellation_token.clone(),
)), )),
host_metrics.clone(),
config.cancellation_token.clone(), config.cancellation_token.clone(),
); );
CriticalTaskExecutionHandle::new_with_runtime( CriticalTaskExecutionHandle::new_with_runtime(
...@@ -266,14 +255,11 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -266,14 +255,11 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
MAX_CONCURRENT_TRANSFERS, MAX_CONCURRENT_TRANSFERS,
&config.async_rt_handle, &config.async_rt_handle,
config.cancellation_token.clone(), config.cancellation_token.clone(),
disk_metrics.clone(),
"onboard_bw".to_string(),
)?, )?,
MAX_TRANSFER_BATCH_SIZE, MAX_TRANSFER_BATCH_SIZE,
&config.async_rt_handle, &config.async_rt_handle,
config.cancellation_token.clone(), config.cancellation_token.clone(),
)), )),
disk_metrics.clone(),
config.cancellation_token.clone(), config.cancellation_token.clone(),
); );
CriticalTaskExecutionHandle::new_with_runtime( CriticalTaskExecutionHandle::new_with_runtime(
...@@ -293,7 +279,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -293,7 +279,7 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
mut offload_rx: mpsc::UnboundedReceiver<OffloadRequest<Source, Locality, Metadata>>, mut offload_rx: mpsc::UnboundedReceiver<OffloadRequest<Source, Locality, Metadata>>,
transfer_manager: Arc<dyn TransferManager<Source, Target, Locality, Metadata>>, transfer_manager: Arc<dyn TransferManager<Source, Target, Locality, Metadata>>,
offload_filter: Option<Arc<dyn OffloadFilter>>, offload_filter: Option<Arc<dyn OffloadFilter>>,
pool_metrics: Arc<PoolMetrics>, offload_metric: Option<prometheus::IntCounter>,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
) -> Result<()> { ) -> Result<()> {
if source_pool.is_none() || target_pool.is_none() { if source_pool.is_none() || target_pool.is_none() {
...@@ -315,7 +301,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -315,7 +301,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
match offload_rx.try_recv() { match offload_rx.try_recv() {
Ok(request) => { Ok(request) => {
queue.insert(request); queue.insert(request);
pool_metrics.gauge("offload_queue_size").inc();
} }
Err(TryRecvError::Empty) => { Err(TryRecvError::Empty) => {
break; break;
...@@ -326,7 +311,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -326,7 +311,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
// If there is a request, process it. // If there is a request, process it.
if let Some(request) = queue.pop_first() { if let Some(request) = queue.pop_first() {
pool_metrics.gauge("offload_queue_size").dec();
// Try to upgrade the block to a strong reference. // Try to upgrade the block to a strong reference.
let block = match request.block.upgrade() { let block = match request.block.upgrade() {
Some(block) => Some(ImmutableBlock::new(block)), Some(block) => Some(ImmutableBlock::new(block)),
...@@ -368,11 +352,16 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -368,11 +352,16 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
}; };
if let Some(target_block) = target_block { if let Some(target_block) = target_block {
pool_metrics.counter("offload_processed").inc();
tracing::debug!( tracing::debug!(
"Offloading block with sequence hash {} to target pool.", "Offloading block with sequence hash {} to target pool.",
request.sequence_hash request.sequence_hash
); );
// Track the offload metric if available
if let Some(ref metric) = offload_metric {
metric.inc();
}
transfer_manager transfer_manager
.enqueue_transfer(PendingTransfer::new( .enqueue_transfer(PendingTransfer::new(
vec![block], vec![block],
...@@ -389,7 +378,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -389,7 +378,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
_ = cancellation_token.cancelled() => return Ok(()), _ = cancellation_token.cancelled() => return Ok(()),
Some(request) = offload_rx.recv() => { Some(request) = offload_rx.recv() => {
queue.insert(request); queue.insert(request);
pool_metrics.gauge("offload_queue_size").inc();
} }
} }
} }
...@@ -401,7 +389,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -401,7 +389,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
target_pool: Option<Arc<dyn BlockPool<Target, Locality, Metadata>>>, target_pool: Option<Arc<dyn BlockPool<Target, Locality, Metadata>>>,
mut onboard_rx: mpsc::UnboundedReceiver<OnboardRequest<Source, Target, Locality, Metadata>>, mut onboard_rx: mpsc::UnboundedReceiver<OnboardRequest<Source, Target, Locality, Metadata>>,
transfer_manager: Arc<dyn TransferManager<Source, Target, Locality, Metadata>>, transfer_manager: Arc<dyn TransferManager<Source, Target, Locality, Metadata>>,
pool_metrics: Arc<PoolMetrics>,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
) -> Result<()> { ) -> Result<()> {
if source_pool.is_none() || target_pool.is_none() { if source_pool.is_none() || target_pool.is_none() {
...@@ -414,10 +401,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -414,10 +401,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
_ = cancellation_token.cancelled() => return Ok::<(), anyhow::Error>(()), _ = cancellation_token.cancelled() => return Ok::<(), anyhow::Error>(()),
Some(request) = onboard_rx.recv() => { Some(request) = onboard_rx.recv() => {
pool_metrics
.gauge("onboard_queue_size")
.set(onboard_rx.len() as i64);
// Try to allocate blocks on the device. // Try to allocate blocks on the device.
let target_blocks = if let Some(targets) = request.targets { let target_blocks = if let Some(targets) = request.targets {
targets targets
...@@ -431,10 +414,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -431,10 +414,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
} }
}; };
pool_metrics
.counter("onboard_processed")
.inc_by(request.blocks.len() as u64);
tracing::debug!("Onboarding {} blocks to target pool.", request.blocks.len()); tracing::debug!("Onboarding {} blocks to target pool.", request.blocks.len());
transfer_manager transfer_manager
...@@ -491,11 +470,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -491,11 +470,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
key, key,
}; };
// Track metrics if available
if let Some(ref kvbm_metrics) = self.kvbm_metrics {
kvbm_metrics.offload_blocks_d2h.inc();
}
self.device_offload_tx.send(request).unwrap(); self.device_offload_tx.send(request).unwrap();
} else if let Some(host_block) = } else if let Some(host_block) =
any_block.downcast_ref::<ImmutableBlock<PinnedStorage, Locality, Metadata>>() any_block.downcast_ref::<ImmutableBlock<PinnedStorage, Locality, Metadata>>()
...@@ -511,11 +485,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata> ...@@ -511,11 +485,6 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
key, key,
}; };
// Track metrics if available
if let Some(ref kvbm_metrics) = self.kvbm_metrics {
kvbm_metrics.offload_blocks_h2d.inc();
}
self.host_offload_tx.send(request).unwrap(); self.host_offload_tx.send(request).unwrap();
} }
...@@ -833,9 +802,9 @@ mod tests { ...@@ -833,9 +802,9 @@ mod tests {
let config = OffloadManagerConfig { let config = OffloadManagerConfig {
nixl_agent: agent_arc, nixl_agent: agent_arc,
async_rt_handle, async_rt_handle,
metrics: BlockManagerMetrics::new(&Arc::new(Registry::new()))?,
cancellation_token: CancellationToken::new(), cancellation_token: CancellationToken::new(),
model_config: minimal_config, model_config: minimal_config,
kvbm_metrics: None,
}; };
let manager = OffloadManager::new( let manager = OffloadManager::new(
......
...@@ -30,7 +30,6 @@ use nixl_sys::NixlDescriptor; ...@@ -30,7 +30,6 @@ use nixl_sys::NixlDescriptor;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::runtime::Handle; use tokio::runtime::Handle;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
...@@ -41,7 +40,6 @@ use crate::block_manager::block::{ ...@@ -41,7 +40,6 @@ use crate::block_manager::block::{
locality::LocalityProvider, locality::LocalityProvider,
transfer::{TransferContext, WriteTo, WriteToStrategy}, transfer::{TransferContext, WriteTo, WriteToStrategy},
}; };
use crate::block_manager::metrics::PoolMetrics;
use crate::block_manager::pool::{BlockPool, BlockPoolError}; use crate::block_manager::pool::{BlockPool, BlockPoolError};
use crate::block_manager::storage::{Local, Storage}; use crate::block_manager::storage::{Local, Storage};
...@@ -53,8 +51,6 @@ use super::BlockResult; ...@@ -53,8 +51,6 @@ use super::BlockResult;
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle; use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;
const BLOCKS_BW_MIN_PUBLISH_INTERVAL_MS: u64 = 50;
/// Manage a set of pending transfers. /// Manage a set of pending transfers.
pub struct PendingTransfer< pub struct PendingTransfer<
Source: Storage, Source: Storage,
...@@ -164,10 +160,6 @@ struct TransferCompletionManager< ...@@ -164,10 +160,6 @@ struct TransferCompletionManager<
Locality: LocalityProvider, Locality: LocalityProvider,
Metadata: BlockMetadata, Metadata: BlockMetadata,
> { > {
pool_metrics: Arc<PoolMetrics>,
transfer_type: String,
last_publish_time: Option<Instant>,
transfer_start: Instant,
num_blocks_transferred: usize, num_blocks_transferred: usize,
_phantom: PhantomData<(Source, Target, Locality, Metadata)>, _phantom: PhantomData<(Source, Target, Locality, Metadata)>,
} }
...@@ -175,12 +167,8 @@ struct TransferCompletionManager< ...@@ -175,12 +167,8 @@ struct TransferCompletionManager<
impl<Source: Storage, Target: Storage, Locality: LocalityProvider, Metadata: BlockMetadata> impl<Source: Storage, Target: Storage, Locality: LocalityProvider, Metadata: BlockMetadata>
TransferCompletionManager<Source, Target, Locality, Metadata> TransferCompletionManager<Source, Target, Locality, Metadata>
{ {
pub fn new(pool_metrics: Arc<PoolMetrics>, transfer_type: String) -> Self { pub fn new() -> Self {
Self { Self {
pool_metrics,
transfer_type,
last_publish_time: None,
transfer_start: Instant::now(),
num_blocks_transferred: 0, num_blocks_transferred: 0,
_phantom: PhantomData, _phantom: PhantomData,
} }
...@@ -192,20 +180,6 @@ impl<Source: Storage, Target: Storage, Locality: LocalityProvider, Metadata: Blo ...@@ -192,20 +180,6 @@ impl<Source: Storage, Target: Storage, Locality: LocalityProvider, Metadata: Blo
) -> Result<()> { ) -> Result<()> {
self.num_blocks_transferred += pending_transfer.sources.len(); self.num_blocks_transferred += pending_transfer.sources.len();
let should_publish = self.last_publish_time.is_none_or(|last_publish_time| {
last_publish_time.elapsed() > Duration::from_millis(BLOCKS_BW_MIN_PUBLISH_INTERVAL_MS)
});
if should_publish {
self.last_publish_time = Some(Instant::now());
let duration = self.transfer_start.elapsed();
let blocks_per_sec = self.num_blocks_transferred as f64 / duration.as_secs_f64();
self.pool_metrics
.gauge(self.transfer_type.as_str())
.set(blocks_per_sec as i64);
}
match pending_transfer.handle_complete().await { match pending_transfer.handle_complete().await {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
...@@ -245,13 +219,10 @@ impl<Source: Storage, Target: Storage, Locality: LocalityProvider, Metadata: Blo ...@@ -245,13 +219,10 @@ impl<Source: Storage, Target: Storage, Locality: LocalityProvider, Metadata: Blo
max_concurrent_transfers: usize, max_concurrent_transfers: usize,
runtime: &Handle, runtime: &Handle,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
pool_metrics: Arc<PoolMetrics>,
transfer_type: String,
) -> Result<Self> { ) -> Result<Self> {
let (futures_tx, mut futures_rx) = mpsc::channel(1); let (futures_tx, mut futures_rx) = mpsc::channel(1);
let mut completion_manager = let mut completion_manager = TransferCompletionManager::new();
TransferCompletionManager::new(pool_metrics.clone(), transfer_type.clone());
CriticalTaskExecutionHandle::new_with_runtime( CriticalTaskExecutionHandle::new_with_runtime(
move |cancel_token| async move { move |cancel_token| async move {
......
...@@ -15,7 +15,6 @@ use super::block::{ ...@@ -15,7 +15,6 @@ use super::block::{
private, registry::BlockRegistry, private, registry::BlockRegistry,
}; };
use super::events::{EventManager, NullEventManager}; use super::events::{EventManager, NullEventManager};
use super::metrics::{BlockManagerMetrics, PoolMetrics};
use super::storage::Storage; use super::storage::Storage;
use crate::block_manager::CacheLevel; use crate::block_manager::CacheLevel;
...@@ -23,7 +22,6 @@ use crate::block_manager::block::locality::LocalityProvider; ...@@ -23,7 +22,6 @@ use crate::block_manager::block::locality::LocalityProvider;
use crate::tokens::{SequenceHash, TokenBlock}; use crate::tokens::{SequenceHash, TokenBlock};
use async_trait::async_trait; use async_trait::async_trait;
use prometheus::Registry;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::{ use std::{
collections::{BTreeSet, HashMap, VecDeque}, collections::{BTreeSet, HashMap, VecDeque},
......
...@@ -72,11 +72,6 @@ pub struct ManagedBlockPoolArgs<S: Storage, L: LocalityProvider, M: BlockMetadat ...@@ -72,11 +72,6 @@ pub struct ManagedBlockPoolArgs<S: Storage, L: LocalityProvider, M: BlockMetadat
#[builder(default = "Handle::current()")] #[builder(default = "Handle::current()")]
async_runtime: Handle, async_runtime: Handle,
#[builder(
default = "BlockManagerMetrics::new(&Arc::new(Registry::new())).unwrap().pool(\"pool\")"
)]
pool_metrics: Arc<PoolMetrics>,
#[builder(default = "BlockRegistrationDuplicationSetting::Disabled")] #[builder(default = "BlockRegistrationDuplicationSetting::Disabled")]
default_duplication_setting: BlockRegistrationDuplicationSetting, default_duplication_setting: BlockRegistrationDuplicationSetting,
} }
...@@ -90,7 +85,6 @@ impl<S: Storage, L: LocalityProvider, M: BlockMetadata> ManagedBlockPoolArgsBuil ...@@ -90,7 +85,6 @@ impl<S: Storage, L: LocalityProvider, M: BlockMetadata> ManagedBlockPoolArgsBuil
blocks, blocks,
global_registry, global_registry,
async_runtime, async_runtime,
metrics,
default_duplication_setting, default_duplication_setting,
) = args.dissolve(); ) = args.dissolve();
...@@ -101,7 +95,6 @@ impl<S: Storage, L: LocalityProvider, M: BlockMetadata> ManagedBlockPoolArgsBuil ...@@ -101,7 +95,6 @@ impl<S: Storage, L: LocalityProvider, M: BlockMetadata> ManagedBlockPoolArgsBuil
blocks, blocks,
global_registry, global_registry,
async_runtime, async_runtime,
metrics,
default_duplication_setting, default_duplication_setting,
); );
...@@ -183,7 +176,6 @@ impl<S: Storage, L: LocalityProvider, M: BlockMetadata> ManagedBlockPool<S, L, M ...@@ -183,7 +176,6 @@ impl<S: Storage, L: LocalityProvider, M: BlockMetadata> ManagedBlockPool<S, L, M
blocks: Vec<Block<S, L, M>>, blocks: Vec<Block<S, L, M>>,
global_registry: GlobalRegistry, global_registry: GlobalRegistry,
async_runtime: Handle, async_runtime: Handle,
metrics: Arc<PoolMetrics>,
default_duplication_setting: BlockRegistrationDuplicationSetting, default_duplication_setting: BlockRegistrationDuplicationSetting,
) -> Self { ) -> Self {
let (pool, progress_engine) = Self::with_progress_engine( let (pool, progress_engine) = Self::with_progress_engine(
...@@ -192,7 +184,6 @@ impl<S: Storage, L: LocalityProvider, M: BlockMetadata> ManagedBlockPool<S, L, M ...@@ -192,7 +184,6 @@ impl<S: Storage, L: LocalityProvider, M: BlockMetadata> ManagedBlockPool<S, L, M
blocks, blocks,
global_registry, global_registry,
async_runtime, async_runtime,
metrics,
default_duplication_setting, default_duplication_setting,
); );
...@@ -237,7 +228,6 @@ impl<S: Storage, L: LocalityProvider, M: BlockMetadata> ManagedBlockPool<S, L, M ...@@ -237,7 +228,6 @@ impl<S: Storage, L: LocalityProvider, M: BlockMetadata> ManagedBlockPool<S, L, M
blocks: Vec<Block<S, L, M>>, blocks: Vec<Block<S, L, M>>,
global_registry: GlobalRegistry, global_registry: GlobalRegistry,
async_runtime: Handle, async_runtime: Handle,
metrics: Arc<PoolMetrics>,
default_duplication_setting: BlockRegistrationDuplicationSetting, default_duplication_setting: BlockRegistrationDuplicationSetting,
) -> (Self, ProgressEngine<S, L, M>) { ) -> (Self, ProgressEngine<S, L, M>) {
let (priority_tx, priority_rx) = tokio::sync::mpsc::unbounded_channel(); let (priority_tx, priority_rx) = tokio::sync::mpsc::unbounded_channel();
...@@ -251,7 +241,6 @@ impl<S: Storage, L: LocalityProvider, M: BlockMetadata> ManagedBlockPool<S, L, M ...@@ -251,7 +241,6 @@ impl<S: Storage, L: LocalityProvider, M: BlockMetadata> ManagedBlockPool<S, L, M
blocks, blocks,
global_registry, global_registry,
async_runtime, async_runtime,
metrics,
); );
let available_blocks_counter = progress_engine.available_blocks_counter.clone(); let available_blocks_counter = progress_engine.available_blocks_counter.clone();
...@@ -504,7 +493,6 @@ struct ProgressEngine<S: Storage, L: LocalityProvider, M: BlockMetadata> { ...@@ -504,7 +493,6 @@ struct ProgressEngine<S: Storage, L: LocalityProvider, M: BlockMetadata> {
cancel_token: CancellationToken, cancel_token: CancellationToken,
state: State<S, L, M>, state: State<S, L, M>,
return_rx: tokio::sync::mpsc::UnboundedReceiver<Block<S, L, M>>, return_rx: tokio::sync::mpsc::UnboundedReceiver<Block<S, L, M>>,
metrics: Arc<PoolMetrics>,
available_blocks_counter: Arc<AtomicU64>, available_blocks_counter: Arc<AtomicU64>,
total_blocks_counter: Arc<AtomicU64>, total_blocks_counter: Arc<AtomicU64>,
} }
...@@ -515,7 +503,6 @@ pub struct State<S: Storage, L: LocalityProvider, M: BlockMetadata> { ...@@ -515,7 +503,6 @@ pub struct State<S: Storage, L: LocalityProvider, M: BlockMetadata> {
registry: BlockRegistry, registry: BlockRegistry,
return_tx: tokio::sync::mpsc::UnboundedSender<Block<S, L, M>>, return_tx: tokio::sync::mpsc::UnboundedSender<Block<S, L, M>>,
event_manager: Arc<dyn EventManager>, event_manager: Arc<dyn EventManager>,
metrics: Arc<PoolMetrics>,
} }
impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> ProgressEngine<S, L, M> { impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> ProgressEngine<S, L, M> {
...@@ -528,16 +515,10 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> ProgressEngine ...@@ -528,16 +515,10 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> ProgressEngine
blocks: Vec<Block<S, L, M>>, blocks: Vec<Block<S, L, M>>,
global_registry: GlobalRegistry, global_registry: GlobalRegistry,
async_runtime: Handle, async_runtime: Handle,
metrics: Arc<PoolMetrics>,
) -> Self { ) -> Self {
let (return_tx, return_rx) = tokio::sync::mpsc::unbounded_channel(); let (return_tx, return_rx) = tokio::sync::mpsc::unbounded_channel();
let mut state = State::<S, L, M>::new( let mut state =
event_manager, State::<S, L, M>::new(event_manager, return_tx, global_registry, async_runtime);
return_tx,
global_registry,
async_runtime,
metrics.clone(),
);
let count = blocks.len(); let count = blocks.len();
...@@ -553,7 +534,6 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> ProgressEngine ...@@ -553,7 +534,6 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> ProgressEngine
cancel_token, cancel_token,
state, state,
return_rx, return_rx,
metrics,
available_blocks_counter, available_blocks_counter,
total_blocks_counter, total_blocks_counter,
} }
...@@ -564,17 +544,14 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> ProgressEngine ...@@ -564,17 +544,14 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> ProgressEngine
biased; biased;
Some(priority_req) = self.priority_rx.recv(), if !self.priority_rx.is_closed() => { Some(priority_req) = self.priority_rx.recv(), if !self.priority_rx.is_closed() => {
self.metrics.gauge("priority_request_queue_size").set(self.priority_rx.len() as i64);
self.state.handle_priority_request(priority_req, &mut self.return_rx).await; self.state.handle_priority_request(priority_req, &mut self.return_rx).await;
} }
Some(req) = self.ctrl_rx.recv(), if !self.ctrl_rx.is_closed() => { Some(req) = self.ctrl_rx.recv(), if !self.ctrl_rx.is_closed() => {
self.metrics.gauge("control_request_queue_size").set(self.ctrl_rx.len() as i64);
self.state.handle_control_request(req); self.state.handle_control_request(req);
} }
Some(block) = self.return_rx.recv() => { Some(block) = self.return_rx.recv() => {
self.metrics.gauge("return_block_queue_size").set(self.return_rx.len() as i64);
self.state.handle_return_block(block); self.state.handle_return_block(block);
} }
...@@ -612,7 +589,6 @@ mod tests { ...@@ -612,7 +589,6 @@ mod tests {
blocks, blocks,
global_registry, global_registry,
async_runtime, async_runtime,
metrics,
default_duplication_setting, default_duplication_setting,
) = args.dissolve(); ) = args.dissolve();
...@@ -622,7 +598,6 @@ mod tests { ...@@ -622,7 +598,6 @@ mod tests {
blocks, blocks,
global_registry, global_registry,
async_runtime, async_runtime,
metrics,
default_duplication_setting, default_duplication_setting,
); );
......
...@@ -17,7 +17,6 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> State<S, L, M> ...@@ -17,7 +17,6 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> State<S, L, M>
return_tx: tokio::sync::mpsc::UnboundedSender<Block<S, L, M>>, return_tx: tokio::sync::mpsc::UnboundedSender<Block<S, L, M>>,
global_registry: GlobalRegistry, global_registry: GlobalRegistry,
async_runtime: Handle, async_runtime: Handle,
metrics: Arc<PoolMetrics>,
) -> Self { ) -> Self {
Self { Self {
active: ActiveBlockPool::new(), active: ActiveBlockPool::new(),
...@@ -25,7 +24,6 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> State<S, L, M> ...@@ -25,7 +24,6 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> State<S, L, M>
registry: BlockRegistry::new(event_manager.clone(), global_registry, async_runtime), registry: BlockRegistry::new(event_manager.clone(), global_registry, async_runtime),
return_tx, return_tx,
event_manager, event_manager,
metrics,
} }
} }
...@@ -159,10 +157,6 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> State<S, L, M> ...@@ -159,10 +157,6 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> State<S, L, M>
} }
} }
self.metrics
.counter("blocks_allocated")
.inc_by(count as u64);
Ok(blocks) Ok(blocks)
} }
...@@ -271,10 +265,6 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> State<S, L, M> ...@@ -271,10 +265,6 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> State<S, L, M>
assert_eq!(immutable_blocks.len(), expected_len); assert_eq!(immutable_blocks.len(), expected_len);
self.metrics
.counter("blocks_registered")
.inc_by(immutable_blocks.len() as u64);
Ok(immutable_blocks) Ok(immutable_blocks)
} }
...@@ -320,13 +310,6 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> State<S, L, M> ...@@ -320,13 +310,6 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> State<S, L, M>
immutable_blocks.push(immutable); immutable_blocks.push(immutable);
} }
self.metrics
.counter("cache_hits")
.inc_by(immutable_blocks.len() as u64);
self.metrics
.counter("cache_misses")
.inc_by(sequence_hashes.len() as u64 - immutable_blocks.len() as u64);
immutable_blocks immutable_blocks
} }
......
...@@ -16,7 +16,6 @@ use super::{ ...@@ -16,7 +16,6 @@ use super::{
config::NixlOptions, config::NixlOptions,
events::{EventManager, NullEventManager}, events::{EventManager, NullEventManager},
locality::LogicalResources, locality::LogicalResources,
metrics::BlockManagerMetrics,
offload::{ offload::{
OffloadFilters, OffloadManager, OffloadManagerConfig, filter::OffloadFilter, OffloadFilters, OffloadManager, OffloadManagerConfig, filter::OffloadFilter,
request::BlockResult, request::BlockResult,
...@@ -43,9 +42,6 @@ pub(crate) struct Resources { ...@@ -43,9 +42,6 @@ pub(crate) struct Resources {
// event manager for block manager events // event manager for block manager events
pub event_manager: Arc<dyn EventManager>, pub event_manager: Arc<dyn EventManager>,
// metrics for the block manager
pub metrics: Arc<BlockManagerMetrics>,
// config for the block manager // config for the block manager
pub config: KvBlockManagerConfig, pub config: KvBlockManagerConfig,
} }
...@@ -155,7 +151,6 @@ impl<R: LogicalResources, Metadata: BlockMetadata> ...@@ -155,7 +151,6 @@ impl<R: LogicalResources, Metadata: BlockMetadata>
let offload_config = OffloadManagerConfig { let offload_config = OffloadManagerConfig {
nixl_agent: resources.nixl_agent.clone(), nixl_agent: resources.nixl_agent.clone(),
async_rt_handle: resources.async_rt_handle.clone(), async_rt_handle: resources.async_rt_handle.clone(),
metrics: resources.metrics.clone(),
cancellation_token: resources.cancellation_token.clone(), cancellation_token: resources.cancellation_token.clone(),
model_config, model_config,
kvbm_metrics: resources.config.kvbm_metrics.clone(), kvbm_metrics: resources.config.kvbm_metrics.clone(),
...@@ -278,7 +273,6 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<locality::Local, Metadata> { ...@@ -278,7 +273,6 @@ impl<Metadata: BlockMetadata> KvBlockManagerState<locality::Local, Metadata> {
let offload_config = OffloadManagerConfig { let offload_config = OffloadManagerConfig {
nixl_agent: resources.nixl_agent.clone(), nixl_agent: resources.nixl_agent.clone(),
async_rt_handle: resources.async_rt_handle.clone(), async_rt_handle: resources.async_rt_handle.clone(),
metrics: resources.metrics.clone(),
cancellation_token: resources.cancellation_token.clone(), cancellation_token: resources.cancellation_token.clone(),
model_config, model_config,
kvbm_metrics: resources.config.kvbm_metrics.clone(), kvbm_metrics: resources.config.kvbm_metrics.clone(),
...@@ -521,7 +515,7 @@ impl<Locality: LocalityProvider, Metadata: BlockMetadata> std::fmt::Debug ...@@ -521,7 +515,7 @@ impl<Locality: LocalityProvider, Metadata: BlockMetadata> std::fmt::Debug
pub(crate) fn create_block_pool<S: Storage, L: LocalityProvider, M: BlockMetadata>( pub(crate) fn create_block_pool<S: Storage, L: LocalityProvider, M: BlockMetadata>(
factory: impl IntoBlocks<S, L>, factory: impl IntoBlocks<S, L>,
resources: &Resources, resources: &Resources,
pool_name: &str, _pool_name: &str,
) -> Result<( ) -> Result<(
Arc<dyn BlockPool<S, L, M>>, Arc<dyn BlockPool<S, L, M>>,
Vec<Block<S, L, M>>, Vec<Block<S, L, M>>,
...@@ -532,7 +526,6 @@ pub(crate) fn create_block_pool<S: Storage, L: LocalityProvider, M: BlockMetadat ...@@ -532,7 +526,6 @@ pub(crate) fn create_block_pool<S: Storage, L: LocalityProvider, M: BlockMetadat
.global_registry(resources.global_registry.clone()) .global_registry(resources.global_registry.clone())
.async_runtime(resources.async_rt_handle.clone()) .async_runtime(resources.async_rt_handle.clone())
.event_manager(resources.event_manager.clone()) .event_manager(resources.event_manager.clone())
.pool_metrics(resources.metrics.pool(pool_name))
.build()?; .build()?;
let offload_filter = factory.offload_filter(); let offload_filter = factory.offload_filter();
......
...@@ -18,8 +18,6 @@ impl Resources { ...@@ -18,8 +18,6 @@ impl Resources {
let global_registry = GlobalRegistry::default(); let global_registry = GlobalRegistry::default();
let metrics = BlockManagerMetrics::new(&config.runtime.metrics_registry)?;
let event_manager = config let event_manager = config
.event_manager .event_manager
.clone() .clone()
...@@ -75,7 +73,6 @@ impl Resources { ...@@ -75,7 +73,6 @@ impl Resources {
nixl_backends, nixl_backends,
global_registry, global_registry,
event_manager, event_manager,
metrics,
config, config,
}) })
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment