Unverified Commit 9cf9ef18 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat(planner): add optimization_target for easy-mode scaling (#8137)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent 2cabf441
...@@ -55,6 +55,15 @@ class PlannerConfig(BaseModel): ...@@ -55,6 +55,15 @@ class PlannerConfig(BaseModel):
) )
backend: Literal["vllm", "sglang", "trtllm", "mocker"] = SLAPlannerDefaults.backend backend: Literal["vllm", "sglang", "trtllm", "mocker"] = SLAPlannerDefaults.backend
mode: Literal["disagg", "prefill", "decode", "agg"] = SLAPlannerDefaults.mode mode: Literal["disagg", "prefill", "decode", "agg"] = SLAPlannerDefaults.mode
optimization_target: Literal["throughput", "latency", "sla"] = Field(
default="throughput",
description=(
"Scaling optimization target. "
"'throughput' (default) and 'latency' use static thresholds on queue "
"depth and KV cache utilization — no SLA targets or profiling needed. "
"'sla' uses regression-based scaling that targets specific ttft/itl values."
),
)
no_operation: bool = SLAPlannerDefaults.no_operation no_operation: bool = SLAPlannerDefaults.no_operation
log_dir: Optional[str] = SLAPlannerDefaults.log_dir log_dir: Optional[str] = SLAPlannerDefaults.log_dir
...@@ -163,6 +172,20 @@ class PlannerConfig(BaseModel): ...@@ -163,6 +172,20 @@ class PlannerConfig(BaseModel):
"Please specify the namespace where GlobalPlanner is running." "Please specify the namespace where GlobalPlanner is running."
) )
# Easy mode: force load scaling on, throughput scaling off
if self.optimization_target != "sla":
self.enable_load_scaling = True
self.enable_throughput_scaling = False
if (
self.ttft != SLAPlannerDefaults.ttft
or self.itl != SLAPlannerDefaults.itl
):
logger.warning(
"optimization_target=%s ignores ttft/itl values; "
"set optimization_target='sla' to use SLA-based scaling",
self.optimization_target,
)
# At least one scaling mode must be enabled # At least one scaling mode must be enabled
if not self.enable_throughput_scaling and not self.enable_load_scaling: if not self.enable_throughput_scaling and not self.enable_load_scaling:
raise ValueError( raise ValueError(
......
...@@ -77,6 +77,7 @@ def _engine_caps( ...@@ -77,6 +77,7 @@ def _engine_caps(
else None, else None,
max_num_seqs=worker_info.max_num_seqs if worker_info else None, max_num_seqs=worker_info.max_num_seqs if worker_info else None,
context_length=worker_info.context_length if worker_info else None, context_length=worker_info.context_length if worker_info else None,
max_kv_tokens=worker_info.max_kv_tokens if worker_info else None,
) )
......
...@@ -20,6 +20,19 @@ if TYPE_CHECKING: ...@@ -20,6 +20,19 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# -- Easy-mode static thresholds (optimization_target != "sla") -----------
# Prefill: ratio of queued_prefill_tokens / context_length
_PREFILL_THROUGHPUT_SCALE_UP = 1.0 # queued >= context_length
_PREFILL_THROUGHPUT_SCALE_DOWN = 0.1 # queued < context_length / 10
_PREFILL_LATENCY_SCALE_UP = 0.1 # queued >= context_length / 10
_PREFILL_LATENCY_SCALE_DOWN = 0.0 # queued == 0
# Decode/Agg: KV cache utilization (scheduled + queued) / max_kv_tokens
_DECODE_THROUGHPUT_SCALE_UP = 1.0 # util > 100%
_DECODE_THROUGHPUT_SCALE_DOWN = 0.6 # util < 60%
_DECODE_LATENCY_SCALE_UP = 0.4 # util > 40%
_DECODE_LATENCY_SCALE_DOWN = 0.1 # util < 10%
class LoadScalingMixin: class LoadScalingMixin:
"""FPM-driven load-based scaling decisions.""" """FPM-driven load-based scaling decisions."""
...@@ -60,6 +73,14 @@ class LoadScalingMixin: ...@@ -60,6 +73,14 @@ class LoadScalingMixin:
self._diag_load_reason = "worker_count_mismatch" self._diag_load_reason = "worker_count_mismatch"
return None return None
easy = self._config.optimization_target != "sla"
if easy:
desired = (
self._prefill_easy_decision(fpm_stats, num_workers)
if component == "prefill"
else self._decode_easy_decision(fpm_stats, num_workers)
)
else:
desired = ( desired = (
self._prefill_load_decision(fpm_stats, num_workers) self._prefill_load_decision(fpm_stats, num_workers)
if component == "prefill" if component == "prefill"
...@@ -113,13 +134,22 @@ class LoadScalingMixin: ...@@ -113,13 +134,22 @@ class LoadScalingMixin:
self._diag_load_reason = "worker_count_mismatch" self._diag_load_reason = "worker_count_mismatch"
return None return None
easy = self._config.optimization_target != "sla"
p_desired = ( p_desired = (
self._prefill_load_decision(p_stats, self._num_p_workers) (
self._prefill_easy_decision(p_stats, self._num_p_workers)
if easy
else self._prefill_load_decision(p_stats, self._num_p_workers)
)
if p_stats if p_stats
else None else None
) )
d_desired = ( d_desired = (
self._decode_load_decision(d_stats, self._num_d_workers) (
self._decode_easy_decision(d_stats, self._num_d_workers)
if easy
else self._decode_load_decision(d_stats, self._num_d_workers)
)
if d_stats if d_stats
else None else None
) )
...@@ -174,6 +204,34 @@ class LoadScalingMixin: ...@@ -174,6 +204,34 @@ class LoadScalingMixin:
if not self._reconcile_fpm_worker_count(fpm_stats, num_workers, "agg"): if not self._reconcile_fpm_worker_count(fpm_stats, num_workers, "agg"):
self._diag_load_reason = "worker_count_mismatch" self._diag_load_reason = "worker_count_mismatch"
return None return None
easy = self._config.optimization_target != "sla"
if easy:
desired = self._agg_easy_decision(fpm_stats, num_workers)
# For agg easy mode, we directly get a single decision
# _agg_easy_decision already sets _diag_load_reason before returning None
if desired is None:
return None
original_desired = desired
desired = max(desired, self._config.min_endpoint)
if self._config.enable_throughput_scaling:
desired = max(desired, self._throughput_lower_bound_d)
desired = self._apply_single_budget(desired, "decode")
if desired < num_workers:
if desired > original_desired:
self._diag_load_reason = "scale_down_capped_by_throughput"
else:
self._diag_load_reason = "scale_down"
elif desired > num_workers:
self._diag_load_reason = "scale_up"
else:
self._diag_load_reason = "no_change"
logger.info(f"Agg easy-mode scaling: {num_workers} -> {desired}")
return ScalingDecision(num_decode=desired)
if not self._agg_regression.has_sufficient_data(): if not self._agg_regression.has_sufficient_data():
logger.info( logger.info(
f"Agg regression: insufficient data " f"Agg regression: insufficient data "
...@@ -358,6 +416,193 @@ class LoadScalingMixin: ...@@ -358,6 +416,193 @@ class LoadScalingMixin:
return self._scale_decision(estimates, self._config.itl, num_workers, "agg ITL") return self._scale_decision(estimates, self._config.itl, num_workers, "agg ITL")
# ------------------------------------------------------------------
# Easy-mode decision methods (optimization_target != "sla")
# ------------------------------------------------------------------
def _prefill_easy_decision(
self, fpm_stats: dict[tuple[str, int], ForwardPassMetrics], num_workers: int
) -> Optional[int]:
p_caps = self._capabilities.prefill
ctx_len = p_caps.context_length if p_caps else None
if not ctx_len or ctx_len <= 0:
logger.warning(
"context_length not available, skipping easy prefill scaling"
)
self._diag_load_reason = "insufficient_data"
return None
if num_workers == 0:
self._diag_load_reason = "insufficient_data"
return None
is_latency = self._config.optimization_target == "latency"
up_thresh = (
_PREFILL_LATENCY_SCALE_UP if is_latency else _PREFILL_THROUGHPUT_SCALE_UP
)
down_thresh = (
_PREFILL_LATENCY_SCALE_DOWN
if is_latency
else _PREFILL_THROUGHPUT_SCALE_DOWN
)
ratios: list[float] = []
for (wid, dp), fpm in fpm_stats.items():
queued = fpm.queued_requests.sum_prefill_tokens
ratio = queued / ctx_len
ratios.append(ratio)
logger.info(
f"Easy prefill {wid}:dp{dp}: queued={queued}, "
f"context_length={ctx_len}, ratio={ratio:.3f}"
)
if not ratios:
self._diag_load_reason = "insufficient_data"
return None
# Scale up if ANY engine above threshold
if any(r >= up_thresh for r in ratios):
logger.info(
f"Easy prefill: engine(s) above scale-up threshold "
f"({up_thresh}), scaling up to {num_workers + 1}"
)
return num_workers + 1
# Scale down if ALL engines below threshold
if num_workers > 1:
if is_latency:
# For latency mode, scale down when ALL queues are empty
if all(r <= down_thresh for r in ratios):
desired = max(num_workers - 1, self._config.min_endpoint)
logger.info(
f"Easy prefill: all engines at zero queue, -> {desired}"
)
return desired
else:
if all(r < down_thresh for r in ratios):
desired = max(num_workers - 1, self._config.min_endpoint)
logger.info(
f"Easy prefill: all engines below scale-down threshold "
f"({down_thresh}), -> {desired}"
)
return desired
self._diag_load_reason = "no_change"
return None
def _decode_easy_decision(
self, fpm_stats: dict[tuple[str, int], ForwardPassMetrics], num_workers: int
) -> Optional[int]:
d_caps = self._capabilities.decode
max_kv = d_caps.max_kv_tokens if d_caps else None
if not max_kv or max_kv <= 0:
logger.warning("max_kv_tokens not available, skipping easy decode scaling")
self._diag_load_reason = "insufficient_data"
return None
if num_workers == 0:
self._diag_load_reason = "insufficient_data"
return None
is_latency = self._config.optimization_target == "latency"
up_thresh = (
_DECODE_LATENCY_SCALE_UP if is_latency else _DECODE_THROUGHPUT_SCALE_UP
)
down_thresh = (
_DECODE_LATENCY_SCALE_DOWN if is_latency else _DECODE_THROUGHPUT_SCALE_DOWN
)
utils: list[float] = []
for (wid, dp), fpm in fpm_stats.items():
sched_kv = fpm.scheduled_requests.sum_decode_kv_tokens
queued_kv = fpm.queued_requests.sum_decode_kv_tokens
util = (sched_kv + queued_kv) / max_kv
utils.append(util)
logger.info(
f"Easy decode {wid}:dp{dp}: sched_kv={sched_kv}, "
f"queued_kv={queued_kv}, max_kv={max_kv}, util={util:.3f}"
)
if not utils:
self._diag_load_reason = "insufficient_data"
return None
if any(u > up_thresh for u in utils):
logger.info(
f"Easy decode: engine(s) above scale-up threshold "
f"({up_thresh}), scaling up to {num_workers + 1}"
)
return num_workers + 1
if num_workers > 1 and all(u < down_thresh for u in utils):
desired = max(num_workers - 1, self._config.min_endpoint)
logger.info(
f"Easy decode: all engines below scale-down threshold "
f"({down_thresh}), -> {desired}"
)
return desired
self._diag_load_reason = "no_change"
return None
def _agg_easy_decision(
self, fpm_stats: dict[tuple[str, int], ForwardPassMetrics], num_workers: int
) -> Optional[int]:
"""Easy-mode decision for agg: uses combined KV utilization including queued prefill."""
d_caps = self._capabilities.decode
max_kv = d_caps.max_kv_tokens if d_caps else None
if not max_kv or max_kv <= 0:
logger.warning("max_kv_tokens not available, skipping easy agg scaling")
self._diag_load_reason = "insufficient_data"
return None
if num_workers == 0:
self._diag_load_reason = "insufficient_data"
return None
is_latency = self._config.optimization_target == "latency"
up_thresh = (
_DECODE_LATENCY_SCALE_UP if is_latency else _DECODE_THROUGHPUT_SCALE_UP
)
down_thresh = (
_DECODE_LATENCY_SCALE_DOWN if is_latency else _DECODE_THROUGHPUT_SCALE_DOWN
)
utils: list[float] = []
for (wid, dp), fpm in fpm_stats.items():
sched_kv = fpm.scheduled_requests.sum_decode_kv_tokens
queued_kv = fpm.queued_requests.sum_decode_kv_tokens
queued_prefill = fpm.queued_requests.sum_prefill_tokens
util = (sched_kv + queued_kv + queued_prefill) / max_kv
utils.append(util)
logger.info(
f"Easy agg {wid}:dp{dp}: sched_kv={sched_kv}, queued_kv={queued_kv}, "
f"queued_prefill={queued_prefill}, max_kv={max_kv}, util={util:.3f}"
)
if not utils:
self._diag_load_reason = "insufficient_data"
return None
if any(u > up_thresh for u in utils):
logger.info(
f"Easy agg: engine(s) above scale-up threshold "
f"({up_thresh}), scaling up to {num_workers + 1}"
)
return num_workers + 1
if num_workers > 1 and all(u < down_thresh for u in utils):
desired = max(num_workers - 1, self._config.min_endpoint)
logger.info(
f"Easy agg: all engines below scale-down threshold "
f"({down_thresh}), -> {desired}"
)
return desired
self._diag_load_reason = "no_change"
return None
# ------------------------------------------------------------------
# SLA-based per-engine latency estimation
# ------------------------------------------------------------------
def _scale_decision( def _scale_decision(
self, estimates: list[float], sla: float, num_workers: int, label: str self, estimates: list[float], sla: float, num_workers: int, label: str
) -> Optional[int]: ) -> Optional[int]:
......
...@@ -66,7 +66,10 @@ class PlannerStateMachine(LoadScalingMixin, ThroughputScalingMixin): ...@@ -66,7 +66,10 @@ class PlannerStateMachine(LoadScalingMixin, ThroughputScalingMixin):
self._is_agg = config.mode == "agg" self._is_agg = config.mode == "agg"
self._has_prefill = config.mode in ("disagg", "prefill") self._has_prefill = config.mode in ("disagg", "prefill")
self._has_decode = config.mode in ("disagg", "decode", "agg") self._has_decode = config.mode in ("disagg", "decode", "agg")
self._is_easy = config.optimization_target != "sla"
# Easy mode uses static thresholds -- no regression or predictors needed
if not self._is_easy:
if self._is_agg: if self._is_agg:
self._agg_regression = AggRegressionModel( self._agg_regression = AggRegressionModel(
max_num_fpm_samples=config.max_num_fpm_samples, max_num_fpm_samples=config.max_num_fpm_samples,
...@@ -132,6 +135,9 @@ class PlannerStateMachine(LoadScalingMixin, ThroughputScalingMixin): ...@@ -132,6 +135,9 @@ class PlannerStateMachine(LoadScalingMixin, ThroughputScalingMixin):
decode_fpms: Optional[list[ForwardPassMetrics]] = None, decode_fpms: Optional[list[ForwardPassMetrics]] = None,
agg_fpms: Optional[list[ForwardPassMetrics]] = None, agg_fpms: Optional[list[ForwardPassMetrics]] = None,
) -> None: ) -> None:
if self._is_easy:
logger.debug("Skipping benchmark FPM loading in easy mode")
return
if agg_fpms and self._is_agg: if agg_fpms and self._is_agg:
self._agg_regression.load_benchmark_fpms(agg_fpms) self._agg_regression.load_benchmark_fpms(agg_fpms)
logger.info(f"Bootstrapped agg regression with {len(agg_fpms)} FPMs") logger.info(f"Bootstrapped agg regression with {len(agg_fpms)} FPMs")
...@@ -145,6 +151,9 @@ class PlannerStateMachine(LoadScalingMixin, ThroughputScalingMixin): ...@@ -145,6 +151,9 @@ class PlannerStateMachine(LoadScalingMixin, ThroughputScalingMixin):
logger.info(f"Bootstrapped decode regression with {len(decode_fpms)} FPMs") logger.info(f"Bootstrapped decode regression with {len(decode_fpms)} FPMs")
def warm_load_predictors(self, observations: list[TrafficObservation]) -> None: def warm_load_predictors(self, observations: list[TrafficObservation]) -> None:
if self._is_easy:
logger.debug("Skipping load predictor warmup in easy mode")
return
for obs in observations: for obs in observations:
self._num_req_predictor.add_data_point(obs.num_req) self._num_req_predictor.add_data_point(obs.num_req)
self._isl_predictor.add_data_point(obs.isl) self._isl_predictor.add_data_point(obs.isl)
...@@ -163,6 +172,7 @@ class PlannerStateMachine(LoadScalingMixin, ThroughputScalingMixin): ...@@ -163,6 +172,7 @@ class PlannerStateMachine(LoadScalingMixin, ThroughputScalingMixin):
if tick.run_load_scaling: if tick.run_load_scaling:
if tick_input.fpm_observations is not None: if tick_input.fpm_observations is not None:
if not self._is_easy:
self._observe_fpm(tick_input.fpm_observations) self._observe_fpm(tick_input.fpm_observations)
load_decision = self._advance_load(tick_input.fpm_observations) load_decision = self._advance_load(tick_input.fpm_observations)
if load_decision is not None: if load_decision is not None:
......
...@@ -134,6 +134,7 @@ class EngineCapabilities: ...@@ -134,6 +134,7 @@ class EngineCapabilities:
max_num_batched_tokens: Optional[int] = None max_num_batched_tokens: Optional[int] = None
max_num_seqs: Optional[int] = None max_num_seqs: Optional[int] = None
context_length: Optional[int] = None context_length: Optional[int] = None
max_kv_tokens: Optional[int] = None
@dataclass @dataclass
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Tests for easy-mode scaling (optimization_target = throughput | latency)."""
import pytest
try:
import msgspec # noqa: F401
except ImportError:
pytest.skip("msgspec required for FPM tests", allow_module_level=True)
from dynamo.common.forward_pass_metrics import (
ForwardPassMetrics,
QueuedRequestMetrics,
ScheduledRequestMetrics,
)
from dynamo.planner.config.planner_config import PlannerConfig
from dynamo.planner.core.state_machine import PlannerStateMachine
from dynamo.planner.core.types import (
EngineCapabilities,
FpmObservations,
ScheduledTick,
TickInput,
WorkerCapabilities,
WorkerCounts,
)
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.pre_merge,
pytest.mark.unit,
pytest.mark.planner,
]
def _tick_for(tick_input: TickInput) -> ScheduledTick:
has_fpm = tick_input.fpm_observations is not None
return ScheduledTick(
at_s=tick_input.now_s,
run_load_scaling=has_fpm,
run_throughput_scaling=False,
need_worker_states=True,
need_worker_fpm=has_fpm,
)
def _make_fpm(
*,
sum_prefill_tokens: int = 0,
num_prefill_requests: int = 0,
sum_decode_kv_tokens: int = 0,
num_decode_requests: int = 0,
queued_prefill_tokens: int = 0,
queued_decode_kv_tokens: int = 0,
wall_time: float = 0.01,
worker_id: str = "w1",
dp_rank: int = 0,
) -> ForwardPassMetrics:
return ForwardPassMetrics(
worker_id=worker_id,
dp_rank=dp_rank,
wall_time=wall_time,
scheduled_requests=ScheduledRequestMetrics(
sum_prefill_tokens=sum_prefill_tokens,
num_prefill_requests=num_prefill_requests,
sum_decode_kv_tokens=sum_decode_kv_tokens,
num_decode_requests=num_decode_requests,
),
queued_requests=QueuedRequestMetrics(
sum_prefill_tokens=queued_prefill_tokens,
sum_decode_kv_tokens=queued_decode_kv_tokens,
),
)
def _easy_config(**overrides) -> PlannerConfig:
defaults = dict(
mode="disagg",
optimization_target="throughput",
enable_load_scaling=True,
enable_throughput_scaling=False,
min_endpoint=1,
max_gpu_budget=-1,
load_adjustment_interval=5,
max_num_fpm_samples=50,
fpm_sample_bucket_size=16,
load_min_observations=5,
load_predictor="constant",
no_operation=True,
backend="vllm",
metric_pulling_prometheus_endpoint="http://localhost:9090",
metric_reporting_prometheus_port=0,
)
defaults.update(overrides)
return PlannerConfig.model_construct(**defaults)
CONTEXT_LENGTH = 4096
MAX_KV_TOKENS = 100000
def _prefill_caps() -> WorkerCapabilities:
return WorkerCapabilities(
prefill=EngineCapabilities(
num_gpu=1, context_length=CONTEXT_LENGTH, max_num_batched_tokens=2048
),
decode=EngineCapabilities(
num_gpu=1, max_kv_tokens=MAX_KV_TOKENS, max_num_batched_tokens=2048
),
)
def _decode_caps() -> WorkerCapabilities:
return WorkerCapabilities(
decode=EngineCapabilities(
num_gpu=1, max_kv_tokens=MAX_KV_TOKENS, max_num_batched_tokens=2048
),
)
def _agg_caps() -> WorkerCapabilities:
return WorkerCapabilities(
decode=EngineCapabilities(
num_gpu=1,
max_kv_tokens=MAX_KV_TOKENS,
max_num_batched_tokens=2048,
context_length=CONTEXT_LENGTH,
),
)
def _make_core(config=None, caps=None, **overrides) -> PlannerStateMachine:
cfg = config or _easy_config(**overrides)
return PlannerStateMachine(cfg, caps or _prefill_caps())
# ── Config validation ────────────────────────────────────────────────
class TestEasyConfig:
def test_throughput_forces_load_on_throughput_off(self):
cfg = PlannerConfig.model_validate(dict(optimization_target="throughput"))
assert cfg.enable_load_scaling is True
assert cfg.enable_throughput_scaling is False
def test_latency_forces_load_on_throughput_off(self):
cfg = PlannerConfig.model_validate(dict(optimization_target="latency"))
assert cfg.enable_load_scaling is True
assert cfg.enable_throughput_scaling is False
def test_sla_mode_preserves_original_flags(self):
cfg = PlannerConfig.model_validate(
dict(
optimization_target="sla",
enable_load_scaling=True,
enable_throughput_scaling=True,
pre_deployment_sweeping_mode="rapid",
throughput_adjustment_interval=60,
load_adjustment_interval=5,
)
)
assert cfg.enable_load_scaling is True
assert cfg.enable_throughput_scaling is True
def test_no_regression_in_easy_mode(self):
core = _make_core(optimization_target="throughput", mode="prefill")
assert not hasattr(core, "_prefill_regression")
def test_no_predictors_in_easy_mode(self):
core = _make_core(optimization_target="throughput", mode="prefill")
assert not hasattr(core, "_num_req_predictor")
# ── Prefill throughput scaling ───────────────────────────────────────
class TestPrefillThroughputEasy:
def test_scale_up_at_context_length(self):
core = _make_core(mode="prefill", optimization_target="throughput")
fpm = _make_fpm(queued_prefill_tokens=CONTEXT_LENGTH)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(prefill={("w1", 0): fpm}),
worker_counts=WorkerCounts(ready_num_prefill=1),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is not None
assert effects.scale_to.num_prefill == 2
assert effects.diagnostics.load_decision_reason == "scale_up"
def test_no_change_between_thresholds(self):
core = _make_core(mode="prefill", optimization_target="throughput")
# queued = context_length / 2 -> between 0.1 and 1.0
fpm1 = _make_fpm(worker_id="w1", queued_prefill_tokens=CONTEXT_LENGTH // 2)
fpm2 = _make_fpm(worker_id="w2", queued_prefill_tokens=CONTEXT_LENGTH // 2)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(
prefill={("w1", 0): fpm1, ("w2", 0): fpm2}
),
worker_counts=WorkerCounts(ready_num_prefill=2),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is None
assert effects.diagnostics.load_decision_reason == "no_change"
def test_scale_down_below_tenth(self):
core = _make_core(mode="prefill", optimization_target="throughput")
# queued < context_length / 10
fpm1 = _make_fpm(worker_id="w1", queued_prefill_tokens=CONTEXT_LENGTH // 20)
fpm2 = _make_fpm(worker_id="w2", queued_prefill_tokens=CONTEXT_LENGTH // 20)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(
prefill={("w1", 0): fpm1, ("w2", 0): fpm2}
),
worker_counts=WorkerCounts(ready_num_prefill=2),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is not None
assert effects.scale_to.num_prefill == 1
assert effects.diagnostics.load_decision_reason == "scale_down"
def test_no_scale_down_at_min(self):
core = _make_core(mode="prefill", optimization_target="throughput")
fpm = _make_fpm(queued_prefill_tokens=0)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(prefill={("w1", 0): fpm}),
worker_counts=WorkerCounts(ready_num_prefill=1),
)
effects = core.on_tick(_tick_for(tick), tick)
# Already at 1 worker, can't scale down further
assert effects.scale_to is None
# ── Prefill latency scaling ──────────────────────────────────────────
class TestPrefillLatencyEasy:
def test_scale_up_at_tenth(self):
core = _make_core(mode="prefill", optimization_target="latency")
# Use exact tenth (ceil to avoid int division rounding below threshold)
fpm = _make_fpm(queued_prefill_tokens=CONTEXT_LENGTH // 10 + 1)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(prefill={("w1", 0): fpm}),
worker_counts=WorkerCounts(ready_num_prefill=1),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is not None
assert effects.scale_to.num_prefill == 2
def test_scale_down_at_zero(self):
core = _make_core(mode="prefill", optimization_target="latency")
fpm1 = _make_fpm(worker_id="w1", queued_prefill_tokens=0)
fpm2 = _make_fpm(worker_id="w2", queued_prefill_tokens=0)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(
prefill={("w1", 0): fpm1, ("w2", 0): fpm2}
),
worker_counts=WorkerCounts(ready_num_prefill=2),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is not None
assert effects.scale_to.num_prefill == 1
def test_no_scale_down_with_any_queued(self):
core = _make_core(mode="prefill", optimization_target="latency")
fpm1 = _make_fpm(worker_id="w1", queued_prefill_tokens=10)
fpm2 = _make_fpm(worker_id="w2", queued_prefill_tokens=10)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(
prefill={("w1", 0): fpm1, ("w2", 0): fpm2}
),
worker_counts=WorkerCounts(ready_num_prefill=2),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is None
# ── Decode throughput scaling ────────────────────────────────────────
class TestDecodeThroughputEasy:
def test_scale_up_above_100_pct(self):
core = _make_core(
mode="decode", optimization_target="throughput", caps=_decode_caps()
)
# util = (80000 + 30000) / 100000 = 1.1 > 1.0
fpm = _make_fpm(sum_decode_kv_tokens=80000, queued_decode_kv_tokens=30000)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(decode={("w1", 0): fpm}),
worker_counts=WorkerCounts(ready_num_decode=1),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is not None
assert effects.scale_to.num_decode == 2
assert effects.diagnostics.load_decision_reason == "scale_up"
def test_scale_down_below_60_pct(self):
core = _make_core(
mode="decode", optimization_target="throughput", caps=_decode_caps()
)
# util = (40000 + 0) / 100000 = 0.4 < 0.6
fpm1 = _make_fpm(
worker_id="w1", sum_decode_kv_tokens=40000, queued_decode_kv_tokens=0
)
fpm2 = _make_fpm(
worker_id="w2", sum_decode_kv_tokens=40000, queued_decode_kv_tokens=0
)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(decode={("w1", 0): fpm1, ("w2", 0): fpm2}),
worker_counts=WorkerCounts(ready_num_decode=2),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is not None
assert effects.scale_to.num_decode == 1
def test_no_change_between_thresholds(self):
core = _make_core(
mode="decode", optimization_target="throughput", caps=_decode_caps()
)
# util = (70000 + 0) / 100000 = 0.7 -> between 0.6 and 1.0
fpm1 = _make_fpm(
worker_id="w1", sum_decode_kv_tokens=70000, queued_decode_kv_tokens=0
)
fpm2 = _make_fpm(
worker_id="w2", sum_decode_kv_tokens=70000, queued_decode_kv_tokens=0
)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(decode={("w1", 0): fpm1, ("w2", 0): fpm2}),
worker_counts=WorkerCounts(ready_num_decode=2),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is None
# ── Decode latency scaling ───────────────────────────────────────────
class TestDecodeLatencyEasy:
def test_scale_up_above_40_pct(self):
core = _make_core(
mode="decode", optimization_target="latency", caps=_decode_caps()
)
# util = (45000 + 0) / 100000 = 0.45 > 0.4
fpm = _make_fpm(sum_decode_kv_tokens=45000, queued_decode_kv_tokens=0)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(decode={("w1", 0): fpm}),
worker_counts=WorkerCounts(ready_num_decode=1),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is not None
assert effects.scale_to.num_decode == 2
def test_scale_down_below_10_pct(self):
core = _make_core(
mode="decode", optimization_target="latency", caps=_decode_caps()
)
# util = (5000 + 0) / 100000 = 0.05 < 0.1
fpm1 = _make_fpm(
worker_id="w1", sum_decode_kv_tokens=5000, queued_decode_kv_tokens=0
)
fpm2 = _make_fpm(
worker_id="w2", sum_decode_kv_tokens=5000, queued_decode_kv_tokens=0
)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(decode={("w1", 0): fpm1, ("w2", 0): fpm2}),
worker_counts=WorkerCounts(ready_num_decode=2),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is not None
assert effects.scale_to.num_decode == 1
# ── ANY-up / ALL-down logic ─────────────────────────────────────────
class TestAnyUpAllDown:
def test_any_engine_above_triggers_scale_up(self):
"""One engine above threshold, one below -> scale up."""
core = _make_core(
mode="decode", optimization_target="throughput", caps=_decode_caps()
)
fpm_ok = _make_fpm(
worker_id="w1", sum_decode_kv_tokens=50000, queued_decode_kv_tokens=0
)
fpm_hot = _make_fpm(
worker_id="w2", sum_decode_kv_tokens=80000, queued_decode_kv_tokens=30000
)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(
decode={("w1", 0): fpm_ok, ("w2", 0): fpm_hot}
),
worker_counts=WorkerCounts(ready_num_decode=2),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is not None
assert effects.scale_to.num_decode == 3
def test_mixed_no_scale_down(self):
"""One engine below scale-down threshold, one above -> no change."""
core = _make_core(
mode="decode", optimization_target="throughput", caps=_decode_caps()
)
fpm_low = _make_fpm(
worker_id="w1", sum_decode_kv_tokens=20000, queued_decode_kv_tokens=0
)
fpm_mid = _make_fpm(
worker_id="w2", sum_decode_kv_tokens=70000, queued_decode_kv_tokens=0
)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(
decode={("w1", 0): fpm_low, ("w2", 0): fpm_mid}
),
worker_counts=WorkerCounts(ready_num_decode=2),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is None
def test_all_below_triggers_scale_down(self):
"""All engines below scale-down threshold -> scale down."""
core = _make_core(
mode="decode", optimization_target="throughput", caps=_decode_caps()
)
fpm1 = _make_fpm(
worker_id="w1", sum_decode_kv_tokens=20000, queued_decode_kv_tokens=0
)
fpm2 = _make_fpm(
worker_id="w2", sum_decode_kv_tokens=30000, queued_decode_kv_tokens=0
)
fpm3 = _make_fpm(
worker_id="w3", sum_decode_kv_tokens=25000, queued_decode_kv_tokens=0
)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(
decode={("w1", 0): fpm1, ("w2", 0): fpm2, ("w3", 0): fpm3}
),
worker_counts=WorkerCounts(ready_num_decode=3),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is not None
assert effects.scale_to.num_decode == 2
# ── Disagg mode ──────────────────────────────────────────────────────
class TestDisaggEasy:
def test_disagg_scale_up_prefill(self):
core = _make_core(mode="disagg", optimization_target="throughput")
p_fpm = _make_fpm(queued_prefill_tokens=CONTEXT_LENGTH)
d_fpm = _make_fpm(sum_decode_kv_tokens=50000, queued_decode_kv_tokens=0)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(
prefill={("w1", 0): p_fpm},
decode={("w1", 0): d_fpm},
),
worker_counts=WorkerCounts(ready_num_prefill=1, ready_num_decode=1),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is not None
assert effects.scale_to.num_prefill == 2
def test_disagg_scale_up_decode(self):
core = _make_core(mode="disagg", optimization_target="throughput")
p_fpm = _make_fpm(queued_prefill_tokens=0)
d_fpm = _make_fpm(sum_decode_kv_tokens=80000, queued_decode_kv_tokens=30000)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(
prefill={("w1", 0): p_fpm},
decode={("w1", 0): d_fpm},
),
worker_counts=WorkerCounts(ready_num_prefill=1, ready_num_decode=1),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is not None
assert effects.scale_to.num_decode == 2
# ── Agg mode ─────────────────────────────────────────────────────────
class TestAggEasy:
def test_agg_scale_up_decode_heavy(self):
core = _make_core(
mode="agg", optimization_target="throughput", caps=_agg_caps()
)
# util = (80000 + 30000 + 0) / 100000 = 1.1 > 1.0
fpm = _make_fpm(
sum_decode_kv_tokens=80000,
queued_decode_kv_tokens=30000,
queued_prefill_tokens=0,
)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(decode={("w1", 0): fpm}),
worker_counts=WorkerCounts(ready_num_decode=1),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is not None
assert effects.scale_to.num_decode == 2
def test_agg_scale_up_prefill_heavy(self):
"""Agg includes queued prefill in utilization calc."""
core = _make_core(
mode="agg", optimization_target="throughput", caps=_agg_caps()
)
# util = (20000 + 0 + 90000) / 100000 = 1.1 > 1.0
fpm = _make_fpm(
sum_decode_kv_tokens=20000,
queued_decode_kv_tokens=0,
queued_prefill_tokens=90000,
)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(decode={("w1", 0): fpm}),
worker_counts=WorkerCounts(ready_num_decode=1),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is not None
assert effects.scale_to.num_decode == 2
def test_agg_scale_down(self):
core = _make_core(
mode="agg", optimization_target="throughput", caps=_agg_caps()
)
# util = (30000 + 0 + 0) / 100000 = 0.3 < 0.6
fpm1 = _make_fpm(
worker_id="w1",
sum_decode_kv_tokens=30000,
queued_decode_kv_tokens=0,
queued_prefill_tokens=0,
)
fpm2 = _make_fpm(
worker_id="w2",
sum_decode_kv_tokens=30000,
queued_decode_kv_tokens=0,
queued_prefill_tokens=0,
)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(decode={("w1", 0): fpm1, ("w2", 0): fpm2}),
worker_counts=WorkerCounts(ready_num_decode=2),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is not None
assert effects.scale_to.num_decode == 1
# ── Missing capabilities ─────────────────────────────────────────────
class TestMissingCapabilities:
def test_missing_context_length_skips_prefill(self):
caps = WorkerCapabilities(
prefill=EngineCapabilities(num_gpu=1), # no context_length
decode=EngineCapabilities(num_gpu=1, max_kv_tokens=MAX_KV_TOKENS),
)
core = _make_core(mode="prefill", optimization_target="throughput", caps=caps)
fpm = _make_fpm(queued_prefill_tokens=10000)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(prefill={("w1", 0): fpm}),
worker_counts=WorkerCounts(ready_num_prefill=1),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is None
assert effects.diagnostics.load_decision_reason == "insufficient_data"
def test_missing_max_kv_tokens_skips_decode(self):
caps = WorkerCapabilities(
decode=EngineCapabilities(num_gpu=1), # no max_kv_tokens
)
core = _make_core(mode="decode", optimization_target="throughput", caps=caps)
fpm = _make_fpm(sum_decode_kv_tokens=80000, queued_decode_kv_tokens=30000)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(decode={("w1", 0): fpm}),
worker_counts=WorkerCounts(ready_num_decode=1),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is None
assert effects.diagnostics.load_decision_reason == "insufficient_data"
# ── Scaling in progress ──────────────────────────────────────────────
class TestScalingInProgress:
def test_no_decision_when_scaling(self):
core = _make_core(mode="prefill", optimization_target="throughput")
fpm = _make_fpm(queued_prefill_tokens=CONTEXT_LENGTH * 2)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(prefill={("w1", 0): fpm}),
worker_counts=WorkerCounts(ready_num_prefill=1, expected_num_prefill=2),
)
effects = core.on_tick(_tick_for(tick), tick)
assert effects.scale_to is None
assert effects.diagnostics.load_decision_reason == "scaling_in_progress"
# ── Budget clamping ──────────────────────────────────────────────────
class TestBudgetClamping:
def test_min_endpoint_respected(self):
core = _make_core(
mode="prefill", optimization_target="throughput", min_endpoint=2
)
fpm = _make_fpm(queued_prefill_tokens=0)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(prefill={("w1", 0): fpm}),
worker_counts=WorkerCounts(ready_num_prefill=2),
)
effects = core.on_tick(_tick_for(tick), tick)
# Can't scale below min_endpoint=2
assert effects.scale_to is None
def test_gpu_budget_caps_scale_up(self):
caps = WorkerCapabilities(
prefill=EngineCapabilities(
num_gpu=4, context_length=CONTEXT_LENGTH, max_num_batched_tokens=2048
),
)
core = _make_core(
mode="prefill",
optimization_target="throughput",
max_gpu_budget=4,
caps=caps,
)
fpm = _make_fpm(queued_prefill_tokens=CONTEXT_LENGTH * 2)
tick = TickInput(
now_s=5.0,
fpm_observations=FpmObservations(prefill={("w1", 0): fpm}),
worker_counts=WorkerCounts(ready_num_prefill=1),
)
effects = core.on_tick(_tick_for(tick), tick)
# Would want 2 replicas (8 GPUs) but budget is 4 -> capped at 1
assert effects.scale_to is None or (
effects.scale_to.num_prefill is not None
and effects.scale_to.num_prefill * 4 <= 4
)
...@@ -114,6 +114,7 @@ def test_agg_mode_supports_throughput_scaling(): ...@@ -114,6 +114,7 @@ def test_agg_mode_supports_throughput_scaling():
config = PlannerConfig( config = PlannerConfig(
namespace="test-ns", namespace="test-ns",
mode="agg", mode="agg",
optimization_target="sla",
enable_throughput_scaling=True, enable_throughput_scaling=True,
enable_load_scaling=False, enable_load_scaling=False,
) )
......
...@@ -85,6 +85,7 @@ def _make_fpm( ...@@ -85,6 +85,7 @@ def _make_fpm(
def _make_config(**overrides) -> PlannerConfig: def _make_config(**overrides) -> PlannerConfig:
defaults = dict( defaults = dict(
mode="disagg", mode="disagg",
optimization_target="sla",
ttft=500.0, ttft=500.0,
itl=50.0, itl=50.0,
min_endpoint=1, min_endpoint=1,
......
...@@ -13,6 +13,7 @@ sla: ...@@ -13,6 +13,7 @@ sla:
itl: 50.0 itl: 50.0
features: features:
planner: planner:
optimization_target: sla
pre_deployment_sweeping_mode: rapid pre_deployment_sweeping_mode: rapid
enable_throughput_scaling: true enable_throughput_scaling: true
enable_load_scaling: false enable_load_scaling: false
......
...@@ -13,6 +13,7 @@ sla: ...@@ -13,6 +13,7 @@ sla:
itl: 50.0 itl: 50.0
features: features:
planner: planner:
optimization_target: sla
pre_deployment_sweeping_mode: rapid pre_deployment_sweeping_mode: rapid
enable_throughput_scaling: true enable_throughput_scaling: true
enable_load_scaling: false enable_load_scaling: false
......
...@@ -15,6 +15,7 @@ sla: ...@@ -15,6 +15,7 @@ sla:
itl: 50.0 itl: 50.0
features: features:
planner: planner:
optimization_target: sla
pre_deployment_sweeping_mode: rapid pre_deployment_sweeping_mode: rapid
enable_throughput_scaling: true enable_throughput_scaling: true
enable_load_scaling: false enable_load_scaling: false
......
...@@ -17,6 +17,7 @@ sla: ...@@ -17,6 +17,7 @@ sla:
searchStrategy: thorough searchStrategy: thorough
features: features:
planner: planner:
optimization_target: sla
pre_deployment_sweeping_mode: rapid pre_deployment_sweeping_mode: rapid
enable_throughput_scaling: true enable_throughput_scaling: true
enable_load_scaling: false enable_load_scaling: false
......
...@@ -15,6 +15,7 @@ sla: ...@@ -15,6 +15,7 @@ sla:
searchStrategy: thorough searchStrategy: thorough
features: features:
planner: planner:
optimization_target: sla
pre_deployment_sweeping_mode: thorough pre_deployment_sweeping_mode: thorough
enable_throughput_scaling: true enable_throughput_scaling: true
enable_load_scaling: false enable_load_scaling: false
......
...@@ -77,6 +77,7 @@ def _make_dgdr(**overrides) -> DynamoGraphDeploymentRequestSpec: ...@@ -77,6 +77,7 @@ def _make_dgdr(**overrides) -> DynamoGraphDeploymentRequestSpec:
def _make_planner(**overrides) -> PlannerConfig: def _make_planner(**overrides) -> PlannerConfig:
base = dict( base = dict(
optimization_target="sla",
enable_throughput_scaling=True, enable_throughput_scaling=True,
enable_load_scaling=False, enable_load_scaling=False,
pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid, pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
......
...@@ -17,6 +17,18 @@ LLM inference breaks these assumptions: ...@@ -17,6 +17,18 @@ LLM inference breaks these assumptions:
The Dynamo **Planner** is an autoscaler purpose-built for these constraints. It understands engine profiling data, tracks per-worker GPU utilization, predicts traffic patterns, and makes scaling decisions that directly target TTFT and ITL SLAs — not proxy metrics. The Dynamo **Planner** is an autoscaler purpose-built for these constraints. It understands engine profiling data, tracks per-worker GPU utilization, predicts traffic patterns, and makes scaling decisions that directly target TTFT and ITL SLAs — not proxy metrics.
## Getting Started: Optimization Targets
The planner offers three `optimization_target` settings that control how scaling decisions are made:
| Target | Description | Requires SLA? | Requires Profiling? |
|--------|-------------|:-------------:|:-------------------:|
| **`throughput`** (default) | Maximizes throughput by scaling based on queue depth and KV cache utilization. Scales up when engines are saturated, scales down when utilization drops. | No | No |
| **`latency`** | Minimizes latency by scaling aggressively to keep queues short. Scales up at lower utilization thresholds. | No | No |
| **`sla`** | Targets specific TTFT/ITL SLA values using regression-based performance models. Most precise, but requires configuration. | Yes (`ttft`, `itl`) | Recommended |
**We recommend starting with the default `throughput` target** — it works out of the box with zero configuration. Switch to `latency` if your workload is latency-sensitive, or to `sla` when you need precise SLA targeting with pre-deployment profiling.
> **New to the Planner?** Start with the [Planner Guide](planner-guide.md) for a complete workflow including profiling and deployment. > **New to the Planner?** Start with the [Planner Guide](planner-guide.md) for a complete workflow including profiling and deployment.
> **Need multi-DGD coordination?** See the [Global Planner Guide](global-planner.md) for shared-policy coordination across multiple DGDs and single-endpoint multi-pool deployments. > **Need multi-DGD coordination?** See the [Global Planner Guide](global-planner.md) for shared-policy coordination across multiple DGDs and single-endpoint multi-pool deployments.
...@@ -63,40 +75,51 @@ When both modes are enabled, throughput-based scaling provides a capacity floor ...@@ -63,40 +75,51 @@ When both modes are enabled, throughput-based scaling provides a capacity floor
- Dynamo platform installed on Kubernetes ([Installation Guide](../../kubernetes/installation-guide.md)) - Dynamo platform installed on Kubernetes ([Installation Guide](../../kubernetes/installation-guide.md))
- kube-prometheus-stack installed ([Metrics Setup](../../kubernetes/observability/metrics.md)) - kube-prometheus-stack installed ([Metrics Setup](../../kubernetes/observability/metrics.md))
For throughput-based scaling, pre-deployment engine performance data is also required (via self-benchmark mode or [Profiling Guide](../profiler/profiler-guide.md)). ### Default Mode (zero config)
### Throughput-Based Scaling (with DGDR)
The fastest path to a throughput-based planner deployment is through a DynamoGraphDeploymentRequest, which automatically profiles your model: The planner works out of the box with no configuration needed. By default, `optimization_target` is set to `throughput`, which uses static thresholds on queue depth and KV cache utilization — no SLAs or profiling required:
```bash ```yaml
kubectl apply -f components/src/dynamo/profiler/deploy/profile_sla_aic_dgdr.yaml -n $NAMESPACE # Minimal planner config — uses throughput optimization by default
features:
planner:
mode: disagg
backend: vllm
``` ```
See [Planner Guide](planner-guide.md) for the full workflow. For latency-sensitive workloads:
### Load-Based Scaling (without profiling)
To deploy with load-based scaling only (no profiling required), add these arguments to the planner service in your DGD:
```yaml ```yaml
args: features:
- --enable-loadbased-scaling planner:
- --disable-throughput-scaling mode: disagg
- --loadbased-adjustment-interval=5 backend: vllm
optimization_target: latency
``` ```
The planner will auto-discover the frontend metrics endpoint from the DGD. See [disagg_planner.yaml](https://github.com/ai-dynamo/dynamo/blob/main/examples/backends/vllm/deploy/disagg_planner.yaml) for a complete example. ### SLA-Based Scaling (advanced)
### Manual DGD Deployment For precise SLA targeting with pre-deployment profiling, set `optimization_target: sla`:
```yaml
features:
planner:
optimization_target: sla
enable_throughput_scaling: true
enable_load_scaling: true
ttft: 500.0
itl: 50.0
pre_deployment_sweeping_mode: rapid
```
For manual control with throughput-based scaling, use the disaggregated planner templates: The fastest path to SLA-based scaling is through a DynamoGraphDeploymentRequest, which automatically profiles your model:
```bash ```bash
# After profiling is complete kubectl apply -f components/src/dynamo/profiler/deploy/profile_sla_aic_dgdr.yaml -n $NAMESPACE
kubectl apply -f examples/backends/vllm/deploy/disagg_planner.yaml -n $NAMESPACE
``` ```
See [Planner Guide](planner-guide.md) for the full workflow.
## Current Limitations ## Current Limitations
### Load-based scaling ### Load-based scaling
...@@ -128,6 +151,7 @@ Load-based scaling has the following known limitations. Throughput-based scaling ...@@ -128,6 +151,7 @@ Load-based scaling has the following known limitations. Throughput-based scaling
| `--namespace` | `$DYN_NAMESPACE` or `dynamo` | Dynamo logical namespace | | `--namespace` | `$DYN_NAMESPACE` or `dynamo` | Dynamo logical namespace |
| `--backend` | `vllm` | Backend framework (`sglang`, `trtllm`, `vllm`) | | `--backend` | `vllm` | Backend framework (`sglang`, `trtllm`, `vllm`) |
| `--mode` | `disagg` | Planner mode (`disagg`, `prefill`, `decode`, `agg`) | | `--mode` | `disagg` | Planner mode (`disagg`, `prefill`, `decode`, `agg`) |
| `--optimization-target` | `throughput` | Scaling target: `throughput` (queue/util thresholds), `latency` (aggressive low-latency), `sla` (regression-based SLA targeting) |
| `--environment` | `kubernetes` | Deployment environment | | `--environment` | `kubernetes` | Deployment environment |
| `--ttft` | `500.0` | Target Time To First Token (ms) | | `--ttft` | `500.0` | Target Time To First Token (ms) |
| `--itl` | `50.0` | Target Inter-Token Latency (ms) | | `--itl` | `50.0` | Target Inter-Token Latency (ms) |
......
...@@ -10,16 +10,17 @@ For a quick overview, see the [Planner overview](README.md). For architecture in ...@@ -10,16 +10,17 @@ For a quick overview, see the [Planner overview](README.md). For architecture in
## Scaling Modes ## Scaling Modes
The planner supports two scaling modes that can be used independently or together: The planner supports three optimization targets that determine how scaling decisions are made:
- **Throughput-based scaling** (`enable_throughput_scaling: true`): Uses pre-deployment engine performance data (from self-benchmark or profiler) and traffic prediction to plan capacity. Best for stable, predictable workloads. - **`throughput`** (default): Uses static thresholds on queue depth and KV cache utilization. No SLA targets or profiling needed. Works out of the box.
- **Load-based scaling** (`enable_load_scaling: true`): Uses real-time ForwardPassMetrics (FPM) from the Dynamo event plane and online regression to make scaling decisions. Best for bursty or unpredictable traffic. Does not require pre-deployment data. - **`latency`**: Same approach as `throughput` but with more aggressive thresholds — scales up earlier and tolerates less queuing. Ideal for latency-sensitive workloads.
- **`sla`**: Uses regression-based performance models with specific TTFT/ITL targets. Supports both throughput-based (predictive) and load-based (reactive) scaling modes. For advanced users who need precise SLA control.
**When to use which:** **When to use which:**
- Enable **throughput-based scaling** whenever pre-deployment performance data is available (via self-benchmark or profiler). It provides stable, prediction-based capacity planning. - Start with **`throughput`** (the default) — it works immediately with no configuration.
- Enable **load-based scaling** when traffic is bursty. It reacts quickly to real-time load changes. - Switch to **`latency`** if your workload has strict latency requirements and you prefer to over-provision rather than queue.
- Enable **both** for the best of both worlds: throughput-based provides a capacity floor, load-based handles bursts above it. When both are enabled, use a longer `throughput_adjustment_interval`. - Use **`sla`** when you have pre-deployment profiling data and want to target specific TTFT/ITL values.
## PlannerConfig Reference ## PlannerConfig Reference
...@@ -28,6 +29,17 @@ The planner is configured via a `PlannerConfig` JSON/YAML object. When using the ...@@ -28,6 +29,17 @@ The planner is configured via a `PlannerConfig` JSON/YAML object. When using the
```yaml ```yaml
features: features:
planner: planner:
mode: disagg
backend: vllm
# optimization_target defaults to "throughput" — works out of the box
```
For SLA-based scaling:
```yaml
features:
planner:
optimization_target: sla
enable_throughput_scaling: true enable_throughput_scaling: true
enable_load_scaling: false enable_load_scaling: false
pre_deployment_sweeping_mode: rapid pre_deployment_sweeping_mode: rapid
...@@ -35,14 +47,22 @@ features: ...@@ -35,14 +47,22 @@ features:
backend: vllm backend: vllm
``` ```
### Scaling Mode Fields ### Optimization Target
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `optimization_target` | string | `throughput` | `throughput`: scale based on queue/utilization thresholds. `latency`: aggressive low-latency thresholds. `sla`: regression-based scaling with ttft/itl targets. |
When `optimization_target` is `throughput` or `latency`, load-based scaling is automatically enabled and throughput-based scaling is disabled. The `ttft`/`itl` fields are ignored.
### Scaling Mode Fields (SLA mode)
| Field | Type | Default | Description | | Field | Type | Default | Description |
|-------|------|---------|-------------| |-------|------|---------|-------------|
| `enable_throughput_scaling` | bool | `true` | Enable throughput-based scaling (requires pre-deployment performance data). | | `enable_throughput_scaling` | bool | `true` | Enable throughput-based scaling (requires pre-deployment performance data). Only used when `optimization_target: sla`. |
| `enable_load_scaling` | bool | `false` | Enable load-based scaling. | | `enable_load_scaling` | bool | `false` | Enable load-based scaling. Only used when `optimization_target: sla`. |
At least one scaling mode must be enabled. At least one scaling mode must be enabled when using `optimization_target: sla`.
### Pre-Deployment Sweeping ### Pre-Deployment Sweeping
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment