Unverified Commit f8920708 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat(planner): add diagnostics metrics, rename to dynamo_planner_*, and add HTML report (#8078)

parent f6976e7f
......@@ -126,8 +126,30 @@ class PlannerConfig(BaseModel):
load_metric_samples: int = SLAPlannerDefaults.load_metric_samples
load_min_observations: int = SLAPlannerDefaults.load_min_observations
# Diagnostics report settings
report_interval_hours: Optional[float] = Field(
default=None,
description=(
"Generate an HTML diagnostics report every N hours (simulated time). "
"Set to None to disable periodic report generation."
),
)
report_output_dir: str = Field(
default="./planner_reports",
description="Directory for HTML diagnostics reports.",
)
@model_validator(mode="after")
def _validate_config(self) -> "PlannerConfig":
if self.report_interval_hours is not None:
if (
not math.isfinite(self.report_interval_hours)
or self.report_interval_hours <= 0
):
raise ValueError(
"report_interval_hours must be a positive finite number or None"
)
sqrt = math.isqrt(self.fpm_sample_bucket_size)
if sqrt * sqrt != self.fpm_sample_bucket_size:
raise ValueError(
......
......@@ -44,7 +44,7 @@ class PrefillPlanner(NativePlannerBase):
return
desired = effects.scale_to.num_prefill
if self.prometheus_port != 0:
self.prometheus_metrics.predicted_num_p.set(desired)
self.prometheus_metrics.predicted_num_prefill_replicas.set(desired)
await self._apply_scaling_targets(
[
TargetReplica(
......@@ -82,7 +82,7 @@ class DecodePlanner(NativePlannerBase):
return
desired = effects.scale_to.num_decode
if self.prometheus_port != 0:
self.prometheus_metrics.predicted_num_d.set(desired)
self.prometheus_metrics.predicted_num_decode_replicas.set(desired)
await self._apply_scaling_targets(
[
TargetReplica(
......@@ -120,7 +120,7 @@ class AggPlanner(NativePlannerBase):
return
desired = effects.scale_to.num_decode
if self.prometheus_port != 0:
self.prometheus_metrics.predicted_num_d.set(desired)
self.prometheus_metrics.predicted_num_decode_replicas.set(desired)
await self._apply_scaling_targets(
[
TargetReplica(
......@@ -168,9 +168,13 @@ class DisaggPlanner(NativePlannerBase):
decision = effects.scale_to
if decision.num_prefill is not None and self.prometheus_port != 0:
self.prometheus_metrics.predicted_num_p.set(decision.num_prefill)
self.prometheus_metrics.predicted_num_prefill_replicas.set(
decision.num_prefill
)
if decision.num_decode is not None and self.prometheus_port != 0:
self.prometheus_metrics.predicted_num_d.set(decision.num_decode)
self.prometheus_metrics.predicted_num_decode_replicas.set(
decision.num_decode
)
targets = []
if decision.num_prefill is not None:
......
......@@ -35,11 +35,13 @@ from dynamo.planner.core.types import (
FpmObservations,
PlannerEffects,
ScheduledTick,
TickDiagnostics,
TickInput,
TrafficObservation,
WorkerCapabilities,
WorkerCounts,
)
from dynamo.planner.monitoring.diagnostics_recorder import DiagnosticsRecorder
from dynamo.planner.monitoring.planner_metrics import PlannerPrometheusMetrics
from dynamo.planner.monitoring.traffic_metrics import Metrics, PrometheusAPIClient
from dynamo.planner.monitoring.worker_info import WorkerInfo, resolve_worker_info
......@@ -171,6 +173,9 @@ class NativePlannerBase:
self._last_metrics = Metrics()
self._cumulative_gpu_hours: float = 0.0
# Diagnostics recorder
self._recorder = DiagnosticsRecorder(config=config)
# State machine (created after WorkerInfo is resolved)
self._state_machine: Optional[PlannerStateMachine] = None
......@@ -393,8 +398,8 @@ class NativePlannerBase:
num_p, num_d, _ = await self._get_worker_counts_raw()
if self.prometheus_port != 0:
self.prometheus_metrics.num_p_workers.set(num_p)
self.prometheus_metrics.num_d_workers.set(num_d)
self.prometheus_metrics.num_prefill_replicas.set(num_p)
self.prometheus_metrics.num_decode_replicas.set(num_d)
gpu_hours = (
(
num_p * (self.config.prefill_engine_num_gpu or 0)
......@@ -439,14 +444,16 @@ class NativePlannerBase:
)
if self.prometheus_port != 0:
self.prometheus_metrics.observed_ttft.set(m.ttft)
self.prometheus_metrics.observed_itl.set(m.itl)
self.prometheus_metrics.observed_request_rate.set(
self.prometheus_metrics.observed_ttft_ms.set(m.ttft)
self.prometheus_metrics.observed_itl_ms.set(m.itl)
self.prometheus_metrics.observed_requests_per_second.set(
m.num_req / self.config.throughput_adjustment_interval
)
self.prometheus_metrics.observed_request_duration.set(m.request_duration)
self.prometheus_metrics.observed_isl.set(m.isl)
self.prometheus_metrics.observed_osl.set(m.osl)
self.prometheus_metrics.observed_request_duration_seconds.set(
m.request_duration
)
self.prometheus_metrics.observed_input_sequence_tokens.set(m.isl)
self.prometheus_metrics.observed_output_sequence_tokens.set(m.osl)
if not m.is_valid():
logger.info("Metrics contain None or NaN values, skipping")
......@@ -477,8 +484,38 @@ class NativePlannerBase:
_log_fpm(wid, dp, fpm, "decode")
decode_stats = stats
if self.prometheus_port != 0:
self._emit_per_engine_fpm(prefill_stats, decode_stats)
return FpmObservations(prefill=prefill_stats, decode=decode_stats)
def _emit_per_engine_fpm(
self,
prefill_stats: Optional[dict] = None,
decode_stats: Optional[dict] = None,
) -> None:
pm = self.prometheus_metrics
pm.engine_queued_prefill_tokens.clear()
pm.engine_queued_decode_kv_tokens.clear()
pm.engine_inflight_decode_kv_tokens.clear()
if prefill_stats:
for (wid, dp), fpm in prefill_stats.items():
labels = dict(worker_id=wid, dp_rank=str(dp))
pm.engine_queued_prefill_tokens.labels(**labels).set(
fpm.queued_requests.sum_prefill_tokens
)
if decode_stats:
for (wid, dp), fpm in decode_stats.items():
labels = dict(worker_id=wid, dp_rank=str(dp))
pm.engine_queued_decode_kv_tokens.labels(**labels).set(
fpm.queued_requests.sum_decode_kv_tokens
)
pm.engine_inflight_decode_kv_tokens.labels(**labels).set(
fpm.scheduled_requests.sum_decode_kv_tokens
)
async def _collect_worker_counts(self) -> WorkerCounts:
num_p, num_d, is_stable = await self._get_worker_counts_raw()
return WorkerCounts(
......@@ -532,6 +569,33 @@ class NativePlannerBase:
return
await self.connector.set_component_replicas(targets, blocking=blocking)
# ------------------------------------------------------------------
# Diagnostics reporting (shared across all adapters)
# ------------------------------------------------------------------
def _report_diagnostics(self, diag: TickDiagnostics) -> None:
if self.prometheus_port == 0:
return
pm = self.prometheus_metrics
interval = self.config.throughput_adjustment_interval
pm.estimated_ttft_ms.set(diag.estimated_ttft_ms or 0)
pm.estimated_itl_ms.set(diag.estimated_itl_ms or 0)
pm.predicted_requests_per_second.set(
diag.predicted_num_req / interval
if diag.predicted_num_req is not None and interval > 0
else 0
)
pm.predicted_input_sequence_tokens.set(diag.predicted_isl or 0)
pm.predicted_output_sequence_tokens.set(diag.predicted_osl or 0)
pm.engine_prefill_capacity_requests_per_second.set(diag.engine_rps_prefill or 0)
pm.engine_decode_capacity_requests_per_second.set(diag.engine_rps_decode or 0)
pm.load_scaling_decision.state(diag.load_decision_reason or "unset")
pm.throughput_scaling_decision.state(diag.throughput_decision_reason or "unset")
# ------------------------------------------------------------------
# Main loop
# ------------------------------------------------------------------
......@@ -549,6 +613,21 @@ class NativePlannerBase:
tick_input = await self._gather_tick_input(next_tick)
effects = self.state_machine.on_tick(next_tick, tick_input)
await self._apply_effects(effects)
self._report_diagnostics(effects.diagnostics)
if self._recorder.enabled:
try:
self._recorder.record(
tick_input,
effects,
self._last_metrics,
self._cumulative_gpu_hours,
)
if self._recorder.should_generate_report(tick_input.now_s):
self._recorder.generate_report()
except Exception as e:
logger.error(f"Diagnostics report failed: {e}")
assert effects.next_tick is not None
next_tick = effects.next_tick
......
......@@ -24,8 +24,14 @@ logger = logging.getLogger(__name__)
class LoadScalingMixin:
"""FPM-driven load-based scaling decisions."""
# Scratch fields owned by PlannerStateMachine, declared here for mypy
_diag_estimated_ttft_ms: Optional[float]
_diag_estimated_itl_ms: Optional[float]
_diag_load_reason: Optional[str]
def _advance_load(self, obs: FpmObservations) -> Optional[ScalingDecision]:
if not self._config.enable_load_scaling:
self._diag_load_reason = "disabled"
return None
mode = self._config.mode
if mode == "agg":
......@@ -39,6 +45,7 @@ class LoadScalingMixin:
) -> Optional[ScalingDecision]:
if self._scaling_in_progress(component):
logger.info(f"Scaling in progress for {component}, observing only")
self._diag_load_reason = "scaling_in_progress"
return None
fpm_stats = obs.prefill if component == "prefill" else obs.decode
......@@ -47,8 +54,10 @@ class LoadScalingMixin:
)
if not fpm_stats:
self._diag_load_reason = "no_fpm_data"
return None
if not self._reconcile_fpm_worker_count(fpm_stats, num_workers, component):
self._diag_load_reason = "worker_count_mismatch"
return None
desired = (
......@@ -59,6 +68,7 @@ class LoadScalingMixin:
if desired is None:
return None
original_desired = desired
if self._config.enable_throughput_scaling:
bound = (
self._throughput_lower_bound_p
......@@ -68,6 +78,17 @@ class LoadScalingMixin:
desired = max(desired, bound)
desired = self._apply_single_budget(desired, component)
if desired < num_workers:
if desired > original_desired:
self._diag_load_reason = "scale_down_capped_by_throughput"
else:
self._diag_load_reason = "scale_down"
elif desired > num_workers:
self._diag_load_reason = "scale_up"
else:
self._diag_load_reason = "no_change"
return (
ScalingDecision(num_prefill=desired)
if component == "prefill"
......@@ -79,14 +100,17 @@ class LoadScalingMixin:
if not p_stats and not d_stats:
logger.warning("No FPM data for either prefill or decode, skipping")
self._diag_load_reason = "no_fpm_data"
return None
if p_stats and not self._reconcile_fpm_worker_count(
p_stats, self._num_p_workers, "prefill"
):
self._diag_load_reason = "worker_count_mismatch"
return None
if d_stats and not self._reconcile_fpm_worker_count(
d_stats, self._num_d_workers, "decode"
):
self._diag_load_reason = "worker_count_mismatch"
return None
p_desired = (
......@@ -105,8 +129,10 @@ class LoadScalingMixin:
if final_p == self._num_p_workers and final_d == self._num_d_workers:
logger.info("Load-based scaling: no scaling needed")
self._diag_load_reason = "no_change"
return None
original_p, original_d = final_p, final_d
if self._config.enable_throughput_scaling:
final_p = max(final_p, self._throughput_lower_bound_p)
final_d = max(final_d, self._throughput_lower_bound_d)
......@@ -115,6 +141,17 @@ class LoadScalingMixin:
final_d = max(final_d, self._config.min_endpoint)
final_p, final_d = self._apply_global_budget(final_p, final_d)
if (final_p > original_p or final_d > original_d) and (
original_p < self._num_p_workers or original_d < self._num_d_workers
):
self._diag_load_reason = "scale_down_capped_by_throughput"
elif final_p > self._num_p_workers or final_d > self._num_d_workers:
self._diag_load_reason = "scale_up"
elif final_p < self._num_p_workers or final_d < self._num_d_workers:
self._diag_load_reason = "scale_down"
else:
self._diag_load_reason = "no_change"
logger.info(
f"Load-based disagg scaling: prefill {self._num_p_workers}->{final_p}, "
f"decode {self._num_d_workers}->{final_d}"
......@@ -124,6 +161,7 @@ class LoadScalingMixin:
def _advance_load_agg(self, obs: FpmObservations) -> Optional[ScalingDecision]:
fpm_stats = obs.decode
if not fpm_stats:
self._diag_load_reason = "no_fpm_data"
return None
num_workers = self._num_d_workers
......@@ -131,20 +169,24 @@ class LoadScalingMixin:
logger.info(
f"Scaling in progress ({num_workers} -> {self._expected_num_d}), observing only"
)
self._diag_load_reason = "scaling_in_progress"
return None
if not self._reconcile_fpm_worker_count(fpm_stats, num_workers, "agg"):
self._diag_load_reason = "worker_count_mismatch"
return None
if not self._agg_regression.has_sufficient_data():
logger.info(
f"Agg regression: insufficient data "
f"({self._agg_regression.num_observations}/{self._agg_regression.min_observations})"
)
self._diag_load_reason = "insufficient_data"
return None
d_caps = self._capabilities.decode
max_tokens = d_caps.max_num_batched_tokens if d_caps else None
if not max_tokens or max_tokens <= 0:
logger.warning("max_num_batched_tokens not available, skipping agg scaling")
self._diag_load_reason = "insufficient_data"
return None
p_desired = self._agg_prefill_scaling(fpm_stats, num_workers, max_tokens)
......@@ -167,13 +209,25 @@ class LoadScalingMixin:
desired = max(p_desired, d_desired)
else:
logger.info("Agg scaling: no scaling needed")
self._diag_load_reason = "no_change"
return None
original_desired = desired
desired = max(desired, self._config.min_endpoint)
if self._config.enable_throughput_scaling:
desired = max(desired, self._throughput_lower_bound_d)
desired = self._apply_single_budget(desired, "decode")
if desired < num_workers:
if desired > original_desired:
self._diag_load_reason = "scale_down_capped_by_throughput"
else:
self._diag_load_reason = "scale_down"
elif desired > num_workers:
self._diag_load_reason = "scale_up"
else:
self._diag_load_reason = "no_change"
logger.info(f"Agg load-based scaling: {num_workers} -> {desired}")
return ScalingDecision(num_decode=desired)
......@@ -189,8 +243,10 @@ class LoadScalingMixin:
f"TTFT regression: insufficient data "
f"({self._prefill_regression.num_observations}/{self._prefill_regression.min_observations})"
)
self._diag_load_reason = "insufficient_data"
return None
if num_workers == 0:
self._diag_load_reason = "insufficient_data"
return None
p_caps = self._capabilities.prefill
......@@ -199,6 +255,7 @@ class LoadScalingMixin:
logger.warning(
"max_num_batched_tokens not available, skipping prefill load scaling"
)
self._diag_load_reason = "insufficient_data"
return None
estimates: list[float] = []
......@@ -215,6 +272,10 @@ class LoadScalingMixin:
f"(queued={fpm.queued_requests.sum_prefill_tokens}, "
f"avg_isl={self._prefill_regression.avg_isl:.1f})"
)
if estimates:
self._diag_estimated_ttft_ms = max(estimates)
return self._scale_decision(
estimates, self._config.ttft, num_workers, "prefill TTFT"
)
......@@ -227,8 +288,10 @@ class LoadScalingMixin:
f"ITL regression: insufficient data "
f"({self._decode_regression.num_observations}/{self._decode_regression.min_observations})"
)
self._diag_load_reason = "insufficient_data"
return None
if num_workers == 0:
self._diag_load_reason = "insufficient_data"
return None
estimates: list[float] = []
......@@ -245,6 +308,10 @@ class LoadScalingMixin:
f"(sched_kv={fpm.scheduled_requests.sum_decode_kv_tokens}, "
f"queued_kv={fpm.queued_requests.sum_decode_kv_tokens})"
)
if estimates:
self._diag_estimated_itl_ms = max(estimates)
return self._scale_decision(
estimates, self._config.itl, num_workers, "decode ITL"
)
......@@ -264,6 +331,10 @@ class LoadScalingMixin:
)
if est is not None:
estimates.append(est * 1000)
if estimates:
self._diag_estimated_ttft_ms = max(estimates)
return self._scale_decision(
estimates, self._config.ttft, num_workers, "agg TTFT"
)
......@@ -281,12 +352,17 @@ class LoadScalingMixin:
)
if est is not None:
estimates.append(est * 1000)
if estimates:
self._diag_estimated_itl_ms = max(estimates)
return self._scale_decision(estimates, self._config.itl, num_workers, "agg ITL")
def _scale_decision(
self, estimates: list[float], sla: float, num_workers: int, label: str
) -> Optional[int]:
if not estimates:
self._diag_load_reason = "insufficient_data"
return None
sensitivity = self._config.load_scaling_down_sensitivity / 100.0
......@@ -310,4 +386,5 @@ class LoadScalingMixin:
)
return desired
self._diag_load_reason = "no_change"
return None
......@@ -35,6 +35,7 @@ from dynamo.planner.core.types import (
FpmObservations,
PlannerEffects,
ScheduledTick,
TickDiagnostics,
TickInput,
TrafficObservation,
WorkerCapabilities,
......@@ -102,6 +103,17 @@ class PlannerStateMachine(LoadScalingMixin, ThroughputScalingMixin):
self._next_load_s: float = float("inf")
self._next_throughput_s: float = float("inf")
# Diagnostics scratch fields populated by mixins, read by on_tick
self._diag_estimated_ttft_ms: Optional[float] = None
self._diag_estimated_itl_ms: Optional[float] = None
self._diag_predicted_num_req: Optional[float] = None
self._diag_predicted_isl: Optional[float] = None
self._diag_predicted_osl: Optional[float] = None
self._diag_engine_rps_prefill: Optional[float] = None
self._diag_engine_rps_decode: Optional[float] = None
self._diag_load_reason: Optional[str] = None
self._diag_throughput_reason: Optional[str] = None
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
......@@ -144,6 +156,7 @@ class PlannerStateMachine(LoadScalingMixin, ThroughputScalingMixin):
def on_tick(self, tick: ScheduledTick, tick_input: TickInput) -> PlannerEffects:
effects = PlannerEffects()
self._reset_diag()
if tick_input.worker_counts is not None:
self._update_inventory(tick_input.worker_counts)
......@@ -167,9 +180,34 @@ class PlannerStateMachine(LoadScalingMixin, ThroughputScalingMixin):
tick_input.now_s + self._config.throughput_adjustment_interval
)
effects.diagnostics = self._build_diagnostics()
effects.next_tick = self._next_scheduled_tick()
return effects
def _reset_diag(self) -> None:
self._diag_estimated_ttft_ms = None
self._diag_estimated_itl_ms = None
self._diag_predicted_num_req = None
self._diag_predicted_isl = None
self._diag_predicted_osl = None
self._diag_engine_rps_prefill = None
self._diag_engine_rps_decode = None
self._diag_load_reason = None
self._diag_throughput_reason = None
def _build_diagnostics(self) -> TickDiagnostics:
return TickDiagnostics(
estimated_ttft_ms=self._diag_estimated_ttft_ms,
estimated_itl_ms=self._diag_estimated_itl_ms,
predicted_num_req=self._diag_predicted_num_req,
predicted_isl=self._diag_predicted_isl,
predicted_osl=self._diag_predicted_osl,
engine_rps_prefill=self._diag_engine_rps_prefill,
engine_rps_decode=self._diag_engine_rps_decode,
load_decision_reason=self._diag_load_reason,
throughput_decision_reason=self._diag_throughput_reason,
)
# ------------------------------------------------------------------
# Tick scheduling
# ------------------------------------------------------------------
......
......@@ -22,10 +22,19 @@ logger = logging.getLogger(__name__)
class ThroughputScalingMixin:
"""Traffic-driven throughput-based scaling decisions."""
# Scratch fields owned by PlannerStateMachine, declared here for mypy
_diag_predicted_num_req: Optional[float]
_diag_predicted_isl: Optional[float]
_diag_predicted_osl: Optional[float]
_diag_engine_rps_prefill: Optional[float]
_diag_engine_rps_decode: Optional[float]
_diag_throughput_reason: Optional[str]
def _advance_throughput(
self, traffic: TrafficObservation
) -> Optional[ScalingDecision]:
if not self._config.enable_throughput_scaling:
self._diag_throughput_reason = "disabled"
return None
next_num_req, next_isl, next_osl = self._predict_load()
......@@ -34,6 +43,7 @@ class ThroughputScalingMixin:
if traffic.duration_s <= 0:
logger.warning("Traffic observation has non-positive duration, skipping")
self._diag_throughput_reason = "no_traffic_data"
return None
demand_rps = next_num_req / traffic.duration_s
mode = self._config.mode
......@@ -52,9 +62,13 @@ class ThroughputScalingMixin:
logger.info(
f"Predicted load: num_req={nr:.2f}, isl={isl:.2f}, osl={osl:.2f}"
)
self._diag_predicted_num_req = nr
self._diag_predicted_isl = isl
self._diag_predicted_osl = osl
return nr, isl, osl
except Exception as e:
logger.error(f"Failed to predict load: {e}")
self._diag_throughput_reason = "predict_failed"
return None, None, None
def _throughput_single(
......@@ -74,9 +88,11 @@ class ThroughputScalingMixin:
else:
self._throughput_lower_bound_d = desired
logger.info(f"Throughput lower bound set to {desired} for {component}")
self._diag_throughput_reason = "set_lower_bound"
return None
desired = self._apply_single_budget(desired, component)
self._diag_throughput_reason = "scale"
return (
ScalingDecision(num_prefill=desired)
if component == "prefill"
......@@ -95,9 +111,11 @@ class ThroughputScalingMixin:
self._throughput_lower_bound_p = num_p
self._throughput_lower_bound_d = num_d
logger.info(f"Throughput lower bounds set: prefill={num_p}, decode={num_d}")
self._diag_throughput_reason = "set_lower_bound"
return None
num_p, num_d = self._apply_global_budget(num_p, num_d)
self._diag_throughput_reason = "scale"
return ScalingDecision(num_prefill=num_p, num_decode=num_d)
def _throughput_agg(
......@@ -109,6 +127,7 @@ class ThroughputScalingMixin:
logger.warning(
"max_num_batched_tokens not available, skipping agg throughput"
)
self._diag_throughput_reason = "model_not_ready"
return None
(
......@@ -124,12 +143,16 @@ class ThroughputScalingMixin:
)
if engine_rps <= 0:
logger.warning("Agg perf model not ready, skipping throughput scaling")
self._diag_throughput_reason = "model_not_ready"
return None
if actual_ttft > self._config.ttft or actual_itl > self._config.itl:
logger.warning(
f"Agg SLA not fully met: TTFT={actual_ttft:.1f}ms, ITL={actual_itl:.1f}ms"
)
self._diag_engine_rps_prefill = engine_rps
self._diag_engine_rps_decode = engine_rps
desired = max(math.ceil(demand_rps / engine_rps), self._config.min_endpoint)
logger.info(
f"Agg: {demand_rps:.2f} rps / {engine_rps:.2f} engine_rps = {desired} replicas"
......@@ -138,9 +161,11 @@ class ThroughputScalingMixin:
if self._config.enable_load_scaling:
self._throughput_lower_bound_d = desired
logger.info(f"Agg throughput lower bound set to {desired}")
self._diag_throughput_reason = "set_lower_bound"
return None
desired = self._apply_single_budget(desired, "decode")
self._diag_throughput_reason = "scale"
return ScalingDecision(num_decode=desired)
def _compute_prefill_replicas(
......@@ -151,11 +176,15 @@ class ThroughputScalingMixin:
)
if engine_rps <= 0:
logger.warning("Prefill perf model not ready, skipping throughput scaling")
self._diag_throughput_reason = "model_not_ready"
return None
if ttft_ms > self._config.ttft:
logger.warning(
f"Prefill TTFT SLA not met: {ttft_ms:.1f}ms > {self._config.ttft:.1f}ms"
)
self._diag_engine_rps_prefill = engine_rps
result = max(math.ceil(demand_rps / engine_rps), self._config.min_endpoint)
logger.info(
f"Prefill: {demand_rps:.2f} rps / {engine_rps:.2f} = {result}, est_ttft={ttft_ms:.1f}ms"
......@@ -172,11 +201,15 @@ class ThroughputScalingMixin:
)
if engine_rps <= 0:
logger.warning("Decode perf model not ready, skipping throughput scaling")
self._diag_throughput_reason = "model_not_ready"
return None
if itl_ms > self._config.itl:
logger.warning(
f"Decode ITL SLA not met: {itl_ms:.1f}ms > {self._config.itl:.1f}ms"
)
self._diag_engine_rps_decode = engine_rps
result = max(math.ceil(demand_rps / engine_rps), self._config.min_endpoint)
logger.info(
f"Decode: {demand_rps:.2f} rps / {engine_rps:.2f} = {result}, est_itl={itl_ms:.1f}ms"
......
......@@ -11,7 +11,7 @@ based on the previous tick's ``ScheduledTick`` requirements.
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
......@@ -92,12 +92,38 @@ class ScalingDecision:
num_decode: Optional[int] = None
@dataclass
class TickDiagnostics:
"""Intermediate decision data populated by the state machine for
observability. The adapter layer reads these to set Prometheus
metrics and feed the diagnostics recorder.
"""
# Load-scaling: max estimated latency across engines (ms)
estimated_ttft_ms: Optional[float] = None
estimated_itl_ms: Optional[float] = None
# Throughput-scaling: predicted next-interval traffic
predicted_num_req: Optional[float] = None
predicted_isl: Optional[float] = None
predicted_osl: Optional[float] = None
# Throughput-scaling: single-engine capacity under SLA (req/s)
engine_rps_prefill: Optional[float] = None
engine_rps_decode: Optional[float] = None
# Scaling decision reasons (set by the mixin that ran)
load_decision_reason: Optional[str] = None
throughput_decision_reason: Optional[str] = None
@dataclass
class PlannerEffects:
"""What the core returns after processing a tick."""
scale_to: Optional[ScalingDecision] = None
next_tick: Optional[ScheduledTick] = None
diagnostics: TickDiagnostics = field(default_factory=TickDiagnostics)
@dataclass
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from prometheus_client import Gauge
from prometheus_client import Enum, Gauge
PREFIX = "dynamo_planner"
LOAD_DECISION_STATES = [
"unset",
"disabled",
"no_fpm_data",
"scaling_in_progress",
"worker_count_mismatch",
"insufficient_data",
"no_change",
"scale_up",
"scale_down",
"scale_down_capped_by_throughput",
]
THROUGHPUT_DECISION_STATES = [
"unset",
"disabled",
"no_traffic_data",
"predict_failed",
"model_not_ready",
"set_lower_bound",
"scale",
]
class PlannerPrometheusMetrics:
"""Container for all Planner Prometheus metrics."""
"""Container for all Planner Prometheus metrics.
def __init__(self, prefix: str = "planner"):
# Worker counts
self.num_p_workers = Gauge(
f"{prefix}:num_p_workers", "Number of prefill workers"
All metric names follow the ``dynamo_planner_*`` convention, using
underscores (not colons) and Prometheus-standard unit suffixes.
"""
def __init__(self) -> None:
# -- Worker counts ------------------------------------------------
self.num_prefill_replicas = Gauge(
f"{PREFIX}_num_prefill_replicas",
"Current number of prefill replicas",
)
self.num_d_workers = Gauge(
f"{prefix}:num_d_workers", "Number of decode workers"
self.num_decode_replicas = Gauge(
f"{PREFIX}_num_decode_replicas",
"Current number of decode replicas",
)
# Observed metrics
self.observed_ttft = Gauge(
f"{prefix}:observed_ttft", "Observed time to first token (ms)"
# -- Observed metrics ---------------------------------------------
self.observed_ttft_ms = Gauge(
f"{PREFIX}_observed_ttft_ms",
"Observed time to first token (ms)",
)
self.observed_itl_ms = Gauge(
f"{PREFIX}_observed_itl_ms",
"Observed inter-token latency (ms)",
)
self.observed_itl = Gauge(
f"{prefix}:observed_itl", "Observed inter-token latency (ms)"
self.observed_requests_per_second = Gauge(
f"{PREFIX}_observed_requests_per_second",
"Observed request rate (req/s)",
)
self.observed_request_rate = Gauge(
f"{prefix}:observed_request_rate", "Observed request rate (req/s)"
self.observed_request_duration_seconds = Gauge(
f"{PREFIX}_observed_request_duration_seconds",
"Observed average request duration (seconds)",
)
self.observed_request_duration = Gauge(
f"{prefix}:observed_request_duration", "Observed request duration (s)"
self.observed_input_sequence_tokens = Gauge(
f"{PREFIX}_observed_input_sequence_tokens",
"Observed average input sequence length (tokens)",
)
self.observed_isl = Gauge(
f"{prefix}:observed_isl", "Observed input sequence length"
self.observed_output_sequence_tokens = Gauge(
f"{PREFIX}_observed_output_sequence_tokens",
"Observed average output sequence length (tokens)",
)
self.observed_osl = Gauge(
f"{prefix}:observed_osl", "Observed output sequence length"
# -- Predicted metrics (throughput scaling) -----------------------
self.predicted_requests_per_second = Gauge(
f"{PREFIX}_predicted_requests_per_second",
"Predicted request rate for next interval (req/s)",
)
self.predicted_input_sequence_tokens = Gauge(
f"{PREFIX}_predicted_input_sequence_tokens",
"Predicted input sequence length for next interval (tokens)",
)
self.predicted_output_sequence_tokens = Gauge(
f"{PREFIX}_predicted_output_sequence_tokens",
"Predicted output sequence length for next interval (tokens)",
)
# Predicted metrics
self.predicted_request_rate = Gauge(
f"{prefix}:predicted_request_rate", "Predicted request rate (req/s)"
# -- Predicted replica counts -------------------------------------
self.predicted_num_prefill_replicas = Gauge(
f"{PREFIX}_predicted_num_prefill_replicas",
"Decided number of prefill replicas",
)
self.predicted_isl = Gauge(
f"{prefix}:predicted_isl", "Predicted input sequence length"
self.predicted_num_decode_replicas = Gauge(
f"{PREFIX}_predicted_num_decode_replicas",
"Decided number of decode replicas",
)
self.predicted_osl = Gauge(
f"{prefix}:predicted_osl", "Predicted output sequence length"
# -- Cumulative GPU usage -----------------------------------------
self.gpu_hours = Gauge(
f"{PREFIX}_gpu_hours",
"Cumulative GPU hours consumed",
)
self.predicted_num_p = Gauge(
f"{prefix}:predicted_num_p", "Predicted number of prefill replicas"
# -- Diagnostics: estimated latencies -----------------------------
self.estimated_ttft_ms = Gauge(
f"{PREFIX}_estimated_ttft_ms",
"Max estimated TTFT from regression across engines (ms)",
)
self.predicted_num_d = Gauge(
f"{prefix}:predicted_num_d", "Predicted number of decode replicas"
self.estimated_itl_ms = Gauge(
f"{PREFIX}_estimated_itl_ms",
"Max estimated ITL from regression across engines (ms)",
)
# Cumulative GPU usage
self.gpu_hours = Gauge(f"{prefix}:gpu_hours", "Cumulative GPU hours used")
# -- Diagnostics: engine capacity ---------------------------------
self.engine_prefill_capacity_requests_per_second = Gauge(
f"{PREFIX}_engine_prefill_capacity_requests_per_second",
"Single prefill engine capacity under SLA (req/s)",
)
self.engine_decode_capacity_requests_per_second = Gauge(
f"{PREFIX}_engine_decode_capacity_requests_per_second",
"Single decode engine capacity under SLA (req/s)",
)
# -- Diagnostics: scaling decision enums --------------------------
self.load_scaling_decision = Enum(
f"{PREFIX}_load_scaling_decision",
"Load-based scaling decision reason",
states=LOAD_DECISION_STATES,
)
self.throughput_scaling_decision = Enum(
f"{PREFIX}_throughput_scaling_decision",
"Throughput-based scaling decision reason",
states=THROUGHPUT_DECISION_STATES,
)
# -- Diagnostics: per-engine FPM queue depths ---------------------
_engine_labels = ["worker_id", "dp_rank"]
self.engine_queued_prefill_tokens = Gauge(
f"{PREFIX}_engine_queued_prefill_tokens",
"Queued prefill tokens per engine (from FPM)",
labelnames=_engine_labels,
)
self.engine_queued_decode_kv_tokens = Gauge(
f"{PREFIX}_engine_queued_decode_kv_tokens",
"Queued decode KV tokens per engine (from FPM)",
labelnames=_engine_labels,
)
self.engine_inflight_decode_kv_tokens = Gauge(
f"{PREFIX}_engine_inflight_decode_kv_tokens",
"Inflight (scheduled) decode KV tokens per engine (from FPM)",
labelnames=_engine_labels,
)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Tests for DiagnosticsRecorder and HTML report generation."""
import os
import tempfile
import pytest
try:
import plotly # noqa: F401
except ImportError:
pytest.skip("plotly required for report tests", allow_module_level=True)
try:
import msgspec # noqa: F401
except ImportError:
pytest.skip("msgspec required for FPM data", allow_module_level=True)
from dynamo.common.forward_pass_metrics import (
ForwardPassMetrics,
QueuedRequestMetrics,
ScheduledRequestMetrics,
)
from dynamo.planner.config.planner_config import PlannerConfig
from dynamo.planner.core.types import (
FpmObservations,
PlannerEffects,
ScalingDecision,
TickDiagnostics,
TickInput,
WorkerCounts,
)
from dynamo.planner.monitoring.diagnostics_recorder import DiagnosticsRecorder
from dynamo.planner.monitoring.traffic_metrics import Metrics
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.pre_merge,
pytest.mark.unit,
pytest.mark.planner,
]
def _make_config(tmp_dir: str, **overrides) -> PlannerConfig:
defaults = dict(
mode="disagg",
ttft=500.0,
itl=50.0,
min_endpoint=1,
max_gpu_budget=-1,
throughput_adjustment_interval=60,
load_adjustment_interval=5,
enable_load_scaling=True,
enable_throughput_scaling=True,
load_predictor="constant",
no_operation=True,
backend="vllm",
metric_pulling_prometheus_endpoint="http://localhost:9090",
metric_reporting_prometheus_port=0,
report_interval_hours=0.5,
report_output_dir=tmp_dir,
)
defaults.update(overrides)
return PlannerConfig.model_construct(**defaults)
def _synthetic_ticks(
num_ticks: int = 40,
start_s: float = 1000.0,
interval_s: float = 60.0,
) -> list[tuple[TickInput, PlannerEffects, Metrics, float]]:
"""Generate a realistic multi-phase scaling scenario."""
data = []
gpu_hours = 0.0
num_p, num_d = 1, 1
for i in range(num_ticks):
t = start_s + i * interval_s
phase = i / num_ticks
# Ramp up traffic, then stabilize, then drop
if phase < 0.3:
rps = 2.0 + 8.0 * (phase / 0.3)
isl, osl = 800.0 + 200 * phase, 120.0 + 30 * phase
elif phase < 0.7:
rps = 10.0
isl, osl = 1000.0, 150.0
else:
rps = 10.0 - 8.0 * ((phase - 0.7) / 0.3)
isl, osl = 1000.0 - 200 * (phase - 0.7), 150.0 - 30 * (phase - 0.7)
observed_ttft = 200.0 + rps * 30
observed_itl = 20.0 + rps * 3
# Decisions based on phase
if phase < 0.15:
load_reason = "insufficient_data"
tp_reason = "model_not_ready"
est_ttft, est_itl = None, None
scale_p, scale_d = None, None
elif phase < 0.3:
load_reason = "scale_up"
tp_reason = "set_lower_bound"
est_ttft = observed_ttft * 1.2
est_itl = observed_itl * 1.1
num_p = min(num_p + 1, 5)
num_d = min(num_d + 1, 5)
scale_p, scale_d = num_p, num_d
elif phase < 0.7:
load_reason = "no_change"
tp_reason = "set_lower_bound"
est_ttft = observed_ttft * 0.8
est_itl = observed_itl * 0.9
scale_p, scale_d = None, None
elif phase < 0.85:
load_reason = "scale_down_capped_by_throughput"
tp_reason = "set_lower_bound"
est_ttft = observed_ttft * 0.5
est_itl = observed_itl * 0.5
scale_p, scale_d = None, None
else:
load_reason = "scale_down"
tp_reason = "set_lower_bound"
est_ttft = observed_ttft * 0.3
est_itl = observed_itl * 0.3
num_p = max(num_p - 1, 1)
num_d = max(num_d - 1, 1)
scale_p, scale_d = num_p, num_d
gpu_hours += (num_p + num_d) * interval_s / 3600.0
prefill_fpm = {
(f"pw{j}", 0): ForwardPassMetrics(
worker_id=f"pw{j}",
dp_rank=0,
wall_time=0.01,
scheduled_requests=ScheduledRequestMetrics(
sum_prefill_tokens=int(500 + rps * 50),
num_prefill_requests=max(1, int(rps)),
sum_decode_kv_tokens=0,
num_decode_requests=0,
),
queued_requests=QueuedRequestMetrics(
sum_prefill_tokens=int(200 * rps + j * 100),
sum_decode_kv_tokens=0,
),
)
for j in range(num_p)
}
decode_fpm = {
(f"dw{j}", 0): ForwardPassMetrics(
worker_id=f"dw{j}",
dp_rank=0,
wall_time=0.01,
scheduled_requests=ScheduledRequestMetrics(
sum_prefill_tokens=0,
num_prefill_requests=0,
sum_decode_kv_tokens=int(3000 + rps * 200 + j * 500),
num_decode_requests=max(1, int(rps * 2)),
),
queued_requests=QueuedRequestMetrics(
sum_prefill_tokens=0,
sum_decode_kv_tokens=int(1000 * rps + j * 300),
),
)
for j in range(num_d)
}
tick_input = TickInput(
now_s=t,
worker_counts=WorkerCounts(ready_num_prefill=num_p, ready_num_decode=num_d),
fpm_observations=FpmObservations(prefill=prefill_fpm, decode=decode_fpm),
)
effects = PlannerEffects(
scale_to=(
ScalingDecision(num_prefill=scale_p, num_decode=scale_d)
if scale_p is not None or scale_d is not None
else None
),
diagnostics=TickDiagnostics(
estimated_ttft_ms=est_ttft,
estimated_itl_ms=est_itl,
predicted_num_req=rps * interval_s,
predicted_isl=isl,
predicted_osl=osl,
engine_rps_prefill=5.0 if phase > 0.15 else None,
engine_rps_decode=8.0 if phase > 0.15 else None,
load_decision_reason=load_reason,
throughput_decision_reason=tp_reason,
),
)
observed = Metrics(
ttft=observed_ttft,
itl=observed_itl,
num_req=rps * interval_s,
isl=isl,
osl=osl,
request_duration=2.5,
)
data.append((tick_input, effects, observed, gpu_hours))
return data
class TestDiagnosticsRecorder:
def test_disabled_when_no_interval(self, tmp_path):
cfg = _make_config(str(tmp_path), report_interval_hours=None)
recorder = DiagnosticsRecorder(config=cfg)
assert not recorder.enabled
def test_enabled_when_interval_set(self, tmp_path):
cfg = _make_config(str(tmp_path), report_interval_hours=1.0)
recorder = DiagnosticsRecorder(config=cfg)
assert recorder.enabled
def test_record_accumulates_snapshots(self):
with tempfile.TemporaryDirectory() as tmp_dir:
cfg = _make_config(tmp_dir)
recorder = DiagnosticsRecorder(config=cfg)
data = _synthetic_ticks(num_ticks=5)
for ti, eff, obs, gpu in data:
recorder.record(ti, eff, obs, gpu)
assert len(recorder._snapshots) == 5
def test_should_generate_report_after_interval(self):
with tempfile.TemporaryDirectory() as tmp_dir:
cfg = _make_config(tmp_dir, report_interval_hours=0.5)
recorder = DiagnosticsRecorder(config=cfg)
data = _synthetic_ticks(num_ticks=40, interval_s=60.0)
for ti, eff, obs, gpu in data:
recorder.record(ti, eff, obs, gpu)
last_t = data[-1][0].now_s
assert recorder.should_generate_report(last_t)
def test_generate_report_creates_html(self):
with tempfile.TemporaryDirectory() as tmp_dir:
cfg = _make_config(tmp_dir)
recorder = DiagnosticsRecorder(config=cfg)
data = _synthetic_ticks(num_ticks=40)
for ti, eff, obs, gpu in data:
recorder.record(ti, eff, obs, gpu)
filepath = recorder.generate_report()
assert filepath is not None
assert os.path.exists(filepath)
assert filepath.endswith(".html")
with open(filepath) as f:
content = f.read()
assert len(content) > 1000
assert "plotly" in content.lower()
assert "Replica Counts" in content
assert "Observed TTFT vs SLA" in content
assert "Observed ITL vs SLA" in content
assert "Estimated TTFT vs SLA" in content
assert "Estimated ITL vs SLA" in content
assert "Prefill Engine Load" in content
assert "Decode Engine Load" in content
assert "Request Rate" in content
assert "Engine Capacity" in content
assert "Load Scaling Decisions" in content
assert "Throughput Scaling Decisions" in content
assert "Planner Diagnostics Report" in content
def test_generate_report_clears_snapshots(self):
with tempfile.TemporaryDirectory() as tmp_dir:
cfg = _make_config(tmp_dir)
recorder = DiagnosticsRecorder(config=cfg)
data = _synthetic_ticks(num_ticks=10)
for ti, eff, obs, gpu in data:
recorder.record(ti, eff, obs, gpu)
recorder.generate_report()
assert len(recorder._snapshots) == 0
def test_finalize_generates_final_report(self):
with tempfile.TemporaryDirectory() as tmp_dir:
cfg = _make_config(tmp_dir)
recorder = DiagnosticsRecorder(config=cfg)
data = _synthetic_ticks(num_ticks=5)
for ti, eff, obs, gpu in data:
recorder.record(ti, eff, obs, gpu)
filepath = recorder.finalize()
assert filepath is not None
assert os.path.exists(filepath)
def test_finalize_noop_when_empty(self):
with tempfile.TemporaryDirectory() as tmp_dir:
cfg = _make_config(tmp_dir)
recorder = DiagnosticsRecorder(config=cfg)
assert recorder.finalize() is None
def test_record_without_fpm_data(self):
with tempfile.TemporaryDirectory() as tmp_dir:
cfg = _make_config(tmp_dir)
recorder = DiagnosticsRecorder(config=cfg)
tick_input = TickInput(
now_s=1000.0,
worker_counts=WorkerCounts(ready_num_prefill=2, ready_num_decode=3),
fpm_observations=None,
)
effects = PlannerEffects(
diagnostics=TickDiagnostics(load_decision_reason="no_fpm_data"),
)
observed = Metrics(ttft=100.0, itl=10.0, num_req=50, isl=800, osl=120)
recorder.record(tick_input, effects, observed, 1.0)
assert len(recorder._snapshots) == 1
snap = recorder._snapshots[0]
assert snap.prefill_engines == []
assert snap.decode_engines == []
filepath = recorder.generate_report()
assert filepath is not None
assert os.path.exists(filepath)
......@@ -260,6 +260,9 @@ class TestPrefillLoadScaling:
assert effects.scale_to is not None
assert effects.scale_to.num_prefill is not None
assert effects.scale_to.num_prefill > 1
assert effects.diagnostics.estimated_ttft_ms is not None
assert effects.diagnostics.estimated_ttft_ms > 0
assert effects.diagnostics.load_decision_reason == "scale_up"
def test_no_scaling_when_insufficient_data(self):
core = _make_core(mode="prefill")
......@@ -273,6 +276,7 @@ class TestPrefillLoadScaling:
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is None
assert effects.diagnostics.load_decision_reason == "insufficient_data"
def test_no_scaling_when_load_disabled(self):
core = _make_core(mode="prefill", enable_load_scaling=False)
......@@ -291,6 +295,7 @@ class TestPrefillLoadScaling:
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is None
assert effects.diagnostics.load_decision_reason == "disabled"
# ── Load-based scaling (decode) ───────────────────────────────────────
......@@ -317,6 +322,9 @@ class TestDecodeLoadScaling:
assert effects.scale_to is not None
assert effects.scale_to.num_decode is not None
assert effects.scale_to.num_decode > 1
assert effects.diagnostics.estimated_itl_ms is not None
assert effects.diagnostics.estimated_itl_ms > 0
assert effects.diagnostics.load_decision_reason == "scale_up"
# ── Disagg load scaling ───────────────────────────────────────────────
......@@ -378,6 +386,9 @@ class TestThroughputScaling:
assert effects.scale_to is not None
assert effects.scale_to.num_prefill is not None
assert effects.scale_to.num_prefill >= 1
assert effects.diagnostics.predicted_num_req is not None
assert effects.diagnostics.engine_rps_prefill is not None
assert effects.diagnostics.throughput_decision_reason == "scale"
def test_throughput_sets_lower_bound_when_load_enabled(self):
core = _make_core(enable_load_scaling=True, enable_throughput_scaling=True)
......@@ -398,6 +409,7 @@ class TestThroughputScaling:
assert effects.scale_to is None
assert core._throughput_lower_bound_p >= 1
assert core._throughput_lower_bound_d >= 1
assert effects.diagnostics.throughput_decision_reason == "set_lower_bound"
def test_next_tick_scheduled_after_traffic(self):
core = _make_core(mode="prefill")
......@@ -443,6 +455,7 @@ class TestFpmReconciliation:
effects = core.on_tick(_tick_for(tick), tick)
# FPM reports 2 workers but ready count is 3 -> skip scaling
assert effects.scale_to is None
assert effects.diagnostics.load_decision_reason == "worker_count_mismatch"
# ── Agg planner core ──────────────────────────────────────────────────
......@@ -509,3 +522,84 @@ class TestAggPlannerStateMachine:
assert effects.scale_to is not None
assert effects.scale_to.num_decode is not None
assert effects.scale_to.num_decode >= 1
# ── Diagnostics ──────────────────────────────────────────────────────
class TestDiagnostics:
"""Verify TickDiagnostics is populated correctly across tick types."""
def test_diagnostics_always_present(self):
core = _make_core(mode="prefill")
tick = TickInput(
now_s=5.0,
worker_counts=WorkerCounts(ready_num_prefill=1),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.diagnostics is not None
def test_diagnostics_reset_each_tick(self):
core = _make_core(mode="prefill", ttft=5.0)
_train_prefill_regression(core)
fpm = _make_fpm(
queued_prefill_tokens=10000,
sum_prefill_tokens=500,
num_prefill_requests=1,
wall_time=0.5,
)
tick1 = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(prefill={("w1", 0): fpm}),
worker_counts=WorkerCounts(ready_num_prefill=1),
)
effects1 = core.on_tick(_tick_for(tick1), tick1)
assert effects1.diagnostics.estimated_ttft_ms is not None
tick2 = TickInput(
now_s=10.0,
worker_counts=WorkerCounts(ready_num_prefill=1),
)
st2 = ScheduledTick(
at_s=10.0,
run_load_scaling=False,
run_throughput_scaling=False,
need_worker_states=True,
)
effects2 = core.on_tick(st2, tick2)
assert effects2.diagnostics.estimated_ttft_ms is None
assert effects2.diagnostics.load_decision_reason is None
def test_no_fpm_data_reason(self):
core = _make_core(mode="prefill")
_train_prefill_regression(core)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(prefill=None),
worker_counts=WorkerCounts(ready_num_prefill=1),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.diagnostics.load_decision_reason == "no_fpm_data"
def test_throughput_predicted_load_populated(self):
core = _make_core(
mode="prefill", enable_load_scaling=False, enable_throughput_scaling=True
)
_train_prefill_regression(core)
core._observe_traffic(
TrafficObservation(duration_s=60, num_req=100, isl=1000, osl=150)
)
tick = TickInput(
now_s=60.0,
traffic=TrafficObservation(duration_s=60, num_req=100, isl=1000, osl=150),
worker_counts=WorkerCounts(ready_num_prefill=1),
)
effects = core.on_tick(_tick_for(tick), tick)
diag = effects.diagnostics
assert diag.predicted_num_req is not None
assert diag.predicted_isl is not None
assert diag.predicted_osl is not None
assert diag.engine_rps_prefill is not None
assert diag.engine_rps_prefill > 0
......@@ -97,7 +97,7 @@ data:
"targets": [
{
"editorMode": "code",
"expr": "planner:num_p_workers{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_num_prefill_replicas{namespace=~\"$namespace\"}",
"legendFormat": "Prefill Workers",
"range": true,
"refId": "A"
......@@ -156,7 +156,7 @@ data:
"targets": [
{
"editorMode": "code",
"expr": "planner:num_d_workers{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_num_decode_replicas{namespace=~\"$namespace\"}",
"legendFormat": "Decode Workers",
"range": true,
"refId": "A"
......@@ -216,7 +216,7 @@ data:
"targets": [
{
"editorMode": "code",
"expr": "planner:gpu_hours{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_gpu_hours{namespace=~\"$namespace\"}",
"legendFormat": "GPU Hours",
"range": true,
"refId": "A"
......@@ -339,14 +339,14 @@ data:
"targets": [
{
"editorMode": "code",
"expr": "planner:num_p_workers{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_num_prefill_replicas{namespace=~\"$namespace\"}",
"legendFormat": "Prefill Workers",
"range": true,
"refId": "A"
},
{
"editorMode": "code",
"expr": "planner:num_d_workers{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_num_decode_replicas{namespace=~\"$namespace\"}",
"legendFormat": "Decode Workers",
"range": true,
"refId": "B"
......@@ -497,14 +497,14 @@ data:
"targets": [
{
"editorMode": "code",
"expr": "planner:observed_ttft{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_observed_ttft_ms{namespace=~\"$namespace\"}",
"legendFormat": "TTFT",
"range": true,
"refId": "A"
},
{
"editorMode": "code",
"expr": "planner:observed_itl{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_observed_itl_ms{namespace=~\"$namespace\"}",
"legendFormat": "ITL",
"range": true,
"refId": "B"
......@@ -641,14 +641,14 @@ data:
"targets": [
{
"editorMode": "code",
"expr": "planner:observed_request_rate{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_observed_requests_per_second{namespace=~\"$namespace\"}",
"legendFormat": "Request Rate",
"range": true,
"refId": "A"
},
{
"editorMode": "code",
"expr": "planner:observed_request_duration{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_observed_request_duration_seconds{namespace=~\"$namespace\"}",
"legendFormat": "Request Duration",
"range": true,
"refId": "B"
......@@ -770,14 +770,14 @@ data:
"targets": [
{
"editorMode": "code",
"expr": "planner:observed_isl{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_observed_input_sequence_tokens{namespace=~\"$namespace\"}",
"legendFormat": "ISL",
"range": true,
"refId": "A"
},
{
"editorMode": "code",
"expr": "planner:observed_osl{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_observed_output_sequence_tokens{namespace=~\"$namespace\"}",
"legendFormat": "OSL",
"range": true,
"refId": "B"
......@@ -901,7 +901,7 @@ data:
"targets": [
{
"editorMode": "code",
"expr": "planner:predicted_request_rate{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_predicted_requests_per_second{namespace=~\"$namespace\"}",
"legendFormat": "Predicted Request Rate",
"range": true,
"refId": "A"
......@@ -1027,14 +1027,14 @@ data:
"targets": [
{
"editorMode": "code",
"expr": "planner:predicted_isl{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_predicted_input_sequence_tokens{namespace=~\"$namespace\"}",
"legendFormat": "Predicted ISL",
"range": true,
"refId": "A"
},
{
"editorMode": "code",
"expr": "planner:predicted_osl{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_predicted_output_sequence_tokens{namespace=~\"$namespace\"}",
"legendFormat": "Predicted OSL",
"range": true,
"refId": "B"
......@@ -1161,14 +1161,14 @@ data:
"targets": [
{
"editorMode": "code",
"expr": "planner:predicted_num_p{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_predicted_num_prefill_replicas{namespace=~\"$namespace\"}",
"legendFormat": "Predicted Prefill",
"range": true,
"refId": "A"
},
{
"editorMode": "code",
"expr": "planner:predicted_num_d{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_predicted_num_decode_replicas{namespace=~\"$namespace\"}",
"legendFormat": "Predicted Decode",
"range": true,
"refId": "B"
......@@ -1250,7 +1250,7 @@ data:
"targets": [
{
"editorMode": "code",
"expr": "planner:p_correction_factor{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_p_correction_factor{namespace=~\"$namespace\"}",
"legendFormat": "Prefill CF",
"range": true,
"refId": "A"
......@@ -1319,7 +1319,7 @@ data:
"targets": [
{
"editorMode": "code",
"expr": "planner:d_correction_factor{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_d_correction_factor{namespace=~\"$namespace\"}",
"legendFormat": "Decode CF",
"range": true,
"refId": "A"
......@@ -1449,14 +1449,14 @@ data:
"targets": [
{
"editorMode": "code",
"expr": "planner:p_correction_factor{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_p_correction_factor{namespace=~\"$namespace\"}",
"legendFormat": "Prefill CF",
"range": true,
"refId": "A"
},
{
"editorMode": "code",
"expr": "planner:d_correction_factor{namespace=~\"$namespace\"}",
"expr": "dynamo_planner_d_correction_factor{namespace=~\"$namespace\"}",
"legendFormat": "Decode CF",
"range": true,
"refId": "B"
......@@ -1494,14 +1494,14 @@ data:
"type": "prometheus",
"uid": "${datasource}"
},
"definition": "label_values(planner:num_p_workers, namespace)",
"definition": "label_values(dynamo_planner_num_prefill_replicas, namespace)",
"hide": 0,
"includeAll": true,
"label": "Namespace",
"multi": true,
"name": "namespace",
"options": [],
"query": "label_values(planner:num_p_workers, namespace)",
"query": "label_values(dynamo_planner_num_prefill_replicas, namespace)",
"refresh": 2,
"regex": "",
"skipUrlSync": false,
......
......@@ -177,6 +177,8 @@ The dashboard shows:
### Prometheus Metrics
When `PLANNER_PROMETHEUS_PORT` is set, the planner serves its own metrics endpoint. Exported series use the `dynamo_planner_*` naming convention (underscores and standard unit suffixes), replacing older `planner:*`-style names.
**Throughput-based scaling** pulls traffic metrics from the cluster-wide Prometheus server:
- Request count and duration
- TTFT and ITL distributions
......@@ -186,3 +188,27 @@ The dashboard shows:
- Per-iteration wall time, scheduled prefill/decode tokens, and queued request status
- Delivered via `FpmEventSubscriber` with automatic engine discovery and lifecycle tracking
- No router `/metrics` scraping required
Core gauges on the planner port include replica counts (`dynamo_planner_num_prefill_replicas`, `dynamo_planner_num_decode_replicas`), observed traffic (`dynamo_planner_observed_*`), replica decisions (`dynamo_planner_predicted_num_prefill_replicas`, `dynamo_planner_predicted_num_decode_replicas`), and cumulative `dynamo_planner_gpu_hours`.
Throughput prediction gauges `dynamo_planner_predicted_requests_per_second`, `dynamo_planner_predicted_input_sequence_tokens`, and `dynamo_planner_predicted_output_sequence_tokens` are wired from throughput-scaling traffic prediction and exposed alongside observed sequence-length metrics.
#### Diagnostics metrics
Additional series support dashboards and offline analysis:
- **Regression-based latency estimates:** `dynamo_planner_estimated_ttft_ms` and `dynamo_planner_estimated_itl_ms` reflect the maximum estimated TTFT and ITL from the online regression across engines.
- **Engine capacity:** `dynamo_planner_engine_prefill_requests_per_second` and `dynamo_planner_engine_decode_requests_per_second` report single-engine prefill and decode capacity under the configured SLA.
- **Scaling decision reasons:** `dynamo_planner_load_scaling_decision` and `dynamo_planner_throughput_scaling_decision` are Enum gauges whose state labels encode why each mode chose to scale, hold, or skip (for example `scale_up`, `no_fpm_data`, `set_lower_bound`).
- **Per-engine FPM queue depths:** `dynamo_planner_engine_queued_prefill_tokens`, `dynamo_planner_engine_queued_decode_kv_tokens`, and `dynamo_planner_engine_inflight_decode_kv_tokens` are labeled with `worker_id` and `dp_rank` for each engine.
### HTML diagnostics reports
The planner can emit periodic, self-contained HTML diagnostics files with interactive Plotly charts.
Configure this in `PlannerConfig` (or the equivalent YAML / constructor wiring your deployment uses):
- `report_interval_hours`: interval in **simulated** time between reports; set to `None` to disable.
- `report_output_dir`: directory where HTML files are written (default `./planner_reports`).
Reports aggregate per-tick snapshots and use `TickInput.now_s` for timestamps, so they behave the same in live runs (wall clock) and in **replay** with a simulated clock. Typical charts cover worker counts, observed versus estimated latencies versus SLA targets, request rate, engine capacity, scaling decision timelines, and input/output sequence lengths.
......@@ -100,6 +100,15 @@ When throughput-based scaling is enabled, the planner needs engine performance d
| `kalman_r` | float | `10.0` | Measurement noise. |
| `kalman_min_points` | int | `5` | Minimum data points before Kalman predictions activate. |
### Diagnostics Reports
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `report_interval_hours` | float or `null` | `null` | Generate an HTML diagnostics report every N hours (simulated time). Set to `null` to disable periodic report generation. |
| `report_output_dir` | string | `./planner_reports` | Directory for HTML diagnostics reports. |
The same diagnostic signals surfaced in these reports are also exported as Prometheus metrics under the `dynamo_planner_*` prefix—for example estimated TTFT/ITL (`dynamo_planner_estimated_ttft_ms`, `dynamo_planner_estimated_itl_ms`), per-engine capacity and FPM queue depths, and load/throughput scaling decision enums.
## Integration with Profiler
When the profiler runs with planner enabled, it:
......
......@@ -178,6 +178,16 @@ Each engine emits per-iteration `ForwardPassMetrics` via ZMQ -> FpmEventRelay ->
- **queued_requests**: queued prefill/decode load for TTFT/ITL simulation
- Idle heartbeats (wall_time=0) are skipped
### Diagnostics
Each tick, the scaling state machine fills `TickDiagnostics` with intermediate decision data—estimated latencies, predicted load, per-engine RPS, and decision reasons—via internal `_diag_*` fields. The adapter layer reads this from `PlannerEffects.diagnostics` and:
- Sets Prometheus gauges (e.g. `dynamo_planner_estimated_ttft_ms` and related estimates)
- Records enum metrics for load-scaling decision reasons (`dynamo_planner_load_scaling_decision`)
- Feeds `DiagnosticsRecorder`, which accumulates per-tick snapshots and emits Plotly-based HTML reports on a schedule
Per-engine FPM queue depths from `_collect_fpm()` are exported as labeled Prometheus gauges.
### Regression Models
Three specialized regression models (`fpm_regression.py`):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment