Unverified Commit b658ba61 authored by Ziqi Fan's avatar Ziqi Fan Committed by GitHub
Browse files

feat: enable dynamo metrics on KVBM (#2626)

parent 5045f13b
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"description": "All KVBM related metrics",
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 4,
"links": [],
"panels": [
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 0
},
"id": 1,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "12.0.1",
"targets": [
{
"disableTextWrap": false,
"editorMode": "builder",
"expr": "dynamo_component_save_kv_layer_requests{dynamo_namespace=\"kvbm_connector_worker\"}",
"fullMetaSearch": false,
"includeNullMetadata": true,
"legendFormat": "__auto",
"range": true,
"refId": "A",
"useBackend": false
}
],
"title": "KVBM Worker: save kv layer requests",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 8
},
"id": 2,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "12.0.1",
"targets": [
{
"disableTextWrap": false,
"editorMode": "builder",
"expr": "dynamo_component_offload_requests{dynamo_namespace=\"kvbm_connector_leader\"}",
"fullMetaSearch": false,
"includeNullMetadata": true,
"legendFormat": "__auto",
"range": true,
"refId": "A",
"useBackend": false
}
],
"title": "KVBM Leader: offload requests",
"type": "timeseries"
}
],
"preload": false,
"refresh": "auto",
"schemaVersion": 41,
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-15m",
"to": "now"
},
"timepicker": {},
"timezone": "browser",
"title": "KVBM Dashboard",
"uid": "3f679257-70a5-402c-92b4-05382337b548",
"version": 7
}
......@@ -58,6 +58,18 @@ scrape_configs:
# - targets: ['localhost:9091'] # metrics aggregation service on host
- targets: ['host.docker.internal:9091'] # metrics aggregation service on host
# KVBM leader related metrics
- job_name: 'kvbm-leader-metrics'
scrape_interval: 2s
static_configs:
- targets: ['host.docker.internal:6881']
# KVBM worker related metrics
- job_name: 'kvbm-worker-metrics'
scrape_interval: 2s
static_configs:
- targets: ['host.docker.internal:6880']
# Uncomment to see its own Prometheus metrics
# - job_name: 'prometheus'
# scrape_interval: 5s
......
......@@ -59,3 +59,21 @@ curl localhost:8000/v1/chat/completions -H "Content-Type: application/json"
"max_tokens": 30
}'
```
## Enable and View KVBM Metrics
Follow below steps to enable metrics collection and view via Grafana dashboard:
```bash
# Start the basic services (etcd & natsd), along with Prometheus and Grafana
docker compose -f deploy/docker-compose.yml --profile metrics up -d
# start vllm with DYN_SYSTEM_ENABLED set to true and DYN_SYSTEM_PORT port to 6880.
# NOTE: Make sure port 6880 (for KVBM worker metrics) and port 6881 (for KVBM leader metrics) are available.
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=6880 vllm serve --kv-transfer-config '{"kv_connector":"DynamoConnector","kv_role":"kv_both", "kv_connector_module_path": "dynamo.llm.vllm_integration.connector"}' deepseek-ai/DeepSeek-R1-Distill-Llama-8B
# optional if firewall blocks KVBM metrics ports to send prometheus metrics
sudo ufw allow 6880/tcp
sudo ufw allow 6881/tcp
```
View grafana metrics via http://localhost:3001 (default login: dynamo/dynamo) and look for KVBM Dashboard
......@@ -1333,6 +1333,7 @@ dependencies = [
"either",
"futures",
"once_cell",
"prometheus",
"pyo3",
"pyo3-async-runtimes",
"pythonize",
......
......@@ -80,6 +80,7 @@ pythonize = "0.23"
dlpark = { version = "0.5", features = ["pyo3", "half"], optional = true }
cudarc = { version = "0.16.2", features = ["cuda-12020"], optional = true }
prometheus = "0.14.0"
[dev-dependencies]
......
......@@ -5,6 +5,7 @@ pub mod recorder;
pub mod slot;
use super::*;
use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics;
use dynamo_runtime::DistributedRuntime;
use slot::{ConnectorSlotManager, SlotError, SlotManager, SlotState};
......@@ -14,6 +15,7 @@ use crate::llm::block_manager::{
vllm::KvbmRequest, VllmBlockManager,
};
use crate::DistributedRuntime as PyDistributedRuntime;
use dynamo_runtime::metrics::prometheus_names::kvbm_connector;
use dynamo_llm::block_manager::{
block::{
......@@ -25,10 +27,7 @@ use dynamo_llm::block_manager::{
};
use dynamo_llm::tokens::{SaltHash, TokenBlockSequence, Tokens};
use std::{
collections::HashSet,
sync::{Arc, Mutex},
};
use std::{collections::HashSet, sync::Mutex};
use tokio;
use tokio::sync::mpsc;
......@@ -104,8 +103,19 @@ impl KvConnectorLeader {
// if we need a drt, get it from here
let drt = drt.inner().clone();
let ns = drt
.namespace(kvbm_connector::KVBM_CONNECTOR_LEADER)
.unwrap();
let kvbm_metrics = KvbmMetrics::new(&ns);
Self {
slot_manager: ConnectorSlotManager::new(block_manager.clone(), leader, drt.clone()),
slot_manager: ConnectorSlotManager::new(
block_manager.clone(),
leader,
drt.clone(),
kvbm_metrics,
),
block_size,
inflight_requests: HashSet::new(),
onboarding_slots: HashSet::new(),
......
......@@ -109,6 +109,10 @@ impl KvConnectorLeaderRecorder {
let output_path = "/tmp/records.jsonl";
tracing::info!("recording events to {}", output_path);
let ns = drt.namespace("kvbm_connector_leader").unwrap();
let kvbm_metrics = KvbmMetrics::new(&ns);
let recorder = drt
.runtime()
.primary()
......@@ -116,7 +120,12 @@ impl KvConnectorLeaderRecorder {
.unwrap();
let connector_leader = KvConnectorLeader {
slot_manager: ConnectorSlotManager::new(block_manager.clone(), leader, drt.clone()),
slot_manager: ConnectorSlotManager::new(
block_manager.clone(),
leader,
drt.clone(),
kvbm_metrics,
),
block_size,
inflight_requests: HashSet::new(),
onboarding_slots: HashSet::new(),
......
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::any::Any;
use std::{any::Any, sync::Arc};
use dynamo_llm::{
block_manager::{
......@@ -179,6 +179,7 @@ impl<R: RequestKey> ConnectorSlotManager<R> {
block_manager: VllmBlockManager,
leader: Arc<KvbmLeader>,
drt: DistributedRuntime,
kvbm_metrics: KvbmMetrics,
) -> Self {
tracing::debug!(
"creating slot manager with block size: {}",
......@@ -190,11 +191,14 @@ impl<R: RequestKey> ConnectorSlotManager<R> {
let mut xfer_engine = LocalTransferEngine::new(block_manager.clone(), leader, xfer_rx);
let primary_token = drt.primary_token();
let runtime_primary = drt.runtime().primary();
let drt_for_task = drt;
let xfer_engine_task = CriticalTaskExecutionHandle::new_with_runtime(
|cancellation_token| async move {
xfer_engine.execute(cancellation_token, drt_for_task).await
xfer_engine
.execute(cancellation_token, drt_for_task, kvbm_metrics.clone())
.await
},
primary_token,
"LocalTransferEngine",
......@@ -1027,6 +1031,7 @@ impl LocalTransferEngine {
&mut self,
cancellation_token: CancellationToken,
drt: DistributedRuntime,
kvbm_metrics: KvbmMetrics,
) -> anyhow::Result<()> {
let (onboard_tx, mut onboard_rx) = mpsc::unbounded_channel();
let (offload_tx, mut offload_rx) = mpsc::unbounded_channel();
......@@ -1062,8 +1067,13 @@ impl LocalTransferEngine {
tracing::debug!("LocalOffloadTask: received cancellation signal");
break;
}
if let Err(e) =
process_offload_request(req, &block_manager_offload, &leader_offload).await
if let Err(e) = process_offload_request(
req,
&block_manager_offload,
&leader_offload,
kvbm_metrics.clone(),
)
.await
{
tracing::error!("LocalOffloadTask: error processing request: {:?}", e);
}
......@@ -1132,7 +1142,10 @@ async fn process_offload_request(
offload_req: LocalOffloadRequest,
block_manager: &VllmBlockManager,
leader: &Arc<KvbmLeader>,
kvbm_metrics: KvbmMetrics,
) -> anyhow::Result<()> {
kvbm_metrics.offload_requests.inc();
let request_id = &offload_req.request_id;
let operation_id = &offload_req.operation_id;
......
......@@ -5,6 +5,7 @@ use dynamo_llm::block_manager::connector::protocol::TransferType;
use dynamo_llm::block_manager::connector::scheduler::{
Scheduler, TransferSchedulerClient, WorkerSchedulerClient,
};
use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics;
use std::collections::HashSet;
use std::sync::{Arc, OnceLock};
......@@ -15,6 +16,7 @@ use crate::{
llm::block_manager::distributed::VllmTensor, to_pyerr,
DistributedRuntime as PyDistributedRuntime,
};
use dynamo_runtime::metrics::prometheus_names::kvbm_connector;
use anyhow;
use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig};
......@@ -68,6 +70,8 @@ pub struct KvConnectorWorker {
/// cuda events created by the python side
layer_events: Vec<u64>,
kvbm_metrics: KvbmMetrics,
}
impl KvConnectorWorker {
......@@ -88,6 +92,11 @@ impl KvConnectorWorker {
)?
.detach();
let kvbm_metrics = KvbmMetrics::new(
&drt.namespace(kvbm_connector::KVBM_CONNECTOR_WORKER)
.unwrap(),
);
tracing::info!(
"KvConnectorWorker initialized with worker_id: {}",
vllm_worker_id
......@@ -106,6 +115,7 @@ impl KvConnectorWorker {
layers_complete: 0,
kv_cache_layers: Vec::new(),
layer_events: Vec::new(),
kvbm_metrics,
})
}
}
......@@ -255,6 +265,7 @@ impl Worker for KvConnectorWorker {
/// Trigger layer-wise completion signals.
/// Trigger block-wise completion signals afer last layer.
fn save_kv_layer(&mut self, _layer_name: String) -> anyhow::Result<()> {
self.kvbm_metrics.save_kv_layer_requests.inc();
self.layers_complete += 1;
if self.layers_complete == self.kv_cache_layers.len() {
let offloading_operations = std::mem::take(&mut self.offloading_operations);
......
......@@ -30,6 +30,9 @@ if TYPE_CHECKING:
# from dynamo.llm.vllm_integration.rust import SchedulerOutput as RustSchedulerOutput
from dynamo.llm import BlockManager, KvbmLeader
from dynamo.llm.vllm_integration.kv_cache_utils import (
find_and_set_available_port_from_env,
)
from dynamo.llm.vllm_integration.rust import KvbmRequest
from dynamo.llm.vllm_integration.rust import KvConnectorLeader as RustKvConnectorLeader
from dynamo.llm.vllm_integration.rust import SchedulerOutput as RustSchedulerOutput
......@@ -54,6 +57,7 @@ class KvConnectorLeader:
def __init__(self, vllm_config: "VllmConfig", engine_id: str, **kwargs):
drt = kwargs.get("drt", None)
if drt is None:
find_and_set_available_port_from_env("DYN_SYSTEM_PORT")
self.drt = DistributedRuntime.detached()
else:
self.drt = drt
......
......@@ -28,6 +28,9 @@ if TYPE_CHECKING:
# KvConnectorWorker as RustKvConnectorWorker,
# )
from dynamo.llm.vllm_integration.kv_cache_utils import (
find_and_set_available_port_from_env,
)
from dynamo.llm.vllm_integration.rust import KvConnectorWorker as RustKvConnectorWorker
from dynamo.runtime import DistributedRuntime
......@@ -42,6 +45,8 @@ class KvConnectorWorker:
def __init__(self, vllm_config: "VllmConfig", engine_id: str, **kwargs):
drt = kwargs.get("drt", None)
if drt is None:
# this is needed to avoid metrics port conflict with KVBM leader side DRT if metrics is enabled
find_and_set_available_port_from_env("DYN_SYSTEM_PORT")
self.drt = DistributedRuntime.detached()
else:
self.drt = drt
......
......@@ -7,6 +7,8 @@ Implementation of vLLM protocols for KV cache utility objects.
from __future__ import annotations
import os
import socket
from typing import List
from vllm.v1.core.kv_cache_manager import KVCacheBlocks
......@@ -86,3 +88,29 @@ def convert_kv_cache_blocks(blocks: KVCacheBlocks) -> BlockStates:
for block in blocks.blocks:
states.push_back(convert_kv_cache_block(block))
return states
# TODO(keiven|ziqi): Auto port selection to be done in Rust
def find_and_set_available_port_from_env(env_var="DYN_SYSTEM_PORT"):
"""
Find an available port from the environment variable.
"""
port = int(os.environ.get(env_var, "0"))
if port == 0:
# No port specified, let system pick
pass
while True:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# Port is available
s.bind(("127.0.0.1", port))
s.close()
os.environ[env_var] = str(port)
print(f"Port {port} is available, setting env var {env_var} to {port}")
break
except OSError:
# Port is in use, try next
port += 1
s.close()
except Exception as e:
raise RuntimeError(f"Error finding available port: {e}")
......@@ -28,6 +28,7 @@ pub mod distributed;
pub mod events;
pub mod layout;
pub mod metrics;
pub mod metrics_kvbm;
pub mod offload;
pub mod pool;
pub mod storage;
......
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use dynamo_runtime::metrics::MetricsRegistry;
use prometheus::IntCounter;
#[derive(Clone, Debug)]
pub struct KvbmMetrics {
pub offload_requests: IntCounter,
pub save_kv_layer_requests: IntCounter,
}
impl KvbmMetrics {
pub fn new(mr: &dyn MetricsRegistry) -> Self {
let offload_requests = mr
.create_intcounter("offload_requests", "The number of offload requests", &[])
.unwrap();
let save_kv_layer_requests = mr
.create_intcounter(
"save_kv_layer_requests",
"The number of save kv layer requests",
&[],
)
.unwrap();
Self {
offload_requests,
save_kv_layer_requests,
}
}
}
......@@ -132,3 +132,12 @@ pub mod work_handler {
/// Time spent processing requests by work handler (histogram)
pub const REQUEST_DURATION_SECONDS: &str = "request_duration_seconds";
}
/// KVBM connector
pub mod kvbm_connector {
/// KVBM connector leader
pub const KVBM_CONNECTOR_LEADER: &str = "kvbm_connector_leader";
/// KVBM connector worker
pub const KVBM_CONNECTOR_WORKER: &str = "kvbm_connector_worker";
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment