"lib/runtime/vscode:/vscode.git/clone" did not exist on "4da078b877e806d41b9f0f325c3eda96fa23e039"
Unverified Commit cf83794a authored by Richard Huo's avatar Richard Huo Committed by GitHub
Browse files

feat: DIS-678 kvbm modularity: standalone metrics endpoint (#3433)


Signed-off-by: default avatarrichardhuo-nv <rihuo@nvidia.com>
parent 2d59861e
......@@ -118,8 +118,8 @@
"targets": [
{
"disableTextWrap": false,
"editorMode": "builder",
"expr": "dynamo_component_matched_tokens{dynamo_namespace=\"kvbm_connector_leader\"}",
"editorMode": "code",
"expr": "kvbm_matched_tokens",
"fullMetaSearch": false,
"includeNullMetadata": true,
"legendFormat": "__auto",
......@@ -227,8 +227,8 @@
"targets": [
{
"disableTextWrap": false,
"editorMode": "builder",
"expr": "dynamo_component_offload_requests{dynamo_namespace=\"kvbm_connector_leader\"}",
"editorMode": "code",
"expr": "kvbm_offload_requests",
"fullMetaSearch": false,
"includeNullMetadata": true,
"legendFormat": "__auto",
......@@ -323,8 +323,8 @@
"targets": [
{
"disableTextWrap": false,
"editorMode": "builder",
"expr": "dynamo_component_offload_blocks_d2h{dynamo_namespace=\"kvbm_connector_leader\"}",
"editorMode": "code",
"expr": "kvbm_offload_blocks_d2h",
"fullMetaSearch": false,
"includeNullMetadata": true,
"legendFormat": "__auto",
......@@ -336,102 +336,6 @@
"title": "Offload Blocks - Device to Host",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 18
},
"id": 1,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "12.0.1",
"targets": [
{
"disableTextWrap": false,
"editorMode": "builder",
"expr": "dynamo_component_save_kv_layer_requests{dynamo_namespace=\"kvbm_connector_worker\"}",
"fullMetaSearch": false,
"includeNullMetadata": true,
"legendFormat": "__auto",
"range": true,
"refId": "A",
"useBackend": false
}
],
"title": "Save KV Layer Requests",
"type": "timeseries"
},
{
"collapsed": false,
"gridPos": {
......@@ -528,8 +432,8 @@
"targets": [
{
"disableTextWrap": false,
"editorMode": "builder",
"expr": "dynamo_component_onboard_requests{dynamo_namespace=\"kvbm_connector_leader\"}",
"editorMode": "code",
"expr": "kvbm_onboard_requests",
"fullMetaSearch": false,
"includeNullMetadata": true,
"legendFormat": "__auto",
......@@ -624,8 +528,8 @@
"targets": [
{
"disableTextWrap": false,
"editorMode": "builder",
"expr": "dynamo_component_onboard_blocks_h2d{dynamo_namespace=\"kvbm_connector_leader\"}",
"editorMode": "code",
"expr": "kvbm_onboard_blocks_h2d",
"fullMetaSearch": false,
"includeNullMetadata": true,
"legendFormat": "__auto",
......@@ -720,8 +624,8 @@
"targets": [
{
"disableTextWrap": false,
"editorMode": "builder",
"expr": "dynamo_component_onboard_blocks_d2d{dynamo_namespace=\"kvbm_connector_leader\"}",
"editorMode": "code",
"expr": "kvbm_onboard_blocks_d2d",
"fullMetaSearch": false,
"includeNullMetadata": true,
"legendFormat": "__auto",
......@@ -750,4 +654,4 @@
"title": "KVBM Dashboard",
"uid": "3f679257-70a5-402c-92b4-05382337b548",
"version": 7
}
\ No newline at end of file
}
......@@ -59,16 +59,10 @@ scrape_configs:
- targets: ['host.docker.internal:9091'] # metrics aggregation service on host
# KVBM leader related metrics
- job_name: 'kvbm-leader-metrics'
- job_name: 'kvbm-metrics'
scrape_interval: 2s
static_configs:
- targets: ['host.docker.internal:6882']
# KVBM worker related metrics
- job_name: 'kvbm-worker-metrics'
scrape_interval: 2s
static_configs:
- targets: ['host.docker.internal:6881']
- targets: ['host.docker.internal:6880']
# Uncomment to see its own Prometheus metrics
# - job_name: 'prometheus'
......
......@@ -109,18 +109,51 @@ Follow below steps to enable metrics collection and view via Grafana dashboard:
# Start the basic services (etcd & natsd), along with Prometheus and Grafana
docker compose -f deploy/docker-compose.yml --profile metrics up -d
# set env var DYN_SYSTEM_ENABLED to true, DYN_SYSTEM_PORT to 6880, DYN_KVBM_SLEEP to 5, when launch via dynamo
# NOTE: Make sure port 6881 (for KVBM worker metrics) and port 6882 (for KVBM leader metrics) are available.
# NOTE: DYN_KVBM_SLEEP is needed to avoid metrics port conflict between KVBM leader and worker
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=6880 DYN_KVBM_SLEEP=5 \
# set env var DYN_KVBM_METRICS to true, when launch via dynamo
# Optionally set DYN_KVBM_METRICS_PORT to choose the /metrics port (default: 6880).
DYN_KVBM_METRICS=true \
python3 -m dynamo.trtllm \
--model-path deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--served-model-name deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--extra-engine-args /tmp/kvbm_llm_api_config.yaml &
# optional if firewall blocks KVBM metrics ports to send prometheus metrics
sudo ufw allow 6881/tcp
sudo ufw allow 6882/tcp
sudo ufw allow 6880/tcp
```
View grafana metrics via http://localhost:3001 (default login: dynamo/dynamo) and look for KVBM Dashboard
## Benchmark KVBM
Once the model is loaded ready, follow below steps to use LMBenchmark to benchmark KVBM performance:
```bash
git clone https://github.com/LMCache/LMBenchmark.git
# show case of running the synthetic multi-turn chat dataset.
# we are passing model, endpoint, output file prefix and qps to the sh script.
cd LMBenchmark/synthetic-multi-round-qa
./long_input_short_output_run.sh \
"deepseek-ai/DeepSeek-R1-Distill-Llama-8B" \
"http://localhost:8000" \
"benchmark_kvbm" \
1
# Average TTFT and other perf numbers would be in the output from above cmd
```
More details about how to use LMBenchmark could be found [here](https://github.com/LMCache/LMBenchmark).
`NOTE`: if metrics are enabled as mentioned in the above section, you can observe KV offloading, and KV onboarding in the grafana dashboard.
To compare, you can remove the `kv_connector_config` section from the LLM API config and run `trtllm-serve` with the updated config as the baseline.
```bash
cat > "/tmp/llm_api_config.yaml" <<EOF
backend: pytorch
cuda_graph_config: null
kv_cache_config:
enable_partial_reuse: false
free_gpu_memory_fraction: 0.80
EOF
# run trtllm-serve for the baseline for comparison
trtllm-serve deepseek-ai/DeepSeek-R1-Distill-Llama-8B --host localhost --port 8000 --backend pytorch --extra_llm_api_options /tmp/llm_api_config.yaml &
```
......@@ -77,23 +77,22 @@ Follow below steps to enable metrics collection and view via Grafana dashboard:
# Start the basic services (etcd & natsd), along with Prometheus and Grafana
docker compose -f deploy/docker-compose.yml --profile metrics up -d
# set env var DYN_SYSTEM_ENABLED to true, DYN_SYSTEM_PORT to 6880, DYN_KVBM_SLEEP to 5, when launch via dynamo
# NOTE: Make sure port 6881 (for KVBM worker metrics) and port 6882 (for KVBM leader metrics) are available.
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=6880 \
# set env var DYN_KVBM_METRICS to true, when launch via dynamo
# Optionally set DYN_KVBM_METRICS_PORT to choose the /metrics port (default: 6880).
DYN_KVBM_METRICS=true \
python -m dynamo.vllm \
--model deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--connector kvbm &
# optional if firewall blocks KVBM metrics ports to send prometheus metrics
sudo ufw allow 6881/tcp
sudo ufw allow 6882/tcp
sudo ufw allow 6880/tcp
```
View grafana metrics via http://localhost:3001 (default login: dynamo/dynamo) and look for KVBM Dashboard
## Benchmark KVBM
Once vllm serve is ready, follow below steps to use LMBenchmark to benchmark KVBM performance:
Once the model is loaded ready, follow below steps to use LMBenchmark to benchmark KVBM performance:
```bash
git clone https://github.com/LMCache/LMBenchmark.git
......
......@@ -5,7 +5,7 @@ pub mod recorder;
pub mod slot;
use super::*;
use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics;
use dynamo_llm::block_manager::metrics_kvbm::{KvbmMetrics, KvbmMetricsRegistry};
use dynamo_runtime::DistributedRuntime;
use slot::{ConnectorSlotManager, SlotError, SlotManager, SlotState};
......@@ -15,7 +15,6 @@ use crate::llm::block_manager::{
VllmBlockManager, distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest,
vllm::connector::leader::slot::VllmConnectorSlot,
};
use dynamo_runtime::metrics::prometheus_names::kvbm_connector;
use dynamo_llm::block_manager::{
BasicMetadata, DiskStorage, ImmutableBlock, PinnedStorage,
......@@ -103,11 +102,11 @@ impl KvConnectorLeader {
let drt = drt.inner().clone();
let handle: Handle = drt.runtime().primary();
let ns = drt
.namespace(kvbm_connector::KVBM_CONNECTOR_LEADER)
.unwrap();
let kvbm_metrics = KvbmMetrics::new(&ns);
let kvbm_metrics = KvbmMetrics::new(
&KvbmMetricsRegistry::default(),
kvbm_metrics_endpoint_enabled(),
parse_kvbm_metrics_port(),
);
let kvbm_metrics_clone = kvbm_metrics.clone();
let slot_manager_cell = Arc::new(OnceLock::new());
......@@ -615,3 +614,30 @@ impl PyKvConnectorLeader {
.map_err(to_pyerr)
}
}
pub fn kvbm_metrics_endpoint_enabled() -> bool {
std::env::var("DYN_KVBM_METRICS")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}
pub fn parse_kvbm_metrics_port() -> u16 {
match std::env::var("DYN_KVBM_METRICS_PORT") {
Ok(val) => match val.trim().parse::<u16>() {
Ok(port) => port,
Err(_) => {
tracing::warn!(
"[kvbm] Invalid DYN_KVBM_METRICS_PORT='{}', falling back to 6880",
val
);
6880
}
},
Err(_) => {
tracing::warn!(
"DYN_KVBM_METRICS_PORT not present or couldn’t be interpreted, falling back to 6880"
);
6880
}
}
}
......@@ -100,11 +100,11 @@ impl KvConnectorLeaderRecorder {
let drt = drt.inner().clone();
let handle: Handle = drt.runtime().primary();
let ns = drt
.namespace(kvbm_connector::KVBM_CONNECTOR_LEADER)
.unwrap();
let kvbm_metrics = KvbmMetrics::new(&ns);
let kvbm_metrics = KvbmMetrics::new(
&KvbmMetricsRegistry::default(),
kvbm_metrics_endpoint_enabled(),
parse_kvbm_metrics_port(),
);
let kvbm_metrics_clone = kvbm_metrics.clone();
let token = CancellationToken::new();
......
......@@ -8,10 +8,12 @@ use crate::llm::block_manager::BlockManagerBuilder;
use crate::llm::block_manager::vllm::connector::leader::slot::{
ConnectorSlotManager, SlotManager, SlotState,
};
use crate::llm::block_manager::vllm::connector::leader::{
kvbm_metrics_endpoint_enabled, parse_kvbm_metrics_port,
};
use crate::llm::block_manager::{distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest};
use anyhow;
use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics;
use dynamo_runtime::metrics::prometheus_names::kvbm_connector;
use dynamo_llm::block_manager::metrics_kvbm::{KvbmMetrics, KvbmMetricsRegistry};
use std::collections::HashSet;
use std::sync::{Arc, OnceLock};
use tokio::runtime::Handle;
......@@ -76,11 +78,12 @@ impl KvConnectorLeader {
let drt = drt.inner().clone();
let handle: Handle = drt.runtime().primary();
let ns = drt
.namespace(kvbm_connector::KVBM_CONNECTOR_LEADER)
.unwrap();
let kvbm_metrics = KvbmMetrics::new(
&KvbmMetricsRegistry::default(),
kvbm_metrics_endpoint_enabled(),
parse_kvbm_metrics_port(),
);
let kvbm_metrics = KvbmMetrics::new(&ns);
let kvbm_metrics_clone = kvbm_metrics.clone();
let slot_manager_cell = Arc::new(OnceLock::new());
......
......@@ -20,10 +20,8 @@ use crate::{
use anyhow;
use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig};
use dynamo_llm::block_manager::layout::LayoutType;
use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics;
use dynamo_llm::block_manager::storage::torch::TorchTensor;
use dynamo_runtime::DistributedRuntime;
use dynamo_runtime::metrics::prometheus_names::kvbm_connector;
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;
pub trait Worker: Send + Sync {
......@@ -71,8 +69,6 @@ pub struct KvConnectorWorker {
/// cuda events created by the python side
layer_events: Vec<u64>,
kvbm_metrics: KvbmMetrics,
}
impl KvConnectorWorker {
......@@ -98,11 +94,6 @@ impl KvConnectorWorker {
trtllm_rank
);
let kvbm_metrics = KvbmMetrics::new(
&drt.namespace(kvbm_connector::KVBM_CONNECTOR_WORKER)
.unwrap(),
);
Ok(Self {
drt,
kvbm_worker: OnceLock::new(),
......@@ -116,7 +107,6 @@ impl KvConnectorWorker {
iteration: 0,
layers_complete: 0,
layer_events: Vec::new(),
kvbm_metrics,
})
}
}
......@@ -236,7 +226,6 @@ impl Worker for KvConnectorWorker {
self.connector.enqueue_request(operation);
}
}
self.kvbm_metrics.save_kv_layer_requests.inc();
Ok(())
}
......
......@@ -5,7 +5,6 @@ use dynamo_llm::block_manager::connector::protocol::TransferType;
use dynamo_llm::block_manager::connector::scheduler::{
Scheduler, TransferSchedulerClient, WorkerSchedulerClient,
};
use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics;
use std::collections::HashSet;
use std::sync::{Arc, OnceLock};
......@@ -16,7 +15,6 @@ use crate::{
DistributedRuntime as PyDistributedRuntime, llm::block_manager::distributed::VllmTensor,
to_pyerr,
};
use dynamo_runtime::metrics::prometheus_names::kvbm_connector;
use crate::llm::block_manager::distributed::PyLayoutType;
use anyhow;
......@@ -75,8 +73,6 @@ pub struct KvConnectorWorker {
/// cuda events created by the python side
layer_events: Vec<u64>,
kvbm_metrics: KvbmMetrics,
}
impl KvConnectorWorker {
......@@ -97,11 +93,6 @@ impl KvConnectorWorker {
)?
.detach();
let kvbm_metrics = KvbmMetrics::new(
&drt.namespace(kvbm_connector::KVBM_CONNECTOR_WORKER)
.unwrap(),
);
tracing::info!(
"KvConnectorWorker initialized with worker_id: {}",
vllm_worker_id
......@@ -120,7 +111,6 @@ impl KvConnectorWorker {
layers_complete: 0,
kv_cache_layers: Vec::new(),
layer_events: Vec::new(),
kvbm_metrics,
})
}
}
......@@ -324,7 +314,6 @@ impl Worker for KvConnectorWorker {
self.connector.enqueue_request(operation);
}
}
self.kvbm_metrics.save_kv_layer_requests.inc();
Ok(())
}
......
......@@ -17,17 +17,12 @@ from dynamo.llm.trtllm_integration.rust import (
KvConnectorLeader as RustKvConnectorLeader,
)
from dynamo.llm.trtllm_integration.rust import SchedulerOutput as RustSchedulerOutput
from dynamo.llm.utils import find_and_set_available_port_from_env, maybe_sleep
from dynamo.runtime import DistributedRuntime
class DynamoKVBMConnectorLeader(KvCacheConnectorScheduler):
def __init__(self, llm_args: TorchLlmArgs):
super().__init__(llm_args)
# NOTE: this is needed in TRTLLM to avoid metrics port conflict with KVBM worker,
# since there is no startup order in TRTLLM and race condition is possible.
maybe_sleep()
find_and_set_available_port_from_env("DYN_SYSTEM_PORT")
self.drt = DistributedRuntime.detached()
mappings = self._llm_args.parallel_config.to_mapping()
......
......@@ -9,7 +9,6 @@ from tensorrt_llm.llmapi.llm_args import TorchLlmArgs
from dynamo.llm.trtllm_integration.rust import (
KvConnectorWorker as RustKvConnectorWorker,
)
from dynamo.llm.utils import find_and_set_available_port_from_env
from dynamo.runtime import DistributedRuntime
......@@ -17,7 +16,6 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker):
def __init__(self, llm_args: TorchLlmArgs):
super().__init__(llm_args)
find_and_set_available_port_from_env("DYN_SYSTEM_PORT")
self.drt = DistributedRuntime.detached()
mappings = self._llm_args.parallel_config.to_mapping()
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import os
import socket
import time
def maybe_sleep():
"""
Maybe sleep for the duration specified in the environment variable if it is set.
"""
sleep_duration = int(os.environ.get("DYN_KVBM_SLEEP", "0"))
if sleep_duration > 0:
print(f"Sleeping {sleep_duration} seconds to avoid metrics port conflict")
time.sleep(sleep_duration)
# TODO(keiven|ziqi): Auto port selection to be done in Rust
def find_and_set_available_port_from_env(env_var="DYN_SYSTEM_PORT"):
"""
Find an available port from the environment variable.
"""
port = int(os.environ.get(env_var, "0"))
if port == 0:
# No port specified, let system pick
pass
while True:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# Port is available
s.bind(("127.0.0.1", port))
s.close()
os.environ[env_var] = str(port)
print(f"Port {port} is available, setting env var {env_var} to {port}")
break
except OSError:
# Port is in use, try next
port += 1
s.close()
except Exception as e:
raise RuntimeError(f"Error finding available port: {e}")
......@@ -29,7 +29,6 @@ if TYPE_CHECKING:
# from dynamo.llm.vllm_integration.rust import SchedulerOutput as RustSchedulerOutput
from dynamo.llm import KvbmLeader
from dynamo.llm.utils import find_and_set_available_port_from_env
from dynamo.llm.vllm_integration.rust import KvbmRequest
from dynamo.llm.vllm_integration.rust import KvConnectorLeader as RustKvConnectorLeader
from dynamo.llm.vllm_integration.rust import SchedulerOutput as RustSchedulerOutput
......@@ -54,7 +53,6 @@ class KvConnectorLeader:
def __init__(self, vllm_config: "VllmConfig", engine_id: str, **kwargs):
drt = kwargs.get("drt", None)
if drt is None:
find_and_set_available_port_from_env("DYN_SYSTEM_PORT")
self.drt = DistributedRuntime.detached()
else:
self.drt = drt
......
......@@ -28,7 +28,6 @@ if TYPE_CHECKING:
# KvConnectorWorker as RustKvConnectorWorker,
# )
from dynamo.llm.utils import find_and_set_available_port_from_env
from dynamo.llm.vllm_integration.rust import KvConnectorWorker as RustKvConnectorWorker
from dynamo.runtime import DistributedRuntime
......@@ -43,8 +42,6 @@ class KvConnectorWorker:
def __init__(self, vllm_config: "VllmConfig", engine_id: str, **kwargs):
drt = kwargs.get("drt", None)
if drt is None:
# this is needed to avoid metrics port conflict with KVBM leader side DRT if metrics is enabled
find_and_set_available_port_from_env("DYN_SYSTEM_PORT")
self.drt = DistributedRuntime.detached()
else:
self.drt = drt
......
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use dynamo_runtime::metrics::MetricsRegistry;
use prometheus::IntCounter;
use axum::Router;
use dynamo_runtime::metrics::prometheus_names::{
kvbm::{
MATCHED_TOKENS, OFFLOAD_BLOCKS_D2H, OFFLOAD_REQUESTS, ONBOARD_BLOCKS_D2D,
ONBOARD_BLOCKS_H2D, ONBOARD_REQUESTS,
},
sanitize_prometheus_name,
};
use prometheus::{IntCounter, Opts, Registry};
use std::{collections::HashMap, net::SocketAddr, sync::Arc, thread};
use tokio::{net::TcpListener, sync::Notify};
use crate::http::service::{RouteDoc, metrics::router};
#[derive(Clone, Debug)]
pub struct KvbmMetrics {
......@@ -21,60 +32,167 @@ pub struct KvbmMetrics {
// number of blocks onboarded from disk to device
pub onboard_blocks_d2d: IntCounter,
// number of save kv layer requests
pub save_kv_layer_requests: IntCounter,
// number of matched tokens from KVBM
pub matched_tokens: IntCounter,
shutdown_notify: Option<Arc<Notify>>,
}
impl KvbmMetrics {
pub fn new(mr: &dyn MetricsRegistry) -> Self {
/// Create raw metrics and (once per process) spawn an axum server exposing `/metrics` at metrics_port.
/// Non-blocking: the HTTP server runs on a background task.
pub fn new(mr: &KvbmMetricsRegistry, create_endpoint: bool, metrics_port: u16) -> Self {
// 1) register kvbm metrics
let offload_requests = mr
.create_intcounter("offload_requests", "The number of offload requests", &[])
.create_intcounter(OFFLOAD_REQUESTS, "The number of offload requests", &[])
.unwrap();
let offload_blocks_d2h = mr
.create_intcounter(
"offload_blocks_d2h",
OFFLOAD_BLOCKS_D2H,
"The number of offload blocks from device to host",
&[],
)
.unwrap();
let onboard_requests = mr
.create_intcounter("onboard_requests", "The number of onboard requests", &[])
.create_intcounter(ONBOARD_REQUESTS, "The number of onboard requests", &[])
.unwrap();
let onboard_blocks_h2d = mr
.create_intcounter(
"onboard_blocks_h2d",
ONBOARD_BLOCKS_H2D,
"The number of onboard blocks from host to device",
&[],
)
.unwrap();
let onboard_blocks_d2d = mr
.create_intcounter(
"onboard_blocks_d2d",
ONBOARD_BLOCKS_D2D,
"The number of onboard blocks from disk to device",
&[],
)
.unwrap();
let save_kv_layer_requests = mr
.create_intcounter(
"save_kv_layer_requests",
"The number of save kv layer requests",
&[],
)
.unwrap();
let matched_tokens = mr
.create_intcounter("matched_tokens", "The number of matched tokens", &[])
.create_intcounter(MATCHED_TOKENS, "The number of matched tokens", &[])
.unwrap();
// early return if no endpoint is needed
if !create_endpoint {
return Self {
offload_requests,
offload_blocks_d2h,
onboard_requests,
onboard_blocks_h2d,
onboard_blocks_d2d,
matched_tokens,
shutdown_notify: None,
};
}
// 2) start HTTP server in background with graceful shutdown via Notify
let registry = mr.inner(); // Arc<Registry>
let notify = Arc::new(Notify::new());
let notify_for_task = notify.clone();
let addr = SocketAddr::from(([0, 0, 0, 0], metrics_port));
let (_route_docs, app): (Vec<RouteDoc>, Router) = router(
(*registry).clone(), // take owned Registry (Clone) for router to wrap in Arc
None, // or Some("/metrics".to_string()) to override the path
);
let run_server = async move {
let listener = match TcpListener::bind(addr).await {
Ok(listener) => listener,
Err(err) => {
panic!("failed to bind metrics server to {addr}: {err}");
}
};
if let Err(err) = axum::serve(listener, app)
.with_graceful_shutdown(async move {
// wait for shutdown signal
notify_for_task.notified().await;
})
.await
{
tracing::error!("[kvbm] metrics server error: {err}");
}
};
// Spawn on existing runtime if present, otherwise start our own.
if tokio::runtime::Handle::try_current().is_ok() {
tokio::spawn(run_server);
} else {
thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("build tokio runtime");
rt.block_on(run_server);
});
}
Self {
offload_requests,
offload_blocks_d2h,
onboard_requests,
onboard_blocks_h2d,
onboard_blocks_d2d,
save_kv_layer_requests,
matched_tokens,
shutdown_notify: Some(notify),
}
}
}
impl Drop for KvbmMetrics {
fn drop(&mut self) {
if let Some(n) = &self.shutdown_notify {
// (all KvbmMetrics clones) + 1 (held by server task)
// strong_count == 2 means this is the last metrics instance
if Arc::strong_count(n) == 2 {
n.notify_waiters();
}
}
}
}
/// A raw, standalone Prometheus metrics registry implementation using the fixed prefix: `kvbm_`
#[derive(Debug, Clone)]
pub struct KvbmMetricsRegistry {
registry: Arc<Registry>,
prefix: String,
}
impl KvbmMetricsRegistry {
pub fn new() -> Self {
Self {
registry: Arc::new(Registry::new()),
prefix: "kvbm".to_string(),
}
}
pub fn create_intcounter(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
) -> anyhow::Result<IntCounter> {
let metrics_name = sanitize_prometheus_name(&format!("{}_{}", self.prefix, name))?;
let const_labels: HashMap<String, String> = labels
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
let opts = Opts::new(metrics_name, description).const_labels(const_labels);
let c = IntCounter::with_opts(opts)?;
self.registry.register(Box::new(c.clone()))?;
Ok(c)
}
pub fn inner(&self) -> Arc<Registry> {
Arc::clone(&self.registry)
}
}
impl Default for KvbmMetricsRegistry {
fn default() -> Self {
Self::new()
}
}
......@@ -318,13 +318,25 @@ pub mod distributed_runtime {
pub const UPTIME_SECONDS: &str = "uptime_seconds";
}
/// KVBM connector
pub mod kvbm_connector {
/// KVBM connector leader
pub const KVBM_CONNECTOR_LEADER: &str = "kvbm_connector_leader";
/// KVBM
pub mod kvbm {
/// The number of offload requests
pub const OFFLOAD_REQUESTS: &str = "offload_requests";
/// KVBM connector worker
pub const KVBM_CONNECTOR_WORKER: &str = "kvbm_connector_worker";
/// The number of offload blocks from device to host
pub const OFFLOAD_BLOCKS_D2H: &str = "offload_blocks_d2h";
/// The number of onboard requests
pub const ONBOARD_REQUESTS: &str = "onboard_requests";
/// The number of onboard blocks from host to device
pub const ONBOARD_BLOCKS_H2D: &str = "onboard_blocks_h2d";
/// The number of onboard blocks from disk to device
pub const ONBOARD_BLOCKS_D2D: &str = "onboard_blocks_d2d";
/// The number of matched tokens
pub const MATCHED_TOKENS: &str = "matched_tokens";
}
/// KvStats metrics from LLM workers
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment