Unverified Commit 091cdb51 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat(planner): add live diagnostics dashboard HTTP endpoint (#8168)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent be70dae0
...@@ -137,7 +137,7 @@ class PlannerConfig(BaseModel): ...@@ -137,7 +137,7 @@ class PlannerConfig(BaseModel):
# Diagnostics report settings # Diagnostics report settings
report_interval_hours: Optional[float] = Field( report_interval_hours: Optional[float] = Field(
default=None, default=24.0,
description=( description=(
"Generate an HTML diagnostics report every N hours (simulated time). " "Generate an HTML diagnostics report every N hours (simulated time). "
"Set to None to disable periodic report generation." "Set to None to disable periodic report generation."
...@@ -147,6 +147,14 @@ class PlannerConfig(BaseModel): ...@@ -147,6 +147,14 @@ class PlannerConfig(BaseModel):
default="./planner_reports", default="./planner_reports",
description="Directory for HTML diagnostics reports.", description="Directory for HTML diagnostics reports.",
) )
live_dashboard_port: int = Field(
default=8080,
description=(
"Port for the live diagnostics dashboard HTTP server. "
"Set to 0 to disable. When enabled, visit http://host:port/ "
"to view a real-time Plotly report of accumulated snapshots."
),
)
@model_validator(mode="after") @model_validator(mode="after")
def _validate_config(self) -> "PlannerConfig": def _validate_config(self) -> "PlannerConfig":
......
...@@ -20,6 +20,7 @@ import logging ...@@ -20,6 +20,7 @@ import logging
import time import time
from typing import TYPE_CHECKING, Optional, Union from typing import TYPE_CHECKING, Optional, Union
import aiohttp.web
from prometheus_client import start_http_server from prometheus_client import start_http_server
from dynamo.planner.config.backend_components import WORKER_COMPONENT_NAMES from dynamo.planner.config.backend_components import WORKER_COMPONENT_NAMES
...@@ -42,6 +43,7 @@ from dynamo.planner.core.types import ( ...@@ -42,6 +43,7 @@ from dynamo.planner.core.types import (
WorkerCounts, WorkerCounts,
) )
from dynamo.planner.monitoring.diagnostics_recorder import DiagnosticsRecorder from dynamo.planner.monitoring.diagnostics_recorder import DiagnosticsRecorder
from dynamo.planner.monitoring.live_dashboard import start_live_dashboard
from dynamo.planner.monitoring.planner_metrics import PlannerPrometheusMetrics from dynamo.planner.monitoring.planner_metrics import PlannerPrometheusMetrics
from dynamo.planner.monitoring.traffic_metrics import Metrics, PrometheusAPIClient from dynamo.planner.monitoring.traffic_metrics import Metrics, PrometheusAPIClient
from dynamo.planner.monitoring.worker_info import WorkerInfo, resolve_worker_info from dynamo.planner.monitoring.worker_info import WorkerInfo, resolve_worker_info
...@@ -177,6 +179,9 @@ class NativePlannerBase: ...@@ -177,6 +179,9 @@ class NativePlannerBase:
# Diagnostics recorder # Diagnostics recorder
self._recorder = DiagnosticsRecorder(config=config) self._recorder = DiagnosticsRecorder(config=config)
# Live dashboard runner (started in _async_init)
self._dashboard_runner: Optional[aiohttp.web.AppRunner] = None
# State machine (created after WorkerInfo is resolved) # State machine (created after WorkerInfo is resolved)
self._state_machine: Optional[PlannerStateMachine] = None self._state_machine: Optional[PlannerStateMachine] = None
...@@ -266,6 +271,15 @@ class NativePlannerBase: ...@@ -266,6 +271,15 @@ class NativePlannerBase:
await self._bootstrap_regression() await self._bootstrap_regression()
# Start live dashboard if configured
if self.config.live_dashboard_port:
try:
self._dashboard_runner = await start_live_dashboard(
self._recorder, self.config.live_dashboard_port
)
except Exception as e:
logger.error(f"Failed to start live dashboard: {e}")
async def _init_worker_info(self) -> None: async def _init_worker_info(self) -> None:
connector = getattr(self, "connector", None) connector = getattr(self, "connector", None)
self.prefill_worker_info, self.decode_worker_info = resolve_worker_info( self.prefill_worker_info, self.decode_worker_info = resolve_worker_info(
...@@ -605,32 +619,37 @@ class NativePlannerBase: ...@@ -605,32 +619,37 @@ class NativePlannerBase:
next_tick = self.state_machine.initial_tick(time.time()) next_tick = self.state_machine.initial_tick(time.time())
poll_interval = self.config.load_adjustment_interval / 10 poll_interval = self.config.load_adjustment_interval / 10
while True: try:
now = time.time() while True:
if now < next_tick.at_s: now = time.time()
await asyncio.sleep(min(next_tick.at_s - now, poll_interval)) if now < next_tick.at_s:
continue await asyncio.sleep(min(next_tick.at_s - now, poll_interval))
continue
tick_input = await self._gather_tick_input(next_tick)
effects = self.state_machine.on_tick(next_tick, tick_input) tick_input = await self._gather_tick_input(next_tick)
await self._apply_effects(effects) effects = self.state_machine.on_tick(next_tick, tick_input)
self._report_diagnostics(effects.diagnostics) await self._apply_effects(effects)
self._report_diagnostics(effects.diagnostics)
if self._recorder.enabled:
try: if self._recorder.enabled:
self._recorder.record( try:
tick_input, self._recorder.record(
effects, tick_input,
self._last_metrics, effects,
self._cumulative_gpu_hours, self._last_metrics,
) self._cumulative_gpu_hours,
if self._recorder.should_generate_report(tick_input.now_s): )
self._recorder.generate_report() if self._recorder.should_generate_report(tick_input.now_s):
except Exception as e: self._recorder.generate_report()
logger.error(f"Diagnostics report failed: {e}") except Exception as e:
logger.error(f"Diagnostics report failed: {e}")
assert effects.next_tick is not None
next_tick = effects.next_tick assert effects.next_tick is not None
next_tick = effects.next_tick
finally:
self._recorder.finalize()
if self._dashboard_runner is not None:
await self._dashboard_runner.cleanup()
# ------------------------------------------------------------------ # ------------------------------------------------------------------
......
...@@ -17,6 +17,9 @@ from dataclasses import dataclass, field ...@@ -17,6 +17,9 @@ from dataclasses import dataclass, field
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Optional from typing import Optional
import plotly.graph_objects as go # type: ignore[import-untyped]
from plotly.subplots import make_subplots # type: ignore[import-untyped]
from dynamo.planner.config.planner_config import PlannerConfig from dynamo.planner.config.planner_config import PlannerConfig
from dynamo.planner.core.types import PlannerEffects, TickDiagnostics, TickInput from dynamo.planner.core.types import PlannerEffects, TickDiagnostics, TickInput
from dynamo.planner.monitoring.traffic_metrics import Metrics from dynamo.planner.monitoring.traffic_metrics import Metrics
...@@ -97,7 +100,7 @@ class DiagnosticsRecorder: ...@@ -97,7 +100,7 @@ class DiagnosticsRecorder:
@property @property
def enabled(self) -> bool: def enabled(self) -> bool:
return self._interval_s > 0 return self._interval_s > 0 or self.config.live_dashboard_port != 0
def record( def record(
self, self,
...@@ -193,21 +196,11 @@ class DiagnosticsRecorder: ...@@ -193,21 +196,11 @@ class DiagnosticsRecorder:
self._last_report_s = self._snapshots[0].timestamp_s self._last_report_s = self._snapshots[0].timestamp_s
return now_s - self._last_report_s >= self._interval_s return now_s - self._last_report_s >= self._interval_s
def generate_report(self) -> Optional[str]: def _build_report_html(self, snaps: list[TickSnapshot]) -> str:
if not self._snapshots: """Build the HTML report string from the given snapshots.
return None
try: This method has no side effects (no file I/O, no snapshot clearing).
import plotly.graph_objects as go # type: ignore[import-untyped] """
from plotly.subplots import make_subplots # type: ignore[import-untyped]
except ImportError:
logger.warning(
"plotly is not installed -- cannot generate HTML report. "
"Install with: pip install plotly"
)
return None
snaps = self._snapshots
ts = [s.timestamp_s for s in snaps] ts = [s.timestamp_s for s in snaps]
labels = [ labels = [
datetime.fromtimestamp(t, tz=timezone.utc).strftime("%H:%M:%S") for t in ts datetime.fromtimestamp(t, tz=timezone.utc).strftime("%H:%M:%S") for t in ts
...@@ -588,6 +581,17 @@ class DiagnosticsRecorder: ...@@ -588,6 +581,17 @@ class DiagnosticsRecorder:
margin=dict(t=100), margin=dict(t=100),
) )
return fig.to_html(include_plotlyjs=True, full_html=True)
def generate_report(self) -> Optional[str]:
"""Generate a periodic report, write it to disk, and clear snapshots."""
if not self._snapshots:
return None
snaps = list(self._snapshots)
html = self._build_report_html(snaps)
ts = [s.timestamp_s for s in snaps]
output_dir = self.config.report_output_dir output_dir = self.config.report_output_dir
os.makedirs(output_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True)
self._report_count += 1 self._report_count += 1
...@@ -597,13 +601,24 @@ class DiagnosticsRecorder: ...@@ -597,13 +601,24 @@ class DiagnosticsRecorder:
filename = f"planner_report_{ts_label}_{self._report_count:03d}.html" filename = f"planner_report_{ts_label}_{self._report_count:03d}.html"
filepath = os.path.join(output_dir, filename) filepath = os.path.join(output_dir, filename)
fig.write_html(filepath, include_plotlyjs=True, full_html=True) with open(filepath, "w") as f:
f.write(html)
logger.info(f"Planner diagnostics report written to {filepath}") logger.info(f"Planner diagnostics report written to {filepath}")
self._last_report_s = ts[-1] self._last_report_s = ts[-1]
self._snapshots.clear() self._snapshots.clear()
return filepath return filepath
def render_live_html(self) -> Optional[str]:
"""Render the current accumulated snapshots as HTML without side effects.
Unlike ``generate_report()``, this does NOT clear snapshots or write
to disk. Intended for the live dashboard HTTP endpoint.
"""
if not self._snapshots:
return None
return self._build_report_html(list(self._snapshots))
def finalize(self) -> Optional[str]: def finalize(self) -> Optional[str]:
if self._snapshots: if self._snapshots:
return self.generate_report() return self.generate_report()
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Lightweight HTTP server for live planner diagnostics.
Serves the current accumulated snapshot data as an interactive Plotly HTML
report on demand, without clearing snapshots or writing to disk.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from aiohttp import web
if TYPE_CHECKING:
from dynamo.planner.monitoring.diagnostics_recorder import DiagnosticsRecorder
logger = logging.getLogger(__name__)
_NO_DATA_HTML = (
"<html><body><h3>Planner Live Dashboard</h3>"
"<p>No snapshot data yet. Waiting for planner ticks...</p>"
"<meta http-equiv='refresh' content='5'>"
"</body></html>"
)
_AUTO_REFRESH_TAG = '<meta http-equiv="refresh" content="30">'
def _build_app(recorder: DiagnosticsRecorder) -> web.Application:
async def handle_live(request: web.Request) -> web.Response:
html = recorder.render_live_html()
if html is None:
return web.Response(text=_NO_DATA_HTML, content_type="text/html")
# Inject auto-refresh into the Plotly HTML so the dashboard
# updates every 30 seconds without manual browser refresh.
html = html.replace("<head>", f"<head>{_AUTO_REFRESH_TAG}", 1)
return web.Response(text=html, content_type="text/html")
app = web.Application()
app.router.add_get("/", handle_live)
return app
async def start_live_dashboard(
recorder: DiagnosticsRecorder, port: int
) -> web.AppRunner:
"""Start the live dashboard HTTP server.
Returns the ``AppRunner`` so the caller can clean it up on shutdown.
"""
app = _build_app(recorder)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", port)
await site.start()
logger.info(f"Live diagnostics dashboard running on http://0.0.0.0:{port}/")
return runner
...@@ -61,6 +61,7 @@ def _make_config(tmp_dir: str, **overrides) -> PlannerConfig: ...@@ -61,6 +61,7 @@ def _make_config(tmp_dir: str, **overrides) -> PlannerConfig:
metric_reporting_prometheus_port=0, metric_reporting_prometheus_port=0,
report_interval_hours=0.5, report_interval_hours=0.5,
report_output_dir=tmp_dir, report_output_dir=tmp_dir,
live_dashboard_port=0,
) )
defaults.update(overrides) defaults.update(overrides)
return PlannerConfig.model_construct(**defaults) return PlannerConfig.model_construct(**defaults)
......
...@@ -5,9 +5,11 @@ ...@@ -5,9 +5,11 @@
aiconfigurator[webapp] @ git+https://github.com/ai-dynamo/aiconfigurator.git@1fe8084c817194827706947cc175107ee9e6ea25 aiconfigurator[webapp] @ git+https://github.com/ai-dynamo/aiconfigurator.git@1fe8084c817194827706947cc175107ee9e6ea25
aiofiles<=25.1.0 aiofiles<=25.1.0
aiohttp>=3.9.0,<4.0
filterpy==1.4.5 filterpy==1.4.5
kubernetes==32.0.1 kubernetes==32.0.1
kubernetes_asyncio==32.0.0 kubernetes_asyncio==32.0.0
plotly>=6.0.1
pmdarima==2.1.1 pmdarima==2.1.1
prometheus-api-client==0.6.0 prometheus-api-client==0.6.0
prophet==1.2.1 prophet==1.2.1
......
...@@ -232,7 +232,8 @@ The planner can emit periodic, self-contained HTML diagnostics files with intera ...@@ -232,7 +232,8 @@ The planner can emit periodic, self-contained HTML diagnostics files with intera
Configure this in `PlannerConfig` (or the equivalent YAML / constructor wiring your deployment uses): Configure this in `PlannerConfig` (or the equivalent YAML / constructor wiring your deployment uses):
- `report_interval_hours`: interval in **simulated** time between reports; set to `None` to disable. - `report_interval_hours`: interval in **simulated** time between reports (default `24.0` hours); set to `None` to disable.
- `report_output_dir`: directory where HTML files are written (default `./planner_reports`). - `report_output_dir`: directory where HTML files are written (default `./planner_reports`).
- `live_dashboard_port`: port for a real-time HTTP dashboard (default `8080`). Set to `0` to disable. An aiohttp server starts on the given port and serves the current accumulated snapshot data as an interactive Plotly report at `http://<host>:<port>/`. Unlike periodic reports, the live dashboard does **not** clear snapshots — it always shows all data accumulated since the last periodic report (or since startup if periodic reports are disabled).
Reports aggregate per-tick snapshots and use `TickInput.now_s` for timestamps, so they behave the same in live runs (wall clock) and in **replay** with a simulated clock. Typical charts cover worker counts, observed versus estimated latencies versus SLA targets, request rate, engine capacity, scaling decision timelines, and input/output sequence lengths. Reports aggregate per-tick snapshots and use `TickInput.now_s` for timestamps, so they behave the same in live runs (wall clock) and in **replay** with a simulated clock. Typical charts cover worker counts, observed versus estimated latencies versus SLA targets, request rate, engine capacity, scaling decision timelines, and input/output sequence lengths.
...@@ -124,8 +124,9 @@ When throughput-based scaling is enabled, the planner needs engine performance d ...@@ -124,8 +124,9 @@ When throughput-based scaling is enabled, the planner needs engine performance d
| Field | Type | Default | Description | | Field | Type | Default | Description |
|-------|------|---------|-------------| |-------|------|---------|-------------|
| `report_interval_hours` | float or `null` | `null` | Generate an HTML diagnostics report every N hours (simulated time). Set to `null` to disable periodic report generation. | | `report_interval_hours` | float or `null` | `24.0` | Generate an HTML diagnostics report every N hours (simulated time). Set to `null` to disable periodic report generation. |
| `report_output_dir` | string | `./planner_reports` | Directory for HTML diagnostics reports. | | `report_output_dir` | string | `./planner_reports` | Directory for HTML diagnostics reports. |
| `live_dashboard_port` | int | `8080` | Port for the live diagnostics dashboard HTTP server. Set to `0` to disable. When enabled, visit `http://host:port/` to view a real-time Plotly report of accumulated snapshots. |
The same diagnostic signals surfaced in these reports are also exported as Prometheus metrics under the `dynamo_planner_*` prefix—for example estimated TTFT/ITL (`dynamo_planner_estimated_ttft_ms`, `dynamo_planner_estimated_itl_ms`), per-engine capacity and FPM queue depths, and load/throughput scaling decision enums. The same diagnostic signals surfaced in these reports are also exported as Prometheus metrics under the `dynamo_planner_*` prefix—for example estimated TTFT/ITL (`dynamo_planner_estimated_ttft_ms`, `dynamo_planner_estimated_itl_ms`), per-engine capacity and FPM queue depths, and load/throughput scaling decision enums.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment