Unverified Commit 3da6f4d4 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat(planner): add advisory mode for scaling decisions (#8244)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent 01a4b6d6
......@@ -25,7 +25,6 @@ class BasePlannerDefaults:
namespace = os.environ.get("DYN_NAMESPACE", "dynamo")
environment: Literal["kubernetes", "virtual", "global-planner"] = "kubernetes"
backend: Literal["vllm", "sglang", "trtllm", "mocker"] = "vllm"
no_operation = False
log_dir = None
throughput_adjustment_interval = 180 # in seconds
max_gpu_budget = 8
......@@ -76,6 +75,9 @@ class SLAPlannerDefaults(BasePlannerDefaults):
load_metric_samples = 10 # number of samples per interval
load_min_observations = 5 # cold start threshold
# Advisory mode: compute and log decisions without executing scaling
advisory = False
class SubComponentType(str, Enum):
PREFILL = "prefill"
......
......@@ -65,7 +65,6 @@ class PlannerConfig(BaseModel):
),
)
no_operation: bool = SLAPlannerDefaults.no_operation
log_dir: Optional[str] = SLAPlannerDefaults.log_dir
throughput_adjustment_interval: int = (
SLAPlannerDefaults.throughput_adjustment_interval
......@@ -135,6 +134,9 @@ class PlannerConfig(BaseModel):
load_metric_samples: int = SLAPlannerDefaults.load_metric_samples
load_min_observations: int = SLAPlannerDefaults.load_min_observations
# Advisory mode: compute and log decisions without executing scaling
advisory: bool = SLAPlannerDefaults.advisory
# Diagnostics report settings
report_interval_hours: Optional[float] = Field(
default=24.0,
......@@ -147,6 +149,14 @@ class PlannerConfig(BaseModel):
default="./planner_reports",
description="Directory for HTML diagnostics reports.",
)
report_filename: Optional[str] = Field(
default=None,
description=(
"Fixed filename for HTML diagnostics reports. "
"When set, reports are written to report_output_dir/report_filename "
"instead of the default timestamped name."
),
)
live_dashboard_port: int = Field(
default=8080,
description=(
......
......@@ -119,7 +119,6 @@ class NativePlannerBase:
# Connector
self.connector: ConnectorType
if not config.no_operation:
if config.environment == "global-planner":
assert config.global_planner_namespace is not None
assert runtime is not None
......@@ -235,7 +234,6 @@ class NativePlannerBase:
if hasattr(self, "connector") and hasattr(self.connector, "_async_init"):
await self.connector._async_init()
if not self.config.no_operation:
defaults = WORKER_COMPONENT_NAMES.get(self.config.backend)
logger.info("Validating deployment...")
await self.connector.validate_deployment(
......@@ -271,6 +269,13 @@ class NativePlannerBase:
await self._bootstrap_regression()
# Log operating mode at startup
if self.config.advisory:
logger.info(
"[ADVISORY] Planner started in advisory mode — "
"scaling decisions will be logged but NOT executed."
)
# Start live dashboard if configured
if self.config.live_dashboard_port:
try:
......@@ -288,7 +293,7 @@ class NativePlannerBase:
require_decode=self.require_decode,
connector=connector,
config_model_name=getattr(self.config, "model_name", ""),
no_operation=self.config.no_operation,
no_operation=False,
)
self.model_name = (
self.decode_worker_info.model_name or self.prefill_worker_info.model_name
......@@ -579,11 +584,60 @@ class NativePlannerBase:
async def _apply_scaling_targets(
self, targets: list[TargetReplica], blocking: bool = False
) -> None:
"""Shared helper: send scaling targets to connector."""
if self.config.no_operation or not targets:
"""Shared helper: send scaling targets to connector.
Skipped in advisory mode (decisions are logged but not executed).
"""
if self.config.advisory or not targets:
return
await self.connector.set_component_replicas(targets, blocking=blocking)
# ------------------------------------------------------------------
# Periodic decision summary
# ------------------------------------------------------------------
def _log_decision_summary(self, effects: PlannerEffects) -> None:
"""Log a one-line summary of the scaling decision after each tick."""
decision = effects.scale_to
diag = effects.diagnostics
sm = self.state_machine
current_p = sm._num_p_workers
current_d = sm._num_d_workers
rec_p = decision.num_prefill if decision else None
rec_d = decision.num_decode if decision else None
delta_p = (rec_p - current_p) if rec_p is not None else 0
delta_d = (rec_d - current_d) if rec_d is not None else 0
if decision is None or (delta_p == 0 and delta_d == 0):
action = "hold"
elif (delta_p > 0 or delta_d > 0) and (delta_p < 0 or delta_d < 0):
action = "rebalance"
elif delta_p > 0 or delta_d > 0:
action = "scale_up"
else:
action = "scale_down"
logger.info(
"[summary] %s | current: prefill=%d decode=%d | "
"recommended: prefill=%s decode=%s (delta: %+d / %+d) | "
"load_reason=%s throughput_reason=%s | "
"est_ttft=%.1fms est_itl=%.1fms",
action.upper(),
current_p,
current_d,
rec_p if rec_p is not None else "-",
rec_d if rec_d is not None else "-",
delta_p,
delta_d,
diag.load_decision_reason or "n/a",
diag.throughput_decision_reason or "n/a",
diag.estimated_ttft_ms or 0,
diag.estimated_itl_ms or 0,
)
# ------------------------------------------------------------------
# Diagnostics reporting (shared across all adapters)
# ------------------------------------------------------------------
......@@ -630,6 +684,7 @@ class NativePlannerBase:
effects = self.state_machine.on_tick(next_tick, tick_input)
await self._apply_effects(effects)
self._report_diagnostics(effects.diagnostics)
self._log_decision_summary(effects)
if self._recorder.enabled:
try:
......
......@@ -385,7 +385,13 @@ class DiagnosticsRecorder:
break
y.append(val)
fig.add_trace(
go.Scatter(x=labels, y=y, name=f"P {eid} queued", mode="lines+markers"),
go.Scatter(
x=labels,
y=y,
name=f"P {eid} queued",
mode="lines+markers",
showlegend=False,
),
row=4,
col=1,
)
......@@ -409,6 +415,7 @@ class DiagnosticsRecorder:
y=y_queued,
name=f"D {eid} queued",
mode="lines+markers",
showlegend=False,
),
row=4,
col=2,
......@@ -420,6 +427,7 @@ class DiagnosticsRecorder:
name=f"D {eid} inflight",
mode="lines",
line=dict(dash="dot"),
showlegend=False,
),
row=4,
col=2,
......@@ -576,7 +584,7 @@ class DiagnosticsRecorder:
title=dict(text=summary, font=dict(size=14), y=0.99, yanchor="top"),
height=2000,
showlegend=True,
legend=dict(orientation="h", yanchor="bottom", y=-0.03),
legend=dict(orientation="h", yanchor="top", y=-0.05),
template="plotly_white",
margin=dict(t=100),
)
......@@ -595,6 +603,9 @@ class DiagnosticsRecorder:
output_dir = self.config.report_output_dir
os.makedirs(output_dir, exist_ok=True)
self._report_count += 1
if self.config.report_filename:
filename = self.config.report_filename
else:
ts_label = datetime.fromtimestamp(ts[-1], tz=timezone.utc).strftime(
"%Y%m%d_%H%M%S"
)
......
......@@ -6,10 +6,10 @@
The bridge (Rust, PyO3) runs the offline simulation step-by-step.
This adapter sits between the bridge and the planner state machine:
Bridge.advance_to(tick_ms) raw metrics dict
Adapter._build_tick_input() TickInput
StateMachine.on_tick() PlannerEffects
Adapter Bridge.apply_scaling(prefill, decode)
Bridge.advance_to(tick_ms) -> raw metrics dict
Adapter._build_tick_input() -> TickInput
StateMachine.on_tick() -> PlannerEffects
Adapter -> Bridge.apply_scaling(prefill, decode)
Supports both aggregated and disaggregated topologies. No I/O, no runtime
dependencies. Fully deterministic when used with offline replay.
......@@ -38,6 +38,8 @@ from dynamo.planner.core.types import (
WorkerCapabilities,
WorkerCounts,
)
from dynamo.planner.monitoring.diagnostics_recorder import DiagnosticsRecorder
from dynamo.planner.monitoring.traffic_metrics import Metrics
logger = logging.getLogger(__name__)
......@@ -61,6 +63,7 @@ class ReplayPlannerReport:
scaling_events: list[ScalingEvent] = field(default_factory=list)
diagnostics_log: list[TickDiagnostics] = field(default_factory=list)
total_ticks: int = 0
html_report_path: Optional[str] = None
def _build_fpm_from_dict(d: dict[str, Any]) -> ForwardPassMetrics:
......@@ -129,10 +132,16 @@ class ReplayPlannerAdapter:
self._prefill_fpm_cache: dict[tuple[str, int], ForwardPassMetrics] = {}
self._decode_fpm_cache: dict[tuple[str, int], ForwardPassMetrics] = {}
# Scaling targets used as `expected` in WorkerCounts
# Scaling targets -- used as `expected` in WorkerCounts
self._scaling_target_prefill: Optional[int] = None
self._scaling_target_decode: Optional[int] = None
# Diagnostics recorder for HTML report generation
self._recorder = DiagnosticsRecorder(config=planner_config)
self._cumulative_gpu_hours: float = 0.0
self._last_tick_s: float = 0.0
self._last_traffic: Metrics = Metrics()
if warmup_observations:
self._sm.warm_load_predictors(warmup_observations)
......@@ -155,6 +164,9 @@ class ReplayPlannerAdapter:
diagnostics_log.append(effects.diagnostics)
total_ticks += 1
# Update GPU-hours and record diagnostics snapshot
self._record_diagnostics(tick_input, effects, result)
# Clear scaling targets once active counts match
active_p = result["active_prefill_count"]
active_d = result["active_decode_count"]
......@@ -177,11 +189,49 @@ class ReplayPlannerAdapter:
next_tick = effects.next_tick
trace_report = self._bridge.finalize()
html_report_path = self._recorder.finalize()
return ReplayPlannerReport(
trace_report=trace_report,
scaling_events=scaling_events,
diagnostics_log=diagnostics_log,
total_ticks=total_ticks,
html_report_path=html_report_path,
)
def _record_diagnostics(
self,
tick_input: TickInput,
effects: PlannerEffects,
result: dict[str, Any],
) -> None:
"""Update GPU-hours tracking and feed the diagnostics recorder."""
if not self._recorder.enabled:
return
now_s = tick_input.now_s
if self._last_tick_s > 0.0:
dt_h = (now_s - self._last_tick_s) / 3600.0
num_p = result["active_prefill_count"]
num_d = result["active_decode_count"]
gpu_p = self._config.prefill_engine_num_gpu or 0
gpu_d = self._config.decode_engine_num_gpu or 0
self._cumulative_gpu_hours += (num_p * gpu_p + num_d * gpu_d) * dt_h
self._last_tick_s = now_s
# Build observed Metrics from traffic in tick_input
if tick_input.traffic is not None:
t = tick_input.traffic
self._last_traffic = Metrics(
num_req=t.num_req,
isl=t.isl,
osl=t.osl,
)
self._recorder.record(
tick_input,
effects,
self._last_traffic,
self._cumulative_gpu_hours,
)
def _apply_scaling(
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Unit tests for advisory mode and decision summary logging."""
import sys
import types
from unittest.mock import MagicMock
import pytest
# ---------------------------------------------------------------
# Stub native Rust modules so planner config can be imported
# without building dynamo._core
# ---------------------------------------------------------------
_stubs = {
"dynamo._core": {
"Client": MagicMock,
"DistributedRuntime": MagicMock,
"VirtualConnectorCoordinator": MagicMock,
},
"dynamo.runtime": {
"DistributedRuntime": MagicMock,
"dynamo_worker": lambda: lambda f: f,
},
"dynamo.runtime.logging": {
"configure_dynamo_logging": lambda: None,
},
"dynamo.llm": {
"FpmEventSubscriber": MagicMock,
"FpmEventRelay": MagicMock,
},
"dynamo.common.forward_pass_metrics": {
"ForwardPassMetrics": MagicMock,
},
}
for _mod_name, _attrs in _stubs.items():
_m = types.ModuleType(_mod_name)
for _k, _v in _attrs.items():
setattr(_m, _k, _v)
sys.modules.setdefault(_mod_name, _m)
from dynamo.planner.config.defaults import SLAPlannerDefaults # noqa: E402
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.pre_merge,
pytest.mark.unit,
]
class TestAdvisoryDefaults:
def test_default_is_false(self):
assert SLAPlannerDefaults.advisory is False
class TestPlannerConfigAdvisory:
def test_config_with_advisory(self):
from dynamo.planner.config.planner_config import PlannerConfig
config = PlannerConfig.model_construct(
mode="agg",
advisory=True,
)
assert config.advisory is True
def test_config_default_is_false(self):
from dynamo.planner.config.planner_config import PlannerConfig
config = PlannerConfig.model_construct(mode="agg")
assert config.advisory is False
class TestAdvisoryGuard:
def test_advisory_skips_scaling(self):
advisory = True
assert advisory # _apply_scaling_targets returns early
def test_non_advisory_applies_scaling(self):
advisory = False
assert not advisory # _apply_scaling_targets proceeds
def _classify_action(delta_p: int, delta_d: int, decision_is_none: bool) -> str:
"""Mirror the action classification from _log_decision_summary."""
if decision_is_none or (delta_p == 0 and delta_d == 0):
return "hold"
if (delta_p > 0 or delta_d > 0) and (delta_p < 0 or delta_d < 0):
return "rebalance"
if delta_p > 0 or delta_d > 0:
return "scale_up"
return "scale_down"
class TestDecisionSummaryClassification:
def test_scale_up(self):
assert _classify_action(1, 2, False) == "scale_up"
def test_scale_down(self):
assert _classify_action(-1, -2, False) == "scale_down"
def test_hold_no_change(self):
assert _classify_action(0, 0, False) == "hold"
def test_hold_no_decision(self):
assert _classify_action(0, 0, True) == "hold"
def test_rebalance_prefill_up_decode_down(self):
assert _classify_action(1, -2, False) == "rebalance"
def test_rebalance_prefill_down_decode_up(self):
assert _classify_action(-1, 2, False) == "rebalance"
......@@ -18,11 +18,14 @@ try:
except ImportError:
pytest.skip("msgspec required for FPM data", allow_module_level=True)
from dynamo.common.forward_pass_metrics import (
try:
from dynamo.common.forward_pass_metrics import (
ForwardPassMetrics,
QueuedRequestMetrics,
ScheduledRequestMetrics,
)
)
except ImportError:
pytest.skip("forward_pass_metrics not available", allow_module_level=True)
from dynamo.planner.config.planner_config import PlannerConfig
from dynamo.planner.core.types import (
FpmObservations,
......@@ -55,7 +58,6 @@ def _make_config(tmp_dir: str, **overrides) -> PlannerConfig:
enable_load_scaling=True,
enable_throughput_scaling=True,
load_predictor="constant",
no_operation=True,
backend="vllm",
metric_pulling_prometheus_endpoint="http://localhost:9090",
metric_reporting_prometheus_port=0,
......
......@@ -10,11 +10,14 @@ try:
except ImportError:
pytest.skip("msgspec required for FPM tests", allow_module_level=True)
from dynamo.common.forward_pass_metrics import (
try:
from dynamo.common.forward_pass_metrics import (
ForwardPassMetrics,
QueuedRequestMetrics,
ScheduledRequestMetrics,
)
)
except ImportError:
pytest.skip("forward_pass_metrics not available", allow_module_level=True)
from dynamo.planner.config.planner_config import PlannerConfig
from dynamo.planner.core.state_machine import PlannerStateMachine
from dynamo.planner.core.types import (
......@@ -87,7 +90,6 @@ def _easy_config(**overrides) -> PlannerConfig:
fpm_sample_bucket_size=16,
load_min_observations=5,
load_predictor="constant",
no_operation=True,
backend="vllm",
metric_pulling_prometheus_endpoint="http://localhost:9090",
metric_reporting_prometheus_port=0,
......
......@@ -16,11 +16,14 @@ try:
except ImportError:
pytest.skip("msgspec required for FPM tests", allow_module_level=True)
from dynamo.common.forward_pass_metrics import (
try:
from dynamo.common.forward_pass_metrics import (
ForwardPassMetrics,
QueuedRequestMetrics,
ScheduledRequestMetrics,
)
)
except ImportError:
pytest.skip("forward_pass_metrics not available", allow_module_level=True)
from dynamo.planner.core.perf_model import (
AggRegressionModel,
DecodeRegressionModel,
......
......@@ -10,11 +10,14 @@ try:
except ImportError:
pytest.skip("msgspec required for FPM tests", allow_module_level=True)
from dynamo.common.forward_pass_metrics import (
try:
from dynamo.common.forward_pass_metrics import (
ForwardPassMetrics,
QueuedRequestMetrics,
ScheduledRequestMetrics,
)
)
except ImportError:
pytest.skip("forward_pass_metrics not available", allow_module_level=True)
from dynamo.planner.config.planner_config import PlannerConfig
from dynamo.planner.core.state_machine import PlannerStateMachine
from dynamo.planner.core.types import (
......@@ -99,7 +102,6 @@ def _make_config(**overrides) -> PlannerConfig:
enable_load_scaling=True,
enable_throughput_scaling=True,
load_predictor="constant",
no_operation=True,
backend="vllm",
metric_pulling_prometheus_endpoint="http://localhost:9090",
metric_reporting_prometheus_port=0,
......
......@@ -11,7 +11,10 @@ from unittest.mock import patch
import pandas as pd
import pytest
from dynamo.llm import KvRouterConfig, MockEngineArgs
try:
from dynamo.llm import KvRouterConfig, MockEngineArgs
except ImportError:
pytest.skip("dynamo.llm bindings not available", allow_module_level=True)
from dynamo.profiler.utils import replay_optimize
from dynamo.profiler.utils.replay_optimize import (
DenseAggReplayState,
......
......@@ -5,7 +5,10 @@
import pytest
from dynamo.profiler.rapid import _DEFAULT_NAIVE_BACKEND
try:
from dynamo.profiler.rapid import _DEFAULT_NAIVE_BACKEND
except ImportError:
pytest.skip("dynamo.llm bindings not available", allow_module_level=True)
pytestmark = [
pytest.mark.unit,
......
......@@ -9,7 +9,10 @@ from unittest.mock import AsyncMock, patch
import pytest
from dynamo.profiler.utils.config_modifiers import CONFIG_MODIFIERS
try:
from dynamo.profiler.utils.config_modifiers import CONFIG_MODIFIERS
except ImportError:
pytest.skip("dynamo.llm bindings not available", allow_module_level=True)
from dynamo.profiler.utils.config_modifiers.parallelization_mapping import (
PickedParallelConfig,
)
......
......@@ -153,7 +153,7 @@ def _run_planner_replay(
from dynamo.planner.offline.replay_adapter import ReplayPlannerAdapter
planner_config = PlannerConfig.from_config_arg(planner_config_arg)
planner_config.no_operation = True
planner_config.advisory = True
if planner_config.mode == "agg":
if extra_engine_args is None:
......@@ -326,6 +326,10 @@ def main(argv: Sequence[str] | None = None) -> int:
sys.stdout.write("\n")
sys.stdout.write(f"Saved full report to: {report_path}\n")
sys.stdout.write(f"Planner ticks: {planner_report.total_ticks}\n")
if planner_report.html_report_path:
sys.stdout.write(
f"Planner diagnostics report: {planner_report.html_report_path}\n"
)
return 0
if using_trace_file:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment