Unverified Commit dfde02c5 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

fix(planner): restore documented Prometheus metric behaviour [DYN-2855] (#8575)


Signed-off-by: default avatarHongkuan Zhou <hongkuanz@nvidia.com>
Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent 71fafe3f
......@@ -174,6 +174,7 @@ class NativePlannerBase:
# Shared metrics state
self._last_metrics = Metrics()
self._cumulative_gpu_hours: float = 0.0
self._last_gpu_hours_update_ts: Optional[float] = None
# Diagnostics recorder
self._recorder = DiagnosticsRecorder(config=config)
......@@ -495,22 +496,6 @@ class NativePlannerBase:
async def _collect_traffic(self) -> Optional[TrafficObservation]:
"""Pull traffic metrics from Prometheus over the throughput interval."""
num_p, num_d, _ = await self._get_worker_counts_raw()
if self.prometheus_port != 0:
self.prometheus_metrics.num_prefill_replicas.set(num_p)
self.prometheus_metrics.num_decode_replicas.set(num_d)
gpu_hours = (
(
num_p * (self.config.prefill_engine_num_gpu or 0)
+ num_d * (self.config.decode_engine_num_gpu or 0)
)
* self.config.throughput_adjustment_interval
/ 3600
)
self._cumulative_gpu_hours += gpu_hours
self.prometheus_metrics.gpu_hours.set(self._cumulative_gpu_hours)
assert self.model_name is not None
interval_str = f"{self.config.throughput_adjustment_interval}s"
m = self._last_metrics
......@@ -773,7 +758,41 @@ class NativePlannerBase:
# Diagnostics reporting (shared across all adapters)
# ------------------------------------------------------------------
def _report_diagnostics(self, diag: TickDiagnostics) -> None:
def _publish_inventory_and_gpu_hours(self, tick_input: TickInput) -> None:
"""Publish replica counts and cumulative gpu_hours every tick.
Sourced from tick_input.worker_counts (populated every tick via
need_worker_states=True); independent of enable_throughput_scaling
so non-SLA planners also report inventory and cost accounting.
``_cumulative_gpu_hours`` is updated regardless of Prometheus
port so the HTML recorder / live dashboard stay accurate even
when Prometheus export is disabled.
"""
if tick_input.worker_counts is None:
return
num_p = tick_input.worker_counts.ready_num_prefill or 0
num_d = tick_input.worker_counts.ready_num_decode or 0
now = tick_input.now_s
if self._last_gpu_hours_update_ts is not None:
dt_s = max(0.0, now - self._last_gpu_hours_update_ts)
self._cumulative_gpu_hours += (
(
num_p * (self.config.prefill_engine_num_gpu or 0)
+ num_d * (self.config.decode_engine_num_gpu or 0)
)
* dt_s
/ 3600.0
)
self._last_gpu_hours_update_ts = now
if self.prometheus_port == 0:
return
self.prometheus_metrics.num_prefill_replicas.set(num_p)
self.prometheus_metrics.num_decode_replicas.set(num_d)
self.prometheus_metrics.gpu_hours.set(self._cumulative_gpu_hours)
def _report_diagnostics(self, tick: ScheduledTick, diag: TickDiagnostics) -> None:
if self.prometheus_port == 0:
return
pm = self.prometheus_metrics
......@@ -793,8 +812,12 @@ class NativePlannerBase:
pm.engine_prefill_capacity_requests_per_second.set(diag.engine_rps_prefill or 0)
pm.engine_decode_capacity_requests_per_second.set(diag.engine_rps_decode or 0)
pm.load_scaling_decision.state(diag.load_decision_reason or "unset")
pm.throughput_scaling_decision.state(diag.throughput_decision_reason or "unset")
if tick.run_load_scaling:
pm.load_scaling_decision.state(diag.load_decision_reason or "unset")
if tick.run_throughput_scaling:
pm.throughput_scaling_decision.state(
diag.throughput_decision_reason or "unset"
)
# ------------------------------------------------------------------
# Main loop
......@@ -814,9 +837,10 @@ class NativePlannerBase:
self._refresh_worker_info_from_connector()
tick_input = await self._gather_tick_input(next_tick)
self._publish_inventory_and_gpu_hours(tick_input)
effects = self.state_machine.on_tick(next_tick, tick_input)
await self._apply_effects(effects)
self._report_diagnostics(effects.diagnostics)
self._report_diagnostics(next_tick, effects.diagnostics)
self._log_decision_summary(effects)
if self._recorder.enabled:
......
......@@ -143,24 +143,36 @@ class LoadScalingMixin:
return None
easy = self._config.optimization_target != "sla"
p_desired = (
(
# Sub-decisions may set self._diag_load_reason to an informative
# value (e.g. "insufficient_data") before returning None. The
# per-component aggregation below only emits {scale_up,
# scale_down, scale_down_capped_by_throughput, no_change}, which
# would silently overwrite them. Isolate each component's
# contribution so both can be restored in the no-scaling-needed
# branch; otherwise sequential sub-decision calls would clobber
# each other on the shared field.
p_reason: Optional[str] = None
p_desired: Optional[int] = None
if p_stats:
self._diag_load_reason = None
p_desired = (
self._prefill_easy_decision(p_stats, self._num_p_workers)
if easy
else self._prefill_load_decision(p_stats, self._num_p_workers)
)
if p_stats
else None
)
d_desired = (
(
p_reason = self._diag_load_reason
d_reason: Optional[str] = None
d_desired: Optional[int] = None
if d_stats:
self._diag_load_reason = None
d_desired = (
self._decode_easy_decision(d_stats, self._num_d_workers)
if easy
else self._decode_load_decision(d_stats, self._num_d_workers)
)
if d_stats
else None
)
d_reason = self._diag_load_reason
final_p = p_desired if p_desired is not None else self._num_p_workers
final_d = d_desired if d_desired is not None else self._num_d_workers
......@@ -219,6 +231,20 @@ class LoadScalingMixin:
if final_p == self._num_p_workers and final_d == self._num_d_workers:
logger.info("Load-based scaling: no scaling needed")
# Restore per-component sub-decision reasons that the
# aggregation step overwrote with "no_change", so operators
# can tell which side is stalled (e.g. prefill
# insufficient_data while decode is fine).
if p_reason is not None and p_reason != "no_change":
self._diag_load_reason_prefill = p_reason
if d_reason is not None and d_reason != "no_change":
self._diag_load_reason_decode = d_reason
# Aggregate reason: surface the most informative of the two
# so the non-per-component Enum/HTML view also reflects it.
for candidate in (p_reason, d_reason):
if candidate is not None and candidate != "no_change":
self._diag_load_reason = candidate
break
return None
logger.info(
......
......@@ -231,7 +231,7 @@ class DiagnosticsRecorder:
]
fig = make_subplots(
rows=6,
rows=7,
cols=2,
subplot_titles=(
"Replica Counts",
......@@ -246,6 +246,8 @@ class DiagnosticsRecorder:
"Sequence Lengths (Observed vs Predicted)",
"Load Scaling Decisions",
"Throughput Scaling Decisions",
"Cumulative GPU Hours",
"",
),
vertical_spacing=0.055,
horizontal_spacing=0.08,
......@@ -705,6 +707,18 @@ class DiagnosticsRecorder:
2,
)
# -- Row 7: Cumulative GPU hours ---------------------------------
fig.add_trace(
go.Scatter(
x=labels,
y=_vals("gpu_hours"),
name="Cumulative GPU Hours",
mode="lines+markers",
),
row=7,
col=1,
)
# -- Layout -------------------------------------------------------
# Count actual replica transitions, not just ticks where a decision
......@@ -740,7 +754,7 @@ class DiagnosticsRecorder:
)
fig.update_layout(
title=dict(text=summary, font=dict(size=14), y=0.99, yanchor="top"),
height=2000,
height=2300,
showlegend=True,
legend=dict(orientation="h", yanchor="top", y=-0.05),
template="plotly_white",
......
......@@ -493,6 +493,50 @@ class TestDisaggEasy:
assert effects.scale_to is not None
assert effects.scale_to.num_decode == 2
def test_disagg_preserves_insufficient_data_when_both_sides_stuck(self):
# Both prefill and decode sub-decisions bail out with
# "insufficient_data" (missing capability data). The wrap-up used
# to overwrite this with "no_change"; the fix preserves the real
# reason so operators can see why scaling stalled.
caps = WorkerCapabilities(
prefill=EngineCapabilities(num_gpu=1), # no context_length
decode=EngineCapabilities(num_gpu=1), # no max_kv_tokens
)
core = _make_core(mode="disagg", optimization_target="throughput", caps=caps)
p_fpm = _make_fpm(queued_prefill_tokens=0)
d_fpm = _make_fpm(sum_decode_kv_tokens=0, queued_decode_kv_tokens=0)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(
prefill={("w1", 0): p_fpm},
decode={("w1", 0): d_fpm},
),
worker_counts=WorkerCounts(ready_num_prefill=1, ready_num_decode=1),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is None
assert effects.diagnostics.load_decision_reason == "insufficient_data"
def test_disagg_no_change_when_sub_decisions_return_clean_no_change(self):
# Both sub-decisions produce valid "no_change" — aggregate reason
# should still be "no_change" (not a stale null).
core = _make_core(mode="disagg", optimization_target="throughput")
# Single-worker topology with mid-range utilisation -> sub-decisions
# sit between scale-up and scale-down thresholds.
p_fpm = _make_fpm(queued_prefill_tokens=CONTEXT_LENGTH // 2)
d_fpm = _make_fpm(sum_decode_kv_tokens=MAX_KV_TOKENS // 2)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(
prefill={("w1", 0): p_fpm},
decode={("w1", 0): d_fpm},
),
worker_counts=WorkerCounts(ready_num_prefill=1, ready_num_decode=1),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is None
assert effects.diagnostics.load_decision_reason == "no_change"
# ── Agg mode ─────────────────────────────────────────────────────────
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Tests for Prometheus metric publication in NativePlannerBase.
Covers:
- ``_publish_inventory_and_gpu_hours``: replica counts + cumulative
gpu_hours must publish on every tick (not just throughput ticks) and
accumulate using wall-clock deltas.
- ``_report_diagnostics``: scaling-decision Enum gauges must only be
written for the scaling path that actually ran this tick, so
load-only ticks don't wipe the throughput Enum (and vice versa).
"""
import os
from unittest.mock import Mock, patch
import pytest
from dynamo.planner.config.planner_config import PlannerConfig
from dynamo.planner.core.base import NativePlannerBase
from dynamo.planner.core.types import (
ScheduledTick,
TickDiagnostics,
TickInput,
WorkerCounts,
)
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.pre_merge,
pytest.mark.unit,
pytest.mark.planner,
]
def _make_planner(prometheus_enabled: bool = True) -> NativePlannerBase:
"""Build a minimal NativePlannerBase with a mock Prometheus metrics object.
``start_http_server`` is patched so no real HTTP port is bound (tests
would otherwise fail in parallel or on shared hosts). ``prometheus_port``
is toggled post-init as an enabled/disabled gate for the methods under
test; the actual port value is never used for I/O.
"""
with patch(
"dynamo.planner.core.base.PlannerPrometheusMetrics"
) as mock_metrics, patch("dynamo.planner.core.base.start_http_server"), patch(
"dynamo.planner.connectors.kubernetes.KubernetesAPI"
), patch.dict(
os.environ, {"DYN_PARENT_DGD_K8S_NAME": "test-graph"}
):
mock_metrics.return_value = Mock()
config = PlannerConfig.model_construct(
throughput_adjustment_interval=60,
prefill_engine_num_gpu=2,
decode_engine_num_gpu=4,
min_endpoint=1,
max_gpu_budget=-1,
ttft=500.0,
itl=50.0,
backend="vllm",
no_operation=True,
metric_pulling_prometheus_endpoint="http://localhost:9090",
metric_reporting_prometheus_port=0,
load_predictor="constant",
environment="kubernetes",
namespace="test-namespace",
mode="disagg",
enable_load_scaling=True,
enable_throughput_scaling=True,
load_adjustment_interval=5,
max_num_fpm_samples=50,
fpm_sample_bucket_size=16,
load_scaling_down_sensitivity=80,
load_metric_samples=10,
load_min_observations=5,
)
planner = NativePlannerBase(None, config)
# Gate the methods under test without binding a real port.
planner.prometheus_port = 1 if prometheus_enabled else 0
return planner
def _tick_input(now_s: float, num_p: int = 2, num_d: int = 3) -> TickInput:
return TickInput(
now_s=now_s,
worker_counts=WorkerCounts(ready_num_prefill=num_p, ready_num_decode=num_d),
)
# ── Bug 1: inventory & gpu_hours publish every tick ─────────────────
class TestPublishInventoryAndGpuHours:
"""Inventory/gpu_hours gauges must publish on every tick regardless of
whether the throughput-scaling path is enabled."""
def test_replica_gauges_set_on_first_call(self):
planner = _make_planner()
pm = planner.prometheus_metrics
planner._publish_inventory_and_gpu_hours(_tick_input(now_s=1000.0))
pm.num_prefill_replicas.set.assert_called_once_with(2)
pm.num_decode_replicas.set.assert_called_once_with(3)
def test_first_call_contributes_zero_gpu_hours(self):
planner = _make_planner()
pm = planner.prometheus_metrics
planner._publish_inventory_and_gpu_hours(_tick_input(now_s=1000.0))
# First call has no prior timestamp -> delta is zero.
assert planner._cumulative_gpu_hours == 0.0
pm.gpu_hours.set.assert_called_once_with(0.0)
def test_cumulative_gpu_hours_uses_wall_clock_delta(self):
planner = _make_planner()
pm = planner.prometheus_metrics
planner._publish_inventory_and_gpu_hours(_tick_input(now_s=1000.0))
planner._publish_inventory_and_gpu_hours(
_tick_input(now_s=1180.0, num_p=2, num_d=3)
)
# dt = 180s, gpu_count = 2*2 + 3*4 = 16, gpu_hours = 16 * 180 / 3600 = 0.8
assert planner._cumulative_gpu_hours == pytest.approx(0.8)
pm.gpu_hours.set.assert_called_with(pytest.approx(0.8))
def test_accumulates_across_multiple_ticks(self):
planner = _make_planner()
# Two 5-second ticks with 1 prefill + 1 decode worker each on single GPUs.
planner.config.prefill_engine_num_gpu = 1
planner.config.decode_engine_num_gpu = 1
planner._publish_inventory_and_gpu_hours(
_tick_input(now_s=0.0, num_p=1, num_d=1)
)
planner._publish_inventory_and_gpu_hours(
_tick_input(now_s=5.0, num_p=1, num_d=1)
)
planner._publish_inventory_and_gpu_hours(
_tick_input(now_s=10.0, num_p=1, num_d=1)
)
# Two deltas of 5s each, 2 GPUs total -> 2 * (2 * 5 / 3600) = 20/3600
assert planner._cumulative_gpu_hours == pytest.approx(20.0 / 3600.0)
def test_prometheus_disabled_still_accumulates_gpu_hours(self):
# Prometheus export off, but the HTML recorder / live dashboard
# still consumes _cumulative_gpu_hours, so accumulation must
# continue. Only the gauge publishes should be skipped.
planner = _make_planner(prometheus_enabled=False)
pm = planner.prometheus_metrics
planner._publish_inventory_and_gpu_hours(_tick_input(now_s=1000.0))
planner._publish_inventory_and_gpu_hours(
_tick_input(now_s=1180.0, num_p=2, num_d=3)
)
# dt = 180s, gpu_count = 2*2 + 3*4 = 16, gpu_hours = 16 * 180 / 3600 = 0.8
assert planner._cumulative_gpu_hours == pytest.approx(0.8)
pm.num_prefill_replicas.set.assert_not_called()
pm.num_decode_replicas.set.assert_not_called()
pm.gpu_hours.set.assert_not_called()
def test_handles_none_worker_counts(self):
planner = _make_planner()
pm = planner.prometheus_metrics
planner._publish_inventory_and_gpu_hours(
TickInput(now_s=1000.0, worker_counts=None)
)
pm.num_prefill_replicas.set.assert_not_called()
pm.num_decode_replicas.set.assert_not_called()
# ── Bug 3: per-path enum publishes ──────────────────────────────────
def _diag(
load_reason: str | None = None, throughput_reason: str | None = None
) -> TickDiagnostics:
return TickDiagnostics(
load_decision_reason=load_reason,
throughput_decision_reason=throughput_reason,
)
def _tick(run_load: bool, run_throughput: bool) -> ScheduledTick:
return ScheduledTick(
at_s=0.0,
run_load_scaling=run_load,
run_throughput_scaling=run_throughput,
need_worker_states=True,
need_worker_fpm=run_load,
need_traffic_metrics=run_throughput,
)
class TestReportDiagnosticsEnumGating:
"""Scaling-decision Enum gauges must only be written for the path that
actually ran this tick, so load-only ticks don't clobber the
throughput Enum (and vice versa)."""
def test_load_only_tick_does_not_touch_throughput_enum(self):
planner = _make_planner()
pm = planner.prometheus_metrics
planner._report_diagnostics(
_tick(run_load=True, run_throughput=False),
_diag(load_reason="scale_up"),
)
pm.load_scaling_decision.state.assert_called_once_with("scale_up")
pm.throughput_scaling_decision.state.assert_not_called()
def test_throughput_only_tick_does_not_touch_load_enum(self):
planner = _make_planner()
pm = planner.prometheus_metrics
planner._report_diagnostics(
_tick(run_load=False, run_throughput=True),
_diag(throughput_reason="scale"),
)
pm.throughput_scaling_decision.state.assert_called_once_with("scale")
pm.load_scaling_decision.state.assert_not_called()
def test_combined_tick_publishes_both(self):
planner = _make_planner()
pm = planner.prometheus_metrics
planner._report_diagnostics(
_tick(run_load=True, run_throughput=True),
_diag(load_reason="no_change", throughput_reason="set_lower_bound"),
)
pm.load_scaling_decision.state.assert_called_once_with("no_change")
pm.throughput_scaling_decision.state.assert_called_once_with("set_lower_bound")
def test_run_tick_with_no_reason_still_writes_unset(self):
# Defensive: if a scaling path ran but didn't populate a reason,
# explicitly write "unset" so stale state from a prior tick
# doesn't linger.
planner = _make_planner()
pm = planner.prometheus_metrics
planner._report_diagnostics(_tick(run_load=True, run_throughput=False), _diag())
pm.load_scaling_decision.state.assert_called_once_with("unset")
pm.throughput_scaling_decision.state.assert_not_called()
def test_skipped_when_prometheus_disabled(self):
planner = _make_planner(prometheus_enabled=False)
pm = planner.prometheus_metrics
planner._report_diagnostics(
_tick(run_load=True, run_throughput=True),
_diag(load_reason="scale_up", throughput_reason="scale"),
)
pm.load_scaling_decision.state.assert_not_called()
pm.throughput_scaling_decision.state.assert_not_called()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment