Unverified Commit 5c18b961 authored by mukesh-hai's avatar mukesh-hai Committed by GitHub
Browse files

[Core][Metrics] expose waiting request breakdown via labeled metric (capacity/deferred) (#38435)


Signed-off-by: default avatarMukesh Baphna <mukesh@hippocraticai.com>
Signed-off-by: default avatarMark McLoughlin <markmc@redhat.com>
Co-authored-by: default avatarClaude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: default avatarMark McLoughlin <markmc@redhat.com>
parent f72b2097
......@@ -182,6 +182,7 @@ async def test_metrics_counts(
EXPECTED_METRICS_V1 = [
"vllm:num_requests_running",
"vllm:num_requests_waiting",
"vllm:num_requests_waiting_by_reason",
"vllm:kv_cache_usage_perc",
"vllm:prefix_cache_queries",
"vllm:prefix_cache_hits",
......
......@@ -1039,6 +1039,54 @@ def test_no_spec_tokens_scheduled_for_prefill_chunks():
assert len(output.scheduled_spec_decode_tokens[req.request_id]) == num_spec_tokens
def test_scheduler_stats_waiting_queues():
"""Test that scheduler stats correctly report waiting and skipped_waiting queues."""
# Create scheduler with limited capacity so we can have waiting requests
scheduler = create_scheduler(max_num_batched_tokens=100)
# Create requests: some will be scheduled, some will wait on capacity,
# and some will be blocked by constraints
all_requests = create_requests(num_requests=5, num_tokens=50)
# Add 3 requests - only 2 can be scheduled (2 * 50 = 100 tokens)
# The 3rd will remain in waiting queue (capacity constraint)
for request in all_requests[:3]:
scheduler.add_request(request)
# Manually add 2 more to skipped_waiting to simulate constraint-blocked
for request in all_requests[3:]:
request.status = RequestStatus.WAITING_FOR_REMOTE_KVS
scheduler.skipped_waiting.add_request(request)
# Schedule - this will schedule 2 requests, leaving 1 in waiting
output = scheduler.schedule()
# Verify: 2 scheduled, 1 still waiting on capacity, 2 blocked by constraints
assert len(output.scheduled_new_reqs) == 2
assert len(scheduler.waiting) == 1
assert len(scheduler.skipped_waiting) == 2
# Call update_from_output() to get frontend-facing stat
scheduled_req_ids = list(output.num_scheduled_tokens.keys())
model_runner_output = ModelRunnerOutput(
req_ids=scheduled_req_ids,
req_id_to_index={req_id: i for i, req_id in enumerate(scheduled_req_ids)},
sampled_token_ids=[[1]] * len(scheduled_req_ids),
logprobs=None,
prompt_logprobs_dict={},
pooler_output=[],
)
engine_core_outputs = scheduler.update_from_output(output, model_runner_output)
assert engine_core_outputs and len(engine_core_outputs) > 0
stats = engine_core_outputs[0].scheduler_stats
assert stats is not None
# Verify stats match queue lengths after scheduling
assert stats.num_running_reqs == 2 # 2 were scheduled
assert stats.num_waiting_reqs == 1 # 1 waiting on capacity
assert stats.num_skipped_waiting_reqs == 2 # 2 blocked by constraints
def _assert_right_scheduler_output(
output: SchedulerOutput,
num_requests: int,
......
......@@ -1955,7 +1955,8 @@ class Scheduler(SchedulerInterface):
)
return SchedulerStats(
num_running_reqs=len(self.running),
num_waiting_reqs=len(self.waiting) + len(self.skipped_waiting),
num_waiting_reqs=len(self.waiting),
num_skipped_waiting_reqs=len(self.skipped_waiting),
kv_cache_usage=self.kv_cache_manager.usage,
encoder_cache_usage=self._get_encoder_cache_usage(),
prefix_cache_stats=prefix_cache_stats,
......
......@@ -32,6 +32,10 @@ from vllm.v1.spec_decode.metrics import SpecDecodingLogging, SpecDecodingProm
logger = init_logger(__name__)
# User-facing reason labels for waiting request breakdown
WAITING_REASON_CAPACITY = "capacity"
WAITING_REASON_DEFERRED = "deferred"
PerEngineStatLoggerFactory = Callable[[VllmConfig, int], "StatLoggerBase"]
AggregateStatLoggerFactory = type["AggregateStatLoggerBase"]
StatLoggerFactory = AggregateStatLoggerFactory | PerEngineStatLoggerFactory
......@@ -222,13 +226,21 @@ class LoggingStatLogger(StatLoggerBase):
"Running: %d reqs",
"Waiting: %d reqs",
]
total_waiting = (
self.last_scheduler_stats.num_waiting_reqs
+ self.last_scheduler_stats.num_skipped_waiting_reqs
)
log_args: list[int | float | str] = [
self.last_prompt_throughput,
self.last_generation_throughput,
self.last_scheduler_stats.num_running_reqs,
self.last_scheduler_stats.num_waiting_reqs,
total_waiting,
]
if self.last_scheduler_stats.num_skipped_waiting_reqs > 0:
log_parts.append("Deferred: %d reqs")
log_args.append(self.last_scheduler_stats.num_skipped_waiting_reqs)
if self.num_preemptions > 0:
log_parts.append("Preemptions: %d")
log_args.append(self.num_preemptions)
......@@ -328,6 +340,9 @@ class AggregatedLoggingStatLogger(LoggingStatLogger, AggregateStatLoggerBase):
self.last_scheduler_stats.num_running_reqs += (
last_scheduler_stats.num_running_reqs
)
self.last_scheduler_stats.num_skipped_waiting_reqs += (
last_scheduler_stats.num_skipped_waiting_reqs
)
self.last_scheduler_stats.kv_cache_usage += (
last_scheduler_stats.kv_cache_usage
)
......@@ -453,6 +468,28 @@ class PrometheusStatLogger(AggregateStatLoggerBase):
gauge_scheduler_waiting, per_engine_labelvalues
)
gauge_waiting_by_reason = self._gauge_cls(
name="vllm:num_requests_waiting_by_reason",
documentation=(
"Number of waiting requests by reason. "
"Reason labels: 'capacity' = waiting for scheduling capacity; "
"'deferred' = deferred by transient constraints "
"(LoRA budget, KV transfer, blocked status). "
"Sum of all reasons equals vllm:num_requests_waiting."
),
multiprocess_mode="mostrecent",
labelnames=labelnames + ["reason"],
)
self.gauge_waiting_by_reason: dict[str, dict[int, Gauge]] = {}
for waiting_reason in [WAITING_REASON_CAPACITY, WAITING_REASON_DEFERRED]:
per_engine_labelvalues_with_reason = {
idx: labelvalues + [waiting_reason]
for idx, labelvalues in per_engine_labelvalues.items()
}
self.gauge_waiting_by_reason[waiting_reason] = create_metric_per_engine(
gauge_waiting_by_reason, per_engine_labelvalues_with_reason
)
gauge_engine_sleep_state = self._gauge_cls(
name="vllm:engine_sleep_state",
documentation=(
......@@ -1030,8 +1067,16 @@ class PrometheusStatLogger(AggregateStatLoggerBase):
self.gauge_scheduler_running[engine_idx].set(
scheduler_stats.num_running_reqs
)
self.gauge_scheduler_waiting[engine_idx].set(
total_waiting = (
scheduler_stats.num_waiting_reqs
+ scheduler_stats.num_skipped_waiting_reqs
)
self.gauge_scheduler_waiting[engine_idx].set(total_waiting)
self.gauge_waiting_by_reason[WAITING_REASON_CAPACITY][engine_idx].set(
scheduler_stats.num_waiting_reqs
)
self.gauge_waiting_by_reason[WAITING_REASON_DEFERRED][engine_idx].set(
scheduler_stats.num_skipped_waiting_reqs
)
self.gauge_kv_cache_usage[engine_idx].set(scheduler_stats.kv_cache_usage)
......
......@@ -172,7 +172,9 @@ class SchedulerStats:
"""Stats associated with the scheduler."""
num_running_reqs: int = 0
num_waiting_reqs: int = 0
num_waiting_reqs: int = 0 # length of the "waiting" request queue
num_skipped_waiting_reqs: int = 0 # length of the "skipped waiting" queue
# These are used for internal DP load-balancing.
step_counter: int = 0
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment