Unverified Commit b5c6529e authored by SCDESPERTATE's avatar SCDESPERTATE Committed by GitHub
Browse files

[PD] Improve disaggregation metrics output: update the metrics to keep...

[PD] Improve disaggregation metrics output: update the metrics to keep reflecting real stats (#7317)
parent ca4b86c5
...@@ -334,6 +334,8 @@ class DecodePreallocQueue: ...@@ -334,6 +334,8 @@ class DecodePreallocQueue:
error_message, error_message,
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
) )
if self.scheduler.enable_metrics:
self.scheduler.metrics_collector.increment_bootstrap_failed_reqs()
else: else:
raise ValueError(f"Unexpected poll case: {poll}") raise ValueError(f"Unexpected poll case: {poll}")
...@@ -595,6 +597,8 @@ class DecodeTransferQueue: ...@@ -595,6 +597,8 @@ class DecodeTransferQueue:
# unlock the kv cache or it will have memory leak # unlock the kv cache or it will have memory leak
self.tree_cache.cache_finished_req(decode_req.req) self.tree_cache.cache_finished_req(decode_req.req)
indices_to_remove.add(i) indices_to_remove.add(i)
if self.scheduler.enable_metrics:
self.scheduler.metrics_collector.increment_transfer_failed_reqs()
continue continue
elif poll == KVPoll.Success: elif poll == KVPoll.Success:
......
...@@ -238,6 +238,8 @@ class PrefillBootstrapQueue: ...@@ -238,6 +238,8 @@ class PrefillBootstrapQueue:
self.scheduler.stream_output([req], req.return_logprob) self.scheduler.stream_output([req], req.return_logprob)
indices_to_remove.add(i) indices_to_remove.add(i)
failed_reqs.append(req) failed_reqs.append(req)
if self.scheduler.enable_metrics:
self.scheduler.metrics_collector.increment_bootstrap_failed_reqs()
continue continue
# KV.WaitingForInput - init here # KV.WaitingForInput - init here
...@@ -522,6 +524,8 @@ class SchedulerDisaggregationPrefillMixin: ...@@ -522,6 +524,8 @@ class SchedulerDisaggregationPrefillMixin:
req, error_message, status_code=HTTPStatus.INTERNAL_SERVER_ERROR req, error_message, status_code=HTTPStatus.INTERNAL_SERVER_ERROR
) )
done_reqs.append(req) done_reqs.append(req)
if self.enable_metrics:
self.metrics_collector.increment_transfer_failed_reqs()
else: else:
assert False, f"Unexpected polling state {poll=}" assert False, f"Unexpected polling state {poll=}"
......
...@@ -125,6 +125,14 @@ class SchedulerMetricsMixin: ...@@ -125,6 +125,14 @@ class SchedulerMetricsMixin:
total_queue_latency += req.queue_time_end - req.queue_time_start total_queue_latency += req.queue_time_end - req.queue_time_start
self.stats.avg_request_queue_latency = total_queue_latency / num_new_seq self.stats.avg_request_queue_latency = total_queue_latency / num_new_seq
if self.disaggregation_mode == DisaggregationMode.PREFILL:
self.stats.num_prefill_prealloc_queue_reqs = len(
self.disagg_prefill_bootstrap_queue.queue
)
self.stats.num_prefill_inflight_queue_reqs = len(
self.disagg_prefill_inflight_queue
)
self.metrics_collector.log_stats(self.stats) self.metrics_collector.log_stats(self.stats)
self._emit_kv_metrics() self._emit_kv_metrics()
self._publish_kv_events() self._publish_kv_events()
...@@ -202,6 +210,13 @@ class SchedulerMetricsMixin: ...@@ -202,6 +210,13 @@ class SchedulerMetricsMixin:
self.stats.spec_accept_length = spec_accept_length self.stats.spec_accept_length = spec_accept_length
self.stats.total_retracted_reqs = self.total_retracted_reqs self.stats.total_retracted_reqs = self.total_retracted_reqs
self.metrics_collector.log_stats(self.stats) self.metrics_collector.log_stats(self.stats)
if self.disaggregation_mode == DisaggregationMode.DECODE:
self.stats.num_decode_prealloc_queue_reqs = len(
self.disagg_decode_prealloc_queue.queue
)
self.stats.num_decode_transfer_queue_reqs = len(
self.disagg_decode_transfer_queue.queue
)
self._emit_kv_metrics() self._emit_kv_metrics()
self._publish_kv_events() self._publish_kv_events()
......
...@@ -142,7 +142,7 @@ class SchedulerStats: ...@@ -142,7 +142,7 @@ class SchedulerStats:
spec_accept_length: float = 0.0 spec_accept_length: float = 0.0
avg_request_queue_latency: float = 0.0 avg_request_queue_latency: float = 0.0
num_prefill_prealloc_queue_reqs: int = 0 num_prefill_prealloc_queue_reqs: int = 0
num_prefill_infight_queue_reqs: int = 0 num_prefill_inflight_queue_reqs: int = 0
num_decode_prealloc_queue_reqs: int = 0 num_decode_prealloc_queue_reqs: int = 0
num_decode_transfer_queue_reqs: int = 0 num_decode_transfer_queue_reqs: int = 0
total_retracted_reqs: int = 0 total_retracted_reqs: int = 0
...@@ -235,9 +235,9 @@ class SchedulerMetricsCollector: ...@@ -235,9 +235,9 @@ class SchedulerMetricsCollector:
multiprocess_mode="mostrecent", multiprocess_mode="mostrecent",
) )
self.num_prefill_infight_queue_reqs = Gauge( self.num_prefill_inflight_queue_reqs = Gauge(
name="sglang:num_prefill_infight_queue_reqs", name="sglang:num_prefill_inflight_queue_reqs",
documentation="The number of requests in the prefill infight queue.", documentation="The number of requests in the prefill inflight queue.",
labelnames=labels.keys(), labelnames=labels.keys(),
multiprocess_mode="mostrecent", multiprocess_mode="mostrecent",
) )
...@@ -294,7 +294,7 @@ class SchedulerMetricsCollector: ...@@ -294,7 +294,7 @@ class SchedulerMetricsCollector:
self.num_prefill_prealloc_queue_reqs, stats.num_prefill_prealloc_queue_reqs self.num_prefill_prealloc_queue_reqs, stats.num_prefill_prealloc_queue_reqs
) )
self._log_gauge( self._log_gauge(
self.num_prefill_infight_queue_reqs, stats.num_prefill_infight_queue_reqs self.num_prefill_inflight_queue_reqs, stats.num_prefill_inflight_queue_reqs
) )
self._log_gauge( self._log_gauge(
self.num_decode_prealloc_queue_reqs, stats.num_decode_prealloc_queue_reqs self.num_decode_prealloc_queue_reqs, stats.num_decode_prealloc_queue_reqs
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment