"lib/parsers/src/vscode:/vscode.git/clone" did not exist on "57bdbdf1eec917ca74a3b76bc15c90cc9a2cbc9e"
Unverified Commit 3f53a78e authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files
parent 219cfa1f
...@@ -949,6 +949,7 @@ if __name__ == "__main__": ...@@ -949,6 +949,7 @@ if __name__ == "__main__":
y_context_length=np.array(y_context_length), y_context_length=np.array(y_context_length),
z_itl=np.array(z_itl), z_itl=np.array(z_itl),
z_thpt_per_gpu=np.array(z_thpt_per_gpu), z_thpt_per_gpu=np.array(z_thpt_per_gpu),
max_kv_tokens=np.array([max_kv_tokens]),
) )
logger.info(f"Saved data points to {save_path}") logger.info(f"Saved data points to {save_path}")
......
...@@ -18,12 +18,13 @@ __all__ = [ ...@@ -18,12 +18,13 @@ __all__ = [
"LocalConnector", "LocalConnector",
"PlannerConnector", "PlannerConnector",
"KubernetesConnector", "KubernetesConnector",
"PlannerDefaults", "LoadPlannerDefaults",
"SLAPlannerDefaults",
] ]
# Import the classes # Import the classes
from dynamo.planner.circusd import CircusController from dynamo.planner.circusd import CircusController
from dynamo.planner.defaults import PlannerDefaults from dynamo.planner.defaults import LoadPlannerDefaults, SLAPlannerDefaults
from dynamo.planner.kubernetes_connector import KubernetesConnector from dynamo.planner.kubernetes_connector import KubernetesConnector
from dynamo.planner.local_connector import LocalConnector from dynamo.planner.local_connector import LocalConnector
from dynamo.planner.planner_connector import PlannerConnector from dynamo.planner.planner_connector import PlannerConnector
...@@ -163,6 +163,7 @@ class CircusController: ...@@ -163,6 +163,7 @@ class CircusController:
waiting: bool = True, waiting: bool = True,
max_retries: int = 3, max_retries: int = 3,
retry_delay: float = 2.0, retry_delay: float = 2.0,
blocking: bool = True,
timeout: int = 600, # 10 minutes timeout: int = 600, # 10 minutes
) -> bool: ) -> bool:
""" """
...@@ -185,8 +186,11 @@ class CircusController: ...@@ -185,8 +186,11 @@ class CircusController:
response = self.client.send_message("signal", name=name, signum="SIGTERM") response = self.client.send_message("signal", name=name, signum="SIGTERM")
if response.get("status") != "ok": if response.get("status") != "ok":
logger.warning(f"Failed to send SIGTERM to {name}: {response}") logger.warning(f"Failed to send SIGTERM to {name}: {response}")
if not blocking:
return True
except Exception as e: except Exception as e:
logger.warning(f"Error sending SIGTERM to {name}: {e}") logger.warning(f"Error sending SIGTERM to {name}: {e}")
return False
# Now wait for the process to exit gracefully # Now wait for the process to exit gracefully
exited = await self._wait_for_process_graceful_exit(name, timeout) exited = await self._wait_for_process_graceful_exit(name, timeout)
......
...@@ -15,18 +15,32 @@ ...@@ -15,18 +15,32 @@
# Source of truth for planner defaults # Source of truth for planner defaults
class PlannerDefaults: class BasePlannerDefaults:
namespace = "dynamo" namespace = "dynamo"
environment = "local" environment = "local"
no_operation = False no_operation = False
log_dir = None log_dir = None
adjustment_interval = 10 adjustment_interval = 180 # in seconds
metric_pulling_interval = 1
max_gpu_budget = 8 max_gpu_budget = 8
min_endpoint = 1 min_endpoint = 1 # applies to both decode and prefill
decode_engine_num_gpu = 1
prefill_engine_num_gpu = 1
class LoadPlannerDefaults(BasePlannerDefaults):
metric_pulling_interval = 10 # in seconds
decode_kv_scale_up_threshold = 0.9 decode_kv_scale_up_threshold = 0.9
decode_kv_scale_down_threshold = 0.5 decode_kv_scale_down_threshold = 0.5
prefill_queue_scale_up_threshold = 5.0 prefill_queue_scale_up_threshold = 5.0
prefill_queue_scale_down_threshold = 0.2 prefill_queue_scale_down_threshold = 0.2
decode_engine_num_gpu = 1
prefill_engine_num_gpu = 1
class SLAPlannerDefaults(BasePlannerDefaults):
prometheus_endpoint = "http://localhost:9090"
profile_results_dir = "profiling_results"
isl = 3000 # in number of tokens
osl = 150 # in number of tokens
ttft = 0.5 # in seconds
itl = 0.05 # in seconds
load_predictor = "arima" # ["constant", "arima", "prophet"]
load_prediction_window_size = 50 # predict load using how many recent load samples
...@@ -231,7 +231,10 @@ class LocalConnector(PlannerConnector): ...@@ -231,7 +231,10 @@ class LocalConnector(PlannerConnector):
target_watcher = matching_components[highest_suffix] target_watcher = matching_components[highest_suffix]
logger.info(f"Removing watcher {target_watcher}") logger.info(f"Removing watcher {target_watcher}")
success = await self.circus.remove_watcher(name=target_watcher) success = await self.circus.remove_watcher(
name=target_watcher, blocking=blocking
)
if not blocking:
logger.info( logger.info(
f"Circus remove_watcher for {target_watcher} {'succeeded' if success else 'failed'}" f"Circus remove_watcher for {target_watcher} {'succeeded' if success else 'failed'}"
) )
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import asyncio
import logging
from pydantic import BaseModel
from dynamo.planner.defaults import SLAPlannerDefaults
from dynamo.planner.utils.planner_core import start_sla_planner
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sdk import async_on_start, dynamo_context, endpoint, service
from dynamo.sdk.core.protocol.interface import ComponentType
from dynamo.sdk.lib.config import ServiceConfig
from dynamo.sdk.lib.image import DYNAMO_IMAGE
logger = logging.getLogger(__name__)
# start planner 30 seconds after the other components to make sure planner can see them
# TODO: remove this delay
INIT_PLANNER_START_DELAY = 30
class RequestType(BaseModel):
text: str
@service(
dynamo={
"namespace": "dynamo",
"component_type": ComponentType.PLANNER,
},
resources={"cpu": "10", "memory": "20Gi"},
workers=1,
image=DYNAMO_IMAGE,
)
class Planner:
def __init__(self):
configure_dynamo_logging(service_name="Planner")
logger.info("Starting planner")
self.runtime = dynamo_context["runtime"]
config = ServiceConfig.get_instance()
# Get namespace directly from dynamo_context as it contains the active namespace
self.namespace = dynamo_context["namespace"]
config_instance = config.get("Planner", {})
self.args = argparse.Namespace(
namespace=self.namespace,
environment=config_instance.get(
"environment", SLAPlannerDefaults.environment
),
no_operation=config_instance.get(
"no-operation", SLAPlannerDefaults.no_operation
),
log_dir=config_instance.get("log-dir", SLAPlannerDefaults.log_dir),
adjustment_interval=config_instance.get(
"adjustment-interval", SLAPlannerDefaults.adjustment_interval
),
max_gpu_budget=config_instance.get(
"max-gpu-budget", SLAPlannerDefaults.max_gpu_budget
),
min_endpoint=config_instance.get(
"min-endpoint", SLAPlannerDefaults.min_endpoint
),
decode_engine_num_gpu=config_instance.get(
"decode-engine-num-gpu", SLAPlannerDefaults.decode_engine_num_gpu
),
prefill_engine_num_gpu=config_instance.get(
"prefill-engine-num-gpu", SLAPlannerDefaults.prefill_engine_num_gpu
),
prometheus_endpoint=config_instance.get(
"prometheus-endpoint", SLAPlannerDefaults.prometheus_endpoint
),
profile_results_dir=config_instance.get(
"profile-results-dir", SLAPlannerDefaults.profile_results_dir
),
isl=config_instance.get("isl", SLAPlannerDefaults.isl),
osl=config_instance.get("osl", SLAPlannerDefaults.osl),
ttft=config_instance.get("ttft", SLAPlannerDefaults.ttft),
itl=config_instance.get("itl", SLAPlannerDefaults.itl),
load_predictor=config_instance.get(
"load-predictor", SLAPlannerDefaults.load_predictor
),
load_prediction_window_size=config_instance.get(
"load-prediction-window-size",
SLAPlannerDefaults.load_prediction_window_size,
),
)
@async_on_start
async def async_init(self):
await asyncio.sleep(INIT_PLANNER_START_DELAY)
logger.info("Calling start_planner")
await start_sla_planner(self.runtime, self.args)
logger.info("Planner started")
@endpoint()
async def generate(self, request: RequestType):
"""Dummy endpoint to satisfy that each component has an endpoint"""
yield "mock endpoint"
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import subprocess
import tempfile
import yaml
from dynamo.sdk import service
from dynamo.sdk.lib.config import ServiceConfig
from dynamo.sdk.lib.image import DYNAMO_IMAGE
logger = logging.getLogger(__name__)
@service(
dynamo={
"namespace": "dynamo",
},
workers=1,
image=DYNAMO_IMAGE,
)
class Prometheus:
def __init__(self):
"""Initialize Frontend service with HTTP server and model configuration."""
self.config = ServiceConfig.get_parsed_config("Prometheus")
self.process = None
logger.info(f"Prometheus config: {self.config}")
self.start_prometheus_server()
def start_prometheus_server(self):
logger.info("Starting prometheus server...")
self.temp_file = tempfile.NamedTemporaryFile(
mode="w", suffix=".yml", delete=False
)
yaml.dump(self.config, self.temp_file)
self.temp_file.close()
config_path = self.temp_file.name
cmd = [
"prometheus",
f"--config.file={config_path}",
]
logger.info(f"Prometheus cmd: {cmd}")
self.process = subprocess.Popen(
cmd,
stdout=None,
stderr=None,
)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import math
import warnings
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
import pandas as pd
import pmdarima
from prophet import Prophet
logger = logging.getLogger("cmdstanpy")
logger.addHandler(logging.NullHandler())
logger.propagate = False
logger.setLevel(logging.CRITICAL)
# Suppress sklearn deprecation warnings
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
class BasePredictor(ABC):
"""Base class for all load predictors"""
def __init__(self, minimum_data_points=5):
self.minimum_data_points = minimum_data_points
self.data_buffer = []
def add_data_point(self, value):
"""Add new data point to the buffer"""
if not math.isnan(value):
self.data_buffer.append(value)
else:
self.data_buffer.append(0)
def get_last_value(self):
"""Get the last value from the buffer"""
if not self.data_buffer:
return 0
return self.data_buffer[-1]
@abstractmethod
def predict_next(self):
"""Predict the next value"""
pass
class ConstantPredictor(BasePredictor):
"""
Assume load is constant and predict the next load to be the same as most recent load
"""
def __init__(self, **kwargs):
super().__init__(minimum_data_points=1)
def predict_next(self):
return self.get_last_value()
# Auto ARIMA model from pmdarima
class ARIMAPredictor(BasePredictor):
def __init__(self, window_size=100, minimum_data_points=5):
super().__init__(minimum_data_points=minimum_data_points)
self.window_size = window_size # How many past points to use
self.model = None
def add_data_point(self, value):
super().add_data_point(value)
# Keep only the last window_size points
if len(self.data_buffer) > self.window_size:
self.data_buffer = self.data_buffer[-self.window_size :]
def predict_next(self):
"""Predict the next value(s)"""
if len(self.data_buffer) < self.minimum_data_points:
return self.get_last_value()
# Fit auto ARIMA model
self.model = pmdarima.auto_arima(
self.data_buffer,
suppress_warnings=True,
error_action="ignore",
)
# Make prediction
forecast = self.model.predict(n_periods=1)
return forecast[0]
# Time-series forecasting model from Meta
class ProphetPredictor(BasePredictor):
def __init__(self, window_size=100, step_size=3600, minimum_data_points=5):
super().__init__(minimum_data_points=minimum_data_points)
self.window_size = window_size
self.curr_step = 0
self.step_size = step_size
self.start_date = datetime(2024, 1, 1) # Base date for generating timestamps
self.data_buffer = [] # Override to store dicts instead of values
def add_data_point(self, value):
"""Add new data point to the buffer"""
# Use proper datetime for Prophet
timestamp = self.start_date + timedelta(seconds=self.curr_step)
value = 0 if math.isnan(value) else value
self.data_buffer.append({"ds": timestamp, "y": value})
self.curr_step += 1
# Keep only the last window_size points
if len(self.data_buffer) > self.window_size:
self.data_buffer = self.data_buffer[-self.window_size :]
def get_last_value(self):
"""Get the last value from the buffer"""
if not self.data_buffer:
return 0
return self.data_buffer[-1]["y"]
def predict_next(self):
"""Predict the next value"""
if len(self.data_buffer) < self.minimum_data_points:
return self.get_last_value()
# Convert to DataFrame
df = pd.DataFrame(self.data_buffer)
# Initialize and fit Prophet model
model = Prophet()
# Fit the model
model.fit(df)
# Create future dataframe for next timestamp
next_timestamp = self.start_date + timedelta(seconds=self.curr_step)
future_df = pd.DataFrame({"ds": [next_timestamp]})
# Make prediction
forecast = model.predict(future_df)
return forecast["yhat"].iloc[0]
LOAD_PREDICTORS = {
"constant": ConstantPredictor,
"arima": ARIMAPredictor,
"prophet": ProphetPredictor,
}
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import scipy
class PrefillInterpolator:
"""
Takes input from results of pre-deployment performance profiling to interpolate
throughput/gpu and TTFT for a given ISL.
"""
def __init__(self, profile_results_dir: str):
prefill_npz_fn = (
f"{profile_results_dir}/selected_prefill_interpolation/raw_data.npz"
)
with np.load(prefill_npz_fn) as raw_data:
self.prefill_isl = raw_data["prefill_isl"]
self.prefill_ttft = raw_data["prefill_ttft"]
self.prefill_thpt_per_gpu = raw_data["prefill_thpt_per_gpu"]
self.min_isl = min(self.prefill_isl)
self.max_isl = max(self.prefill_isl)
# perform 1d interpolation
self.ttft_interpolator = scipy.interpolate.interp1d(
self.prefill_isl, self.prefill_ttft, kind="cubic"
)
self.thpt_interpolator = scipy.interpolate.interp1d(
self.prefill_isl, self.prefill_thpt_per_gpu, kind="cubic"
)
def interpolate_ttft(self, isl: float) -> float:
isl = max(self.min_isl, min(isl, self.max_isl))
return self.ttft_interpolator(isl)
def interpolate_thpt_per_gpu(self, isl: float) -> float:
isl = max(self.min_isl, min(isl, self.max_isl))
return self.thpt_interpolator(isl)
class DecodeInterpolator:
"""
Takes input from results of pre-deployment performance profiling to interpolate
throughput/gpu and ITL for a given decode context length.
"""
def __init__(self, profile_results_dir: str, resolution: int = 100):
decode_npz_fn = (
f"{profile_results_dir}/selected_decode_interpolation/raw_data.npz"
)
with np.load(decode_npz_fn) as raw_data:
self.x_kv_usage = raw_data["x_kv_usage"]
self.y_context_length = raw_data["y_context_length"]
self.z_itl = raw_data["z_itl"]
self.z_thpt_per_gpu = raw_data["z_thpt_per_gpu"]
self.max_kv_tokens = raw_data["max_kv_tokens"][0]
# pre-compute the interpolation grid for fast lookup
self.resolution = resolution
self.xi = np.linspace(0, 1, resolution)
self.yi = np.linspace(0, max(self.y_context_length), resolution)
self.X, self.Y = np.meshgrid(self.xi, self.yi)
# perform 2d interpolation with fallback for NaN values
self.itl_interpolator = scipy.interpolate.griddata(
(self.x_kv_usage, self.y_context_length),
self.z_itl,
(self.X, self.Y),
method="cubic",
)
# Fill NaN values using nearest neighbor interpolation
nan_mask = np.isnan(self.itl_interpolator)
if np.any(nan_mask):
itl_nearest = scipy.interpolate.griddata(
(self.x_kv_usage, self.y_context_length),
self.z_itl,
(self.X, self.Y),
method="nearest",
)
self.itl_interpolator[nan_mask] = itl_nearest[nan_mask]
self.itl_interpolator /= 1000 # convert ms to s
self.thpt_interpolator = scipy.interpolate.griddata(
(self.x_kv_usage, self.y_context_length),
self.z_thpt_per_gpu,
(self.X, self.Y),
method="cubic",
)
# Fill NaN values using nearest neighbor interpolation
nan_mask = np.isnan(self.thpt_interpolator)
if np.any(nan_mask):
thpt_nearest = scipy.interpolate.griddata(
(self.x_kv_usage, self.y_context_length),
self.z_thpt_per_gpu,
(self.X, self.Y),
method="nearest",
)
self.thpt_interpolator[nan_mask] = thpt_nearest[nan_mask]
def compute_idx(self, concurrency: float, context_length: float) -> tuple[int, int]:
kv_usage = concurrency * context_length / self.max_kv_tokens
# Calculate x index (kv_usage)
ix = int(
np.clip(
np.round((kv_usage - self.xi[0]) / (self.xi[1] - self.xi[0])),
0,
self.resolution - 1,
)
)
# Calculate y index (context_length)
iy = int(
np.clip(
np.round((context_length - self.yi[0]) / (self.yi[1] - self.yi[0])),
0,
self.resolution - 1,
)
)
return ix, iy
def interpolate_itl(self, concurrency: float, context_length: float) -> float:
ix, iy = self.compute_idx(concurrency, context_length)
return self.itl_interpolator[iy, ix]
def interpolate_thpt_per_gpu(
self, concurrency: float, context_length: float
) -> float:
ix, iy = self.compute_idx(concurrency, context_length)
return self.thpt_interpolator[iy, ix]
def find_best_throughput_per_gpu(self, itl: float, context_length: float) -> float:
# find the max kv_load that has itl <= target itl
# here we cannot use binary search as interpolated itl might not be monotonic
iy = int(
np.clip(
np.round((context_length - self.yi[0]) / (self.yi[1] - self.yi[0])),
0,
self.resolution - 1,
)
)
iy = max(0, min(iy, self.resolution - 1))
for ix in range(self.resolution - 1, -1, -1):
if self.itl_interpolator[iy, ix] <= itl:
return self.thpt_interpolator[iy, ix]
return self.thpt_interpolator[iy, 0]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import asyncio
import logging
import math
import time
from dataclasses import dataclass
from typing import Optional
from dynamo.planner import KubernetesConnector, LocalConnector
from dynamo.planner.defaults import SLAPlannerDefaults
from dynamo.planner.utils.load_predictor import LOAD_PREDICTORS
from dynamo.planner.utils.perf_interpolation import (
DecodeInterpolator,
PrefillInterpolator,
)
from dynamo.planner.utils.prometheus import PrometheusAPIClient
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging
configure_dynamo_logging()
logger = logging.getLogger(__name__)
@dataclass
class Metrics:
ttft: Optional[float] = None
itl: Optional[float] = None
num_req: Optional[float] = None
isl: Optional[float] = None
osl: Optional[float] = None
request_duration: Optional[float] = None
p_load: Optional[float] = None
d_load: Optional[float] = None
class Planner:
def __init__(self, runtime: DistributedRuntime, args: argparse.Namespace):
self.runtime = runtime
self.args = args
self.namespace = args.namespace
if not args.no_operation:
if args.environment == "local":
self.connector = LocalConnector(args.namespace, runtime)
elif args.environment == "kubernetes":
self.connector = KubernetesConnector(args.namespace)
else:
raise ValueError(f"Invalid environment: {args.environment}")
self.prometheus_api_client = PrometheusAPIClient(args.prometheus_endpoint)
self.num_req_predictor = LOAD_PREDICTORS[args.load_predictor](
window_size=args.load_prediction_window_size,
)
self.isl_predictor = LOAD_PREDICTORS[args.load_predictor](
window_size=args.load_prediction_window_size,
)
self.osl_predictor = LOAD_PREDICTORS[args.load_predictor](
window_size=args.load_prediction_window_size,
)
self.prefill_interpolator = PrefillInterpolator(args.profile_results_dir)
self.decode_interpolator = DecodeInterpolator(args.profile_results_dir)
self.prefill_client = None
self.workers_client = None
self.p_endpoints = [] # type: ignore
self.d_endpoints = [] # type: ignore
self.last_adjustment_time = time.time()
self.last_metrics = Metrics()
self.p_correction_factor = 1.0
self.d_correction_factor = 1.0
async def get_workers_info(self):
try:
if self.prefill_client is None:
self.prefill_client = (
await self.runtime.namespace(self.namespace)
.component("PrefillWorker")
.endpoint("mock")
.client()
)
# TODO: remove this sleep after rust client() is blocking until watching state
await asyncio.sleep(0.1)
# TODO: use etcd events instead of pulling instance_ids
p_endpoints = self.prefill_client.instance_ids() # type: ignore
except Exception:
p_endpoints = []
logger.warning(
"No prefill workers found, aggregated mode is not supported yet"
)
try:
if self.workers_client is None:
self.workers_client = (
await self.runtime.namespace(self.namespace)
.component("VllmWorker")
.endpoint("generate")
.client()
)
# TODO: remove this sleep after rust client() is blocking until watching state
await asyncio.sleep(0.1)
# TODO: use etcd events instead of pulling instance_ids
d_endpoints = self.workers_client.instance_ids() # type: ignore
except Exception as e:
raise RuntimeError(f"Failed to get decode worker endpoints: {e}")
return p_endpoints, d_endpoints
def observe_metrics(self):
self.last_metrics.ttft = self.prometheus_api_client.get_avg_time_to_first_token(
f"{self.args.adjustment_interval}s"
)
self.last_metrics.itl = self.prometheus_api_client.get_avg_inter_token_latency(
f"{self.args.adjustment_interval}s"
)
self.last_metrics.num_req = self.prometheus_api_client.get_avg_request_count(
f"{self.args.adjustment_interval}s"
)
self.last_metrics.request_duration = (
self.prometheus_api_client.get_avg_request_duration(
f"{self.args.adjustment_interval}s"
)
)
self.last_metrics.isl = (
self.prometheus_api_client.get_avg_input_sequence_tokens(
f"{self.args.adjustment_interval}s"
)
)
self.last_metrics.osl = (
self.prometheus_api_client.get_avg_output_sequence_tokens(
f"{self.args.adjustment_interval}s"
)
)
logger.info(
f"Observed num_req: {self.last_metrics.num_req:.2f} isl: {self.last_metrics.isl:.2f} osl: {self.last_metrics.osl:.2f}"
)
logger.info(
f"Observed ttft: {self.last_metrics.ttft:.3f}s itl: {self.last_metrics.itl:.3f}s"
)
self.num_req_predictor.add_data_point(self.last_metrics.num_req)
self.isl_predictor.add_data_point(self.last_metrics.isl)
self.osl_predictor.add_data_point(self.last_metrics.osl)
async def make_adjustments(self):
try:
self.p_endpoints, self.d_endpoints = await self.get_workers_info()
logger.info(
f"Number of prefill workers: {len(self.p_endpoints)}, number of decode workers: {len(self.d_endpoints)}"
)
# first correct the prediction correction factor
# for TTFT, we expect the correction factor to be << 1 due to queuing delay
expect_ttft = self.prefill_interpolator.interpolate_ttft(
self.last_metrics.isl
)
self.p_correction_factor = self.last_metrics.ttft / expect_ttft
# for ITL, we expect the correction factor to be close to 1
expect_itl = self.decode_interpolator.interpolate_itl(
concurrency=self.last_metrics.num_req # type: ignore
/ len(self.d_endpoints)
* self.last_metrics.request_duration # type: ignore
/ self.args.adjustment_interval,
context_length=self.last_metrics.isl + self.last_metrics.osl / 2, # type: ignore
)
self.d_correction_factor = self.last_metrics.itl / expect_itl
logger.info(
f"Correction factors: TTFT: {self.p_correction_factor:.3f}, ITL: {self.d_correction_factor:.3f}"
)
except Exception as e:
logger.error(f"Failed to correct prediction factors: {e}")
return
try:
# predict the next load
next_num_req = self.num_req_predictor.predict_next()
next_isl = self.isl_predictor.predict_next()
next_osl = self.osl_predictor.predict_next()
logger.info(
f"Predicted load: num_req={next_num_req:.2f}, isl={next_isl:.2f}, osl={next_osl:.2f}"
)
except Exception as e:
logger.error(f"Failed to predict load: {e}")
return
try:
# compute how many replicas are needed for prefill
# here we assume the prefill bias is purely due to request queueing
# and we increase the number of prefill replicas linearly to account for the queueing delay
pred_prefill_load_per_gpu = (
next_num_req
* next_isl
/ self.args.adjustment_interval
* min(1, self.p_correction_factor)
)
next_num_p = math.ceil(
pred_prefill_load_per_gpu
/ self.prefill_interpolator.interpolate_thpt_per_gpu(next_isl)
/ self.args.prefill_engine_num_gpu
)
# compute how many replicas are needed for decode
# 1. apply d_correction_factor to the ITL SLA
corrected_itl = self.args.itl / self.d_correction_factor
# 2. reversely find out what is best throughput/gpu that can achieve corrected_itl under the predicted context length
pred_decode_thpt_per_gpu = (
self.decode_interpolator.find_best_throughput_per_gpu(
itl=corrected_itl, context_length=next_isl + next_osl / 2
)
)
# 3. compute number of decode replicas needed
next_num_d = math.ceil(
next_num_req
* next_osl
/ self.args.adjustment_interval
/ pred_decode_thpt_per_gpu
/ self.args.decode_engine_num_gpu
)
# correct num_p and num_d based on the gpu budget
next_num_p = max(next_num_p, self.args.min_endpoint)
next_num_d = max(next_num_d, self.args.min_endpoint)
logger.info(
f"Predicted number of engine replicas: prefill={next_num_p}, decode={next_num_d}"
)
total_gpu_required = (
next_num_p * self.args.prefill_engine_num_gpu
+ next_num_d * self.args.decode_engine_num_gpu
)
if total_gpu_required > self.args.max_gpu_budget:
scale = self.args.max_gpu_budget / total_gpu_required
next_num_p = max(self.args.min_endpoint, round(next_num_p * scale))
next_num_d = max(
self.args.min_endpoint,
round(
(
self.args.max_gpu_budget
- next_num_p * self.args.prefill_engine_num_gpu
)
/ self.args.decode_engine_num_gpu
),
)
logger.warning(
f"Total number of GPUs required ({total_gpu_required}) exceeds the max GPU budget ({self.args.max_gpu_budget}), scaling down to {next_num_p} prefill and {next_num_d} decode replicas"
)
except Exception as e:
logger.error(f"Failed to compute number of replicas: {e}")
return
if not self.args.no_operation:
# scale up/down the number of prefill/decode non-blockingly
# TODO: add a check to avoid scaling before the previous scaling is completed
if next_num_p > len(self.p_endpoints):
for _ in range(next_num_p - len(self.p_endpoints)):
self.connector.add_component("PrefillWorker", blocking=False)
elif next_num_p < len(self.p_endpoints):
for _ in range(len(self.p_endpoints) - next_num_p):
self.connector.remove_component("PrefillWorker", blocking=False)
if next_num_d > len(self.d_endpoints):
for _ in range(next_num_d - len(self.d_endpoints)):
self.connector.add_component("VllmWorker", blocking=False)
elif next_num_d < len(self.d_endpoints):
for _ in range(len(self.d_endpoints) - next_num_d):
self.connector.remove_component("VllmWorker", blocking=False)
async def run(self):
"""Main loop for the planner"""
self.last_adjustment_time = time.time()
while True:
current_time = time.time()
if (
current_time - self.last_adjustment_time
>= self.args.adjustment_interval
):
self.last_adjustment_time = time.time()
logger.info("New adjustment interval started!")
self.observe_metrics()
await self.make_adjustments()
# sleep for a while to avoid busy-waiting but not too long to miss the next adjustment
await asyncio.sleep(self.args.adjustment_interval / 10)
async def start_sla_planner(runtime: DistributedRuntime, args: argparse.Namespace):
planner = Planner(runtime, args)
await planner.run()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
# Common planner arguments
parser.add_argument(
"--namespace",
type=str,
default=SLAPlannerDefaults.namespace,
help="Namespace planner will look at",
)
parser.add_argument(
"--environment",
type=str,
default=SLAPlannerDefaults.environment,
help="Environment to run the planner in (local, kubernetes)",
)
parser.add_argument(
"--no-operation",
action="store_true",
default=SLAPlannerDefaults.no_operation,
help="Do not make any adjustments, just observe the metrics",
)
parser.add_argument(
"--log-dir",
type=str,
default=SLAPlannerDefaults.log_dir,
help="Tensorboard logging directory",
)
parser.add_argument(
"--adjustment-interval",
type=int,
default=SLAPlannerDefaults.adjustment_interval,
help="Interval in seconds between scaling adjustments",
)
parser.add_argument(
"--max-gpu-budget",
type=int,
default=SLAPlannerDefaults.max_gpu_budget,
help="Maximum number of GPUs to use",
)
parser.add_argument(
"--min-endpoint",
type=int,
default=SLAPlannerDefaults.min_endpoint,
help="Minimum number of endpoints to keep for prefill/decode workers",
)
parser.add_argument(
"--decode-engine-num-gpu",
type=int,
default=SLAPlannerDefaults.decode_engine_num_gpu,
help="Number of GPUs per decode engine",
)
parser.add_argument(
"--prefill-engine-num-gpu",
type=int,
default=SLAPlannerDefaults.prefill_engine_num_gpu,
help="Number of GPUs per prefill engine",
)
# SLA-planner specific arguments
parser.add_argument(
"--prometheus-endpoint",
type=str,
default=SLAPlannerDefaults.prometheus_endpoint,
help="Prometheus endpoint url",
)
parser.add_argument(
"--profile-results-dir",
type=str,
default=SLAPlannerDefaults.profile_results_dir,
help="Directory to pre-deployment profiling results",
)
parser.add_argument(
"--isl",
type=int,
default=SLAPlannerDefaults.isl,
help="Input sequence length",
)
parser.add_argument(
"--osl",
type=int,
default=SLAPlannerDefaults.osl,
help="Output sequence length",
)
parser.add_argument(
"--ttft",
type=float,
default=SLAPlannerDefaults.ttft,
help="Time to first token (in seconds)",
)
parser.add_argument(
"--itl",
type=float,
default=SLAPlannerDefaults.itl,
help="Inter-token latency (in seconds)",
)
parser.add_argument(
"--load-predictor",
type=str,
default=SLAPlannerDefaults.load_predictor,
help="Load predictor to use",
)
parser.add_argument(
"--load-prediction-window-size",
type=int,
default=SLAPlannerDefaults.load_prediction_window_size,
help="Window size for load prediction",
)
args = parser.parse_args()
asyncio.run(dynamo_worker()(start_sla_planner)(args))
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from prometheus_api_client import PrometheusConnect
from dynamo.runtime.logging import configure_dynamo_logging
configure_dynamo_logging()
logger = logging.getLogger(__name__)
class PrometheusAPIClient:
def __init__(self, url: str):
self.prom = PrometheusConnect(url=url, disable_ssl=True)
def get_avg_inter_token_latency(self, interval: str):
try:
return float(
self.prom.custom_query(
query=f"increase(nv_llm_http_service_inter_token_latency_seconds_sum[{interval}])/increase(nv_llm_http_service_inter_token_latency_seconds_count[{interval}])",
)[0]["value"][1]
)
except Exception as e:
logger.error(f"Error getting avg inter token latency: {e}")
return 0
def get_avg_time_to_first_token(self, interval: str):
try:
return float(
self.prom.custom_query(
query=f"increase(nv_llm_http_service_time_to_first_token_seconds_sum[{interval}])/increase(nv_llm_http_service_time_to_first_token_seconds_count[{interval}])",
)[0]["value"][1]
)
except Exception as e:
logger.error(f"Error getting avg time to first token: {e}")
return 0
def get_avg_request_duration(self, interval: str):
try:
return float(
self.prom.custom_query(
query=f"increase(nv_llm_http_service_request_duration_seconds_sum[{interval}])/increase(nv_llm_http_service_request_duration_seconds_count[{interval}])",
)[0]["value"][1]
)
except Exception as e:
logger.error(f"Error getting avg request duration: {e}")
return 0
def get_avg_request_count(self, interval: str):
try:
raw_res = self.prom.custom_query(
query=f"increase(nv_llm_http_service_requests_total[{interval}])"
)
total_count = 0.0
for res in raw_res:
# count all success/failed and stream/non-stream requests
total_count += float(res["value"][1])
return total_count
except Exception as e:
logger.error(f"Error getting avg request count: {e}")
return 0
def get_avg_input_sequence_tokens(self, interval: str):
try:
return float(
self.prom.custom_query(
query=f"increase(nv_llm_http_service_input_sequence_tokens_sum[{interval}])/increase(nv_llm_http_service_input_sequence_tokens_count[{interval}])",
)[0]["value"][1]
)
except Exception as e:
logger.error(f"Error getting avg input sequence tokens: {e}")
return 0
def get_avg_output_sequence_tokens(self, interval: str):
try:
return float(
self.prom.custom_query(
query=f"increase(nv_llm_http_service_output_sequence_tokens_sum[{interval}])/increase(nv_llm_http_service_output_sequence_tokens_count[{interval}])",
)[0]["value"][1]
)
except Exception as e:
logger.error(f"Error getting avg output sequence tokens: {e}")
return 0
...@@ -245,6 +245,23 @@ RUN printf "[safe]\n directory=/workspace\n" > /root/.gitconfig ...@@ -245,6 +245,23 @@ RUN printf "[safe]\n directory=/workspace\n" > /root/.gitconfig
RUN ln -sf /bin/bash /bin/sh RUN ln -sf /bin/bash /bin/sh
# Install prometheus
ARG PROM_VERSION=3.4.1
RUN apt-get update && apt-get install -y --no-install-recommends \
curl tar ca-certificates && \
rm -rf /var/lib/apt/lists/*
RUN ARCH=$(dpkg --print-architecture) && \
case "$ARCH" in \
amd64) PLATFORM=linux-amd64 ;; \
arm64) PLATFORM=linux-arm64 ;; \
*) echo "Unsupported architecture: $ARCH" && exit 1 ;; \
esac && \
curl -fsSL https://github.com/prometheus/prometheus/releases/download/v${PROM_VERSION}/prometheus-${PROM_VERSION}.${PLATFORM}.tar.gz \
| tar -xz -C /tmp && \
mv /tmp/prometheus-${PROM_VERSION}.${PLATFORM}/prometheus /usr/local/bin/ && \
chmod +x /usr/local/bin/prometheus && \
rm -rf /tmp/prometheus-${PROM_VERSION}.${PLATFORM}
### BUILDS ### ### BUILDS ###
# Rust build/dev dependencies # Rust build/dev dependencies
......
...@@ -23,13 +23,17 @@ kubernetes==32.0.1 ...@@ -23,13 +23,17 @@ kubernetes==32.0.1
matplotlib matplotlib
msgspec msgspec
mypy mypy
numpy numpy==1.26.4 # pmdarima is not compatible with numpy 2
opentelemetry-api opentelemetry-api
opentelemetry-sdk opentelemetry-sdk
pip==25.0.1 pip==25.0.1
pmdarima
pre-commit pre-commit
prometheus-api-client
prophet
protobuf==5.27.3 protobuf==5.27.3
pydantic==2.7.1 pydantic==2.7.1
pynvml
pyright pyright
PyYAML PyYAML
scikit-learn scikit-learn
......
...@@ -31,7 +31,7 @@ import typer ...@@ -31,7 +31,7 @@ import typer
import yaml import yaml
from rich.console import Console from rich.console import Console
from dynamo.planner.defaults import PlannerDefaults # type: ignore[attr-defined] from dynamo.planner.defaults import BasePlannerDefaults # type: ignore[attr-defined]
from dynamo.runtime.logging import configure_dynamo_logging from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sdk.core.protocol.interface import ComponentType from dynamo.sdk.core.protocol.interface import ComponentType
from dynamo.sdk.core.runner import TargetEnum from dynamo.sdk.core.runner import TargetEnum
...@@ -344,7 +344,7 @@ def is_local_planner_enabled(svc: Any, service_configs: dict) -> bool: ...@@ -344,7 +344,7 @@ def is_local_planner_enabled(svc: Any, service_configs: dict) -> bool:
if planners: if planners:
# Get the config for the planner and check environment # Get the config for the planner and check environment
planner_config = service_configs.get(PLANNER_SERVICE_NAME, {}) planner_config = service_configs.get(PLANNER_SERVICE_NAME, {})
environment = planner_config.get("environment", PlannerDefaults.environment) environment = planner_config.get("environment", BasePlannerDefaults.environment)
return environment == "local" return environment == "local"
return False return False
...@@ -360,7 +360,7 @@ def raise_local_planner_warning(svc: Any, service_configs: dict) -> None: ...@@ -360,7 +360,7 @@ def raise_local_planner_warning(svc: Any, service_configs: dict) -> None:
planner_config = service_configs.get(PLANNER_SERVICE_NAME, {}) planner_config = service_configs.get(PLANNER_SERVICE_NAME, {})
# Resolve no-op setting # Resolve no-op setting
no_op = planner_config.get("no-operation", PlannerDefaults.no_operation) no_op = planner_config.get("no-operation", BasePlannerDefaults.no_operation)
# Check worker counts across nodes # Check worker counts across nodes
nodes = [dep for dep in svc.all_services().values()] nodes = [dep for dep in svc.all_services().values()]
......
# Load-based Planner
This document covers load-based planner in `examples/llm/components/planner.py`.
## Load-based Scaling Up/Down Prefill/Decode Workers
To adjust the number of prefill/decode workers, planner monitors the following metrics:
* Prefill worker: planner monitors the number of requests pending in the prefill queue to estimate the prefill workload.
* Decode/aggregated worker: planner monitors the average KV cache utilization rate to estimate the decode/aggregated workload.
Every `metric-pulling-interval`, planner gathers the aforementioned metrics. Every `adjustment-interval`, planner compares the aggregated metrics in this interval with pre-set thresholds and decide to scale up/down prefill/decode workers. To avoid over-compensation, planner only changes the number of workers by 1 in one adjustment interval. In addition, when the number of workers is being adjusted, the planner blocks the metric pulling and adjustment.
To scale up a prefill/decode worker, planner just need to launch the worker in the correct namespace. The auto-discovery mechanism picks up the workers and add them to the routers. To scale down a prefill worker, planner send a SIGTERM signal to the prefill worker. The prefill worker store the signal and exit when it finishes the current request pulled from the prefill queue. This ensures that no remote prefill request is dropped. To scale down a decode worker, planner revokes the etcd lease of the decode worker. When the etcd lease is revoked, the corresponding decode worker is immediately removed from the router and won't get any new requests. The decode worker then finishes all the current requests in their original stream and exits gracefully.
There are two additional rules set by planner to prevent over-compensation:
1. After a new decode worker is added, since it needs time to populate the kv cache, planner doesn't scale down the number of decode workers in the next `NEW_DECODE_WORKER_GRACE_PERIOD=3` adjustment intervals.
1. We do not scale up prefill worker if the prefill queue size is estimated to reduce below the `--prefill-queue-scale-up-threshold` within the next `NEW_PREFILL_WORKER_QUEUE_BUFFER_PERIOD=3` adjustment intervals following the trend observed in the current adjustment interval.
## Comply with SLA using Load-based Planner
To ensure dynamo serve complies with the SLA, we provide a pre-deployment script to profile the model performance with different parallelization mappings and recommend the parallelization mapping for prefill and decode workers and planner configurations. To use this script, the user needs to provide the target ISL, OSL, TTFT SLA, and ITL SLA.
> [!NOTE]
> The script considers a fixed ISL/OSL without KV cache reuse. If the real ISL/OSL has a large variance or a significant amount of KV cache can be reused, the result might be inaccurate.
We assume there is no piggy-backed prefill requests in the decode engine. Even if there are some short piggy-backed prefill requests in the decode engine, it should not affect the ITL too much in most conditions. However, if the piggy-backed prefill requests are too much, the ITL might be inaccurate.
```bash
cd $DYNAMO_HOME/benchmarks/profiler/
python -m utils.profile_sla \
--config <path-to-dynamo-config-file> \
--output-dir <path-to-profile-results-dir> \
--isl <target-isl> \
--osl <target-osl> \
--ttft <target-ttft-(ms)> \
--itl <target-itl-(ms)>
```
The script will first detect the number of available GPUs on the current nodes (multi-node engine not supported yet). Then, it will profile the prefill and decode performance with different TP sizes. For prefill, since there is no in-flight batching (assume isl is long enough to saturate the GPU), the script directly measures the TTFT for a request with given isl without kv-reusing. For decode, since the ITL (or iteration time) is relevant with how many requests are in-flight, the script will measure the ITL under different number of in-flight requests. The range of the number of in-flight requests is from 1 to the maximum number of requests that the kv cache of the engine can hold. To measure the ITL without being affected by piggy-backed prefill requests, the script will enable kv-reuse and warm up the engine by issuing the same prompts before measuring the ITL. Since the kv cache is sufficient for all the requests, it can hold the kv cache of the pre-computed prompts and skip the prefill phase when measuring the ITL.
After the profiling finishes, two plots will be generated in the `output-dir`. For example, here are the profiling results for `examples/llm/configs/disagg.yaml`:
![Prefill Performance](../images/h100_prefill_performance.png)
![Decode Performance](../images/h100_decode_performance.png)
For the prefill performance, the script will plot the TTFT for different TP sizes and select the best TP size that meet the target TTFT SLA and delivers the best throughput per GPU. Based on how close the TTFT of the selected TP size is to the SLA, the script will also recommend the upper and lower bounds of the prefill queue size to be used in planner.
For the decode performance, the script will plot the ITL for different TP sizes and different in-flight requests. Similarly, it will select the best point that satisfies the ITL SLA and delivers the best throughput per GPU and recommend the upper and lower bounds of the kv cache utilization rate to be used in planner.
The following information will be printed out in the terminal:
```
2025-05-16 15:20:24 - __main__ - INFO - Analyzing results and generate recommendations...
2025-05-16 15:20:24 - __main__ - INFO - Suggested prefill TP:4 (TTFT 48.37 ms, throughput 15505.23 tokens/s/GPU)
2025-05-16 15:20:24 - __main__ - INFO - Suggested planner upper/lower bound for prefill queue size: 0.24/0.10
2025-05-16 15:20:24 - __main__ - INFO - Suggested decode TP:4 (ITL 4.83 ms, throughput 51.22 tokens/s/GPU)
2025-05-16 15:20:24 - __main__ - INFO - Suggested planner upper/lower bound for decode kv cache utilization: 0.20/0.10
```
After finding the best TP size for prefill and decode, the script will then interpolate the TTFT with ISL and ITL with active KV cache and decode context length. This is to provide a more accurate estimation of the performance when ISL and OSL changes. The results will be saved to `<output_dir>/<decode/prefill>_tp<best_tp>_interpolation`.
## Usage
`dynamo serve` automatically starts the planner. Configure it through YAML files or command-line arguments:
```bash
# YAML configuration
dynamo serve graphs.disagg:Frontend -f disagg.yaml
# disagg.yaml
Planner:
environment: local
no-operation: false
log-dir: log/planner
# Command-line configuration
dynamo serve graphs.disagg:Frontend -f disagg.yaml --Planner.environment=local --Planner.no-operation=false --Planner.log-dir=log/planner
```
Configuration options:
* `namespace` (str, default: "dynamo"): Target namespace for planner operations
* `environment` (str, default: "local"): Target environment (local, kubernetes)
* `no-operation` (bool, default: false): Run in observation mode only
* `log-dir` (str, default: None): Tensorboard log directory
* `adjustment-interval` (int, default: 30): Seconds between adjustments
* `metric-pulling-interval` (int, default: 1): Seconds between metric pulls
* `max-gpu-budget` (int, default: 8): Maximum GPUs for all workers
* `min-gpu-budget` (int, default: 1): Minimum GPUs per worker type
* `decode-kv-scale-up-threshold` (float, default: 0.9): KV cache threshold for scale-up
* `decode-kv-scale-down-threshold` (float, default: 0.5): KV cache threshold for scale-down
* `prefill-queue-scale-up-threshold` (float, default: 0.5): Queue threshold for scale-up
* `prefill-queue-scale-down-threshold` (float, default: 0.2): Queue threshold for scale-down
* `decode-engine-num-gpu` (int, default: 1): GPUs per decode engine
* `prefill-engine-num-gpu` (int, default: 1): GPUs per prefill engine
Run as standalone process:
```bash
PYTHONPATH=/workspace/examples/llm python components/planner.py --namespace=dynamo --served-model-name=vllm --no-operation --log-dir=log/planner
```
Monitor metrics with Tensorboard:
```bash
tensorboard --logdir=<path-to-tensorboard-log-dir>
```
...@@ -36,187 +36,9 @@ Currently, the planner can scale the number of vllm workers up and down based on ...@@ -36,187 +36,9 @@ Currently, the planner can scale the number of vllm workers up and down based on
**<sup>[1]</sup>** Supported with some limitations. **<sup>[1]</sup>** Supported with some limitations.
We currently provide two reference planner designs:
## Load-based Scaling Up/Down Prefill/Decode Workers 1. Load-based planner: [Load-based planner docs](load_planner.md)
2. SLA-based planner: [SLA-based planner docs](sla_planner.md)
To adjust the number of prefill/decode workers, planner monitors the following metrics:
* Prefill worker: planner monitors the number of requests pending in the prefill queue to estimate the prefill workload.
* Decode/aggregated worker: planner monitors the average KV cache utilization rate to estimate the decode/aggregated workload.
Every `metric-pulling-interval`, planner gathers the aforementioned metrics.
Every `adjustment-interval`, planner compares the aggregated metrics in this interval with pre-set thresholds and decide to scale up/down prefill/decode workers.
To avoid over-compensation, planner only changes the number of workers by 1 in one adjustment interval.
In addition, when the number of workers is being adjusted, the planner blocks the metric pulling and adjustment.
To scale up a prefill/decode worker, planner just need to launch the worker in the correct namespace.
The auto-discovery mechanism picks up the workers and add them to the routers.
To scale down a prefill worker, planner send a SIGTERM signal to the prefill worker.
The prefill worker store the signal and exit when it finishes the current request pulled from the prefill queue.
This ensures that no remote prefill request is dropped.
To scale down a decode worker, planner revokes the etcd lease of the decode worker.
When the etcd lease is revoked, the corresponding decode worker is immediately removed from the router and won't get any new requests.
The decode worker then finishes all the current requests in their original stream and exits gracefully.
There are two additional rules set by planner to prevent over-compensation:
1. After a new decode worker is added, since it needs time to populate the kv cache,
planner doesn't scale down the number of decode workers in the next `NEW_DECODE_WORKER_GRACE_PERIOD=3` adjustment intervals.
2. We do not scale up prefill worker if the prefill queue size is estimated to reduce below the `--prefill-queue-scale-up-threshold` within the next `NEW_PREFILL_WORKER_QUEUE_BUFFER_PERIOD=3` adjustment intervals following the trend observed in the current adjustment interval.
For benchmarking recommendations, see the [Planner benchmark example](../../docs/guides/planner_benchmark/benchmark_planner.md).
## Comply with SLA
To ensure dynamo serve complies with the SLA, we provide a pre-deployment script to profile the model performance with different parallelization mappings, and recommend the parallelization mapping for prefill and decode workers and planner configurations.
To use this script, the user needs to provide the target ISL, OSL, TTFT SLA, and ITL SLA.
> [!Note]
> The script considers a fixed ISL/OSL without KV cache reuse.
> If the real ISL/OSL has a large variance or a significant amount of KV cache can be reused, the result might be inaccurate.
>
> We assume there are no piggybacked prefill requests in the decode engine.
> Even if there are some short piggybacked prefill requests in the decode engine, it should not affect the ITL in most cases.
> However, if the piggybacked prefill requests are too much, the ITL might be inaccurate.
```bash
python -m utils.profile_sla \
--config <path-to-dynamo-config-file> \
--output-dir <path-to-profile-results-dir> \
--isl <target-isl> \
--osl <target-osl> \
--ttft <target-ttft-(ms)> \
--itl <target-itl-(ms)>
```
The script first detects the number of available GPUs on the current nodes (multi-node engine not supported yet).
Then, it profiles the prefill and decode performance with different TP sizes.
For prefill, since there is no in-flight batching (assume isl is long enough to saturate the GPU), the script directly measures the TTFT for a request with given isl without kv-reuse.
For decode, since the ITL (or iteration time) is relevant to how many requests are in-flight, the script measures the ITL under a different number of in-flight requests.
The range of the number of in-flight requests is from 1 to the maximum number of requests that the kv cache of the engine can hold.
To measure the ITL without being affected by piggybacked prefill requests, the script enables kv-reuse and warm up the engine by issuing the same prompts before measuring the ITL.
Since the kv cache is sufficient for all the requests, it can hold the kv cache of the pre-computed prompts and skip the prefill phase when measuring the ITL.
After the profiling finishes, two plots are generated in the `output-dir`.
For example, here are the profiling results for `examples/llm/configs/disagg.yaml`:
![Prefill Performance](../images/h100_prefill_performance.png)
![Decode Performance](../images/h100_decode_performance.png)
For the prefill performance, the script plots the TTFT for different TP sizes and selects the best TP size that meets the target TTFT SLA and delivers the best throughput per GPU.
Based on how close the TTFT of the selected TP size is to the SLA, the script also recommends the upper and lower bounds of the prefill queue size to be used in planner.
For the decode performance, the script plots the ITL for different TP sizes and different in-flight requests.
Similarly, it selects the best point that satisfies the ITL SLA and delivers the best throughput per GPU
and recommends the upper and lower bounds of the kv cache utilization rate to be used in planner.
The following information is printed out in the terminal:
```text
2025-05-16 15:20:24 - __main__ - INFO - Analyzing results and generate recommendations...
2025-05-16 15:20:24 - __main__ - INFO - Suggested prefill TP:4 (TTFT 48.37 ms, throughput 15505.23 tokens/s/GPU)
2025-05-16 15:20:24 - __main__ - INFO - Suggested planner upper/lower bound for prefill queue size: 0.24/0.10
2025-05-16 15:20:24 - __main__ - INFO - Suggested decode TP:4 (ITL 4.83 ms, throughput 51.22 tokens/s/GPU)
2025-05-16 15:20:24 - __main__ - INFO - Suggested planner upper/lower bound for decode kv cache utilization: 0.20/0.10
```
After finding the best TP size for prefill and decode, the script interpolates the TTFT with ISL and ITL with active KV cache and decode context length.
This is to provide a more accurate estimation of the performance when ISL and OSL changes.
The results are saved to `<output_dir>/<decode/prefill>_tp<best_tp>_interpolation`.
## Usage
`dynamo serve` automatically starts the planner.
Configure it through YAML files or command-line arguments:
Usage:
```bash
# YAML configuration
dynamo serve graphs.disagg:Frontend -f disagg.yaml
# disagg.yaml
Planner:
environment: local
no-operation: false
log-dir: log/planner
# Configure the planner through CLI arguments
dynamo serve graphs.disagg:Frontend \
-f disagg.yaml \
--Planner.environment=local \
--Planner.no-operation=false \
--Planner.log-dir=log/planner
```
The planner accepts the following options:
- `namespace` (str, default: "dynamo"):
Namespace planner will look at
- `environment` (str, default: "local"):
Environment to run the planner in (local, kubernetes)
- `no-operation` (bool, default: false):
Do not make any adjustments, just observe the metrics and log to tensorboard
- `log-dir` (str, default: None):
Tensorboard logging directory
- `adjustment-interval` (int, default: 30):
Interval in seconds between scaling adjustments
- `metric-pulling-interval` (int, default: 1):
Interval in seconds between metric pulls
- `max-gpu-budget` (int, default: 8):
Maximum number of GPUs to use, planner will not scale up more than this number of GPUs for prefill plus decode workers
- `min-gpu-budget` (int, default: 1):
Minimum number of GPUs to use, planner will not scale down below this number of GPUs for prefill or decode workers
- `decode-kv-scale-up-threshold` (float, default: 0.9):
KV cache utilization threshold to scale up decode workers
- `decode-kv-scale-down-threshold` (float, default: 0.5):
KV cache utilization threshold to scale down decode workers
- `prefill-queue-scale-up-threshold` (float, default: 0.5):
Queue utilization threshold to scale up prefill workers
- `prefill-queue-scale-down-threshold` (float, default: 0.2):
Queue utilization threshold to scale down prefill workers
- `decode-engine-num-gpu` (int, default: 1):
Number of GPUs per decode engine
- `prefill-engine-num-gpu` (int, default: 1):
Number of GPUs per prefill engine
Run as standalone process:
```bash
PYTHONPATH=/workspace/examples/llm python components/planner.py \
--namespace=dynamo \
--served-model-name=vllm \
--no-operation \
--log-dir=log/planner
```
Monitor metrics with Tensorboard:
### Tensorboard
Planner logs to tensorboard to visualize the metrics and the scaling actions.
You can start tensorboard with the following command:
```bash
tensorboard --logdir=<path-to-tensorboard-log-dir>
```
## Backends ## Backends
...@@ -277,7 +99,5 @@ The Kubernetes backend scales workers by updating DynamoGraphDeployment replica ...@@ -277,7 +99,5 @@ The Kubernetes backend scales workers by updating DynamoGraphDeployment replica
When scaling needs change, the planner: When scaling needs change, the planner:
1. Updates the deployment's replica count 1. Updates the deployment's replica count
2. Lets the Kubernetes operator create/remove pods 2. Lets the Kubernetes operator create/remove pods
3. Maintains seamless scaling without manual intervention 3. Maintains seamless scaling without manual intervention
# SLA-based Planner
This document covers SLA-based planner in `examples/common/utils/planner_core.py`.
The SLA (Service Level Agreement)-based planner is an intelligent autoscaling system that monitors system performance and adjusts the number of prefill and decode workers to meet specified TTFT and ITL targets. Unlike the load-based planner that scales based on resource utilization thresholds, the SLA planner uses predictive modeling and performance interpolation to proactively scale the workers.
> [!NOTE]
> Currently, SLA-based planner only supports disaggregated setup.
## Features
* **SLA-driven scaling**: Automatically scales prefill/decode workers to meet TTFT and ITL targets
* **Predictive load forecasting**: Uses ARIMA, Prophet, or constant predictors to forecast future load
* **Performance interpolation**: Leverages profiling results data from pre-deployment profiling for accurate scaling decisions
* **Correction factors**: Adapts to real-world performance deviations from profiled data
## Architecture
The SLA planner consists of several key components:
1. **Load Predictors**: Forecast future request patterns (number of requests, input/output sequence lengths)
2. **Performance Interpolators**: Estimate TTFT and ITL based on profiled performance data
3. **Correction Factors**: Adjust predictions based on observed vs. expected performance
4. **Scaling Logic**: Calculate optimal number of prefill/decode replicas to meet SLA targets
## Pre-Deployment Profiling
Before using the SLA planner, you must profile the performance of the selected model and GPU to generate interpolation data:
```bash
cd $DYNAMO_HOME/benchmarks/profiler/
python -m utils.profile_sla \
--config <path-to-dynamo-config-file> \
--output-dir <path-to-profile-results-dir> \
--isl <target-input-sequence-length> \
--osl <target-output-sequence-length> \
--ttft <target-ttft-ms> \
--itl <target-itl-ms>
```
This script will:
- Profile prefill performance across different tensor parallelism (TP) sizes
- Profile decode performance under various concurrency levels
- Recommend optimal TP configurations and scaling thresholds
- Generate interpolation data for the recommended TP configuration
### Prefill Interpolation Data
In prefill engine, prefills are usually done with batch size=1 and only the ISL (excluding prefix cache hit) affects the iteration time. The script profiles the selected prefill TP configuration across different ISLs and record the TTFT and prefill throughput per GPU under those ISLs.
### Decode Interpolation Data
In decode engine, decode requests are added inflight and iteration time (or ITL) depends on both the context length and the real-time load of the engine. We capture the real-time load of the engine with active kv usage and average context length. The active kv usage determines the complexity of the memory-bounded attention kernel while the active kv usage divided the average context length determines the complexity of the computation bound MLP kernel. For example, the below figure shows the ITL of DS-Distilled Llama 8b model on H100 TP4. The ITL grows near-linearly with active kv usage under a fixed context length. And the slope increases as the context length decreases.
![images](../images/itl_interpolation.png)
The script profiles the selected decode TP configuration across different active kv blocks and average context length.
## Load Prediction
The SLA planner use load predictor to predict the number of requests, ISL, and OSL in the next adjustment interval. Currently, three load prediction model is supported:
### Constant Predictor
- **Use case**: Stable and long prediction interval
- **Behavior**: Assumes next load equals current load
- **Configuration**: `load-predictor: "constant"`
### ARIMA Predictor
- **Use case**: Time-series data with trends and seasonality
- **Behavior**: Uses auto-ARIMA to fit optimal model parameters
- **Configuration**: `load-predictor: "arima"`
### Prophet Predictor
- **Use case**: Complex seasonal patterns and trend changes
- **Behavior**: Facebook's [Prophet](https://facebook.github.io/prophet/) model for time-series forecasting
- **Configuration**: `load-predictor: "prophet"`
## Scaling Algorithm
SLA planner uses a sophisticated scaling algorithm. At each adjustment interval, SLA planner performs the following operations:
### 1. Metric Collection
Every adjustment interval, collect:
- Average Time to First Token (TTFT)
- Average Inter-Token Latency (ITL)
- Request count and duration
- Input/Output sequence lengths
### 2. Correction Factor Calculation
Using the collected metrics, SLA planner applies the interpolator to find out the expected TTFT/ITL and calibrate the interpolation model. This step is important because the actual TTFT/ITL can often be different than the ideal world:
- **TTFT**: actual TTFT heavily depends on request queueing and prefix cache hit rate (if use kv reuse). For example, if all requests arrives at the beginning of the adjustment interval, they queue heavily and TTFT will be significantly higher. If prefix cache hit rate is very high, the actual number of tokens in the prefill will be very low and TTFT will be significantly lower.
- **ITL**: actual ITL maybe affected by chunked small prefill request in decode engine.
- **Metric variances**: large variances in request rate, ISL, and OSL may lead to inaccurate estimation of the TTFT/ITL since SLA only consider the average when interpolating.
SLA planner calculate the correction factor with
- **Prefill correction**: `actual_ttft / expected_ttft`
- **Decode correction**: `actual_itl / expected_itl`
### 3. Load Prediction
SLA planner forecasts these metric in the next interval using the load predictor
- Number of requests
- Input sequence length
- Output sequence length
### 4. Calculating Number of Replicas
**Prefill replicas**: SLA planner assumes the prefill correction factor has linear affect on the prefill throughput per GPU as prefill is single-batched.
```
predicted_load = next_requests * next_isl / interval * min(1, prefill_correction)
prefill_replicas = ceil(predicted_load / interpolated_throughput / gpus_per_engine)
```
**Decode replicas**:
```
# 1. apply d_correction_factor to the ITL SLA
corrected_itl = self.args.itl / self.d_correction_factor
# 2. reversely find out what is best throughput/gpu that can achieve corrected_itl under the predicted context length
pred_decode_thpt_per_gpu = self.decode_interpolator.find_best_throughput_per_gpu(
itl=corrected_itl,
context_length=next_isl + next_osl / 2
)
# 3. compute number of decode replicas needed
next_num_d = math.ceil(next_num_req * next_osl / self.args.adjustment_interval / pred_decode_thpt_per_gpu / self.args.decode_engine_num_gpu)
```
### 5. Scaling
Finally, SLA planner applies the change by scaling up/down the number of prefill and decode workers to the calculated number of replica in the next interval.
> [!NOTE]
> SLA-planner scales up/down the P/D engines non-blockingly. If `adjustment-interval` is too short, the previous scaling operations may not finish before the new scaling operations are issued. Make sure to set a large enough `adjustment-interval`.
## Deploying
To deploy SLA-planner, use the rust frontend (`dynamo-run`) that reports metrics at `/metrics` HTTP endpoint. You can also use your own frontend, but it must report number of requests, ISL, OSL, TTFT, ITL in the same format.
SLA-planner and prometheus server are provided as common components that can be directly imported from `dynamo` package. The following changes are needed:
- Add `Planner` and `Prometheus` components' dependency in `Frontend`.
- Link `Planner` and `Prometheus` in the graph.
- Add `Planner` and `Prometheus` configurations in the config file.
A `vllm_v0` example is available for reference:
```bash
cd $DYNAMO_HOME/examples/vllm_v0
dynamo serve graphs.disagg_planner:Frontend -f ./configs/disagg_planner.yaml
```
\ No newline at end of file
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
...@@ -31,7 +31,7 @@ from utils.prefill_queue import PrefillQueue ...@@ -31,7 +31,7 @@ from utils.prefill_queue import PrefillQueue
from dynamo.llm import KvMetricsAggregator from dynamo.llm import KvMetricsAggregator
from dynamo.planner import KubernetesConnector, LocalConnector from dynamo.planner import KubernetesConnector, LocalConnector
from dynamo.planner.defaults import PlannerDefaults from dynamo.planner.defaults import LoadPlannerDefaults
from dynamo.runtime import DistributedRuntime, dynamo_worker from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging from dynamo.runtime.logging import configure_dynamo_logging
...@@ -405,89 +405,91 @@ async def start_planner(runtime: DistributedRuntime, args: argparse.Namespace): ...@@ -405,89 +405,91 @@ async def start_planner(runtime: DistributedRuntime, args: argparse.Namespace):
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
# Common planner arguments
parser.add_argument( parser.add_argument(
"--namespace", "--namespace",
type=str, type=str,
default=PlannerDefaults.namespace, default=LoadPlannerDefaults.namespace,
help="Namespace planner will look at", help="Namespace planner will look at",
) )
parser.add_argument(
"--environment",
type=str,
default=LoadPlannerDefaults.environment,
help="Environment to run the planner in (local, kubernetes)",
)
parser.add_argument( parser.add_argument(
"--no-operation", "--no-operation",
action="store_true", action="store_true",
default=PlannerDefaults.no_operation, default=LoadPlannerDefaults.no_operation,
help="Do not make any adjustments, just observe the metrics", help="Do not make any adjustments, just observe the metrics",
) )
parser.add_argument( parser.add_argument(
"--log-dir", "--log-dir",
type=str, type=str,
default=PlannerDefaults.log_dir, default=LoadPlannerDefaults.log_dir,
help="Tensorboard logging directory", help="Tensorboard logging directory",
) )
parser.add_argument( parser.add_argument(
"--adjustment-interval", "--adjustment-interval",
type=int, type=int,
default=PlannerDefaults.adjustment_interval, default=LoadPlannerDefaults.adjustment_interval,
help="Interval in seconds between scaling adjustments", help="Interval in seconds between scaling adjustments",
) )
parser.add_argument(
"--max-gpu-budget",
type=int,
default=LoadPlannerDefaults.max_gpu_budget,
help="Maximum number of GPUs to use",
)
parser.add_argument(
"--min-endpoint",
type=int,
default=LoadPlannerDefaults.min_endpoint,
help="Minimum number of endpoints to keep for prefill/decode workers",
)
parser.add_argument( parser.add_argument(
"--metric-pulling-interval", "--metric-pulling-interval",
type=int, type=int,
default=PlannerDefaults.metric_pulling_interval, default=LoadPlannerDefaults.metric_pulling_interval,
help="Interval in seconds between metric pulls", help="Interval in seconds between metric pulls",
) )
parser.add_argument( parser.add_argument(
"--max-gpu-budget", "--decode-engine-num-gpu",
type=int, type=int,
default=PlannerDefaults.max_gpu_budget, default=LoadPlannerDefaults.decode_engine_num_gpu,
help="Maximum number of GPUs to use", help="Number of GPUs per decode engine",
) )
parser.add_argument( parser.add_argument(
"--min-endpoint", "--prefill-engine-num-gpu",
type=int, type=int,
default=PlannerDefaults.min_endpoint, default=LoadPlannerDefaults.prefill_engine_num_gpu,
help="Minimum number of endpoints to keep for prefill/decode workers", help="Number of GPUs per prefill engine",
) )
# Load-planner specific arguments
parser.add_argument( parser.add_argument(
"--decode-kv-scale-up-threshold", "--decode-kv-scale-up-threshold",
type=float, type=float,
default=PlannerDefaults.decode_kv_scale_up_threshold, default=LoadPlannerDefaults.decode_kv_scale_up_threshold,
help="KV cache utilization threshold to scale up decode workers", help="KV cache utilization threshold to scale up decode workers",
) )
parser.add_argument( parser.add_argument(
"--decode-kv-scale-down-threshold", "--decode-kv-scale-down-threshold",
type=float, type=float,
default=PlannerDefaults.decode_kv_scale_down_threshold, default=LoadPlannerDefaults.decode_kv_scale_down_threshold,
help="KV cache utilization threshold to scale down decode workers", help="KV cache utilization threshold to scale down decode workers",
) )
parser.add_argument( parser.add_argument(
"--prefill-queue-scale-up-threshold", "--prefill-queue-scale-up-threshold",
type=float, type=float,
default=PlannerDefaults.prefill_queue_scale_up_threshold, default=LoadPlannerDefaults.prefill_queue_scale_up_threshold,
help="Queue utilization threshold to scale up prefill workers, this threshold is per prefill worker", help="Queue utilization threshold to scale up prefill workers, this threshold is per prefill worker",
) )
parser.add_argument( parser.add_argument(
"--prefill-queue-scale-down-threshold", "--prefill-queue-scale-down-threshold",
type=float, type=float,
default=PlannerDefaults.prefill_queue_scale_down_threshold, default=LoadPlannerDefaults.prefill_queue_scale_down_threshold,
help="Queue utilization threshold to scale down prefill workers, this threshold is per prefill worker", help="Queue utilization threshold to scale down prefill workers, this threshold is per prefill worker",
) )
parser.add_argument(
"--decode-engine-num-gpu",
type=int,
default=PlannerDefaults.decode_engine_num_gpu,
help="Number of GPUs per decode engine",
)
parser.add_argument(
"--prefill-engine-num-gpu",
type=int,
default=PlannerDefaults.prefill_engine_num_gpu,
help="Number of GPUs per prefill engine",
)
parser.add_argument(
"--environment",
type=str,
default=PlannerDefaults.environment,
help="Environment to run the planner in (local, kubernetes)",
)
args = parser.parse_args() args = parser.parse_args()
asyncio.run(dynamo_worker()(start_planner)(args)) asyncio.run(dynamo_worker()(start_planner)(args))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment