"...git@developer.sourcefind.cn:2222/OpenDAS/vllm_cscc.git" did not exist on "9c1352eb5736d9e71d37959db44b6a641e898772"
Unverified Commit e21dcf6c authored by Ziqi Fan's avatar Ziqi Fan Committed by GitHub
Browse files

feat: enable KVBM emit metrics in Dynamo TRTLLM (#3254)


Signed-off-by: default avatarZiqi Fan <ziqif@nvidia.com>
parent 67e1f6ee
...@@ -62,13 +62,13 @@ scrape_configs: ...@@ -62,13 +62,13 @@ scrape_configs:
- job_name: 'kvbm-leader-metrics' - job_name: 'kvbm-leader-metrics'
scrape_interval: 2s scrape_interval: 2s
static_configs: static_configs:
- targets: ['host.docker.internal:6881'] - targets: ['host.docker.internal:6882']
# KVBM worker related metrics # KVBM worker related metrics
- job_name: 'kvbm-worker-metrics' - job_name: 'kvbm-worker-metrics'
scrape_interval: 2s scrape_interval: 2s
static_configs: static_configs:
- targets: ['host.docker.internal:6880'] - targets: ['host.docker.internal:6881']
# Uncomment to see its own Prometheus metrics # Uncomment to see its own Prometheus metrics
# - job_name: 'prometheus' # - job_name: 'prometheus'
......
...@@ -45,12 +45,12 @@ docker compose -f deploy/docker-compose.yml up -d ...@@ -45,12 +45,12 @@ docker compose -f deploy/docker-compose.yml up -d
./container/run.sh --framework trtllm -it --mount-workspace --use-nixl-gds ./container/run.sh --framework trtllm -it --mount-workspace --use-nixl-gds
# enable kv offloading to CPU memory # enable kv offloading to CPU memory
# 60 means 60GB of pinned CPU memory would be used # 4 means 4GB of pinned CPU memory would be used
export DYN_KVBM_CPU_CACHE_GB=60 export DYN_KVBM_CPU_CACHE_GB=4
# enable kv offloading to disk. Note: To enable disk cache offloading, you must first enable a CPU memory cache offloading. # enable kv offloading to disk. Note: To enable disk cache offloading, you must first enable a CPU memory cache offloading.
# 20 means 20GB of disk would be used # 8 means 8GB of disk would be used
export DYN_KVBM_DISK_CACHE_GB=20 export DYN_KVBM_DISK_CACHE_GB=8
# Allocating memory and disk storage can take some time. # Allocating memory and disk storage can take some time.
# We recommend setting a higher timeout for leader–worker initialization. # We recommend setting a higher timeout for leader–worker initialization.
...@@ -73,10 +73,10 @@ kv_connector_config: ...@@ -73,10 +73,10 @@ kv_connector_config:
connector_worker_class: DynamoKVBMConnectorWorker connector_worker_class: DynamoKVBMConnectorWorker
EOF EOF
# start dynamo frontend # [DYNAMO] start dynamo frontend
python3 -m dynamo.frontend --http-port 8000 & python3 -m dynamo.frontend --http-port 8000 &
# To serve an LLM model with dynamo # [DYNAMO] To serve an LLM model with dynamo
python3 -m dynamo.trtllm \ python3 -m dynamo.trtllm \
--model-path deepseek-ai/DeepSeek-R1-Distill-Llama-8B \ --model-path deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--served-model-name deepseek-ai/DeepSeek-R1-Distill-Llama-8B \ --served-model-name deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
...@@ -95,7 +95,32 @@ curl localhost:8000/v1/chat/completions -H "Content-Type: application/json" ...@@ -95,7 +95,32 @@ curl localhost:8000/v1/chat/completions -H "Content-Type: application/json"
"max_tokens": 30 "max_tokens": 30
}' }'
# Optionally, we could also serve an LLM with trtllm-serve to utilize the KVBM feature. ```
trtllm-serve deepseek-ai/DeepSeek-R1-Distill-Llama-8B --host localhost --port 8001 --backend pytorch --extra_llm_api_options /tmp/kvbm_llm_api_config.yaml
Alternatively, can use "trtllm-serve" with KVBM by replacing the above two [DYNAMO] cmds with below:
```bash
trtllm-serve deepseek-ai/DeepSeek-R1-Distill-Llama-8B --host localhost --port 8000 --backend pytorch --extra_llm_api_options /tmp/kvbm_llm_api_config.yaml
``` ```
## Enable and View KVBM Metrics
Follow below steps to enable metrics collection and view via Grafana dashboard:
```bash
# Start the basic services (etcd & natsd), along with Prometheus and Grafana
docker compose -f deploy/docker-compose.yml --profile metrics up -d
# set env var DYN_SYSTEM_ENABLED to true, DYN_SYSTEM_PORT to 6880, DYN_KVBM_SLEEP to 5, when launch via dynamo
# NOTE: Make sure port 6881 (for KVBM worker metrics) and port 6882 (for KVBM leader metrics) are available.
# NOTE: DYN_KVBM_SLEEP is needed to avoid metrics port conflict between KVBM leader and worker
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=6880 DYN_KVBM_SLEEP=5 \
python3 -m dynamo.trtllm \
--model-path deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--served-model-name deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--extra-engine-args /tmp/kvbm_llm_api_config.yaml &
# optional if firewall blocks KVBM metrics ports to send prometheus metrics
sudo ufw allow 6881/tcp
sudo ufw allow 6882/tcp
```
View grafana metrics via http://localhost:3001 (default login: dynamo/dynamo) and look for KVBM Dashboard
...@@ -77,13 +77,16 @@ Follow below steps to enable metrics collection and view via Grafana dashboard: ...@@ -77,13 +77,16 @@ Follow below steps to enable metrics collection and view via Grafana dashboard:
# Start the basic services (etcd & natsd), along with Prometheus and Grafana # Start the basic services (etcd & natsd), along with Prometheus and Grafana
docker compose -f deploy/docker-compose.yml --profile metrics up -d docker compose -f deploy/docker-compose.yml --profile metrics up -d
# start vllm with DYN_SYSTEM_ENABLED set to true and DYN_SYSTEM_PORT port to 6880. # set env var DYN_SYSTEM_ENABLED to true, DYN_SYSTEM_PORT to 6880, DYN_KVBM_SLEEP to 5, when launch via dynamo
# NOTE: Make sure port 6880 (for KVBM worker metrics) and port 6881 (for KVBM leader metrics) are available. # NOTE: Make sure port 6881 (for KVBM worker metrics) and port 6882 (for KVBM leader metrics) are available.
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=6880 vllm serve --kv-transfer-config '{"kv_connector":"DynamoConnector","kv_role":"kv_both", "kv_connector_module_path": "dynamo.llm.vllm_integration.connector"}' deepseek-ai/DeepSeek-R1-Distill-Llama-8B DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=6880 \
python -m dynamo.vllm \
--model deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--connector kvbm &
# optional if firewall blocks KVBM metrics ports to send prometheus metrics # optional if firewall blocks KVBM metrics ports to send prometheus metrics
sudo ufw allow 6880/tcp
sudo ufw allow 6881/tcp sudo ufw allow 6881/tcp
sudo ufw allow 6882/tcp
``` ```
View grafana metrics via http://localhost:3001 (default login: dynamo/dynamo) and look for KVBM Dashboard View grafana metrics via http://localhost:3001 (default login: dynamo/dynamo) and look for KVBM Dashboard
......
...@@ -19,8 +19,10 @@ use crate::{ ...@@ -19,8 +19,10 @@ use crate::{
use anyhow; use anyhow;
use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig}; use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig};
use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics;
use dynamo_llm::block_manager::storage::torch::TorchTensor; use dynamo_llm::block_manager::storage::torch::TorchTensor;
use dynamo_runtime::DistributedRuntime; use dynamo_runtime::DistributedRuntime;
use dynamo_runtime::metrics::prometheus_names::kvbm_connector;
use dynamo_runtime::utils::task::CriticalTaskExecutionHandle; use dynamo_runtime::utils::task::CriticalTaskExecutionHandle;
pub trait Worker: Send + Sync { pub trait Worker: Send + Sync {
...@@ -68,6 +70,8 @@ pub struct KvConnectorWorker { ...@@ -68,6 +70,8 @@ pub struct KvConnectorWorker {
/// cuda events created by the python side /// cuda events created by the python side
layer_events: Vec<u64>, layer_events: Vec<u64>,
kvbm_metrics: KvbmMetrics,
} }
impl KvConnectorWorker { impl KvConnectorWorker {
...@@ -93,6 +97,11 @@ impl KvConnectorWorker { ...@@ -93,6 +97,11 @@ impl KvConnectorWorker {
trtllm_rank trtllm_rank
); );
let kvbm_metrics = KvbmMetrics::new(
&drt.namespace(kvbm_connector::KVBM_CONNECTOR_WORKER)
.unwrap(),
);
Ok(Self { Ok(Self {
drt, drt,
kvbm_worker: OnceLock::new(), kvbm_worker: OnceLock::new(),
...@@ -106,6 +115,7 @@ impl KvConnectorWorker { ...@@ -106,6 +115,7 @@ impl KvConnectorWorker {
iteration: 0, iteration: 0,
layers_complete: 0, layers_complete: 0,
layer_events: Vec::new(), layer_events: Vec::new(),
kvbm_metrics,
}) })
} }
} }
...@@ -223,6 +233,7 @@ impl Worker for KvConnectorWorker { ...@@ -223,6 +233,7 @@ impl Worker for KvConnectorWorker {
self.connector.enqueue_request(operation); self.connector.enqueue_request(operation);
} }
} }
self.kvbm_metrics.save_kv_layer_requests.inc();
Ok(()) Ok(())
} }
......
...@@ -17,12 +17,17 @@ from dynamo.llm.trtllm_integration.rust import ( ...@@ -17,12 +17,17 @@ from dynamo.llm.trtllm_integration.rust import (
KvConnectorLeader as RustKvConnectorLeader, KvConnectorLeader as RustKvConnectorLeader,
) )
from dynamo.llm.trtllm_integration.rust import SchedulerOutput as RustSchedulerOutput from dynamo.llm.trtllm_integration.rust import SchedulerOutput as RustSchedulerOutput
from dynamo.llm.utils import find_and_set_available_port_from_env, maybe_sleep
from dynamo.runtime import DistributedRuntime from dynamo.runtime import DistributedRuntime
class DynamoKVBMConnectorLeader(KvCacheConnectorScheduler): class DynamoKVBMConnectorLeader(KvCacheConnectorScheduler):
def __init__(self, llm_args: TorchLlmArgs): def __init__(self, llm_args: TorchLlmArgs):
super().__init__(llm_args) super().__init__(llm_args)
# NOTE: this is needed in TRTLLM to avoid metrics port conflict with KVBM worker,
# since there is no startup order in TRTLLM and race condition is possible.
maybe_sleep()
find_and_set_available_port_from_env("DYN_SYSTEM_PORT")
self.drt = DistributedRuntime.detached() self.drt = DistributedRuntime.detached()
mappings = self._llm_args.parallel_config.to_mapping() mappings = self._llm_args.parallel_config.to_mapping()
......
...@@ -9,6 +9,7 @@ from tensorrt_llm.llmapi.llm_args import TorchLlmArgs ...@@ -9,6 +9,7 @@ from tensorrt_llm.llmapi.llm_args import TorchLlmArgs
from dynamo.llm.trtllm_integration.rust import ( from dynamo.llm.trtllm_integration.rust import (
KvConnectorWorker as RustKvConnectorWorker, KvConnectorWorker as RustKvConnectorWorker,
) )
from dynamo.llm.utils import find_and_set_available_port_from_env
from dynamo.runtime import DistributedRuntime from dynamo.runtime import DistributedRuntime
...@@ -16,6 +17,7 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker): ...@@ -16,6 +17,7 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker):
def __init__(self, llm_args: TorchLlmArgs): def __init__(self, llm_args: TorchLlmArgs):
super().__init__(llm_args) super().__init__(llm_args)
find_and_set_available_port_from_env("DYN_SYSTEM_PORT")
self.drt = DistributedRuntime.detached() self.drt = DistributedRuntime.detached()
mappings = self._llm_args.parallel_config.to_mapping() mappings = self._llm_args.parallel_config.to_mapping()
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import os
import socket
import time
def maybe_sleep():
"""
Maybe sleep for the duration specified in the environment variable if it is set.
"""
sleep_duration = int(os.environ.get("DYN_KVBM_SLEEP", "0"))
if sleep_duration > 0:
print(f"Sleeping {sleep_duration} seconds to avoid metrics port conflict")
time.sleep(sleep_duration)
# TODO(keiven|ziqi): Auto port selection to be done in Rust
def find_and_set_available_port_from_env(env_var="DYN_SYSTEM_PORT"):
"""
Find an available port from the environment variable.
"""
port = int(os.environ.get(env_var, "0"))
if port == 0:
# No port specified, let system pick
pass
while True:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# Port is available
s.bind(("127.0.0.1", port))
s.close()
os.environ[env_var] = str(port)
print(f"Port {port} is available, setting env var {env_var} to {port}")
break
except OSError:
# Port is in use, try next
port += 1
s.close()
except Exception as e:
raise RuntimeError(f"Error finding available port: {e}")
...@@ -29,9 +29,7 @@ if TYPE_CHECKING: ...@@ -29,9 +29,7 @@ if TYPE_CHECKING:
# from dynamo.llm.vllm_integration.rust import SchedulerOutput as RustSchedulerOutput # from dynamo.llm.vllm_integration.rust import SchedulerOutput as RustSchedulerOutput
from dynamo.llm import KvbmLeader from dynamo.llm import KvbmLeader
from dynamo.llm.vllm_integration.kv_cache_utils import ( from dynamo.llm.utils import find_and_set_available_port_from_env
find_and_set_available_port_from_env,
)
from dynamo.llm.vllm_integration.rust import KvbmRequest from dynamo.llm.vllm_integration.rust import KvbmRequest
from dynamo.llm.vllm_integration.rust import KvConnectorLeader as RustKvConnectorLeader from dynamo.llm.vllm_integration.rust import KvConnectorLeader as RustKvConnectorLeader
from dynamo.llm.vllm_integration.rust import SchedulerOutput as RustSchedulerOutput from dynamo.llm.vllm_integration.rust import SchedulerOutput as RustSchedulerOutput
......
...@@ -28,9 +28,7 @@ if TYPE_CHECKING: ...@@ -28,9 +28,7 @@ if TYPE_CHECKING:
# KvConnectorWorker as RustKvConnectorWorker, # KvConnectorWorker as RustKvConnectorWorker,
# ) # )
from dynamo.llm.vllm_integration.kv_cache_utils import ( from dynamo.llm.utils import find_and_set_available_port_from_env
find_and_set_available_port_from_env,
)
from dynamo.llm.vllm_integration.rust import KvConnectorWorker as RustKvConnectorWorker from dynamo.llm.vllm_integration.rust import KvConnectorWorker as RustKvConnectorWorker
from dynamo.runtime import DistributedRuntime from dynamo.runtime import DistributedRuntime
......
...@@ -7,8 +7,6 @@ Implementation of vLLM protocols for KV cache utility objects. ...@@ -7,8 +7,6 @@ Implementation of vLLM protocols for KV cache utility objects.
from __future__ import annotations from __future__ import annotations
import os
import socket
from typing import List from typing import List
from vllm.v1.core.kv_cache_manager import KVCacheBlocks from vllm.v1.core.kv_cache_manager import KVCacheBlocks
...@@ -88,29 +86,3 @@ def convert_kv_cache_blocks(blocks: KVCacheBlocks) -> BlockStates: ...@@ -88,29 +86,3 @@ def convert_kv_cache_blocks(blocks: KVCacheBlocks) -> BlockStates:
for block in blocks.blocks: for block in blocks.blocks:
states.push_back(convert_kv_cache_block(block)) states.push_back(convert_kv_cache_block(block))
return states return states
# TODO(keiven|ziqi): Auto port selection to be done in Rust
def find_and_set_available_port_from_env(env_var="DYN_SYSTEM_PORT"):
"""
Find an available port from the environment variable.
"""
port = int(os.environ.get(env_var, "0"))
if port == 0:
# No port specified, let system pick
pass
while True:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# Port is available
s.bind(("127.0.0.1", port))
s.close()
os.environ[env_var] = str(port)
print(f"Port {port} is available, setting env var {env_var} to {port}")
break
except OSError:
# Port is in use, try next
port += 1
s.close()
except Exception as e:
raise RuntimeError(f"Error finding available port: {e}")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment