Unverified Commit 732c1ed2 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat: add more metrics in planner (#4710)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent 96f80089
...@@ -58,6 +58,67 @@ class Metrics: ...@@ -58,6 +58,67 @@ class Metrics:
) )
class PlannerPrometheusMetrics:
"""Container for all Planner Prometheus metrics."""
def __init__(self, prefix: str = "planner"):
# Worker counts
self.num_p_workers = Gauge(
f"{prefix}:num_p_workers", "Number of prefill workers"
)
self.num_d_workers = Gauge(
f"{prefix}:num_d_workers", "Number of decode workers"
)
# Observed metrics
self.observed_ttft = Gauge(
f"{prefix}:observed_ttft", "Observed time to first token (ms)"
)
self.observed_itl = Gauge(
f"{prefix}:observed_itl", "Observed inter-token latency (ms)"
)
self.observed_request_rate = Gauge(
f"{prefix}:observed_request_rate", "Observed request rate (req/s)"
)
self.observed_request_duration = Gauge(
f"{prefix}:observed_request_duration", "Observed request duration (s)"
)
self.observed_isl = Gauge(
f"{prefix}:observed_isl", "Observed input sequence length"
)
self.observed_osl = Gauge(
f"{prefix}:observed_osl", "Observed output sequence length"
)
# Correction factors
self.p_correction_factor = Gauge(
f"{prefix}:p_correction_factor", "Prefill correction factor"
)
self.d_correction_factor = Gauge(
f"{prefix}:d_correction_factor", "Decode correction factor"
)
# Predicted metrics
self.predicted_request_rate = Gauge(
f"{prefix}:predicted_request_rate", "Predicted request rate (req/s)"
)
self.predicted_isl = Gauge(
f"{prefix}:predicted_isl", "Predicted input sequence length"
)
self.predicted_osl = Gauge(
f"{prefix}:predicted_osl", "Predicted output sequence length"
)
self.predicted_num_p = Gauge(
f"{prefix}:predicted_num_p", "Predicted number of prefill replicas"
)
self.predicted_num_d = Gauge(
f"{prefix}:predicted_num_d", "Predicted number of decode replicas"
)
# Cumulative GPU usage
self.gpu_hours = Gauge(f"{prefix}:gpu_hours", "Cumulative GPU hours used")
class Planner: class Planner:
def __init__( def __init__(
self, self,
...@@ -153,13 +214,10 @@ class Planner: ...@@ -153,13 +214,10 @@ class Planner:
self.prometheus_port = args.metric_reporting_prometheus_port self.prometheus_port = args.metric_reporting_prometheus_port
# Initialize Prometheus metrics # Initialize Prometheus metrics
# TODO: use proper naming self.prometheus_metrics = PlannerPrometheusMetrics()
self.num_p_workers_gauge = Gauge(
"num_p_workers", "Number of prefill workers" # Track cumulative GPU hours
) self.cumulative_gpu_hours = 0.0
self.num_d_workers_gauge = Gauge(
"num_d_workers", "Number of decode workers"
)
# Start Prometheus HTTP server if port is specified # Start Prometheus HTTP server if port is specified
if self.prometheus_port != 0: if self.prometheus_port != 0:
...@@ -246,8 +304,21 @@ class Planner: ...@@ -246,8 +304,21 @@ class Planner:
# Update Prometheus metrics if server is running # Update Prometheus metrics if server is running
if self.prometheus_port != 0: if self.prometheus_port != 0:
self.num_p_workers_gauge.set(len(self.p_endpoints)) self.prometheus_metrics.num_p_workers.set(len(self.p_endpoints))
self.num_d_workers_gauge.set(len(self.d_endpoints)) self.prometheus_metrics.num_d_workers.set(len(self.d_endpoints))
# Calculate and accumulate GPU hours for this interval
# TODO: track startup and shutdown times to get more accurate GPU hours
interval_gpu_hours = (
(
len(self.p_endpoints) * self.args.prefill_engine_num_gpu
+ len(self.d_endpoints) * self.args.decode_engine_num_gpu
)
* self.args.adjustment_interval
/ 3600
)
self.cumulative_gpu_hours += interval_gpu_hours
self.prometheus_metrics.gpu_hours.set(self.cumulative_gpu_hours)
# Prometheus returns seconds, convert to milliseconds # Prometheus returns seconds, convert to milliseconds
self.last_metrics.ttft = ( self.last_metrics.ttft = (
...@@ -294,6 +365,19 @@ class Planner: ...@@ -294,6 +365,19 @@ class Planner:
f"Observed ttft: {self.last_metrics.ttft:.2f}ms itl: {self.last_metrics.itl:.2f}ms" f"Observed ttft: {self.last_metrics.ttft:.2f}ms itl: {self.last_metrics.itl:.2f}ms"
) )
# Update observed metrics in Prometheus
if self.prometheus_port != 0:
self.prometheus_metrics.observed_ttft.set(self.last_metrics.ttft)
self.prometheus_metrics.observed_itl.set(self.last_metrics.itl)
self.prometheus_metrics.observed_request_rate.set(
self.last_metrics.num_req / self.args.adjustment_interval
)
self.prometheus_metrics.observed_request_duration.set(
self.last_metrics.request_duration
)
self.prometheus_metrics.observed_isl.set(self.last_metrics.isl)
self.prometheus_metrics.observed_osl.set(self.last_metrics.osl)
self.num_req_predictor.add_data_point(self.last_metrics.num_req) self.num_req_predictor.add_data_point(self.last_metrics.num_req)
self.isl_predictor.add_data_point(self.last_metrics.isl) self.isl_predictor.add_data_point(self.last_metrics.isl)
self.osl_predictor.add_data_point(self.last_metrics.osl) self.osl_predictor.add_data_point(self.last_metrics.osl)
...@@ -446,6 +530,15 @@ class Planner: ...@@ -446,6 +530,15 @@ class Planner:
logger.info( logger.info(
f"Correction factors: TTFT: {self.p_correction_factor:.3f}, ITL: {self.d_correction_factor:.3f}" f"Correction factors: TTFT: {self.p_correction_factor:.3f}, ITL: {self.d_correction_factor:.3f}"
) )
# Update correction factor metrics in Prometheus
if self.prometheus_port != 0:
self.prometheus_metrics.p_correction_factor.set(
self.p_correction_factor
)
self.prometheus_metrics.d_correction_factor.set(
self.d_correction_factor
)
except Exception as e: except Exception as e:
logger.error(f"Failed to correct prediction factors: {e}") logger.error(f"Failed to correct prediction factors: {e}")
return return
...@@ -453,10 +546,23 @@ class Planner: ...@@ -453,10 +546,23 @@ class Planner:
next_num_req, next_isl, next_osl = self.predict_load() next_num_req, next_isl, next_osl = self.predict_load()
if next_num_req is not None and next_isl is not None and next_osl is not None: if next_num_req is not None and next_isl is not None and next_osl is not None:
# Update predicted load metrics in Prometheus
if self.prometheus_port != 0:
self.prometheus_metrics.predicted_request_rate.set(
next_num_req / self.args.adjustment_interval
)
self.prometheus_metrics.predicted_isl.set(next_isl)
self.prometheus_metrics.predicted_osl.set(next_osl)
try: try:
next_num_p, next_num_d = self._compute_replica_requirements( next_num_p, next_num_d = self._compute_replica_requirements(
next_num_req, next_isl, next_osl next_num_req, next_isl, next_osl
) )
# Update predicted replica metrics in Prometheus
if self.prometheus_port != 0:
self.prometheus_metrics.predicted_num_p.set(next_num_p)
self.prometheus_metrics.predicted_num_d.set(next_num_d)
except Exception as e: except Exception as e:
logger.error(f"Failed to compute number of replicas: {e}") logger.error(f"Failed to compute number of replicas: {e}")
return return
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment