Unverified Commit 832bf80a authored by Ziqi Fan's avatar Ziqi Fan Committed by GitHub
Browse files

feat: add NVTX annotation to KVBM (#6334)


Signed-off-by: default avatarZiqi Fan <ziqif@nvidia.com>
parent 7df460b8
...@@ -427,6 +427,26 @@ maturin build --release --out /workspace/dist ...@@ -427,6 +427,26 @@ maturin build --release --out /workspace/dist
uv pip install --upgrade --force-reinstall --no-deps /workspace/dist/kvbm*.whl uv pip install --upgrade --force-reinstall --no-deps /workspace/dist/kvbm*.whl
``` ```
To use [Nsight Systems](https://developer.nvidia.com/nsight-systems) for perf analysis, please follow below steps (using vLLM as example). KVBM has NVTX annotation on top level KV Connector APIs (search for `@nvtx_annotate`). If more is needed, please add then rebuild.
```bash
# build and run local-dev container, which contains nsys
python container/render.py --framework=vllm --target=local-dev --output-short-filename
docker build --build-arg USER_UID=$(id -u) --build-arg USER_GID=$(id -g) -f container/rendered.Dockerfile -t dynamo:latest-vllm-local-dev .
container/run.sh --image dynamo:latest-vllm-local-dev -it --mount-workspace --use-nixl-gds
# export nsys to PATH
# NOTE: change the version accordingly
export PATH=/opt/nvidia/nsight-systems/2025.5.1/bin:$PATH
# example usage of nsys: delay 30 seconds and then capture 60 seconds
python -m dynamo.frontend &
DYN_KVBM_CPU_CACHE_GB=10 \
nsys profile -o /tmp/kvbm-nsys --trace-fork-before-exec=true --cuda-graph-trace=node --delay 30 --duration 60 \
python -m dynamo.vllm --model Qwen/Qwen3-0.6B --connector kvbm
```
## See Also ## See Also
- [KVBM Overview](README.md) for a quick overview of KV Caching, KVBM and its architecture - [KVBM Overview](README.md) for a quick overview of KV Caching, KVBM and its architecture
......
...@@ -11,7 +11,7 @@ from kvbm.trtllm_integration.consolidator_config import is_truthy ...@@ -11,7 +11,7 @@ from kvbm.trtllm_integration.consolidator_config import is_truthy
from kvbm.trtllm_integration.rust import KvbmRequest from kvbm.trtllm_integration.rust import KvbmRequest
from kvbm.trtllm_integration.rust import KvConnectorLeader as RustKvConnectorLeader from kvbm.trtllm_integration.rust import KvConnectorLeader as RustKvConnectorLeader
from kvbm.trtllm_integration.rust import SchedulerOutput as RustSchedulerOutput from kvbm.trtllm_integration.rust import SchedulerOutput as RustSchedulerOutput
from kvbm.utils import is_dyn_runtime_enabled from kvbm.utils import is_dyn_runtime_enabled, nvtx_annotate
from tensorrt_llm._torch.pyexecutor.kv_cache_connector import ( from tensorrt_llm._torch.pyexecutor.kv_cache_connector import (
KvCacheConnectorScheduler, KvCacheConnectorScheduler,
SchedulerOutput, SchedulerOutput,
...@@ -107,6 +107,7 @@ class DynamoKVBMConnectorLeader(KvCacheConnectorScheduler): ...@@ -107,6 +107,7 @@ class DynamoKVBMConnectorLeader(KvCacheConnectorScheduler):
consolidator_output_endpoint=consolidator_output_ep, consolidator_output_endpoint=consolidator_output_ep,
) )
@nvtx_annotate(category="scheduler")
def build_connector_meta(self, scheduler_output: SchedulerOutput) -> bytes: def build_connector_meta(self, scheduler_output: SchedulerOutput) -> bytes:
""" """
Build the metadata for the worker. Build the metadata for the worker.
...@@ -154,6 +155,7 @@ class DynamoKVBMConnectorLeader(KvCacheConnectorScheduler): ...@@ -154,6 +155,7 @@ class DynamoKVBMConnectorLeader(KvCacheConnectorScheduler):
return self._connector.build_connector_metadata(output) return self._connector.build_connector_metadata(output)
@nvtx_annotate(category="scheduler")
def get_num_new_matched_tokens( def get_num_new_matched_tokens(
self, request: LlmRequest, num_computed_tokens: int self, request: LlmRequest, num_computed_tokens: int
) -> tuple[int, bool]: ) -> tuple[int, bool]:
...@@ -174,6 +176,7 @@ class DynamoKVBMConnectorLeader(KvCacheConnectorScheduler): ...@@ -174,6 +176,7 @@ class DynamoKVBMConnectorLeader(KvCacheConnectorScheduler):
num_computed_tokens, num_computed_tokens,
) )
@nvtx_annotate(category="scheduler")
def update_state_after_alloc(self, request: LlmRequest, block_ids: List[int]): def update_state_after_alloc(self, request: LlmRequest, block_ids: List[int]):
""" """
Called after get_num_new_matched_tokens is called to provide the block ids to the scheduler. Called after get_num_new_matched_tokens is called to provide the block ids to the scheduler.
...@@ -185,6 +188,7 @@ class DynamoKVBMConnectorLeader(KvCacheConnectorScheduler): ...@@ -185,6 +188,7 @@ class DynamoKVBMConnectorLeader(KvCacheConnectorScheduler):
str(request.request_id), block_ids, request.context_current_position str(request.request_id), block_ids, request.context_current_position
) )
@nvtx_annotate(category="scheduler")
def request_finished(self, request: LlmRequest, cache_block_ids: list[int]) -> bool: def request_finished(self, request: LlmRequest, cache_block_ids: list[int]) -> bool:
""" """
Called when a request is finished generating tokens. Called when a request is finished generating tokens.
......
...@@ -5,7 +5,7 @@ from typing import Optional ...@@ -5,7 +5,7 @@ from typing import Optional
import torch import torch
from kvbm.trtllm_integration.rust import KvConnectorWorker as RustKvConnectorWorker from kvbm.trtllm_integration.rust import KvConnectorWorker as RustKvConnectorWorker
from kvbm.utils import is_dyn_runtime_enabled from kvbm.utils import is_dyn_runtime_enabled, nvtx_annotate
from tensorrt_llm import logger from tensorrt_llm import logger
from tensorrt_llm._torch.pyexecutor.kv_cache_connector import KvCacheConnectorWorker from tensorrt_llm._torch.pyexecutor.kv_cache_connector import KvCacheConnectorWorker
from tensorrt_llm.llmapi.llm_args import TorchLlmArgs from tensorrt_llm.llmapi.llm_args import TorchLlmArgs
...@@ -50,6 +50,7 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker): ...@@ -50,6 +50,7 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker):
# Default to old way of processing offload # Default to old way of processing offload
self.use_forward_pass_callable = False self.use_forward_pass_callable = False
@nvtx_annotate(category="worker")
def register_forward_pass_callable(self) -> callable: def register_forward_pass_callable(self) -> callable:
""" """
Register a callable object which will be called at the Register a callable object which will be called at the
...@@ -58,6 +59,7 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker): ...@@ -58,6 +59,7 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker):
self.use_forward_pass_callable = True self.use_forward_pass_callable = True
return self._callable_object() return self._callable_object()
@nvtx_annotate(category="worker")
def register_kv_caches(self, kv_cache_tensor: torch.Tensor): def register_kv_caches(self, kv_cache_tensor: torch.Tensor):
""" """
Register the KV cache tensors to the worker. Register the KV cache tensors to the worker.
...@@ -94,6 +96,7 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker): ...@@ -94,6 +96,7 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker):
raw_event_handles, raw_event_handles,
) )
@nvtx_annotate(category="worker")
def bind_connector_meta(self, metadata: object): def bind_connector_meta(self, metadata: object):
"""Set the connector metadata from the scheduler. """Set the connector metadata from the scheduler.
...@@ -107,6 +110,7 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker): ...@@ -107,6 +110,7 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker):
super().bind_connector_meta(metadata) super().bind_connector_meta(metadata)
self._connector.bind_connector_meta(metadata) self._connector.bind_connector_meta(metadata)
@nvtx_annotate(category="worker")
def start_load_kv(self, stream: torch.cuda.Stream): def start_load_kv(self, stream: torch.cuda.Stream):
""" """
Begin loading the KV cache in preparation for the next forward pass. Begin loading the KV cache in preparation for the next forward pass.
...@@ -114,12 +118,14 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker): ...@@ -114,12 +118,14 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker):
""" """
self._connector.start_load_kv() self._connector.start_load_kv()
@nvtx_annotate(category="worker")
def wait_for_save(self, stream: torch.cuda.Stream): def wait_for_save(self, stream: torch.cuda.Stream):
""" """
Block until all synchronous saving operations are complete. Called at the end of the forward pass. Block until all synchronous saving operations are complete. Called at the end of the forward pass.
""" """
pass pass
@nvtx_annotate(category="worker")
def wait_for_layer_load(self, layer_idx: int, stream: torch.cuda.Stream): def wait_for_layer_load(self, layer_idx: int, stream: torch.cuda.Stream):
""" """
Wait for a layer to finish being loaded before proceeding with the forward pass on the layer. Wait for a layer to finish being loaded before proceeding with the forward pass on the layer.
...@@ -130,6 +136,7 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker): ...@@ -130,6 +136,7 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker):
""" """
pass pass
@nvtx_annotate(category="worker")
def save_kv_layer(self, layer_idx: int, stream: torch.cuda.Stream): def save_kv_layer(self, layer_idx: int, stream: torch.cuda.Stream):
""" """
Begin saving the KV cache for a layer. Begin saving the KV cache for a layer.
...@@ -142,6 +149,7 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker): ...@@ -142,6 +149,7 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker):
self.events[layer_idx].record(stream) self.events[layer_idx].record(stream)
self._connector.save_kv_layer(layer_idx) self._connector.save_kv_layer(layer_idx)
@nvtx_annotate(category="worker")
def get_finished( def get_finished(
self, finished_gen_req_ids: list[int], started_loading_req_ids: list[int] self, finished_gen_req_ids: list[int], started_loading_req_ids: list[int]
) -> tuple[list[int], list[int]]: ) -> tuple[list[int], list[int]]:
......
...@@ -4,6 +4,23 @@ ...@@ -4,6 +4,23 @@
import os import os
try:
from nvtx import annotate # type: ignore
except ImportError:
def annotate(*args, **kwargs):
"""Dummy decorator when nvtx is not available."""
# If called with a single callable argument and no kwargs,
# it's being used as @annotate (without parentheses)
if len(args) == 1 and callable(args[0]) and not kwargs:
return args[0]
# Otherwise, it's @annotate(...) and should return a decorator
def decorator(func):
return func
return decorator
def is_dyn_runtime_enabled() -> bool: def is_dyn_runtime_enabled() -> bool:
""" """
...@@ -18,3 +35,17 @@ def is_dyn_runtime_enabled() -> bool: ...@@ -18,3 +35,17 @@ def is_dyn_runtime_enabled() -> bool:
""" """
val = os.environ.get("DYN_RUNTIME_ENABLED_KVBM", "").strip().lower() val = os.environ.get("DYN_RUNTIME_ENABLED_KVBM", "").strip().lower()
return val in {"1", "true"} return val in {"1", "true"}
def nvtx_annotate(func=None, *, domain="kvbm", category=None):
"""Decorator for NVTX annotation. Use as @nvtx_annotate or @nvtx_annotate(category="...")."""
def decorator(f):
kwargs = dict(message=f.__qualname__, color="green", domain=domain)
if category is not None:
kwargs["category"] = category
return annotate(**kwargs)(f)
if func is not None:
return decorator(func)
return decorator
...@@ -28,6 +28,7 @@ if TYPE_CHECKING: ...@@ -28,6 +28,7 @@ if TYPE_CHECKING:
# from kvbm.vllm_integration.kv_cache_utils import KvbmCacheBlocks # from kvbm.vllm_integration.kv_cache_utils import KvbmCacheBlocks
from kvbm.utils import nvtx_annotate
from kvbm.vllm_integration.connector_leader import KvConnectorLeader from kvbm.vllm_integration.connector_leader import KvConnectorLeader
from kvbm.vllm_integration.connector_worker import KvConnectorWorker from kvbm.vllm_integration.connector_worker import KvConnectorWorker
...@@ -68,6 +69,7 @@ class DynamoConnector(KVConnectorBase_V1): ...@@ -68,6 +69,7 @@ class DynamoConnector(KVConnectorBase_V1):
# Scheduler/Leader # Scheduler/Leader
@nvtx_annotate(category="scheduler")
def get_num_new_matched_tokens( def get_num_new_matched_tokens(
self, self,
request: "Request", request: "Request",
...@@ -75,17 +77,20 @@ class DynamoConnector(KVConnectorBase_V1): ...@@ -75,17 +77,20 @@ class DynamoConnector(KVConnectorBase_V1):
) -> tuple[int, bool]: ) -> tuple[int, bool]:
return self._scheduler.get_num_new_matched_tokens(request, num_computed_tokens) return self._scheduler.get_num_new_matched_tokens(request, num_computed_tokens)
@nvtx_annotate(category="scheduler")
def update_state_after_alloc( def update_state_after_alloc(
self, request: "Request", blocks: "KVCacheBlocks", num_external_tokens: int self, request: "Request", blocks: "KVCacheBlocks", num_external_tokens: int
): ):
self._scheduler.update_state_after_alloc(request, blocks, num_external_tokens) self._scheduler.update_state_after_alloc(request, blocks, num_external_tokens)
@nvtx_annotate(category="scheduler")
def build_connector_meta( def build_connector_meta(
self, scheduler_output: SchedulerOutput self, scheduler_output: SchedulerOutput
) -> KVConnectorMetadata: ) -> KVConnectorMetadata:
data = self._scheduler.build_connector_meta(scheduler_output) data = self._scheduler.build_connector_meta(scheduler_output)
return DynamoConnectorMetadata(data) return DynamoConnectorMetadata(data)
@nvtx_annotate(category="scheduler")
def request_finished( def request_finished(
self, self,
request: "Request", request: "Request",
...@@ -95,9 +100,11 @@ class DynamoConnector(KVConnectorBase_V1): ...@@ -95,9 +100,11 @@ class DynamoConnector(KVConnectorBase_V1):
# Worker # Worker
@nvtx_annotate(category="worker")
def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]): def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]):
self._worker.register_kv_caches(kv_caches) self._worker.register_kv_caches(kv_caches)
@nvtx_annotate(category="worker")
@override @override
def bind_connector_metadata( def bind_connector_metadata(
self, connector_metadata: DynamoConnectorMetadata self, connector_metadata: DynamoConnectorMetadata
...@@ -108,19 +115,23 @@ class DynamoConnector(KVConnectorBase_V1): ...@@ -108,19 +115,23 @@ class DynamoConnector(KVConnectorBase_V1):
assert isinstance(connector_metadata.metadata, bytes) assert isinstance(connector_metadata.metadata, bytes)
self._worker.bind_connector_metadata(connector_metadata.metadata) self._worker.bind_connector_metadata(connector_metadata.metadata)
@nvtx_annotate(category="worker")
@override @override
def clear_connector_metadata(self) -> None: def clear_connector_metadata(self) -> None:
super().clear_connector_metadata() super().clear_connector_metadata()
self._worker.clear_connector_metadata() self._worker.clear_connector_metadata()
@nvtx_annotate(category="worker")
@override @override
def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None: def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
self._worker.start_load_kv(forward_context, **kwargs) self._worker.start_load_kv(forward_context, **kwargs)
@nvtx_annotate(category="worker")
@override @override
def wait_for_layer_load(self, layer_name: str) -> None: def wait_for_layer_load(self, layer_name: str) -> None:
pass pass
@nvtx_annotate(category="worker")
@override @override
def save_kv_layer( def save_kv_layer(
self, self,
...@@ -131,6 +142,7 @@ class DynamoConnector(KVConnectorBase_V1): ...@@ -131,6 +142,7 @@ class DynamoConnector(KVConnectorBase_V1):
) -> None: ) -> None:
self._worker.save_kv_layer(layer_name, kv_layer, attn_metadata, **kwargs) self._worker.save_kv_layer(layer_name, kv_layer, attn_metadata, **kwargs)
@nvtx_annotate(category="worker")
@override @override
def wait_for_save(self): def wait_for_save(self):
pass pass
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment