"tests/vscode:/vscode.git/clone" did not exist on "93208162753986f9449d3671d6a263dfc4f4381e"
Unverified Commit 359765d3 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat: load-based scaling in SLA Planner (#6145)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent 815b1291
...@@ -19,5 +19,28 @@ limitations under the License. ...@@ -19,5 +19,28 @@ limitations under the License.
SLA-driven autoscaling controller for Dynamo inference graphs. SLA-driven autoscaling controller for Dynamo inference graphs.
- **User docs**: [docs/planner/](/docs/pages/components/planner/) (deployment, configuration, examples) ## Scaling Modes
- **Design docs**: [docs/pages/design-docs/planner-design.md](/docs/pages/design-docs/planner-design.md) (architecture, algorithms)
The SLA Planner supports two scaling modes that can be used independently or together:
### Throughput-Based Scaling
Uses pre-deployment profiling data and traffic prediction to compute the number of prefill/decode replicas needed to meet TTFT and ITL SLA targets. Requires profiling data from the Dynamo profiler.
### Load-Based Scaling (Experimental)
Uses real-time per-worker load metrics (active prefill tokens, active KV blocks) from the router to make SLA-aware scaling decisions via online linear regression. Does not require profiling data. Responds quickly to traffic bursts.
When both modes are enabled, throughput-based scaling provides a lower bound on replicas while load-based scaling handles real-time adjustments.
### Support Matrix
| Deployment Type | Throughput-Based | Load-Based (Experimental) |
|-----------------|:----------------:|:-------------------------:|
| Disaggregated | Supported | Supported |
| Aggregated | Unsupported | Supported |
## Documentation
- **User docs**: [Planner Guide](../../../../docs/pages/components/planner/planner-guide.md) (deployment, configuration, examples)
- **Design docs**: [Planner Design](../../../../docs/pages/design-docs/planner-design.md) (architecture, algorithms)
...@@ -5,17 +5,12 @@ __all__ = [ ...@@ -5,17 +5,12 @@ __all__ = [
"PlannerConnector", "PlannerConnector",
"KubernetesConnector", "KubernetesConnector",
"VirtualConnector", "VirtualConnector",
"LoadPlannerDefaults",
"SLAPlannerDefaults", "SLAPlannerDefaults",
"TargetReplica", "TargetReplica",
"SubComponentType", "SubComponentType",
] ]
# Import the classes # Import the classes
from dynamo.planner.defaults import ( from dynamo.planner.defaults import SLAPlannerDefaults, SubComponentType
LoadPlannerDefaults,
SLAPlannerDefaults,
SubComponentType,
)
from dynamo.planner.kubernetes_connector import KubernetesConnector, TargetReplica from dynamo.planner.kubernetes_connector import KubernetesConnector, TargetReplica
from dynamo.planner.planner_connector import PlannerConnector from dynamo.planner.planner_connector import PlannerConnector
from dynamo.planner.virtual_connector import VirtualConnector from dynamo.planner.virtual_connector import VirtualConnector
......
...@@ -48,14 +48,6 @@ class BasePlannerDefaults: ...@@ -48,14 +48,6 @@ class BasePlannerDefaults:
metric_reporting_prometheus_port = int(os.environ.get("PLANNER_PROMETHEUS_PORT", 0)) metric_reporting_prometheus_port = int(os.environ.get("PLANNER_PROMETHEUS_PORT", 0))
class LoadPlannerDefaults(BasePlannerDefaults):
metric_pulling_interval = 10 # in seconds
decode_kv_scale_up_threshold = 0.9
decode_kv_scale_down_threshold = 0.5
prefill_queue_scale_up_threshold = 5.0
prefill_queue_scale_down_threshold = 0.2
class SLAPlannerDefaults(BasePlannerDefaults): class SLAPlannerDefaults(BasePlannerDefaults):
# Prometheus endpoint URL for pulling/querying metrics # Prometheus endpoint URL for pulling/querying metrics
metric_pulling_prometheus_endpoint = os.environ.get( metric_pulling_prometheus_endpoint = os.environ.get(
...@@ -81,6 +73,20 @@ class SLAPlannerDefaults(BasePlannerDefaults): ...@@ -81,6 +73,20 @@ class SLAPlannerDefaults(BasePlannerDefaults):
no_correction = False # disable correction factor, might be useful under some conditions like long cold start time no_correction = False # disable correction factor, might be useful under some conditions like long cold start time
mode = "disagg" # ["disagg", "prefill", "decode"] mode = "disagg" # ["disagg", "prefill", "decode"]
# Scaling mode flags
enable_throughput_scaling = True
enable_loadbased_scaling = False
# Load-based scaling settings
loadbased_router_metrics_url: Optional[
str
] = None # will be auto-discovered from the DGD in kubernetes mode if not provided
loadbased_adjustment_interval = 5 # in seconds, must be < adjustment_interval
loadbased_learning_window = 50 # sliding window size for regression
loadbased_scaling_down_sensitivity = 80 # 0-100
loadbased_metric_samples = 10 # number of samples per interval
loadbased_min_observations = 5 # cold start threshold
class VllmComponentName: class VllmComponentName:
prefill_worker_k8s_name = "VllmPrefillWorker" prefill_worker_k8s_name = "VllmPrefillWorker"
......
...@@ -278,6 +278,28 @@ class KubernetesConnector(PlannerConnector): ...@@ -278,6 +278,28 @@ class KubernetesConnector(PlannerConnector):
return prefill_gpu_count, decode_gpu_count return prefill_gpu_count, decode_gpu_count
def get_frontend_metrics_url(self, port: int = 8000) -> Optional[str]:
"""Auto-discover the Frontend service's metrics URL from the DGD.
Iterates spec.services to find the service with componentType "frontend",
then constructs the in-cluster URL using the operator's naming convention:
http://{dgd_name}-{service_key_lowercase}:{port}/metrics
Returns:
The metrics URL string, or None if no frontend service is found.
"""
deployment = self.kube_api.get_graph_deployment(self.graph_deployment_name)
services = deployment.get("spec", {}).get("services", {})
for service_key, service_spec in services.items():
if service_spec.get("componentType", "") == "frontend":
service_name = f"{self.graph_deployment_name}-{service_key.lower()}"
url = f"http://{service_name}:{port}/metrics"
logger.info(f"Auto-discovered frontend metrics URL: {url}")
return url
return None
async def wait_for_deployment_ready(self): async def wait_for_deployment_ready(self):
"""Wait for the deployment to be ready""" """Wait for the deployment to be ready"""
await self.kube_api.wait_for_graph_deployment_ready( await self.kube_api.wait_for_graph_deployment_ready(
......
...@@ -13,13 +13,20 @@ ...@@ -13,13 +13,20 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import argparse
import asyncio import asyncio
import logging import logging
from pydantic import BaseModel from pydantic import BaseModel
from dynamo.planner.utils.planner_argparse import create_sla_planner_parser from dynamo.planner.utils.agg_planner import AggPlanner
from dynamo.planner.utils.planner_core import start_sla_planner from dynamo.planner.utils.decode_planner import DecodePlanner
from dynamo.planner.utils.disagg_planner import DisaggPlanner
from dynamo.planner.utils.planner_argparse import (
create_sla_planner_parser,
validate_sla_planner_args,
)
from dynamo.planner.utils.prefill_planner import PrefillPlanner
from dynamo.runtime import DistributedRuntime, dynamo_worker from dynamo.runtime import DistributedRuntime, dynamo_worker
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -33,6 +40,24 @@ class RequestType(BaseModel): ...@@ -33,6 +40,24 @@ class RequestType(BaseModel):
text: str text: str
async def start_sla_planner(runtime: DistributedRuntime, args: argparse.Namespace):
validate_sla_planner_args(args)
mode = getattr(args, "mode", "disagg")
if mode == "disagg":
planner = DisaggPlanner(runtime, args)
elif mode == "prefill":
planner = PrefillPlanner(runtime, args)
elif mode == "decode":
planner = DecodePlanner(runtime, args)
elif mode == "agg":
planner = AggPlanner(runtime, args)
else:
raise ValueError(f"Invalid planner mode: {mode}")
await planner._async_init()
await planner.run()
@dynamo_worker() @dynamo_worker()
async def init_planner(runtime: DistributedRuntime, args): async def init_planner(runtime: DistributedRuntime, args):
await asyncio.sleep(INIT_PLANNER_START_DELAY) await asyncio.sleep(INIT_PLANNER_START_DELAY)
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import argparse
import asyncio
import logging
from typing import Optional
from dynamo.planner import SubComponentType, TargetReplica
from dynamo.planner.utils.load_based_regression import LoadBasedRegressionModel
from dynamo.planner.utils.planner_core import (
BasePlanner,
PlannerPrometheusMetrics,
PlannerSharedState,
_apply_component_gpu_budget,
_initialize_gpu_counts,
)
from dynamo.planner.utils.prometheus import CachedLoadMetrics
from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging
configure_dynamo_logging()
logger = logging.getLogger(__name__)
class AggPlanner:
"""Aggregated planner: load-based scaling only, single engine type.
In aggregated mode, engines handle both prefill and decode (chunked prefill).
Engine metrics are labeled "decode" by the router.
Scaling logic:
- TTFT and ITL regression models are both maintained.
- Regression uses per-worker time-averaged metrics (not latest snapshot)
because chunked prefill adds noise to instantaneous TTFT/ITL.
- Scale up if either prefill or decode target is exceeded.
- Scale down if both prefill and decode are below their boundaries.
"""
# Engine metrics from agg workers are labeled "decode" by the router
ENGINE_WORKER_TYPE = "decode"
def __init__(
self, runtime: Optional[DistributedRuntime], args: argparse.Namespace
) -> None:
self.args = args
self.shared_state = PlannerSharedState()
if getattr(args, "enable_throughput_scaling", False):
raise ValueError(
"Aggregated planner only supports load-based scaling. "
"Please use --disable-throughput-scaling or do not set --enable-throughput-scaling."
)
if not getattr(args, "enable_loadbased_scaling", False):
raise ValueError("Aggregated planner requires --enable-loadbased-scaling.")
prometheus_metrics = PlannerPrometheusMetrics()
# Use a single BasePlanner instance for infra (connector, prometheus, etc.)
# We use DECODE component_type because engine metrics are labeled "decode"
self.planner = BasePlanner(
runtime,
args,
shared_state=self.shared_state,
prometheus_metrics=prometheus_metrics,
start_prometheus_server=True,
)
# Override: agg planner uses component_type DECODE for metrics fetching
self.planner.component_type = SubComponentType.DECODE
# Create both regression models (agg needs both TTFT and ITL)
self.ttft_regression = LoadBasedRegressionModel(
window_size=args.loadbased_learning_window,
min_observations=args.loadbased_min_observations,
)
self.itl_regression = LoadBasedRegressionModel(
window_size=args.loadbased_learning_window,
min_observations=args.loadbased_min_observations,
)
self.cached_load_metrics = CachedLoadMetrics()
async def _async_init(self):
await self.planner._async_init()
async def run(self):
if not self.args.no_operation:
logger.info("Validating deployment...")
# Agg mode: only decode component exists (engines serve both P and D)
await self.planner.connector.validate_deployment(
prefill_component_name=None,
decode_component_name=self.planner.decode_component_name,
require_prefill=False,
require_decode=True,
)
logger.info("Successfully validated the deployment")
_initialize_gpu_counts(
self.args,
self.planner.connector,
require_prefill=False,
require_decode=True,
)
await self.planner.connector.wait_for_deployment_ready()
# Model name discovery runs in all modes (needed for metrics collection)
if not self.args.no_operation:
model_name = await self.planner._get_model_name(
require_prefill=False, require_decode=True
)
logger.info(f"Detected model name from deployment: {model_name}")
self.planner.model_name = model_name.lower()
else:
model_name = getattr(self.args, "model_name", None)
if not model_name:
raise ValueError(
"Model name is required in no-operation mode. "
"Please provide --model-name."
)
self.planner.model_name = model_name.lower()
loops = [
self._load_loop(),
self.planner.prometheus_engine_client.run_sampling_loop(
self.args.loadbased_metric_samples,
self.args.loadbased_adjustment_interval,
),
]
await asyncio.gather(*loops)
async def _observe_engine_load_stats(self) -> None:
"""Fetch metrics and update regression models using per-worker time-averaged data."""
result = self.planner.prometheus_engine_client.get_recent_and_averaged_metrics(
self.ENGINE_WORKER_TYPE
)
if result is None:
logger.warning(
f"No per-worker metrics available yet for {self.ENGINE_WORKER_TYPE} (buffer empty)"
)
return
recent, per_worker_averaged, cluster_averaged = result
self.cached_load_metrics = CachedLoadMetrics(
recent=recent,
per_worker_averaged=per_worker_averaged,
cluster_averaged=cluster_averaged,
)
# Agg uses per-worker time-averaged metrics for regression
# because chunked prefill adds noise to instantaneous TTFT/ITL
for wid, m in per_worker_averaged.items():
# TTFT regression: (active_prefill_tokens + ISL) -> TTFT
active_prefill = m.get("active_prefill_tokens", 0.0)
last_isl = m.get("last_isl", 0.0)
last_ttft = m.get("last_ttft", 0.0)
if last_ttft > 0 and last_isl > 0:
x = active_prefill + last_isl
y = last_ttft * 1000 # seconds -> ms
logger.info(
f"Agg Worker {wid} prefill observation: TTFT {y:.2f}ms @ tokens {x:.2f}"
)
self.ttft_regression.add_observation(x, y)
# ITL regression: active_decode_blocks -> ITL
active_decode = m.get("active_decode_blocks", 0.0)
last_itl = m.get("last_itl", 0.0)
if last_itl > 0 and active_decode > 0:
x = active_decode
y = last_itl * 1000 # seconds -> ms
logger.info(
f"Agg Worker {wid} decode observation: ITL {y:.2f}ms @ blocks {x:.2f}"
)
self.itl_regression.add_observation(x, y)
def _prefill_scaling_decision(self, num_workers: int) -> Optional[str]:
"""Returns "up", "down", or None for prefill dimension."""
if not self.cached_load_metrics.recent:
return None
if not self.ttft_regression.has_sufficient_data():
logger.info(
f"TTFT regression: insufficient data ({self.ttft_regression.num_observations}"
f"/{self.ttft_regression.min_observations}), skipping"
)
return None
x_sla = self.ttft_regression.predict_x_from_sla(self.args.ttft)
if x_sla is None:
return None
recent = self.cached_load_metrics.recent
cluster_averaged = self.cached_load_metrics.cluster_averaged
avg_isl = cluster_averaged.get("last_isl", 0.0)
target = x_sla - avg_isl
if target <= 0:
logger.warning(
f"Agg TTFT SLA unachievable at current ISL: x_sla={x_sla:.1f}, "
f"avg_isl={avg_isl:.1f}, skipping prefill scaling decision"
)
return None
logger.info(
f"Agg prefill: x_sla={x_sla:.1f}, avg_isl={avg_isl:.1f}, "
f"target_active_tokens={target:.1f}, workers={num_workers}"
)
# Scale up: ALL workers above target
if all(m.get("active_prefill_tokens", 0.0) > target for m in recent.values()):
return "up"
# Scale down: ALL workers below boundary
if num_workers > 1:
sensitivity = self.args.loadbased_scaling_down_sensitivity / 100.0
boundary = target * (num_workers - 1) / num_workers * sensitivity
if all(
m.get("active_prefill_tokens", 0.0) < boundary for m in recent.values()
):
return "down"
return None
def _decode_scaling_decision(self, num_workers: int) -> Optional[str]:
"""Returns "up", "down", or None for decode dimension."""
if not self.cached_load_metrics.recent:
return None
if not self.itl_regression.has_sufficient_data():
logger.info(
f"ITL regression: insufficient data ({self.itl_regression.num_observations}"
f"/{self.itl_regression.min_observations}), skipping"
)
return None
x_sla = self.itl_regression.predict_x_from_sla(self.args.itl)
if x_sla is None:
return None
if x_sla <= 0:
logger.warning(
f"Agg ITL SLA unachievable: x_sla={x_sla:.1f}, "
"skipping decode scaling decision"
)
return None
recent = self.cached_load_metrics.recent
logger.info(f"Agg decode: x_sla={x_sla:.1f}, workers={num_workers}")
# Scale up: ALL workers above target
if all(m.get("active_decode_blocks", 0.0) > x_sla for m in recent.values()):
return "up"
# Scale down: ALL workers below boundary
# TODO: should we strictly enforce all workers below boundary?
# how about user-configurable percentage?
if num_workers > 1:
sensitivity = self.args.loadbased_scaling_down_sensitivity / 100.0
boundary = x_sla * (num_workers - 1) / num_workers * sensitivity
if all(
m.get("active_decode_blocks", 0.0) < boundary for m in recent.values()
):
return "down"
return None
async def _load_loop(self) -> None:
"""Load-based scaling loop for aggregated mode."""
while True:
await asyncio.sleep(self.args.loadbased_adjustment_interval)
logger.info("New agg load-based adjustment interval started!")
# Query DGD for fresh worker counts
_, num_d, _ = await self.planner.get_workers_info(
require_prefill=False, require_decode=True
)
self.shared_state.num_d_workers = num_d
num_workers = num_d
# Observe per-worker metrics
await self._observe_engine_load_stats()
# Reconcile worker counts
prom_count = len(self.cached_load_metrics.recent)
if prom_count != num_workers:
logger.warning(
f"Worker count mismatch: DGD reports {num_workers}, "
f"router metrics reports {prom_count}. Skipping."
)
continue
if not self.cached_load_metrics.recent:
continue
# Make scaling decisions separately for prefill and decode
p_decision = self._prefill_scaling_decision(num_workers)
d_decision = self._decode_scaling_decision(num_workers)
logger.info(
f"Agg scaling decisions: prefill={p_decision}, decode={d_decision}"
)
# Scale up if EITHER needs scale up
# Scale down if BOTH need scale down
if p_decision == "up" or d_decision == "up":
desired = num_workers + 1
elif p_decision == "down" and d_decision == "down":
desired = num_workers - 1
else:
logger.info("Agg scaling: no scaling needed")
continue
desired = max(desired, self.args.min_endpoint)
desired = _apply_component_gpu_budget(
desired, self.args.decode_engine_num_gpu, self.args
)
logger.info(f"Agg load-based scaling: {num_workers} -> {desired}")
if (
self.planner.prometheus_port != 0
and self.planner.prometheus_metrics is not None
):
self.planner.prometheus_metrics.predicted_num_d.set(desired)
if not self.args.no_operation:
target_replicas = [
TargetReplica(
sub_component_type=SubComponentType.DECODE,
component_name=self.planner.decode_component_name,
desired_replicas=desired,
)
]
await self.planner.connector.set_component_replicas(
target_replicas, blocking=True
)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
import math
from typing import Optional
from dynamo.planner import SubComponentType
from dynamo.planner.utils.planner_core import BasePlanner
from dynamo.runtime.logging import configure_dynamo_logging
configure_dynamo_logging()
logger = logging.getLogger(__name__)
class DecodePlanner(BasePlanner):
component_type = SubComponentType.DECODE
def loadbased_plan_adjustment(self) -> Optional[int]:
"""Load-based scaling decision for decode. Returns desired_replicas or None."""
if not self.itl_regression.has_sufficient_data():
logger.info(
f"ITL regression: insufficient data ({self.itl_regression.num_observations}"
f"/{self.itl_regression.min_observations}), skipping load-based scaling"
)
return None
x_sla = self.itl_regression.predict_x_from_sla(self.args.itl)
if x_sla is None:
return None
if x_sla <= 0:
logger.warning(
f"ITL SLA unachievable: x_sla={x_sla:.1f}, "
"skipping load-based decode scaling"
)
return None
if not self.cached_load_metrics.recent:
return None
recent = self.cached_load_metrics.recent
num_workers = self.shared_state.num_d_workers
if num_workers == 0:
return None
logger.info(
f"Load-based decode: x_sla={x_sla:.1f}, workers={num_workers}, "
f"slope={self.itl_regression.slope:.6f}, intercept={self.itl_regression.intercept:.3f}"
)
# Scale up: ALL workers above target (use recent metrics)
all_above = all(
m.get("active_decode_blocks", 0.0) > x_sla for m in recent.values()
)
if all_above:
logger.info(
f"Load-based decode: ALL workers above target ({x_sla:.1f}), "
f"scaling up to {num_workers + 1}"
)
return num_workers + 1
# Scale down: ALL workers below boundary (use recent metrics)
if num_workers > 1:
sensitivity = self.args.loadbased_scaling_down_sensitivity / 100.0
boundary = x_sla * (num_workers - 1) / num_workers * sensitivity
all_below = all(
m.get("active_decode_blocks", 0.0) < boundary for m in recent.values()
)
if all_below:
logger.info(
f"Load-based decode: ALL workers below boundary ({boundary:.1f}), "
f"scaling down to {num_workers - 1}"
)
return num_workers - 1
return None
def _update_correction_factor(self) -> bool:
if self.shared_state.num_d_workers == 0:
logger.warning(
"No decode workers found for correction factor, skipping correction update"
)
return True
expect_itl = self.decode_interpolator.interpolate_itl(
concurrency=self.last_metrics.num_req # type: ignore
/ self.shared_state.num_d_workers
* self.last_metrics.request_duration # type: ignore
/ self.args.adjustment_interval,
context_length=self.last_metrics.isl + self.last_metrics.osl / 2, # type: ignore
)
self.d_correction_factor = self.last_metrics.itl / expect_itl
logger.info(f"Correction factor (decode ITL): {self.d_correction_factor:.3f}")
if self.prometheus_port != 0 and self.prometheus_metrics is not None:
self.prometheus_metrics.d_correction_factor.set(self.d_correction_factor)
return True
def _compute_replica_requirements(
self, next_num_req: float, next_isl: float, next_osl: float
) -> int:
if self.d_correction_factor <= 0:
logger.warning(
f"d_correction_factor is {self.d_correction_factor}, using default value of 1.0"
)
corrected_itl = self.args.itl
else:
corrected_itl = self.args.itl / self.d_correction_factor
(
pred_decode_thpt_per_gpu,
_,
_,
) = self.decode_interpolator.find_best_throughput_per_gpu(
itl=corrected_itl, context_length=next_isl + next_osl / 2
)
if pred_decode_thpt_per_gpu <= 0:
logger.warning(
f"pred_decode_thpt_per_gpu is {pred_decode_thpt_per_gpu} "
"(no throughput satisfies ITL target), falling back to min_endpoint"
)
return self.args.min_endpoint
pred_decode_throughput = next_num_req * next_osl / self.args.adjustment_interval
next_num_d = math.ceil(
pred_decode_throughput
/ pred_decode_thpt_per_gpu
/ self.args.decode_engine_num_gpu
)
next_num_d = max(next_num_d, self.args.min_endpoint)
logger.info(
f"Decode calculation: {pred_decode_throughput:.2f}(d_thpt) / "
f"{pred_decode_thpt_per_gpu * self.args.decode_engine_num_gpu:.2f}(d_engine_cap) = "
f"{next_num_d}(num_d)"
)
return next_num_d
def update_predicted_replicas_metric(self, desired_replicas: int) -> None:
if self.prometheus_port != 0 and self.prometheus_metrics is not None:
self.prometheus_metrics.predicted_num_d.set(desired_replicas)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import argparse
import asyncio
import logging
import time
from typing import Optional
from dynamo.planner import SubComponentType, TargetReplica
from dynamo.planner.utils.decode_planner import DecodePlanner
from dynamo.planner.utils.planner_core import (
PlannerPrometheusMetrics,
PlannerSharedState,
_apply_global_gpu_budget,
_initialize_gpu_counts,
)
from dynamo.planner.utils.prefill_planner import PrefillPlanner
from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging
configure_dynamo_logging()
logger = logging.getLogger(__name__)
class DisaggPlanner:
def __init__(
self, runtime: Optional[DistributedRuntime], args: argparse.Namespace
) -> None:
self.args = args
self.shared_state = PlannerSharedState()
prometheus_metrics = PlannerPrometheusMetrics()
self.enable_throughput = getattr(args, "enable_throughput_scaling", True)
self.enable_loadbased = getattr(args, "enable_loadbased_scaling", False)
self.prefill_planner = PrefillPlanner(
runtime,
args,
shared_state=self.shared_state,
prometheus_metrics=prometheus_metrics,
start_prometheus_server=True,
)
self.decode_planner = DecodePlanner(
runtime,
args,
shared_state=self.shared_state,
prometheus_metrics=prometheus_metrics,
prometheus_traffic_client=getattr(
self.prefill_planner, "prometheus_traffic_client", None
),
prometheus_engine_client=getattr(
self.prefill_planner, "prometheus_engine_client", None
),
connector=getattr(self.prefill_planner, "connector", None),
start_prometheus_server=False,
)
async def _async_init(self):
# Prefill/Decode share the same connector instance in disagg mode.
await self.prefill_planner._async_init()
async def run(self):
if not self.args.no_operation:
logger.info("Validating deployment...")
await self.prefill_planner.connector.validate_deployment(
prefill_component_name=self.prefill_planner.prefill_component_name,
decode_component_name=self.prefill_planner.decode_component_name,
require_prefill=True,
require_decode=True,
)
logger.info("Successfully validated the deployment")
# Initialize GPU counts
_initialize_gpu_counts(
self.args,
self.prefill_planner.connector,
require_prefill=True,
require_decode=True,
)
await self.prefill_planner.connector.wait_for_deployment_ready()
# Model name discovery runs in all modes (needed for metrics collection)
if not self.args.no_operation:
model_name = await self.prefill_planner._get_model_name(
require_prefill=True, require_decode=True
)
logger.info(f"Detected model name from deployment: {model_name}")
model_name = model_name.lower()
else:
model_name = getattr(self.args, "model_name", None)
if not model_name:
raise ValueError(
"Model name is required in no-operation mode. "
"Please provide --model-name."
)
model_name = model_name.lower()
self.prefill_planner.model_name = model_name
self.decode_planner.model_name = model_name
self.shared_state.last_adjustment_time = time.time()
self.shared_state.last_loadbased_adjustment_time = time.time()
# Build list of concurrent loops based on enabled scaling modes
loops = []
if self.enable_throughput:
loops.append(self._throughput_loop())
if self.enable_loadbased:
loops.append(self._load_loop())
loops.append(
self.prefill_planner.prometheus_engine_client.run_sampling_loop(
self.args.loadbased_metric_samples,
self.args.loadbased_adjustment_interval,
)
)
await asyncio.gather(*loops)
async def _throughput_loop(self) -> None:
"""Throughput-based scaling loop for disagg mode."""
while True:
current_time = time.time()
if (
current_time - self.shared_state.last_adjustment_time
>= self.args.adjustment_interval
):
self.shared_state.last_adjustment_time = time.time()
logger.info("New throughput adjustment interval started!")
await self.prefill_planner.observe_traffic_stats(
require_prefill=True, require_decode=True
)
self.decode_planner.update_predictors_from_metrics(
self.shared_state.last_metrics
)
next_num_p = self.prefill_planner.plan_adjustment()
next_num_d = self.decode_planner.plan_adjustment()
if next_num_p is None or next_num_d is None:
await asyncio.sleep(self.args.adjustment_interval / 10)
continue
if self.enable_loadbased:
# When load-based is also enabled: just set lower bounds
self.shared_state.throughput_lower_bound_p = next_num_p
self.shared_state.throughput_lower_bound_d = next_num_d
logger.info(
f"Throughput lower bounds set: prefill={next_num_p}, decode={next_num_d}"
)
else:
# Throughput-only: apply scaling directly
next_num_p, next_num_d = _apply_global_gpu_budget(
next_num_p, next_num_d, self.args
)
self.prefill_planner.update_predicted_replicas_metric(next_num_p)
self.decode_planner.update_predicted_replicas_metric(next_num_d)
if not self.args.no_operation:
target_replicas = [
TargetReplica(
sub_component_type=SubComponentType.PREFILL,
component_name=self.prefill_planner.prefill_component_name,
desired_replicas=next_num_p,
),
TargetReplica(
sub_component_type=SubComponentType.DECODE,
component_name=self.prefill_planner.decode_component_name,
desired_replicas=next_num_d,
),
]
await self.prefill_planner.connector.set_component_replicas(
target_replicas, blocking=False
)
await asyncio.sleep(self.args.adjustment_interval / 10)
async def _load_loop(self) -> None:
"""Load-based scaling loop for disagg mode at shorter interval."""
while True:
await asyncio.sleep(self.args.loadbased_adjustment_interval)
logger.info("New load-based adjustment interval started!")
# Query DGD for fresh worker counts
num_p, num_d, _ = await self.prefill_planner.get_workers_info(
require_prefill=True, require_decode=True
)
self.shared_state.num_p_workers = num_p
self.shared_state.num_d_workers = num_d
# Observe per-worker metrics from router
await self.prefill_planner.observe_engine_load_stats()
await self.decode_planner.observe_engine_load_stats()
# Reconcile DGD worker counts with router Prometheus counts
p_prom_count = len(self.prefill_planner.cached_load_metrics.recent)
d_prom_count = len(self.decode_planner.cached_load_metrics.recent)
if p_prom_count != num_p or d_prom_count != num_d:
logger.warning(
f"Worker count mismatch: DGD reports P={num_p}, D={num_d}; "
f"router metrics reports P={p_prom_count}, D={d_prom_count}. "
"Skipping load-based scaling adjustment."
)
continue
# Scale prefill and decode independently
p_desired = self.prefill_planner.loadbased_plan_adjustment()
d_desired = self.decode_planner.loadbased_plan_adjustment()
final_p = (
p_desired if p_desired is not None else self.shared_state.num_p_workers
)
final_d = (
d_desired if d_desired is not None else self.shared_state.num_d_workers
)
if (
final_p == self.shared_state.num_p_workers
and final_d == self.shared_state.num_d_workers
):
logger.info("Load-based scaling: no scaling needed")
continue
# Enforce lower bounds from throughput-based
if self.enable_throughput:
final_p = max(final_p, self.shared_state.throughput_lower_bound_p)
final_d = max(final_d, self.shared_state.throughput_lower_bound_d)
# Apply GPU budget
final_p, final_d = _apply_global_gpu_budget(final_p, final_d, self.args)
logger.info(
f"Load-based disagg scaling: prefill {self.shared_state.num_p_workers}->{final_p}, "
f"decode {self.shared_state.num_d_workers}->{final_d}"
)
self.prefill_planner.update_predicted_replicas_metric(final_p)
self.decode_planner.update_predicted_replicas_metric(final_d)
if not self.args.no_operation:
target_replicas = [
TargetReplica(
sub_component_type=SubComponentType.PREFILL,
component_name=self.prefill_planner.prefill_component_name,
desired_replicas=final_p,
),
TargetReplica(
sub_component_type=SubComponentType.DECODE,
component_name=self.prefill_planner.decode_component_name,
desired_replicas=final_d,
),
]
await self.prefill_planner.connector.set_component_replicas(
target_replicas, blocking=True
)
...@@ -4,18 +4,24 @@ ...@@ -4,18 +4,24 @@
import argparse import argparse
from typing import Optional from typing import Optional
from dynamo.planner.utils.decode_planner import DecodePlanner
from dynamo.planner.utils.dryrun_plot_utils import create_dryrun_plot from dynamo.planner.utils.dryrun_plot_utils import create_dryrun_plot
from dynamo.planner.utils.planner_core import ( from dynamo.planner.utils.planner_core import (
DecodePlanner,
PlannerSharedState, PlannerSharedState,
PrefillPlanner,
_apply_component_gpu_budget, _apply_component_gpu_budget,
_apply_global_gpu_budget, _apply_global_gpu_budget,
) )
from dynamo.planner.utils.prefill_planner import PrefillPlanner
from dynamo.planner.utils.trace_data_extractor import extract_metrics_from_mooncake from dynamo.planner.utils.trace_data_extractor import extract_metrics_from_mooncake
def run_sla_planner_dryrun(args: argparse.Namespace) -> None: def run_sla_planner_dryrun(args: argparse.Namespace) -> None:
if getattr(args, "enable_loadbased_scaling", False):
raise ValueError(
"Load-based scaling is not supported in dryrun mode. "
"Disable --enable-loadbased-scaling to use dryrun."
)
# Dryrun mode: use defaults if GPU counts not provided (no DGD available) # Dryrun mode: use defaults if GPU counts not provided (no DGD available)
if args.prefill_engine_num_gpu is None: if args.prefill_engine_num_gpu is None:
args.prefill_engine_num_gpu = 1 args.prefill_engine_num_gpu = 1
...@@ -90,7 +96,7 @@ def run_sla_planner_dryrun(args: argparse.Namespace) -> None: ...@@ -90,7 +96,7 @@ def run_sla_planner_dryrun(args: argparse.Namespace) -> None:
compute_safe_p_thpt(args.start_num_p, isl[0], args.ttft) compute_safe_p_thpt(args.start_num_p, isl[0], args.ttft)
* args.adjustment_interval * args.adjustment_interval
] ]
prefill_planner.dryrun_observe_metrics(rr[0], isl[0], osl[0]) prefill_planner.dryrun_observe_traffic_stats(rr[0], isl[0], osl[0])
else: else:
num_p = [0] num_p = [0]
p_thpt = [0] p_thpt = [0]
...@@ -103,7 +109,7 @@ def run_sla_planner_dryrun(args: argparse.Namespace) -> None: ...@@ -103,7 +109,7 @@ def run_sla_planner_dryrun(args: argparse.Namespace) -> None:
compute_safe_d_thpt(args.start_num_d, isl[0], osl[0], args.itl) compute_safe_d_thpt(args.start_num_d, isl[0], osl[0], args.itl)
* args.adjustment_interval * args.adjustment_interval
] ]
decode_planner.dryrun_observe_metrics(rr[0], isl[0], osl[0]) decode_planner.dryrun_observe_traffic_stats(rr[0], isl[0], osl[0])
else: else:
num_d = [0] num_d = [0]
d_thpt = [0] d_thpt = [0]
...@@ -152,7 +158,7 @@ def run_sla_planner_dryrun(args: argparse.Namespace) -> None: ...@@ -152,7 +158,7 @@ def run_sla_planner_dryrun(args: argparse.Namespace) -> None:
# update load predictor # update load predictor
for planner in [prefill_planner, decode_planner]: for planner in [prefill_planner, decode_planner]:
if planner is not None: if planner is not None:
planner.dryrun_observe_metrics( planner.dryrun_observe_traffic_stats(
metric["request_count"], metric["avg_isl"], metric["avg_osl"] metric["request_count"], metric["avg_isl"], metric["avg_osl"]
) )
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from collections import deque
from typing import Optional
import numpy as np
from sklearn.linear_model import LinearRegression
from dynamo.runtime.logging import configure_dynamo_logging
configure_dynamo_logging()
logger = logging.getLogger(__name__)
class LoadBasedRegressionModel:
"""Sliding window linear regression for load-based scaling.
Maintains a fixed-size window of (X, y) observations and provides:
- Forward prediction: y = mx + b (given X, predict latency)
- Reverse prediction: X = (y - b) / m (given target SLA, find max load)
Used to map:
- Prefill: (active_prefill_tokens + ISL) -> TTFT
- Decode: active_decode_blocks -> ITL
"""
def __init__(self, window_size: int, min_observations: int = 5):
self.window_size = window_size
self.min_observations = min_observations
self._observations: deque = deque(maxlen=window_size)
self._model = LinearRegression()
self._is_fitted = False
def add_observation(self, x: float, y: float) -> None:
"""Add an (X, y) observation to the sliding window."""
self._observations.append((x, y))
self._is_fitted = False
def fit(self) -> bool:
"""Fit the linear regression model on current observations.
Returns:
True if fitting succeeded, False if insufficient data.
"""
if len(self._observations) < self.min_observations:
return False
X = np.array([obs[0] for obs in self._observations]).reshape(-1, 1)
y = np.array([obs[1] for obs in self._observations])
self._model.fit(X, y)
self._is_fitted = True
return True
def predict_x_from_sla(self, target_y: float) -> Optional[float]:
"""Reverse prediction: given a target latency (SLA), find the max load.
Solves: x = (y - b) / m
Safety guards:
- Returns None if insufficient data (cold start)
- Falls back to observation-based heuristic if slope <= 0
- Clamps result to non-negative
Args:
target_y: Target latency SLA value (e.g., TTFT in ms, ITL in ms)
Returns:
Maximum load value that satisfies the SLA, or None if insufficient data.
"""
if not self._is_fitted and not self.fit():
return None
coef = float(self._model.coef_[0])
intercept = float(self._model.intercept_)
if coef <= 0:
logger.warning(
f"Regression slope is non-positive ({coef:.6f}), "
"falling back to observation-based heuristic"
)
return self._fallback_x_from_observations(target_y)
x_sla = (target_y - intercept) / coef
return max(0.0, x_sla)
def _fallback_x_from_observations(self, target_y: float) -> float:
"""Fallback when regression slope is non-positive.
Returns the minimum x among observations where y < target_y.
If all observations have y >= target_y, returns the smallest x overall.
"""
below = [(x, y) for x, y in self._observations if y < target_y]
if below:
result = min(x for x, _ in below)
else:
result = min(x for x, _ in self._observations)
logger.info(
f"Fallback x from observations: {result:.1f} "
f"(points below SLA: {len(below)}/{len(self._observations)})"
)
return max(0.0, result)
def has_sufficient_data(self) -> bool:
"""Check if enough observations have been collected (cold start guard)."""
return len(self._observations) >= self.min_observations
@property
def num_observations(self) -> int:
return len(self._observations)
@property
def slope(self) -> Optional[float]:
"""Return the current regression slope, or None if not fitted."""
if not self._is_fitted and not self.fit():
return None
return float(self._model.coef_[0])
@property
def intercept(self) -> Optional[float]:
"""Return the current regression intercept, or None if not fitted."""
if not self._is_fitted and not self.fit():
return None
return float(self._model.intercept_)
...@@ -45,8 +45,8 @@ def create_sla_planner_parser() -> argparse.ArgumentParser: ...@@ -45,8 +45,8 @@ def create_sla_planner_parser() -> argparse.ArgumentParser:
parser.add_argument( parser.add_argument(
"--mode", "--mode",
default=SLAPlannerDefaults.mode, default=SLAPlannerDefaults.mode,
choices=["disagg", "prefill", "decode"], choices=["disagg", "prefill", "decode", "agg"],
help="Planner mode: disagg (prefill+decode), prefill-only, or decode-only", help="Planner mode: disagg (prefill+decode), prefill-only, decode-only, or agg (aggregated)",
) )
parser.add_argument( parser.add_argument(
"--no-operation", "--no-operation",
...@@ -176,4 +176,126 @@ def create_sla_planner_parser() -> argparse.ArgumentParser: ...@@ -176,4 +176,126 @@ def create_sla_planner_parser() -> argparse.ArgumentParser:
type=str, type=str,
help="Model name of deployment (only required for virtual environment)", help="Model name of deployment (only required for virtual environment)",
) )
# Scaling mode flags
parser.add_argument(
"--enable-throughput-scaling",
action="store_true",
default=SLAPlannerDefaults.enable_throughput_scaling,
help="Enable throughput-based scaling (default: True)",
)
parser.add_argument(
"--disable-throughput-scaling",
action="store_true",
default=False,
help="Disable throughput-based scaling",
)
parser.add_argument(
"--enable-loadbased-scaling",
action="store_true",
default=SLAPlannerDefaults.enable_loadbased_scaling,
help="Enable load-based scaling",
)
# Load-based scaling settings
parser.add_argument(
"--loadbased-router-metrics-url",
type=str,
default=SLAPlannerDefaults.loadbased_router_metrics_url,
help="URL to router's /metrics endpoint for direct load metric queries (default: auto-discovered from the DGD)",
)
parser.add_argument(
"--loadbased-adjustment-interval",
type=int,
default=SLAPlannerDefaults.loadbased_adjustment_interval,
help="Load-based adjustment interval in seconds (must be < --adjustment-interval)",
)
parser.add_argument(
"--loadbased-learning-window",
type=int,
default=SLAPlannerDefaults.loadbased_learning_window,
help="Sliding window size for load-based regression (number of observations)",
)
parser.add_argument(
"--loadbased-scaling-down-sensitivity",
type=int,
default=SLAPlannerDefaults.loadbased_scaling_down_sensitivity,
help="Scale-down sensitivity 0-100 (0=never scale down, 100=aggressive)",
)
parser.add_argument(
"--loadbased-metric-samples",
type=int,
default=SLAPlannerDefaults.loadbased_metric_samples,
help="Number of metric samples to average per load-based adjustment interval",
)
parser.add_argument(
"--loadbased-min-observations",
type=int,
default=SLAPlannerDefaults.loadbased_min_observations,
help="Minimum regression observations before load-based scaling starts (cold start)",
)
return parser return parser
def validate_sla_planner_args(args: argparse.Namespace) -> None:
"""Validate and normalize SLA planner arguments.
Resolves conflicting flags, checks required arguments, and enforces
constraints between related arguments. Should be called after parsing
and before constructing any planner.
Raises:
ValueError: If argument constraints are violated
"""
# Resolve enable/disable throughput flags
if getattr(args, "disable_throughput_scaling", False):
args.enable_throughput_scaling = False
enable_throughput = getattr(args, "enable_throughput_scaling", True)
enable_loadbased = getattr(args, "enable_loadbased_scaling", False)
# At least one scaling mode must be enabled
if not enable_throughput and not enable_loadbased:
raise ValueError(
"At least one scaling mode must be enabled "
"(--enable-throughput-scaling or --enable-loadbased-scaling)"
)
if enable_loadbased:
# Router metrics URL is required for load-based scaling unless in
# kubernetes mode where it can be auto-discovered from the DGD.
environment = getattr(args, "environment", "kubernetes")
if (
not getattr(args, "loadbased_router_metrics_url", None)
and environment != "kubernetes"
):
raise ValueError(
"--loadbased-router-metrics-url is required when "
"load-based scaling is enabled outside kubernetes mode"
)
# Load-based interval must be shorter than throughput interval
if enable_throughput:
if args.loadbased_adjustment_interval >= args.adjustment_interval:
raise ValueError(
f"--loadbased-adjustment-interval ({args.loadbased_adjustment_interval}s) "
f"must be shorter than --adjustment-interval ({args.adjustment_interval}s). "
"Load-based scaling is the fast reactive loop; throughput-based is the "
"slow predictive loop."
)
# Auto-disable correction factor: load-based regression already
# accounts for actual latency conditions.
if not getattr(args, "no_correction", False):
import logging
logger = logging.getLogger(__name__)
# TODO: enable correction after we can gather engine forward pass metrics
logger.warning(
"Correction factor is automatically disabled when load-based "
"scaling is enabled. Load-based scaling already accounts for "
"actual latency conditions."
)
args.no_correction = True
...@@ -25,7 +25,12 @@ from dynamo.planner.utils.perf_interpolation import ( ...@@ -25,7 +25,12 @@ from dynamo.planner.utils.perf_interpolation import (
PrefillInterpolator, PrefillInterpolator,
) )
from dynamo.planner.utils.pre_swept_results_utils import PreSweptResultsHelper from dynamo.planner.utils.pre_swept_results_utils import PreSweptResultsHelper
from dynamo.planner.utils.prometheus import PrometheusAPIClient from dynamo.planner.utils.prometheus import (
CachedLoadMetrics,
DirectRouterMetricsClient,
Metrics,
PrometheusAPIClient,
)
from dynamo.planner.utils.trace_data_extractor import extract_metrics_from_mooncake from dynamo.planner.utils.trace_data_extractor import extract_metrics_from_mooncake
from dynamo.runtime import DistributedRuntime from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging from dynamo.runtime.logging import configure_dynamo_logging
...@@ -34,31 +39,6 @@ configure_dynamo_logging() ...@@ -34,31 +39,6 @@ configure_dynamo_logging()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dataclass
class Metrics:
ttft: Optional[float] = None
itl: Optional[float] = None
num_req: Optional[float] = None
isl: Optional[float] = None
osl: Optional[float] = None
request_duration: Optional[float] = None
p_load: Optional[float] = None
d_load: Optional[float] = None
def is_valid(self) -> bool:
"""Check if all metrics are valid (not None and not NaN)."""
return (
self.ttft is not None
and self.itl is not None
and self.isl is not None
and self.osl is not None
and not math.isnan(self.ttft)
and not math.isnan(self.itl)
and not math.isnan(self.isl)
and not math.isnan(self.osl)
)
class PlannerPrometheusMetrics: class PlannerPrometheusMetrics:
"""Container for all Planner Prometheus metrics.""" """Container for all Planner Prometheus metrics."""
...@@ -127,6 +107,11 @@ class PlannerSharedState: ...@@ -127,6 +107,11 @@ class PlannerSharedState:
num_d_workers: int = 0 num_d_workers: int = 0
cumulative_gpu_hours: float = 0.0 cumulative_gpu_hours: float = 0.0
last_adjustment_time: float = 0.0 last_adjustment_time: float = 0.0
# Lower bounds from throughput-based scaling (used when both modes enabled)
throughput_lower_bound_p: int = 1
throughput_lower_bound_d: int = 1
# Separate timestamp for load-based adjustment loop
last_loadbased_adjustment_time: float = 0.0
def _apply_global_gpu_budget( def _apply_global_gpu_budget(
...@@ -265,7 +250,8 @@ class BasePlanner: ...@@ -265,7 +250,8 @@ class BasePlanner:
dryrun: bool = False, dryrun: bool = False,
shared_state: Optional[PlannerSharedState] = None, shared_state: Optional[PlannerSharedState] = None,
prometheus_metrics: Optional[PlannerPrometheusMetrics] = None, prometheus_metrics: Optional[PlannerPrometheusMetrics] = None,
prometheus_api_client: Optional[PrometheusAPIClient] = None, prometheus_traffic_client: Optional[PrometheusAPIClient] = None,
prometheus_engine_client: Optional[DirectRouterMetricsClient] = None,
connector=None, connector=None,
start_prometheus_server: bool = True, start_prometheus_server: bool = True,
): ):
...@@ -296,9 +282,12 @@ class BasePlanner: ...@@ -296,9 +282,12 @@ class BasePlanner:
else: else:
raise ValueError(f"Invalid environment: {args.environment}") raise ValueError(f"Invalid environment: {args.environment}")
self.prometheus_api_client = prometheus_api_client or PrometheusAPIClient( self.prometheus_traffic_client = (
args.metric_pulling_prometheus_endpoint, prometheus_traffic_client
args.namespace, or PrometheusAPIClient(
args.metric_pulling_prometheus_endpoint,
args.namespace,
)
) )
predictor_cls = LOAD_PREDICTORS[args.load_predictor] predictor_cls = LOAD_PREDICTORS[args.load_predictor]
...@@ -337,35 +326,46 @@ class BasePlanner: ...@@ -337,35 +326,46 @@ class BasePlanner:
if hasattr(p, "reset_idle_skip"): if hasattr(p, "reset_idle_skip"):
p.reset_idle_skip() p.reset_idle_skip()
if "use-pre-swept-results" in args.profile_results_dir: # Load-based scaling flags.
config_list = args.profile_results_dir.split(":") # Argument validation (flag resolution, constraint checks, correction factor
configs = { # auto-disable) is handled by validate_sla_planner_args() in planner_argparse.
"gpu_type": config_list[1], self.enable_loadbased = getattr(args, "enable_loadbased_scaling", False)
"model": config_list[2], self.enable_throughput = getattr(args, "enable_throughput_scaling", True)
"framework": config_list[3],
"framework_version": config_list[4], # Only create interpolators when throughput-based scaling is enabled
"tp": int(config_list[5]), # (they require profiling data that isn't needed for load-based-only mode)
"dp": int(config_list[6]), if self.enable_throughput:
"pp": int(config_list[7]), if "use-pre-swept-results" in args.profile_results_dir:
"block_size": int(config_list[8]), config_list = args.profile_results_dir.split(":")
"max_batch_size": int(config_list[9]), configs = {
"gpu_count": int(config_list[10]), "gpu_type": config_list[1],
} "model": config_list[2],
if self.dryrun: "framework": config_list[3],
pre_swept_results_helper = PreSweptResultsHelper( "framework_version": config_list[4],
configs["gpu_type"], configs["framework"], configs["model"] "tp": int(config_list[5]),
) "dp": int(config_list[6]),
raw_data = pre_swept_results_helper.select_data("prefill", configs) "pp": int(config_list[7]),
self.prefill_interpolator = PrefillInterpolator(raw_data=raw_data) "block_size": int(config_list[8]),
raw_data = pre_swept_results_helper.select_data("decode", configs) "max_batch_size": int(config_list[9]),
self.decode_interpolator = DecodeInterpolator(raw_data=raw_data) "gpu_count": int(config_list[10]),
}
if self.dryrun:
pre_swept_results_helper = PreSweptResultsHelper(
configs["gpu_type"], configs["framework"], configs["model"]
)
raw_data = pre_swept_results_helper.select_data("prefill", configs)
self.prefill_interpolator = PrefillInterpolator(raw_data=raw_data)
raw_data = pre_swept_results_helper.select_data("decode", configs)
self.decode_interpolator = DecodeInterpolator(raw_data=raw_data)
else:
raise ValueError(
"Cannot set profile_results_dir to 'use-pre-swept-results' in non-dryrun mode"
)
else: else:
raise ValueError( self.prefill_interpolator = PrefillInterpolator(
"Cannot set profile_results_dir to 'use-pre-swept-results' in non-dryrun mode" args.profile_results_dir
) )
else: self.decode_interpolator = DecodeInterpolator(args.profile_results_dir)
self.prefill_interpolator = PrefillInterpolator(args.profile_results_dir)
self.decode_interpolator = DecodeInterpolator(args.profile_results_dir)
self.prefill_component_name = WORKER_COMPONENT_NAMES[ self.prefill_component_name = WORKER_COMPONENT_NAMES[
self.args.backend self.args.backend
...@@ -405,6 +405,48 @@ class BasePlanner: ...@@ -405,6 +405,48 @@ class BasePlanner:
else: else:
self.no_correction = args.no_correction self.no_correction = args.no_correction
if self.enable_loadbased:
if prometheus_engine_client is not None:
self.prometheus_engine_client = prometheus_engine_client
else:
# Auto-discover frontend metrics URL in Kubernetes mode
if not args.loadbased_router_metrics_url and isinstance(
getattr(self, "connector", None), KubernetesConnector
):
args.loadbased_router_metrics_url = (
self.connector.get_frontend_metrics_url()
)
if not args.loadbased_router_metrics_url:
raise ValueError(
"Could not auto-discover frontend metrics URL from DGD. "
"No service with componentType 'frontend' found. "
"Please provide --loadbased-router-metrics-url explicitly."
)
else:
logger.info(
f"Auto-discovered frontend metrics URL: {args.loadbased_router_metrics_url}"
)
self.prometheus_engine_client = DirectRouterMetricsClient(
args.loadbased_router_metrics_url, args.namespace
)
self.cached_load_metrics = CachedLoadMetrics()
from dynamo.planner.utils.load_based_regression import (
LoadBasedRegressionModel,
)
if self.component_type == SubComponentType.PREFILL:
self.ttft_regression = LoadBasedRegressionModel(
window_size=self.args.loadbased_learning_window,
min_observations=self.args.loadbased_min_observations,
)
elif self.component_type == SubComponentType.DECODE:
self.itl_regression = LoadBasedRegressionModel(
window_size=self.args.loadbased_learning_window,
min_observations=self.args.loadbased_min_observations,
)
@property @property
def last_metrics(self) -> Metrics: def last_metrics(self) -> Metrics:
return self.shared_state.last_metrics return self.shared_state.last_metrics
...@@ -508,7 +550,7 @@ class BasePlanner: ...@@ -508,7 +550,7 @@ class BasePlanner:
return num_p_workers, num_d_workers, True # Always stable for non-K8s return num_p_workers, num_d_workers, True # Always stable for non-K8s
async def observe_metrics( async def observe_traffic_stats(
self, require_prefill: bool = True, require_decode: bool = True self, require_prefill: bool = True, require_decode: bool = True
) -> None: ) -> None:
""" """
...@@ -546,37 +588,39 @@ class BasePlanner: ...@@ -546,37 +588,39 @@ class BasePlanner:
# Prometheus returns seconds, convert to milliseconds # Prometheus returns seconds, convert to milliseconds
self.last_metrics.ttft = ( self.last_metrics.ttft = (
self.prometheus_api_client.get_avg_time_to_first_token( self.prometheus_traffic_client.get_avg_time_to_first_token(
f"{self.args.adjustment_interval}s", f"{self.args.adjustment_interval}s",
self.model_name, self.model_name,
) )
* 1000 * 1000
) )
self.last_metrics.itl = ( self.last_metrics.itl = (
self.prometheus_api_client.get_avg_inter_token_latency( self.prometheus_traffic_client.get_avg_inter_token_latency(
f"{self.args.adjustment_interval}s", f"{self.args.adjustment_interval}s",
self.model_name, self.model_name,
) )
* 1000 * 1000
) )
self.last_metrics.num_req = self.prometheus_api_client.get_avg_request_count( self.last_metrics.num_req = (
f"{self.args.adjustment_interval}s", self.prometheus_traffic_client.get_avg_request_count(
self.model_name, f"{self.args.adjustment_interval}s",
self.model_name,
)
) )
self.last_metrics.request_duration = ( self.last_metrics.request_duration = (
self.prometheus_api_client.get_avg_request_duration( self.prometheus_traffic_client.get_avg_request_duration(
f"{self.args.adjustment_interval}s", f"{self.args.adjustment_interval}s",
self.model_name, self.model_name,
) )
) )
self.last_metrics.isl = ( self.last_metrics.isl = (
self.prometheus_api_client.get_avg_input_sequence_tokens( self.prometheus_traffic_client.get_avg_input_sequence_tokens(
f"{self.args.adjustment_interval}s", f"{self.args.adjustment_interval}s",
self.model_name, self.model_name,
) )
) )
self.last_metrics.osl = ( self.last_metrics.osl = (
self.prometheus_api_client.get_avg_output_sequence_tokens( self.prometheus_traffic_client.get_avg_output_sequence_tokens(
f"{self.args.adjustment_interval}s", f"{self.args.adjustment_interval}s",
self.model_name, self.model_name,
) )
...@@ -623,7 +667,9 @@ class BasePlanner: ...@@ -623,7 +667,9 @@ class BasePlanner:
logger.error(f"Failed to predict load: {e}") logger.error(f"Failed to predict load: {e}")
return None, None, None return None, None, None
def dryrun_observe_metrics(self, num_req: int, isl_avg: float, osl_avg: float): def dryrun_observe_traffic_stats(
self, num_req: int, isl_avg: float, osl_avg: float
):
self.num_req_predictor.add_data_point(num_req) self.num_req_predictor.add_data_point(num_req)
self.isl_predictor.add_data_point(isl_avg) self.isl_predictor.add_data_point(isl_avg)
self.osl_predictor.add_data_point(osl_avg) self.osl_predictor.add_data_point(osl_avg)
...@@ -700,44 +746,73 @@ class BasePlanner: ...@@ -700,44 +746,73 @@ class BasePlanner:
] ]
await self.connector.set_component_replicas(target_replicas, blocking=False) await self.connector.set_component_replicas(target_replicas, blocking=False)
async def run(self): async def _apply_scaling_blocking(self, desired_replicas: int) -> None:
"""Main loop for the planner""" """Apply scaling with blocking=True (wait for deployment ready)."""
if not self.args.no_operation: if self.args.no_operation:
logger.info("Validating deployment...") return
require_prefill = self.component_type == SubComponentType.PREFILL target_replicas = [
require_decode = self.component_type == SubComponentType.DECODE TargetReplica(
await self.connector.validate_deployment( sub_component_type=self.component_type,
prefill_component_name=( component_name=self._component_name(),
self.prefill_component_name if require_prefill else None desired_replicas=desired_replicas,
),
decode_component_name=(
self.decode_component_name if require_decode else None
),
require_prefill=require_prefill,
require_decode=require_decode,
) )
logger.info("Successfully validated the deployment") ]
await self.connector.set_component_replicas(target_replicas, blocking=True)
# Initialize GPU counts async def observe_engine_load_stats(self) -> None:
_initialize_gpu_counts( """Query DirectRouterMetricsClient for per-worker metrics, update regression."""
self.args, worker_type = self.component_type.value # "prefill" or "decode"
self.connector, result = self.prometheus_engine_client.get_recent_and_averaged_metrics(
require_prefill=require_prefill, worker_type
require_decode=require_decode, )
if result is None:
logger.warning(
f"No per-worker metrics available yet for {worker_type} (buffer empty)"
) )
return
await self.connector.wait_for_deployment_ready() recent, per_worker_averaged, cluster_averaged = result
self.cached_load_metrics = CachedLoadMetrics(
recent=recent,
per_worker_averaged=per_worker_averaged,
cluster_averaged=cluster_averaged,
)
model_name = await self._get_model_name( if self.component_type == SubComponentType.PREFILL:
require_prefill=require_prefill, require_decode=require_decode for wid, m in recent.items():
) active_prefill = m.get("active_prefill_tokens", 0.0)
logger.info(f"Detected model name from deployment: {model_name}") last_isl = m.get("last_isl", 0.0)
self.model_name = ( last_ttft = m.get("last_ttft", 0.0)
model_name.lower() if last_ttft > 0 and last_isl > 0:
) # normalize model name to lowercase (MDC) x = active_prefill + last_isl
# last_ttft is in seconds from Prometheus, convert to ms
y = last_ttft * 1000
logger.info(
f"{SubComponentType.PREFILL.value} Worker {wid} observed status: TTFT {y:.2f}ms @ prefill tokens {x:.2f}"
)
self.ttft_regression.add_observation(x, y)
elif self.component_type == SubComponentType.DECODE:
for wid, m in recent.items():
active_decode = m.get("active_decode_blocks", 0.0)
last_itl = m.get("last_itl", 0.0)
if last_itl > 0 and active_decode > 0:
x = active_decode
# last_itl is in seconds from Prometheus, convert to ms
y = last_itl * 1000
logger.info(
f"{SubComponentType.DECODE.value} Worker {wid} observed status: ITL {y:.2f}ms @ decode blocks {x:.2f}"
)
self.itl_regression.add_observation(x, y)
self.shared_state.last_adjustment_time = time.time() def loadbased_plan_adjustment(self) -> Optional[int]:
"""Load-based scaling decision. Override in subclasses."""
raise NotImplementedError
async def _throughput_loop(
self, require_prefill: bool, require_decode: bool
) -> None:
"""Throughput-based scaling loop (existing behavior, extracted from run())."""
while True: while True:
current_time = time.time() current_time = time.time()
...@@ -746,235 +821,141 @@ class BasePlanner: ...@@ -746,235 +821,141 @@ class BasePlanner:
>= self.args.adjustment_interval >= self.args.adjustment_interval
): ):
self.shared_state.last_adjustment_time = time.time() self.shared_state.last_adjustment_time = time.time()
logger.info("New adjustment interval started!") logger.info("New throughput adjustment interval started!")
await self.observe_metrics( await self.observe_traffic_stats(
require_prefill=require_prefill, require_decode=require_decode require_prefill=require_prefill, require_decode=require_decode
) )
desired_replicas = self.plan_adjustment() desired_replicas = self.plan_adjustment()
if desired_replicas is not None: if desired_replicas is not None:
desired_replicas = self.apply_component_budget(desired_replicas) if self.enable_loadbased:
self.update_predicted_replicas_metric(desired_replicas) # When load-based is also enabled: just set lower bound
await self._apply_scaling(desired_replicas) if self.component_type == SubComponentType.PREFILL:
self.shared_state.throughput_lower_bound_p = (
desired_replicas
)
else:
self.shared_state.throughput_lower_bound_d = (
desired_replicas
)
logger.info(
f"Throughput lower bound set to {desired_replicas} for {self.component_type.value}"
)
else:
# Throughput-only: apply scaling directly
desired_replicas = self.apply_component_budget(desired_replicas)
self.update_predicted_replicas_metric(desired_replicas)
# Throughput planner does not needs blocking scaling because it monitors
# and predicts the load, not relying on the current status of the engine.
await self._apply_scaling(desired_replicas)
# sleep for a while to avoid busy-waiting but not too long to miss the next adjustment
await asyncio.sleep(self.args.adjustment_interval / 10) await asyncio.sleep(self.args.adjustment_interval / 10)
async def _load_loop(self, require_prefill: bool, require_decode: bool) -> None:
"""Load-based scaling loop at shorter interval."""
while True:
await asyncio.sleep(self.args.loadbased_adjustment_interval)
logger.info("New load-based adjustment interval started!")
class PrefillPlanner(BasePlanner): # Query DGD for fresh worker counts
component_type = SubComponentType.PREFILL num_p, num_d, _ = await self.get_workers_info(
require_prefill=require_prefill, require_decode=require_decode
def _update_correction_factor(self) -> bool:
expect_ttft = self.prefill_interpolator.interpolate_ttft(self.last_metrics.isl)
self.p_correction_factor = self.last_metrics.ttft / expect_ttft
logger.info(f"Correction factor (prefill TTFT): {self.p_correction_factor:.3f}")
if self.prometheus_port != 0 and self.prometheus_metrics is not None:
self.prometheus_metrics.p_correction_factor.set(self.p_correction_factor)
return True
def _compute_replica_requirements(
self, next_num_req: float, next_isl: float, next_osl: float
) -> int:
pred_prefill_throughput = (
next_num_req
* next_isl
/ self.args.adjustment_interval
* min(1, self.p_correction_factor)
)
p_thpt_per_gpu = self.prefill_interpolator.interpolate_thpt_per_gpu(next_isl)
next_num_p = math.ceil(
pred_prefill_throughput / p_thpt_per_gpu / self.args.prefill_engine_num_gpu
)
next_num_p = max(next_num_p, self.args.min_endpoint)
logger.info(
f"Prefill calculation: {pred_prefill_throughput:.2f}(p_thpt) / "
f"{p_thpt_per_gpu * self.args.prefill_engine_num_gpu:.2f}(p_engine_cap) = "
f"{next_num_p}(num_p)"
)
return next_num_p
def update_predicted_replicas_metric(self, desired_replicas: int) -> None:
if self.prometheus_port != 0 and self.prometheus_metrics is not None:
self.prometheus_metrics.predicted_num_p.set(desired_replicas)
class DecodePlanner(BasePlanner):
component_type = SubComponentType.DECODE
def _update_correction_factor(self) -> bool:
if self.shared_state.num_d_workers == 0:
logger.warning(
"No decode workers found for correction factor, skipping correction update"
)
return True
expect_itl = self.decode_interpolator.interpolate_itl(
concurrency=self.last_metrics.num_req # type: ignore
/ self.shared_state.num_d_workers
* self.last_metrics.request_duration # type: ignore
/ self.args.adjustment_interval,
context_length=self.last_metrics.isl + self.last_metrics.osl / 2, # type: ignore
)
self.d_correction_factor = self.last_metrics.itl / expect_itl
logger.info(f"Correction factor (decode ITL): {self.d_correction_factor:.3f}")
if self.prometheus_port != 0 and self.prometheus_metrics is not None:
self.prometheus_metrics.d_correction_factor.set(self.d_correction_factor)
return True
def _compute_replica_requirements(
self, next_num_req: float, next_isl: float, next_osl: float
) -> int:
if self.d_correction_factor <= 0:
logger.warning(
f"d_correction_factor is {self.d_correction_factor}, using default value of 1.0"
) )
corrected_itl = self.args.itl self.shared_state.num_p_workers = num_p
else: self.shared_state.num_d_workers = num_d
corrected_itl = self.args.itl / self.d_correction_factor
(
pred_decode_thpt_per_gpu,
_,
_,
) = self.decode_interpolator.find_best_throughput_per_gpu(
itl=corrected_itl, context_length=next_isl + next_osl / 2
)
pred_decode_throughput = next_num_req * next_osl / self.args.adjustment_interval
next_num_d = math.ceil(
pred_decode_throughput
/ pred_decode_thpt_per_gpu
/ self.args.decode_engine_num_gpu
)
next_num_d = max(next_num_d, self.args.min_endpoint)
logger.info(
f"Decode calculation: {pred_decode_throughput:.2f}(d_thpt) / "
f"{pred_decode_thpt_per_gpu * self.args.decode_engine_num_gpu:.2f}(d_engine_cap) = "
f"{next_num_d}(num_d)"
)
return next_num_d
def update_predicted_replicas_metric(self, desired_replicas: int) -> None: # Observe per-worker metrics from router
if self.prometheus_port != 0 and self.prometheus_metrics is not None: await self.observe_engine_load_stats()
self.prometheus_metrics.predicted_num_d.set(desired_replicas)
# Reconcile DGD worker count with router Prometheus count
class DisaggPlanner: prom_count = len(self.cached_load_metrics.recent)
def __init__( dgd_count = (
self, runtime: Optional[DistributedRuntime], args: argparse.Namespace num_p if self.component_type == SubComponentType.PREFILL else num_d
) -> None: )
self.args = args if prom_count != dgd_count:
self.shared_state = PlannerSharedState() logger.warning(
prometheus_metrics = PlannerPrometheusMetrics() f"Worker count mismatch: DGD reports {dgd_count} workers, "
f"router metrics reports {prom_count} workers. "
self.prefill_planner = PrefillPlanner( "Skipping load-based scaling adjustment."
runtime, )
args, continue
shared_state=self.shared_state,
prometheus_metrics=prometheus_metrics, desired_replicas = self.loadbased_plan_adjustment()
start_prometheus_server=True,
) if desired_replicas is not None:
self.decode_planner = DecodePlanner( # Enforce lower bound from throughput-based
runtime, if self.enable_throughput:
args, if self.component_type == SubComponentType.PREFILL:
shared_state=self.shared_state, lower_bound = self.shared_state.throughput_lower_bound_p
prometheus_metrics=prometheus_metrics, else:
prometheus_api_client=getattr( lower_bound = self.shared_state.throughput_lower_bound_d
self.prefill_planner, "prometheus_api_client", None desired_replicas = max(desired_replicas, lower_bound)
), desired_replicas = self.apply_component_budget(desired_replicas)
connector=getattr(self.prefill_planner, "connector", None), self.update_predicted_replicas_metric(desired_replicas)
start_prometheus_server=False, # Load-based planner needs blocking scaling because it only checks
) # the current status of the engine, not the predicted load.
# We need to wait for the deployment to be steady before making another one.
async def _async_init(self): await self._apply_scaling_blocking(desired_replicas)
# Prefill/Decode share the same connector instance in disagg mode.
await self.prefill_planner._async_init()
async def run(self): async def run(self):
"""Main loop for the planner"""
require_prefill = self.component_type == SubComponentType.PREFILL
require_decode = self.component_type == SubComponentType.DECODE
if not self.args.no_operation: if not self.args.no_operation:
logger.info("Validating deployment...") logger.info("Validating deployment...")
await self.prefill_planner.connector.validate_deployment( await self.connector.validate_deployment(
prefill_component_name=self.prefill_planner.prefill_component_name, prefill_component_name=(
decode_component_name=self.prefill_planner.decode_component_name, self.prefill_component_name if require_prefill else None
require_prefill=True, ),
require_decode=True, decode_component_name=(
self.decode_component_name if require_decode else None
),
require_prefill=require_prefill,
require_decode=require_decode,
) )
logger.info("Successfully validated the deployment") logger.info("Successfully validated the deployment")
# Initialize GPU counts # Initialize GPU counts
_initialize_gpu_counts( _initialize_gpu_counts(
self.args, self.args,
self.prefill_planner.connector, self.connector,
require_prefill=True, require_prefill=require_prefill,
require_decode=True, require_decode=require_decode,
) )
await self.prefill_planner.connector.wait_for_deployment_ready() await self.connector.wait_for_deployment_ready()
model_name = await self.prefill_planner._get_model_name( # Model name discovery runs in all modes (needed for metrics collection)
require_prefill=True, require_decode=True if not self.args.no_operation:
model_name = await self._get_model_name(
require_prefill=require_prefill, require_decode=require_decode
) )
logger.info(f"Detected model name from deployment: {model_name}") logger.info(f"Detected model name from deployment: {model_name}")
model_name = model_name.lower() self.model_name = model_name.lower()
self.prefill_planner.model_name = model_name else:
self.decode_planner.model_name = model_name model_name = getattr(self.args, "model_name", None)
if not model_name:
self.shared_state.last_adjustment_time = time.time() raise ValueError(
"Model name is required in no-operation mode. "
while True: "Please provide --model-name."
current_time = time.time()
if (
current_time - self.shared_state.last_adjustment_time
>= self.args.adjustment_interval
):
self.shared_state.last_adjustment_time = time.time()
logger.info("New adjustment interval started!")
await self.prefill_planner.observe_metrics(
require_prefill=True, require_decode=True
)
self.decode_planner.update_predictors_from_metrics(
self.shared_state.last_metrics
) )
next_num_p = self.prefill_planner.plan_adjustment() self.model_name = model_name.lower()
next_num_d = self.decode_planner.plan_adjustment()
if next_num_p is None or next_num_d is None:
continue
next_num_p, next_num_d = _apply_global_gpu_budget( self.shared_state.last_adjustment_time = time.time()
next_num_p, next_num_d, self.args self.shared_state.last_loadbased_adjustment_time = time.time()
# Build list of concurrent loops based on enabled scaling modes
loops = []
if self.enable_throughput:
loops.append(self._throughput_loop(require_prefill, require_decode))
if self.enable_loadbased:
loops.append(self._load_loop(require_prefill, require_decode))
loops.append(
self.prometheus_engine_client.run_sampling_loop(
self.args.loadbased_metric_samples,
self.args.loadbased_adjustment_interval,
) )
self.prefill_planner.update_predicted_replicas_metric(next_num_p) )
self.decode_planner.update_predicted_replicas_metric(next_num_d)
if not self.args.no_operation:
target_replicas = [
TargetReplica(
sub_component_type=SubComponentType.PREFILL,
component_name=self.prefill_planner.prefill_component_name,
desired_replicas=next_num_p,
),
TargetReplica(
sub_component_type=SubComponentType.DECODE,
component_name=self.prefill_planner.decode_component_name,
desired_replicas=next_num_d,
),
]
await self.prefill_planner.connector.set_component_replicas(
target_replicas, blocking=False
)
# sleep for a while to avoid busy-waiting but not too long to miss the next adjustment
await asyncio.sleep(self.args.adjustment_interval / 10)
async def start_sla_planner(runtime: DistributedRuntime, args: argparse.Namespace): await asyncio.gather(*loops)
mode = getattr(args, "mode", "disagg")
if mode == "disagg":
planner = DisaggPlanner(runtime, args)
elif mode == "prefill":
planner = PrefillPlanner(runtime, args)
elif mode == "decode":
planner = DecodePlanner(runtime, args)
else:
raise ValueError(f"Invalid planner mode: {mode}")
await planner._async_init()
await planner.run()
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
import math
from typing import Optional
from dynamo.planner import SubComponentType
from dynamo.planner.utils.planner_core import BasePlanner
from dynamo.runtime.logging import configure_dynamo_logging
configure_dynamo_logging()
logger = logging.getLogger(__name__)
class PrefillPlanner(BasePlanner):
component_type = SubComponentType.PREFILL
def loadbased_plan_adjustment(self) -> Optional[int]:
"""Load-based scaling decision for prefill. Returns desired_replicas or None."""
if not self.ttft_regression.has_sufficient_data():
logger.info(
f"TTFT regression: insufficient data ({self.ttft_regression.num_observations}"
f"/{self.ttft_regression.min_observations}), skipping load-based scaling"
)
return None
x_sla = self.ttft_regression.predict_x_from_sla(self.args.ttft)
if x_sla is None:
return None
if not self.cached_load_metrics.recent:
return None
recent = self.cached_load_metrics.recent
cluster_averaged = self.cached_load_metrics.cluster_averaged
# Averaged ISL across all workers in the past adjustment interval
avg_isl = cluster_averaged.get("last_isl", 0.0)
target_active_tokens = x_sla - avg_isl
if target_active_tokens <= 0:
logger.warning(
f"TTFT SLA unachievable at current ISL: x_sla={x_sla:.1f}, "
f"avg_isl={avg_isl:.1f}, skipping load-based prefill scaling"
)
return None
num_workers = self.shared_state.num_p_workers
if num_workers == 0:
return None
logger.info(
f"Load-based prefill: x_sla={x_sla:.1f}, avg_isl={avg_isl:.1f}, "
f"target_active_tokens={target_active_tokens:.1f}, workers={num_workers}, "
f"slope={self.ttft_regression.slope:.6f}, intercept={self.ttft_regression.intercept:.3f}"
)
# Scale up: ALL workers above target (use recent metrics)
all_above = all(
m.get("active_prefill_tokens", 0.0) > target_active_tokens
for m in recent.values()
)
if all_above:
logger.info(
f"Load-based prefill: ALL workers above target ({target_active_tokens:.1f}), "
f"scaling up to {num_workers + 1}"
)
return num_workers + 1
# Scale down: ALL workers below boundary (use recent metrics)
if num_workers > 1:
sensitivity = self.args.loadbased_scaling_down_sensitivity / 100.0
boundary = (
target_active_tokens * (num_workers - 1) / num_workers * sensitivity
)
all_below = all(
m.get("active_prefill_tokens", 0.0) < boundary for m in recent.values()
)
if all_below:
logger.info(
f"Load-based prefill: ALL workers below boundary ({boundary:.1f}), "
f"scaling down to {num_workers - 1}"
)
return num_workers - 1
return None
def _update_correction_factor(self) -> bool:
expect_ttft = self.prefill_interpolator.interpolate_ttft(self.last_metrics.isl)
self.p_correction_factor = self.last_metrics.ttft / expect_ttft
logger.info(f"Correction factor (prefill TTFT): {self.p_correction_factor:.3f}")
if self.prometheus_port != 0 and self.prometheus_metrics is not None:
self.prometheus_metrics.p_correction_factor.set(self.p_correction_factor)
return True
def _compute_replica_requirements(
self, next_num_req: float, next_isl: float, next_osl: float
) -> int:
pred_prefill_throughput = (
next_num_req
* next_isl
/ self.args.adjustment_interval
* min(1, self.p_correction_factor)
)
p_thpt_per_gpu = self.prefill_interpolator.interpolate_thpt_per_gpu(next_isl)
if p_thpt_per_gpu <= 0:
logger.warning(
f"p_thpt_per_gpu is {p_thpt_per_gpu} "
"(no throughput satisfies TTFT target), falling back to min_endpoint"
)
return self.args.min_endpoint
next_num_p = math.ceil(
pred_prefill_throughput / p_thpt_per_gpu / self.args.prefill_engine_num_gpu
)
next_num_p = max(next_num_p, self.args.min_endpoint)
logger.info(
f"Prefill calculation: {pred_prefill_throughput:.2f}(p_thpt) / "
f"{p_thpt_per_gpu * self.args.prefill_engine_num_gpu:.2f}(p_engine_cap) = "
f"{next_num_p}(num_p)"
)
return next_num_p
def update_predicted_replicas_metric(self, desired_replicas: int) -> None:
if self.prometheus_port != 0 and self.prometheus_metrics is not None:
self.prometheus_metrics.predicted_num_p.set(desired_replicas)
...@@ -13,10 +13,16 @@ ...@@ -13,10 +13,16 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import asyncio
import logging import logging
import math
import typing import typing
from dataclasses import dataclass, field
from typing import Optional
import aiohttp
from prometheus_api_client import PrometheusConnect from prometheus_api_client import PrometheusConnect
from prometheus_client.parser import text_string_to_metric_families
from pydantic import BaseModel, ValidationError from pydantic import BaseModel, ValidationError
from dynamo import prometheus_names from dynamo import prometheus_names
...@@ -26,6 +32,48 @@ configure_dynamo_logging() ...@@ -26,6 +32,48 @@ configure_dynamo_logging()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dataclass
class Metrics:
ttft: Optional[float] = None
itl: Optional[float] = None
num_req: Optional[float] = None
isl: Optional[float] = None
osl: Optional[float] = None
request_duration: Optional[float] = None
p_load: Optional[float] = None
d_load: Optional[float] = None
def is_valid(self) -> bool:
"""Check if all required metrics are valid (not None and not NaN)."""
required = [
self.ttft,
self.itl,
self.isl,
self.osl,
self.num_req,
self.request_duration,
]
return all(v is not None and not math.isnan(v) for v in required)
@dataclass
class CachedLoadMetrics:
"""Container for load metrics used by load-based scaling.
Attributes:
recent: Most recent per-worker metrics (from the latest sample).
Keyed by worker_id -> {metric_name: value}.
per_worker_averaged: Per-worker metrics averaged over time (not across workers).
Keyed by worker_id -> {metric_name: value}.
cluster_averaged: Metrics averaged over time and all workers.
Flat dict {metric_name: value}.
"""
recent: dict[str, dict[str, float]] = field(default_factory=dict)
per_worker_averaged: dict[str, dict[str, float]] = field(default_factory=dict)
cluster_averaged: dict[str, float] = field(default_factory=dict)
class FrontendMetric(BaseModel): class FrontendMetric(BaseModel):
container: typing.Optional[str] = None container: typing.Optional[str] = None
dynamo_namespace: typing.Optional[str] = None dynamo_namespace: typing.Optional[str] = None
...@@ -180,3 +228,181 @@ def parse_frontend_metric_containers( ...@@ -180,3 +228,181 @@ def parse_frontend_metric_containers(
logger.error(f"Error parsing frontend metric container: {e}") logger.error(f"Error parsing frontend metric container: {e}")
continue continue
return metrics_containers return metrics_containers
# Metric names for per-worker load metrics (gauge-type, queried directly from router)
_WORKER_METRIC_NAMES = {
"active_prefill_tokens": f"{prometheus_names.name_prefix.FRONTEND}_{prometheus_names.frontend_service.WORKER_ACTIVE_PREFILL_TOKENS}",
"active_decode_blocks": f"{prometheus_names.name_prefix.FRONTEND}_{prometheus_names.frontend_service.WORKER_ACTIVE_DECODE_BLOCKS}",
"last_ttft": f"{prometheus_names.name_prefix.FRONTEND}_{prometheus_names.frontend_service.WORKER_LAST_TIME_TO_FIRST_TOKEN_SECONDS}",
"last_isl": f"{prometheus_names.name_prefix.FRONTEND}_{prometheus_names.frontend_service.WORKER_LAST_INPUT_SEQUENCE_TOKENS}",
"last_itl": f"{prometheus_names.name_prefix.FRONTEND}_{prometheus_names.frontend_service.WORKER_LAST_INTER_TOKEN_LATENCY_SECONDS}",
}
class DirectRouterMetricsClient:
"""Query router's /metrics endpoint directly for real-time per-worker metrics.
Runs a continuous background sampling loop that collects metrics at
evenly-spaced intervals (interval / num_samples). At decision time,
the load-based loop reads the buffer via get_recent_and_averaged_metrics().
"""
def __init__(self, router_metrics_url: str, dynamo_namespace: str):
self.router_metrics_url = router_metrics_url
self.dynamo_namespace = dynamo_namespace
self._sample_buffer: list[dict[str, dict[str, dict[str, float]]]] = []
self._num_samples: int = 10
def _parse_prometheus_text(
self, text: str
) -> dict[str, dict[str, dict[str, float]]]:
"""Parse Prometheus text exposition format and extract per-worker metrics.
Uses prometheus_client.parser to parse the text exposition format.
Groups results by worker_type label (prefill/decode) so callers
can access only the workers they care about.
Args:
text: Raw Prometheus text from /metrics endpoint
Returns:
{"prefill": {worker_id: {metric: float, ...}},
"decode": {worker_id: {metric: float, ...}}}
"""
target_metrics = set(_WORKER_METRIC_NAMES.values())
reverse_map = {v: k for k, v in _WORKER_METRIC_NAMES.items()}
result: dict[str, dict[str, dict[str, float]]] = {}
for family in text_string_to_metric_families(text):
if family.name not in target_metrics:
continue
field_name = reverse_map[family.name]
for sample in family.samples:
labels = sample.labels
worker_type = labels.get("worker_type", "unknown")
worker_id = labels.get("worker_id", "unknown")
value = sample.value
if worker_type not in result:
result[worker_type] = {}
if worker_id not in result[worker_type]:
result[worker_type][worker_id] = {}
result[worker_type][worker_id][field_name] = value
return result
async def _fetch_and_parse(self) -> dict[str, dict[str, dict[str, float]]]:
"""Fetch /metrics from router and parse into per-worker metrics."""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
self.router_metrics_url, timeout=aiohttp.ClientTimeout(total=2)
) as response:
text = await response.text()
return self._parse_prometheus_text(text)
except Exception as e:
logger.warning(f"Failed to fetch router metrics: {e}")
return {}
async def run_sampling_loop(self, num_samples: int, interval: float) -> None:
"""Background coroutine: continuously sample at evenly-spaced intervals.
Runs alongside the load-based loop via asyncio.gather().
sample_interval = interval / num_samples (e.g., 5s / 10 = 0.5s)
Keeps only the last num_samples in the buffer (rolling window).
"""
self._num_samples = num_samples
sample_interval = interval / num_samples
while True:
metrics = await self._fetch_and_parse()
if metrics:
self._sample_buffer.append(metrics)
if len(self._sample_buffer) > num_samples:
self._sample_buffer.pop(0)
await asyncio.sleep(sample_interval)
def get_recent_and_averaged_metrics(
self, worker_type: str
) -> typing.Optional[
tuple[
dict[str, dict[str, float]],
dict[str, dict[str, float]],
dict[str, float],
]
]:
"""Return recent, per-worker time-averaged, and cluster-averaged metrics.
Called by the load-based loop at decision time. Non-blocking.
Args:
worker_type: "prefill" or "decode" — only workers matching
the worker_type label are included.
Returns:
A tuple of (recent, per_worker_averaged, cluster_averaged):
- recent: {worker_id: {metric: float}} from the latest sample
- per_worker_averaged: {worker_id: {metric: float}} averaged over time per worker
- cluster_averaged: {metric: float} averaged over all samples and all workers
Returns None if the sample buffer is empty.
"""
if not self._sample_buffer:
return None
# --- Recent: last sample only ---
latest_sample = self._sample_buffer[-1]
recent: dict[str, dict[str, float]] = {}
for worker_id, metrics in latest_sample.get(worker_type, {}).items():
recent[worker_id] = dict(metrics)
# --- Per-worker averaged: across time, grouped by worker_id ---
pw_sums: dict[str, dict[str, float]] = {}
pw_counts: dict[str, dict[str, int]] = {}
for sample in self._sample_buffer:
typed_workers = sample.get(worker_type, {})
for worker_id, metrics in typed_workers.items():
if worker_id not in pw_sums:
pw_sums[worker_id] = {}
pw_counts[worker_id] = {}
for metric_name, value in metrics.items():
pw_sums[worker_id][metric_name] = (
pw_sums[worker_id].get(metric_name, 0.0) + value
)
pw_counts[worker_id][metric_name] = (
pw_counts[worker_id].get(metric_name, 0) + 1
)
if not pw_sums and not recent:
return None
per_worker_averaged: dict[str, dict[str, float]] = {}
for worker_id in pw_sums:
per_worker_averaged[worker_id] = {}
for metric_name in pw_sums[worker_id]:
per_worker_averaged[worker_id][metric_name] = (
pw_sums[worker_id][metric_name] / pw_counts[worker_id][metric_name]
)
# --- Cluster averaged: across time AND worker_id ---
cluster_sums: dict[str, float] = {}
cluster_counts: dict[str, int] = {}
for worker_id in pw_sums:
for metric_name in pw_sums[worker_id]:
cluster_sums[metric_name] = (
cluster_sums.get(metric_name, 0.0) + pw_sums[worker_id][metric_name]
)
cluster_counts[metric_name] = (
cluster_counts.get(metric_name, 0)
+ pw_counts[worker_id][metric_name]
)
cluster_averaged: dict[str, float] = {}
for metric_name in cluster_sums:
cluster_averaged[metric_name] = (
cluster_sums[metric_name] / cluster_counts[metric_name]
)
return recent, per_worker_averaged, cluster_averaged
...@@ -8,27 +8,37 @@ title: Planner ...@@ -8,27 +8,37 @@ title: Planner
The Planner monitors system performance and automatically scales prefill/decode workers to meet latency SLAs. It runs as a component inside the Dynamo inference graph on Kubernetes. The Planner monitors system performance and automatically scales prefill/decode workers to meet latency SLAs. It runs as a component inside the Dynamo inference graph on Kubernetes.
The SLA Planner supports two scaling modes:
- **Throughput-based scaling**: Uses pre-deployment profiling data and traffic prediction to compute the number of replicas needed to meet TTFT and ITL SLA targets. This is the primary scaling mode for production deployments.
- **Load-based scaling (Experimental)**: Uses real-time per-worker load metrics (active prefill tokens, active KV blocks) from the router to make SLA-aware scaling decisions via online linear regression. Does not require profiling data. Responds quickly to traffic bursts.
When both modes are enabled, throughput-based scaling provides a lower bound on replicas (long-term capacity planning) while load-based scaling handles real-time adjustments (burst response).
> **New to the Planner?** Start with the [SLA Planner Quick Start Guide](planner-guide.md) for a complete workflow including profiling and deployment. > **New to the Planner?** Start with the [SLA Planner Quick Start Guide](planner-guide.md) for a complete workflow including profiling and deployment.
## Feature Matrix ## Feature Matrix
| Category | Feature | Status | | Feature | Throughput-Based | Load-Based (Experimental) |
|----------|---------|--------| |---------|:----------------:|:-------------------------:|
| **Backend** | Local (bare metal) | Deprecated | | **Deployment** | | |
| | Kubernetes | Supported | | Disaggregated | Supported | Supported |
| **LLM Framework** | vLLM | Supported | | Aggregated | Unsupported | Supported |
| | TensorRT-LLM | Supported | | **LLM Framework** | | |
| | SGLang | Supported | | vLLM | Supported | Supported |
| **Serving Type** | Aggregated | Unsupported | | TensorRT-LLM | Supported | Supported |
| | Disaggregated | Supported | | SGLang | Supported | Supported |
| **Scaling Mode** | SLA-based (TTFT/ITL targets) | Supported (primary) | | **Requires Profiling Data** | Yes | No |
| | Load-based (KV cache/queue thresholds) | Deprecated | | **Load Predictors** | ARIMA, Prophet, Kalman, Constant | N/A |
| **Load Predictors** | ARIMA | Supported | | **Connectors** | | |
| | Prophet | Supported | | KubernetesConnector | Supported | Supported |
| | Kalman filter | Supported | | VirtualConnector | Supported | Supported |
| | Constant (current = next) | Supported |
| **Connectors** | KubernetesConnector (native DGD scaling) | Supported | ## When to Use Which Mode
| | VirtualConnector (external environments) | Supported |
- **Throughput-based scaling** should be enabled whenever engine profiling data is available (through pre-deployment profiling). It provides stable, prediction-based capacity planning.
- **Load-based scaling** should be enabled when traffic is bursty or hard to predict. It reacts quickly to real-time load changes without requiring profiling data.
- **Both modes together**: For the best of both worlds, enable both. Throughput-based scaling provides a lower bound (long-term capacity), while load-based scaling handles bursts above that floor. When both are enabled, use a longer `--adjustment-interval` for throughput-based scaling.
## Quick Start ## Quick Start
...@@ -36,21 +46,35 @@ The Planner monitors system performance and automatically scales prefill/decode ...@@ -36,21 +46,35 @@ The Planner monitors system performance and automatically scales prefill/decode
- Dynamo platform installed on Kubernetes ([Installation Guide](../../kubernetes/installation-guide.md)) - Dynamo platform installed on Kubernetes ([Installation Guide](../../kubernetes/installation-guide.md))
- kube-prometheus-stack installed ([Metrics Setup](../../kubernetes/observability/metrics.md)) - kube-prometheus-stack installed ([Metrics Setup](../../kubernetes/observability/metrics.md))
- Pre-deployment profiling completed ([Profiling Guide](../profiler/profiler-guide.md))
### Deploy with DGDR (Recommended) For throughput-based scaling, pre-deployment profiling is also required ([Profiling Guide](../profiler/profiler-guide.md)).
### Throughput-Based Scaling (with DGDR)
The fastest path to a planner-enabled deployment is through a DynamoGraphDeploymentRequest: The fastest path to a throughput-based planner deployment is through a DynamoGraphDeploymentRequest, which automatically profiles your model:
```bash ```bash
kubectl apply -f components/src/dynamo/profiler/deploy/profile_sla_aic_dgdr.yaml -n $NAMESPACE kubectl apply -f components/src/dynamo/profiler/deploy/profile_sla_aic_dgdr.yaml -n $NAMESPACE
``` ```
This automatically profiles your model and deploys with the SLA planner. See [SLA Planner Guide](planner-guide.md) for the full workflow. See [Planner Guide](planner-guide.md) for the full workflow.
### Load-Based Scaling (without profiling)
To deploy with load-based scaling only (no profiling required), add these arguments to the planner service in your DGD:
```yaml
args:
- --enable-loadbased-scaling
- --disable-throughput-scaling
- --loadbased-adjustment-interval=5
```
The planner will auto-discover the frontend metrics endpoint from the DGD. See [disagg_planner_load.yaml](../../../../tests/planner/scaling/disagg_planner_load.yaml) for a complete example.
### Deploy with DGD (Manual) ### Manual DGD Deployment
For manual control, use the disaggregated planner templates: For manual control with throughput-based scaling, use the disaggregated planner templates:
```bash ```bash
# After profiling is complete # After profiling is complete
...@@ -63,9 +87,6 @@ kubectl apply -f examples/backends/vllm/deploy/disagg_planner.yaml -n $NAMESPACE ...@@ -63,9 +87,6 @@ kubectl apply -f examples/backends/vllm/deploy/disagg_planner.yaml -n $NAMESPACE
|----------|-------------| |----------|-------------|
| [Planner Guide](planner-guide.md) | Deployment, configuration, integration, troubleshooting | | [Planner Guide](planner-guide.md) | Deployment, configuration, integration, troubleshooting |
| [Planner Examples](planner-examples.md) | DGDR YAML examples, sample configurations, advanced patterns | | [Planner Examples](planner-examples.md) | DGDR YAML examples, sample configurations, advanced patterns |
| [SLA Planner Guide](planner-guide.md) | End-to-end DGDR workflow: define SLAs, profile, deploy, monitor |
| [SLA-based Planner](planner-guide.md) | Scaling algorithm, correction factors, load prediction details |
| [Load-based Planner](README.md) | Legacy load-based scaling (deprecated) |
| [SLA-Driven Profiling](../profiler/profiler-guide.md) | Pre-deployment profiling process and configuration | | [SLA-Driven Profiling](../profiler/profiler-guide.md) | Pre-deployment profiling process and configuration |
| [Planner Design](../../design-docs/planner-design.md) | Architecture deep-dive for contributors | | [Planner Design](../../design-docs/planner-design.md) | Architecture deep-dive for contributors |
...@@ -75,22 +96,33 @@ kubectl apply -f examples/backends/vllm/deploy/disagg_planner.yaml -n $NAMESPACE ...@@ -75,22 +96,33 @@ kubectl apply -f examples/backends/vllm/deploy/disagg_planner.yaml -n $NAMESPACE
| Argument | Default | Description | | Argument | Default | Description |
|----------|---------|-------------| |----------|---------|-------------|
| **Common** | | |
| `--namespace` | `$DYN_NAMESPACE` or `dynamo` | Dynamo logical namespace | | `--namespace` | `$DYN_NAMESPACE` or `dynamo` | Dynamo logical namespace |
| `--backend` | `vllm` | Backend framework (`vllm`, `sglang`, `trtllm`) | | `--backend` | `vllm` | Backend framework (`vllm`, `sglang`, `trtllm`) |
| `--mode` | `disagg` | Planner mode (`disagg`, `prefill`, `decode`, `agg`) |
| `--environment` | `kubernetes` | Deployment environment | | `--environment` | `kubernetes` | Deployment environment |
| `--adjustment-interval` | `180` | Seconds between scaling decisions |
| `--ttft` | `500.0` | Target Time To First Token (ms) | | `--ttft` | `500.0` | Target Time To First Token (ms) |
| `--itl` | `50.0` | Target Inter-Token Latency (ms) | | `--itl` | `50.0` | Target Inter-Token Latency (ms) |
| `--isl` | `3000` | Expected average input sequence length |
| `--osl` | `150` | Expected average output sequence length |
| `--load-predictor` | `arima` | Prediction model (`arima`, `prophet`, `kalman`, `constant`) |
| `--max-gpu-budget` | `8` | Maximum GPUs across all workers | | `--max-gpu-budget` | `8` | Maximum GPUs across all workers |
| `--min-endpoint` | `1` | Minimum replicas per worker type | | `--min-endpoint` | `1` | Minimum replicas per worker type |
| `--decode-engine-num-gpu` | `1` | GPUs per decode engine | | `--decode-engine-num-gpu` | `1` | GPUs per decode engine |
| `--prefill-engine-num-gpu` | `1` | GPUs per prefill engine | | `--prefill-engine-num-gpu` | `1` | GPUs per prefill engine |
| `--no-operation` | `false` | Observation mode (no actual scaling) | | `--no-operation` | `false` | Observation mode (no actual scaling) |
| `--no-correction` | `false` | Disable correction factors | | **Throughput-based scaling** | | |
| `--enable-throughput-scaling` | `true` | Enable throughput-based scaling |
| `--adjustment-interval` | `180` | Seconds between throughput-based scaling decisions |
| `--profile-results-dir` | `profiling_results` | Path to profiling data (NPZ/JSON) | | `--profile-results-dir` | `profiling_results` | Path to profiling data (NPZ/JSON) |
| `--load-predictor` | `arima` | Prediction model (`arima`, `prophet`, `kalman`, `constant`) |
| `--no-correction` | `false` | Disable correction factors |
| **Load-based scaling (Experimental)** | | |
| `--enable-loadbased-scaling` | `false` | Enable load-based scaling |
| `--disable-throughput-scaling` | `false` | Disable throughput-based scaling (required for `agg` mode) |
| `--loadbased-router-metrics-url` | auto-discovered | URL to router's `/metrics` endpoint |
| `--loadbased-adjustment-interval` | `5` | Seconds between load-based scaling decisions |
| `--loadbased-learning-window` | `50` | Sliding window size for regression model |
| `--loadbased-scaling-down-sensitivity` | `80` | Scale-down sensitivity 0-100 (0=never, 100=aggressive) |
| `--loadbased-metric-samples` | `10` | Number of metric samples per adjustment interval |
| `--loadbased-min-observations` | `5` | Minimum observations before regression activates |
### Environment Variables ### Environment Variables
...@@ -119,7 +151,12 @@ The dashboard shows: ...@@ -119,7 +151,12 @@ The dashboard shows:
### Prometheus Metrics ### Prometheus Metrics
The planner queries the frontend's `/metrics` endpoint via Prometheus. Required metrics: **Throughput-based scaling** pulls traffic metrics from the cluster-wide Prometheus server:
- Request count and duration - Request count and duration
- TTFT and ITL distributions - TTFT and ITL distributions
- Input/output sequence lengths - Input/output sequence lengths
**Load-based scaling** pulls per-engine status directly from the frontend's `/metrics` endpoint:
- Active prefill tokens per worker
- Active decode blocks per worker
- Last observed TTFT, ITL, and ISL per worker
...@@ -4,9 +4,9 @@ ...@@ -4,9 +4,9 @@
title: Planner Examples title: Planner Examples
--- ---
# Planner Examples # Planner Examples: Throughput-Based Scaling
Practical examples for deploying the SLA Planner with different configurations. For deployment concepts, see the [Planner Guide](planner-guide.md). For a quick overview, see the [Planner README](README.md). Practical examples for deploying the SLA Planner with throughput-based scaling. All examples below use the DGDR workflow with pre-deployment profiling. For deployment concepts, see the [Planner Guide](planner-guide.md). For a quick overview, see the [Planner README](README.md).
## Basic Examples ## Basic Examples
......
...@@ -8,6 +8,28 @@ title: Planner Guide ...@@ -8,6 +8,28 @@ title: Planner Guide
Deployment, configuration, and integration guide for the Dynamo SLA Planner. For a quick overview, see the [Planner README](README.md). For architecture internals, see [Planner Design](../../design-docs/planner-design.md). Deployment, configuration, and integration guide for the Dynamo SLA Planner. For a quick overview, see the [Planner README](README.md). For architecture internals, see [Planner Design](../../design-docs/planner-design.md).
## Scaling Modes
The SLA Planner supports two scaling modes:
- **Throughput-based scaling**: Uses pre-deployment profiling data and traffic prediction. Best for stable, predictable workloads where profiling data is available.
- **Load-based scaling (Experimental)**: Uses real-time per-worker engine metrics and online regression. Best for bursty or unpredictable traffic. Does not require profiling data.
**When to use which mode:**
- Enable **throughput-based scaling** whenever engine profiling data is available. It provides stable, prediction-based capacity planning.
- Enable **load-based scaling** when traffic is bursty or hard to predict. It reacts quickly to real-time load changes.
- Enable **both modes together** for the best of both worlds: throughput-based scaling provides a lower bound (long-term capacity), while load-based scaling handles bursts above that floor. When both are enabled, use a longer `--adjustment-interval` for throughput-based scaling.
**DGDR and scaling modes:** Deploying via DGDR automatically triggers profiling and enables throughput-based scaling. To additionally enable load-based scaling, pass the planner arguments through the DGDR's planner config section:
```yaml
profilingConfig:
config:
planner:
plannerEnableLoadbasedScaling: true
plannerLoadbasedAdjustmentInterval: 5
```
## Deployment ## Deployment
### Prerequisites ### Prerequisites
...@@ -191,7 +213,7 @@ For detailed comparison, supported configurations, and limitations, see [SLA-Dri ...@@ -191,7 +213,7 @@ For detailed comparison, supported configurations, and limitations, see [SLA-Dri
### Load Predictors ### Load Predictors
The SLA planner forecasts the number of requests, ISL, and OSL in the next adjustment interval. Four prediction models are supported: The throughput-based scaling mode forecasts the number of requests, ISL, and OSL in the next adjustment interval. Four prediction models are supported:
#### Constant Predictor #### Constant Predictor
- **Use case**: Stable workloads with long prediction intervals - **Use case**: Stable workloads with long prediction intervals
...@@ -231,15 +253,13 @@ You can warm-start load predictors with a mooncake-style JSONL trace file: ...@@ -231,15 +253,13 @@ You can warm-start load predictors with a mooncake-style JSONL trace file:
- **CLI argument**: `--load-predictor-warmup-trace <path/to/trace.jsonl>` - **CLI argument**: `--load-predictor-warmup-trace <path/to/trace.jsonl>`
- **Effect**: preloads predictors with historical request-count / ISL / OSL samples extracted from the trace - **Effect**: preloads predictors with historical request-count / ISL / OSL samples extracted from the trace
### Planner Scaling Parameters ### Throughput-Based Scaling Parameters
| Argument | Default | Description | | Argument | Default | Description |
|----------|---------|-------------| |----------|---------|-------------|
| `--adjustment-interval` | `180` | Seconds between scaling decisions | | `--adjustment-interval` | `180` | Seconds between scaling decisions |
| `--ttft` | `500.0` | Target Time To First Token (ms) | | `--ttft` | `500.0` | Target Time To First Token (ms) |
| `--itl` | `50.0` | Target Inter-Token Latency (ms) | | `--itl` | `50.0` | Target Inter-Token Latency (ms) |
| `--isl` | `3000` | Expected average input sequence length |
| `--osl` | `150` | Expected average output sequence length |
| `--max-gpu-budget` | `8` | Maximum GPUs across all workers | | `--max-gpu-budget` | `8` | Maximum GPUs across all workers |
| `--min-endpoint` | `1` | Minimum replicas per worker type | | `--min-endpoint` | `1` | Minimum replicas per worker type |
| `--decode-engine-num-gpu` | `1` | GPUs per decode engine | | `--decode-engine-num-gpu` | `1` | GPUs per decode engine |
...@@ -247,6 +267,8 @@ You can warm-start load predictors with a mooncake-style JSONL trace file: ...@@ -247,6 +267,8 @@ You can warm-start load predictors with a mooncake-style JSONL trace file:
| `--no-operation` | `false` | Observation mode (no actual scaling) | | `--no-operation` | `false` | Observation mode (no actual scaling) |
| `--no-correction` | `false` | Disable correction factors | | `--no-correction` | `false` | Disable correction factors |
For the full list of arguments including load-based scaling options, see the [Planner README](README.md#key-arguments).
#### Planner Configuration Passthrough #### Planner Configuration Passthrough
Add planner-specific settings in the DGDR: Add planner-specific settings in the DGDR:
......
...@@ -10,9 +10,9 @@ title: Planner Design ...@@ -10,9 +10,9 @@ title: Planner Design
## Overview ## Overview
The Planner is Dynamo's autoscaling controller. It observes system metrics, predicts future load, and adjusts prefill/decode worker replica counts to proactively meet SLA targets. This document covers the internal architecture, algorithms, and design trade-offs. The Planner is Dynamo's autoscaling controller. It supports two scaling modes: **throughput-based** (using profiling data and traffic prediction) and **load-based** (using real-time engine metrics and online regression). This document covers the internal architecture, algorithms, and design trade-offs for both modes.
## Architecture ## Throughput-Based Scaling
![Planner architecture showing Metric Collector, Load Predictor, and Performance Interpolator feeding into the Scaling Algorithm and Connector Layer](../../assets/img/planner-architecture.svg) ![Planner architecture showing Metric Collector, Load Predictor, and Performance Interpolator feeding into the Scaling Algorithm and Connector Layer](../../assets/img/planner-architecture.svg)
...@@ -167,17 +167,48 @@ After the delay: ...@@ -167,17 +167,48 @@ After the delay:
- **Interpolation accuracy vs profiling cost**: Higher `prefillInterpolationGranularity` and `decodeInterpolationGranularity` in the profiling sweep produce more accurate interpolation but increase profiling time linearly. Default granularity (16 prefill, 6 decode) balances accuracy with profiling duration. - **Interpolation accuracy vs profiling cost**: Higher `prefillInterpolationGranularity` and `decodeInterpolationGranularity` in the profiling sweep produce more accurate interpolation but increase profiling time linearly. Default granularity (16 prefill, 6 decode) balances accuracy with profiling duration.
- **Predictor warm-up period**: All predictors need observation history before making reliable forecasts. ARIMA and Prophet need multiple adjustment intervals of data. Kalman starts forecasting after `--kalman-min-points` observations. During warm-up, the planner uses the constant predictor as fallback. - **Predictor warm-up period**: All predictors need observation history before making reliable forecasts. ARIMA and Prophet need multiple adjustment intervals of data. Kalman starts forecasting after `--kalman-min-points` observations. During warm-up, the planner uses the constant predictor as fallback.
## Load-Based Scaling (Experimental)
The load-based mode uses real-time per-worker metrics from the router to make SLA-aware scaling decisions without requiring profiling data.
### Metrics
The planner pulls per-worker load metrics directly from the frontend's `/metrics` endpoint:
- **Active prefill tokens**: pending prefill tokens per worker
- **Active decode blocks**: active KV blocks per worker
- **Last TTFT, ITL, ISL**: most recent observed latencies per worker
### Regression Model
A sliding-window linear regression maps load to latency:
- Prefill: `(active_prefill_tokens + ISL)` -> `TTFT`
- Decode: `active_decode_blocks` -> `ITL`
Given a TTFT/ITL SLA target, the model reverse-solves for the maximum load that satisfies the SLA.
### Scaling Decisions
- **Scale up**: if ALL workers' recent load exceeds the regression-derived target
- **Scale down**: if ALL workers' recent load is below the target adjusted by `(num_workers - 1) / num_workers * sensitivity / 100`
- Only scales by +/-1 per interval (blocking)
### Co-existence with Throughput-Based Scaling
When both modes are enabled, throughput-based scaling (longer interval) sets a lower bound on replicas while load-based scaling (shorter interval) handles real-time adjustments above that floor.
### Aggregated Mode
In aggregated mode (`--mode agg`), engines handle both prefill and decode via chunked prefill. The planner maintains both TTFT and ITL regression models but uses per-worker time-averaged metrics (not instantaneous) for regression training to smooth out chunked prefill noise. Scale up if either prefill or decode signals overload; scale down only if both signal underload.
## Known Limitations ## Known Limitations
1. **30-second startup delay**: Hardcoded wait for component registration. It should be replaced with runtime readiness probing. 1. **30-second startup delay**: Hardcoded wait for component registration. It should be replaced with runtime readiness probing.
2. **Adjustment interval vs scaling latency**: If `adjustment_interval` \< time to scale, scaling decisions can pile up. The planner logs warnings but doesn't queue. 2. **Adjustment interval vs scaling latency**: If `adjustment_interval` \< time to scale, scaling decisions can pile up. The planner logs warnings but doesn't queue.
3. **Average-based interpolation**: The planner uses average ISL/OSL, which may not represent bimodal or heavy-tailed distributions well. 3. **Average-based interpolation**: Throughput-based scaling uses average ISL/OSL, which may not represent bimodal or heavy-tailed distributions well.
4. **Single DGD scope**: Each planner instance manages exactly one DGD. Multi-model/multi-DGD coordination is not supported. 4. **Single DGD scope**: Each planner instance manages exactly one DGD. Multi-model/multi-DGD coordination is not supported.
5. **Load-based planner deprecated**: The load-based code path exists but is non-functional with current backends (no prefill queue metrics).
## Future Work ## Future Work
- Support aggregated (non-disaggregated) scaling mode for single-worker deployments
- Multi-DGD coordination for shared-cluster scenarios - Multi-DGD coordination for shared-cluster scenarios
- Distribution-aware interpolation (beyond mean ISL/OSL) - Distribution-aware interpolation (beyond mean ISL/OSL)
- Adaptive adjustment interval based on observed scaling latency - Adaptive adjustment interval based on observed scaling latency
...@@ -185,17 +216,22 @@ After the delay: ...@@ -185,17 +216,22 @@ After the delay:
## File Map ## File Map
| File | Size | Purpose | | File | Purpose |
| ---------------------------- | ---- | ----------------------------------------------------- | | ---------------------------- | ----------------------------------------------------- |
| `planner_core.py` | 36k | Main scaling loop, algorithm implementation | | `planner_core.py` | Base planner, shared scaling loop, algorithm core |
| `perf_interpolation.py` | 13k | NPZ data loading and throughput/latency interpolation | | `disagg_planner.py` | Disaggregated mode orchestrator (prefill + decode) |
| `load_predictor.py` | 16k | ARIMA, Prophet, Kalman, Constant predictors | | `agg_planner.py` | Aggregated mode orchestrator (load-based only) |
| `pre_swept_results_utils.py` | 12k | Pre-computed H100/H200 profiling data loader | | `prefill_planner.py` | Prefill-specific scaling logic |
| `kubernetes_connector.py` | 11k | K8s API integration for DGD scaling | | `decode_planner.py` | Decode-specific scaling logic |
| `kube.py` | 7.4k | Low-level K8s client wrapper | | `load_based_regression.py` | Sliding-window linear regression for load-based scaling |
| `exceptions.py` | 7.2k | Custom exception hierarchy | | `prometheus.py` | Prometheus/router metrics clients, data classes |
| `prometheus.py` | 7.3k | Prometheus query builder and client | | `perf_interpolation.py` | NPZ data loading and throughput/latency interpolation |
| `defaults.py` | 8.1k | Default configs, backend name mappings | | `load_predictor.py` | ARIMA, Prophet, Kalman, Constant predictors |
| `planner_argparse.py` | 6.2k | CLI argument definitions | | `pre_swept_results_utils.py` | Pre-computed H100/H200 profiling data loader |
| `kubernetes_connector.py` | K8s API integration for DGD scaling |
| `kube.py` | Low-level K8s client wrapper |
| `exceptions.py` | Custom exception hierarchy |
| `defaults.py` | Default configs, backend name mappings |
| `planner_argparse.py` | CLI argument definitions |
...@@ -112,6 +112,10 @@ STUB_MODULES = [ ...@@ -112,6 +112,10 @@ STUB_MODULES = [
"gpu_memory_service", "gpu_memory_service",
"gpu_memory_service.common", "gpu_memory_service.common",
"gpu_memory_service.common.utils", "gpu_memory_service.common.utils",
"prometheus_client",
"prometheus_client.parser",
"sklearn",
"sklearn.linear_model",
] ]
# Project paths for local imports # Project paths for local imports
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: vllm-disagg-planner
spec:
services:
Frontend:
componentType: frontend
replicas: 1
extraPodSpec:
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag
workingDir: /workspace/examples/backends/vllm
command:
- python3
args:
- -m
- dynamo.frontend
- --router-mode
- kv
Planner:
componentType: planner
replicas: 1
extraPodSpec:
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag
workingDir: /workspace/components/src/dynamo/planner
command:
- python3
- -m
- planner_sla
args:
- --environment=kubernetes
- --backend=vllm
- --enable-loadbased-scaling
- --disable-throughput-scaling
- --loadbased-adjustment-interval=5
- --loadbased-min-observations=5
VllmDecodeWorker:
envFromSecret: hf-token-secret
componentType: worker
subComponentType: decode
replicas: 1
resources:
limits:
gpu: "1"
extraPodSpec:
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag
workingDir: /workspace/examples/backends/vllm
command:
- python3
args:
- -m
- dynamo.vllm
- --model
- nvidia/Llama-3.1-8B-Instruct-FP8
VllmPrefillWorker:
envFromSecret: hf-token-secret
componentType: worker
subComponentType: prefill
replicas: 1
resources:
limits:
gpu: "1"
extraPodSpec:
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag
workingDir: /workspace/examples/backends/vllm
command:
- python3
args:
- -m
- dynamo.vllm
- --model
- nvidia/Llama-3.1-8B-Instruct-FP8
- --is-prefill-worker
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment