Unverified Commit 758df5af authored by Nicolò Lucchesi's avatar Nicolò Lucchesi Committed by GitHub
Browse files

[NIXL][Metrics] Track `nixl_num_kv_expired_reqs` metric in Prometheus (#32340)



Add a new metric to track the number of requests that had their KV blocks
expire. The scenario is particularly important to surface and track as it is a
vital indicator of the health of the deployment.

Currently we're resorting to track these failures through unstructured log
parsing (which is, among other thing, error string dependent); current main:

> Releasing expired KV blocks for request cmpl-071d which were retrieved by 0 decode worker(s) within 0 seconds.
Signed-off-by: default avatarNickLucche <nlucches@redhat.com>
parent cdd03d25
...@@ -1930,6 +1930,7 @@ class NixlConnectorWorker: ...@@ -1930,6 +1930,7 @@ class NixlConnectorWorker:
if now < expires: if now < expires:
break break
count = self.consumer_notification_counts_by_req.pop(req_id, 0) count = self.consumer_notification_counts_by_req.pop(req_id, 0)
self.xfer_stats.record_kv_expired_req()
logger.warning( logger.warning(
"Releasing expired KV blocks for request %s which were " "Releasing expired KV blocks for request %s which were "
"retrieved by %d decode worker(s) within %d seconds.", "retrieved by %d decode worker(s) within %d seconds.",
...@@ -2499,13 +2500,14 @@ class NixlKVConnectorStats(KVConnectorStats): ...@@ -2499,13 +2500,14 @@ class NixlKVConnectorStats(KVConnectorStats):
def reset(self): def reset(self):
# Must be serializable # Must be serializable
self.data: dict[str, list[float]] = { self.data: dict[str, list[float | int]] = {
"transfer_duration": [], "transfer_duration": [],
"post_duration": [], "post_duration": [],
"bytes_transferred": [], "bytes_transferred": [],
"num_descriptors": [], "num_descriptors": [],
"num_failed_transfers": [], "num_failed_transfers": [],
"num_failed_notifications": [], "num_failed_notifications": [],
"num_kv_expired_reqs": [],
} }
def record_transfer(self, res: nixlXferTelemetry): def record_transfer(self, res: nixlXferTelemetry):
...@@ -2517,11 +2519,15 @@ class NixlKVConnectorStats(KVConnectorStats): ...@@ -2517,11 +2519,15 @@ class NixlKVConnectorStats(KVConnectorStats):
def record_failed_transfer(self): def record_failed_transfer(self):
"""Record a failed NIXL transfer operation.""" """Record a failed NIXL transfer operation."""
self.data["num_failed_transfers"].append(1.0) self.data["num_failed_transfers"].append(1)
def record_failed_notification(self): def record_failed_notification(self):
"""Record a failed NIXL notification (send_notif).""" """Record a failed NIXL notification (send_notif)."""
self.data["num_failed_notifications"].append(1.0) self.data["num_failed_notifications"].append(1)
def record_kv_expired_req(self):
"""Record a request that had its KV blocks expire."""
self.data["num_kv_expired_reqs"].append(1)
def clone_and_reset(self) -> "NixlKVConnectorStats": def clone_and_reset(self) -> "NixlKVConnectorStats":
old = copy.copy(self) old = copy.copy(self)
...@@ -2529,7 +2535,13 @@ class NixlKVConnectorStats(KVConnectorStats): ...@@ -2529,7 +2535,13 @@ class NixlKVConnectorStats(KVConnectorStats):
return old return old
def is_empty(self) -> bool: def is_empty(self) -> bool:
return self.num_successful_transfers == 0 # Do not discard metrics update that are entirely failures related.
return (
self.num_successful_transfers == 0
and len(self.data["num_failed_transfers"]) == 0
and len(self.data["num_failed_notifications"]) == 0
and len(self.data["num_kv_expired_reqs"]) == 0
)
def aggregate(self, other: KVConnectorStats) -> KVConnectorStats: def aggregate(self, other: KVConnectorStats) -> KVConnectorStats:
if not other.is_empty(): if not other.is_empty():
...@@ -2541,7 +2553,9 @@ class NixlKVConnectorStats(KVConnectorStats): ...@@ -2541,7 +2553,9 @@ class NixlKVConnectorStats(KVConnectorStats):
def reduce(self) -> dict[str, int | float]: def reduce(self) -> dict[str, int | float]:
# Compute compact representative stats suitable for CLI logging # Compute compact representative stats suitable for CLI logging
if self.is_empty(): if self.num_successful_transfers == 0:
# CLI logging only reports successful transfers stats. If all requests in
# the interval were unsuccessful, Prom will report failures stats instead.
return { return {
"Num successful transfers": 0, "Num successful transfers": 0,
"Avg xfer time (ms)": 0, "Avg xfer time (ms)": 0,
...@@ -2677,6 +2691,16 @@ class NixlPromMetrics(KVConnectorPromMetrics): ...@@ -2677,6 +2691,16 @@ class NixlPromMetrics(KVConnectorPromMetrics):
counter_nixl_num_failed_notifications counter_nixl_num_failed_notifications
) )
counter_nixl_num_kv_expired_reqs = self._counter_cls(
name="vllm:nixl_num_kv_expired_reqs",
documentation="Number of requests that had their KV expire. "
"NOTE: This metric is tracked on the P instance.",
labelnames=labelnames,
)
self.counter_nixl_num_kv_expired_reqs = self.make_per_engine(
counter_nixl_num_kv_expired_reqs
)
def observe(self, transfer_stats_data: dict[str, Any], engine_idx: int = 0): def observe(self, transfer_stats_data: dict[str, Any], engine_idx: int = 0):
for prom_obj, list_item_key in zip( for prom_obj, list_item_key in zip(
[ [
...@@ -2698,8 +2722,9 @@ class NixlPromMetrics(KVConnectorPromMetrics): ...@@ -2698,8 +2722,9 @@ class NixlPromMetrics(KVConnectorPromMetrics):
[ [
self.counter_nixl_num_failed_transfers, self.counter_nixl_num_failed_transfers,
self.counter_nixl_num_failed_notifications, self.counter_nixl_num_failed_notifications,
self.counter_nixl_num_kv_expired_reqs,
], ],
["num_failed_transfers", "num_failed_notifications"], ["num_failed_transfers", "num_failed_notifications", "num_kv_expired_reqs"],
): ):
for list_item in transfer_stats_data[counter_item_key]: for list_item in transfer_stats_data[counter_item_key]:
counter_obj[engine_idx].inc(list_item) counter_obj[engine_idx].inc(list_item)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment