Unverified Commit b39382ba authored by Ziqi Fan's avatar Ziqi Fan Committed by GitHub
Browse files

feat: add initial batch of KVBM metrics on match, offload and onboard (#2673)

parent 35055c6f
...@@ -80,6 +80,7 @@ pub struct KvConnectorLeader { ...@@ -80,6 +80,7 @@ pub struct KvConnectorLeader {
inflight_requests: HashSet<String>, inflight_requests: HashSet<String>,
onboarding_slots: HashSet<String>, onboarding_slots: HashSet<String>,
iteration_counter: u64, iteration_counter: u64,
kvbm_metrics: KvbmMetrics,
} }
impl KvConnectorLeader { impl KvConnectorLeader {
...@@ -114,12 +115,13 @@ impl KvConnectorLeader { ...@@ -114,12 +115,13 @@ impl KvConnectorLeader {
block_manager.clone(), block_manager.clone(),
leader, leader,
drt.clone(), drt.clone(),
kvbm_metrics, kvbm_metrics.clone(),
), ),
block_size, block_size,
inflight_requests: HashSet::new(), inflight_requests: HashSet::new(),
onboarding_slots: HashSet::new(), onboarding_slots: HashSet::new(),
iteration_counter: 0, iteration_counter: 0,
kvbm_metrics,
} }
} }
} }
...@@ -188,6 +190,9 @@ impl Leader for KvConnectorLeader { ...@@ -188,6 +190,9 @@ impl Leader for KvConnectorLeader {
"scheduling onboarding for {} external tokens", "scheduling onboarding for {} external tokens",
num_external_tokens num_external_tokens
); );
self.kvbm_metrics
.matched_tokens
.inc_by(num_external_tokens as u64);
Ok((num_external_tokens, true)) Ok((num_external_tokens, true))
} else { } else {
Ok((0, false)) Ok((0, false))
......
...@@ -124,12 +124,13 @@ impl KvConnectorLeaderRecorder { ...@@ -124,12 +124,13 @@ impl KvConnectorLeaderRecorder {
block_manager.clone(), block_manager.clone(),
leader, leader,
drt.clone(), drt.clone(),
kvbm_metrics, kvbm_metrics.clone(),
), ),
block_size, block_size,
inflight_requests: HashSet::new(), inflight_requests: HashSet::new(),
onboarding_slots: HashSet::new(), onboarding_slots: HashSet::new(),
iteration_counter: 0, iteration_counter: 0,
kvbm_metrics,
}; };
let (unbounded_tx, unbounded_rx) = mpsc::unbounded_channel(); let (unbounded_tx, unbounded_rx) = mpsc::unbounded_channel();
......
...@@ -197,7 +197,7 @@ impl<R: RequestKey> ConnectorSlotManager<R> { ...@@ -197,7 +197,7 @@ impl<R: RequestKey> ConnectorSlotManager<R> {
let xfer_engine_task = CriticalTaskExecutionHandle::new_with_runtime( let xfer_engine_task = CriticalTaskExecutionHandle::new_with_runtime(
|cancellation_token| async move { |cancellation_token| async move {
xfer_engine xfer_engine
.execute(cancellation_token, drt_for_task, kvbm_metrics.clone()) .execute(cancellation_token, drt_for_task, kvbm_metrics)
.await .await
}, },
primary_token, primary_token,
...@@ -1042,6 +1042,9 @@ impl LocalTransferEngine { ...@@ -1042,6 +1042,9 @@ impl LocalTransferEngine {
let leader_offload = Arc::clone(&self.leader); let leader_offload = Arc::clone(&self.leader);
let leader_onboard = Arc::clone(&self.leader); let leader_onboard = Arc::clone(&self.leader);
let kvbm_metrics_onboard = kvbm_metrics.clone();
let kvbm_metrics_offload = kvbm_metrics.clone();
let onboard_task = CriticalTaskExecutionHandle::new_with_runtime( let onboard_task = CriticalTaskExecutionHandle::new_with_runtime(
|cancellation_token_onboard| async move { |cancellation_token_onboard| async move {
while let Some(req) = onboard_rx.recv().await { while let Some(req) = onboard_rx.recv().await {
...@@ -1049,7 +1052,10 @@ impl LocalTransferEngine { ...@@ -1049,7 +1052,10 @@ impl LocalTransferEngine {
tracing::debug!("LocalOnboardTask: received cancellation signal"); tracing::debug!("LocalOnboardTask: received cancellation signal");
break; break;
} }
if let Err(e) = process_onboard_request(req, &leader_onboard).await { if let Err(e) =
process_onboard_request(req, &leader_onboard, kvbm_metrics_onboard.clone())
.await
{
tracing::error!("LocalOnboardTask: error processing request: {:?}", e); tracing::error!("LocalOnboardTask: error processing request: {:?}", e);
} }
} }
...@@ -1071,7 +1077,7 @@ impl LocalTransferEngine { ...@@ -1071,7 +1077,7 @@ impl LocalTransferEngine {
req, req,
&block_manager_offload, &block_manager_offload,
&leader_offload, &leader_offload,
kvbm_metrics.clone(), kvbm_metrics_offload.clone(),
) )
.await .await
{ {
...@@ -1145,6 +1151,9 @@ async fn process_offload_request( ...@@ -1145,6 +1151,9 @@ async fn process_offload_request(
kvbm_metrics: KvbmMetrics, kvbm_metrics: KvbmMetrics,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
kvbm_metrics.offload_requests.inc(); kvbm_metrics.offload_requests.inc();
kvbm_metrics
.offload_blocks_d2h
.inc_by(offload_req.block_ids.len() as u64);
let request_id = &offload_req.request_id; let request_id = &offload_req.request_id;
let operation_id = &offload_req.operation_id; let operation_id = &offload_req.operation_id;
...@@ -1154,7 +1163,6 @@ async fn process_offload_request( ...@@ -1154,7 +1163,6 @@ async fn process_offload_request(
offload_req.block_ids.len() offload_req.block_ids.len()
); );
// TODO: Implement actual offload logic
// 1. Acquire mutable host blocks // 1. Acquire mutable host blocks
let host_blocks = block_manager let host_blocks = block_manager
.host() .host()
...@@ -1250,7 +1258,19 @@ async fn process_offload_request( ...@@ -1250,7 +1258,19 @@ async fn process_offload_request(
async fn process_onboard_request( async fn process_onboard_request(
onboard_req: LocalOnboardRequest, onboard_req: LocalOnboardRequest,
leader: &Arc<KvbmLeader>, leader: &Arc<KvbmLeader>,
kvbm_metrics: KvbmMetrics,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
kvbm_metrics.onboard_requests.inc();
if onboard_req.src_blocks.storage_pool() == BlockTransferPool::Host {
kvbm_metrics
.onboard_blocks_h2d
.inc_by(onboard_req.src_blocks.len() as u64);
} else if onboard_req.src_blocks.storage_pool() == BlockTransferPool::Disk {
kvbm_metrics
.onboard_blocks_d2d
.inc_by(onboard_req.src_blocks.len() as u64);
}
let request_id = &onboard_req.request_id; let request_id = &onboard_req.request_id;
let operation_id = &onboard_req.operation_id; let operation_id = &onboard_req.operation_id;
......
...@@ -265,7 +265,6 @@ impl Worker for KvConnectorWorker { ...@@ -265,7 +265,6 @@ impl Worker for KvConnectorWorker {
/// Trigger layer-wise completion signals. /// Trigger layer-wise completion signals.
/// Trigger block-wise completion signals afer last layer. /// Trigger block-wise completion signals afer last layer.
fn save_kv_layer(&mut self, _layer_name: String) -> anyhow::Result<()> { fn save_kv_layer(&mut self, _layer_name: String) -> anyhow::Result<()> {
self.kvbm_metrics.save_kv_layer_requests.inc();
self.layers_complete += 1; self.layers_complete += 1;
if self.layers_complete == self.kv_cache_layers.len() { if self.layers_complete == self.kv_cache_layers.len() {
let offloading_operations = std::mem::take(&mut self.offloading_operations); let offloading_operations = std::mem::take(&mut self.offloading_operations);
...@@ -278,6 +277,7 @@ impl Worker for KvConnectorWorker { ...@@ -278,6 +277,7 @@ impl Worker for KvConnectorWorker {
self.connector.enqueue_request(operation); self.connector.enqueue_request(operation);
} }
} }
self.kvbm_metrics.save_kv_layer_requests.inc();
Ok(()) Ok(())
} }
......
...@@ -6,8 +6,26 @@ use prometheus::IntCounter; ...@@ -6,8 +6,26 @@ use prometheus::IntCounter;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct KvbmMetrics { pub struct KvbmMetrics {
// number of offload requests
pub offload_requests: IntCounter, pub offload_requests: IntCounter,
// number of blocks offloaded from device to host
pub offload_blocks_d2h: IntCounter,
// number of onboard requests
pub onboard_requests: IntCounter,
// number of blocks onboarded from host to device
pub onboard_blocks_h2d: IntCounter,
// number of blocks onboarded from disk to device
pub onboard_blocks_d2d: IntCounter,
// number of save kv layer requests
pub save_kv_layer_requests: IntCounter, pub save_kv_layer_requests: IntCounter,
// number of matched tokens from KVBM
pub matched_tokens: IntCounter,
} }
impl KvbmMetrics { impl KvbmMetrics {
...@@ -15,6 +33,30 @@ impl KvbmMetrics { ...@@ -15,6 +33,30 @@ impl KvbmMetrics {
let offload_requests = mr let offload_requests = mr
.create_intcounter("offload_requests", "The number of offload requests", &[]) .create_intcounter("offload_requests", "The number of offload requests", &[])
.unwrap(); .unwrap();
let offload_blocks_d2h = mr
.create_intcounter(
"offload_blocks_d2h",
"The number of offload blocks from device to host",
&[],
)
.unwrap();
let onboard_requests = mr
.create_intcounter("onboard_requests", "The number of onboard requests", &[])
.unwrap();
let onboard_blocks_h2d = mr
.create_intcounter(
"onboard_blocks_h2d",
"The number of onboard blocks from host to device",
&[],
)
.unwrap();
let onboard_blocks_d2d = mr
.create_intcounter(
"onboard_blocks_d2d",
"The number of onboard blocks from disk to device",
&[],
)
.unwrap();
let save_kv_layer_requests = mr let save_kv_layer_requests = mr
.create_intcounter( .create_intcounter(
"save_kv_layer_requests", "save_kv_layer_requests",
...@@ -22,9 +64,17 @@ impl KvbmMetrics { ...@@ -22,9 +64,17 @@ impl KvbmMetrics {
&[], &[],
) )
.unwrap(); .unwrap();
let matched_tokens = mr
.create_intcounter("matched_tokens", "The number of matched tokens", &[])
.unwrap();
Self { Self {
offload_requests, offload_requests,
offload_blocks_d2h,
onboard_requests,
onboard_blocks_h2d,
onboard_blocks_d2d,
save_kv_layer_requests, save_kv_layer_requests,
matched_tokens,
} }
} }
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment