Unverified Commit 651ef5b5 authored by daiyaanarfeen's avatar daiyaanarfeen Committed by GitHub
Browse files

feat: throughput-metrics-source for SLA planner + GlobalPlanner disagg scaling (#6500)


Signed-off-by: default avatarDaiyaan <darfeen@nvidia.com>
Co-authored-by: default avatarClaude Sonnet 4.6 <noreply@anthropic.com>
parent c18b4758
...@@ -69,6 +69,13 @@ async def main(runtime: DistributedRuntime, args): ...@@ -69,6 +69,13 @@ async def main(runtime: DistributedRuntime, args):
else: else:
logger.info("Authorization: DISABLED (accepting all namespaces)") logger.info("Authorization: DISABLED (accepting all namespaces)")
if args.no_operation:
logger.info(
"No-operation mode: ENABLED (scale requests will be logged, not executed)"
)
else:
logger.info("No-operation mode: DISABLED")
logger.info("=" * 60) logger.info("=" * 60)
# Get K8s namespace (where GlobalPlanner pod is running) # Get K8s namespace (where GlobalPlanner pod is running)
...@@ -80,6 +87,7 @@ async def main(runtime: DistributedRuntime, args): ...@@ -80,6 +87,7 @@ async def main(runtime: DistributedRuntime, args):
runtime=runtime, runtime=runtime,
managed_namespaces=args.managed_namespaces, managed_namespaces=args.managed_namespaces,
k8s_namespace=k8s_namespace, k8s_namespace=k8s_namespace,
no_operation=args.no_operation,
) )
# Serve scale_request endpoint # Serve scale_request endpoint
......
...@@ -45,4 +45,12 @@ Examples: ...@@ -45,4 +45,12 @@ Examples:
help="Environment type (currently only kubernetes supported)", help="Environment type (currently only kubernetes supported)",
) )
parser.add_argument(
"--no-operation",
action="store_true",
default=False,
dest="no_operation",
help="Log incoming scale requests without executing them (useful for testing the e2e flow without actual K8s scaling)",
)
return parser return parser
...@@ -27,7 +27,11 @@ class ScaleRequestHandler: ...@@ -27,7 +27,11 @@ class ScaleRequestHandler:
""" """
def __init__( def __init__(
self, runtime: DistributedRuntime, managed_namespaces: list, k8s_namespace: str self,
runtime: DistributedRuntime,
managed_namespaces: list,
k8s_namespace: str,
no_operation: bool = False,
): ):
"""Initialize the scale request handler. """Initialize the scale request handler.
...@@ -35,6 +39,7 @@ class ScaleRequestHandler: ...@@ -35,6 +39,7 @@ class ScaleRequestHandler:
runtime: Dynamo runtime instance runtime: Dynamo runtime instance
managed_namespaces: List of authorized namespaces (None = accept all) managed_namespaces: List of authorized namespaces (None = accept all)
k8s_namespace: Kubernetes namespace where GlobalPlanner is running k8s_namespace: Kubernetes namespace where GlobalPlanner is running
no_operation: If True, log scale requests without executing K8s scaling
""" """
self.runtime = runtime self.runtime = runtime
# If managed_namespaces is None, accept all namespaces # If managed_namespaces is None, accept all namespaces
...@@ -42,6 +47,7 @@ class ScaleRequestHandler: ...@@ -42,6 +47,7 @@ class ScaleRequestHandler:
set(managed_namespaces) if managed_namespaces else None set(managed_namespaces) if managed_namespaces else None
) )
self.k8s_namespace = k8s_namespace self.k8s_namespace = k8s_namespace
self.no_operation = no_operation
self.connectors = {} # Cache of KubernetesConnector per DGD self.connectors = {} # Cache of KubernetesConnector per DGD
if self.managed_namespaces: if self.managed_namespaces:
...@@ -51,6 +57,12 @@ class ScaleRequestHandler: ...@@ -51,6 +57,12 @@ class ScaleRequestHandler:
else: else:
logger.info("ScaleRequestHandler initialized (accepting all namespaces)") logger.info("ScaleRequestHandler initialized (accepting all namespaces)")
if self.no_operation:
logger.info(
"ScaleRequestHandler running in NO-OPERATION mode: "
"scale requests will be logged but not executed"
)
@dynamo_endpoint(ScaleRequest, ScaleResponse) @dynamo_endpoint(ScaleRequest, ScaleResponse)
async def scale_request(self, request: ScaleRequest): async def scale_request(self, request: ScaleRequest):
"""Process scaling request from a Planner. """Process scaling request from a Planner.
...@@ -74,6 +86,24 @@ class ScaleRequestHandler: ...@@ -74,6 +86,24 @@ class ScaleRequestHandler:
} }
return return
# No-operation mode: log and return success without touching K8s
if self.no_operation:
replicas_summary = {
r.sub_component_type.value: r.desired_replicas
for r in request.target_replicas
}
logger.info(
f"[NO-OP] Scale request from {request.caller_namespace} "
f"for DGD {request.graph_deployment_name} "
f"in K8s namespace {request.k8s_namespace}: {replicas_summary}"
)
yield {
"status": ScaleStatus.SUCCESS.value,
"message": "[no-operation] Scale request received and logged (not executed)",
"current_replicas": {},
}
return
logger.info( logger.info(
f"Processing scale request from {request.caller_namespace} " f"Processing scale request from {request.caller_namespace} "
f"for DGD {request.graph_deployment_name} " f"for DGD {request.graph_deployment_name} "
...@@ -89,7 +119,6 @@ class ScaleRequestHandler: ...@@ -89,7 +119,6 @@ class ScaleRequestHandler:
k8s_namespace=request.k8s_namespace, k8s_namespace=request.k8s_namespace,
parent_dgd_name=request.graph_deployment_name, parent_dgd_name=request.graph_deployment_name,
) )
await connector._async_init()
self.connectors[connector_key] = connector self.connectors[connector_key] = connector
logger.debug(f"Created new connector for {connector_key}") logger.debug(f"Created new connector for {connector_key}")
else: else:
......
...@@ -73,6 +73,8 @@ class SLAPlannerDefaults(BasePlannerDefaults): ...@@ -73,6 +73,8 @@ class SLAPlannerDefaults(BasePlannerDefaults):
no_correction = False # disable correction factor, might be useful under some conditions like long cold start time no_correction = False # disable correction factor, might be useful under some conditions like long cold start time
mode: Literal["disagg", "prefill", "decode", "agg"] = "disagg" mode: Literal["disagg", "prefill", "decode", "agg"] = "disagg"
throughput_metrics_source = "frontend" # "frontend" | "router"
# Scaling mode flags # Scaling mode flags
enable_throughput_scaling = True enable_throughput_scaling = True
enable_load_scaling = False enable_load_scaling = False
......
...@@ -183,6 +183,7 @@ class GlobalPlannerConnector(PlannerConnector): ...@@ -183,6 +183,7 @@ class GlobalPlannerConnector(PlannerConnector):
self, self,
prefill_component_name: Optional[str] = None, prefill_component_name: Optional[str] = None,
decode_component_name: Optional[str] = None, decode_component_name: Optional[str] = None,
**kwargs,
): ):
""" """
Validate deployment (no-op for GlobalPlanner). Validate deployment (no-op for GlobalPlanner).
...@@ -207,7 +208,7 @@ class GlobalPlannerConnector(PlannerConnector): ...@@ -207,7 +208,7 @@ class GlobalPlannerConnector(PlannerConnector):
"(GlobalPlanner manages deployment state)" "(GlobalPlanner manages deployment state)"
) )
def get_model_name(self) -> str: def get_model_name(self, **kwargs) -> str:
""" """
Get model name. Get model name.
......
...@@ -98,10 +98,15 @@ class RemotePlannerClient: ...@@ -98,10 +98,15 @@ class RemotePlannerClient:
f"decode={[r.desired_replicas for r in request.target_replicas if r.sub_component_type == SubComponentType.DECODE]}" f"decode={[r.desired_replicas for r in request.target_replicas if r.sub_component_type == SubComponentType.DECODE]}"
) )
# Send request to single endpoint # Send request via the runtime client's generate method (the correct API for
# calling any dynamo endpoint, regardless of its registered name)
request_json = request.model_dump_json() request_json = request.model_dump_json()
stream = await self._client.generate(request_json)
response_data = await self._client.scale_request(request_json) response_data = None
async for output in stream:
response_data = output.data() if hasattr(output, "data") else output
break # scale_request yields a single response
if response_data is None: if response_data is None:
raise RuntimeError("No response from centralized planner") raise RuntimeError("No response from centralized planner")
......
...@@ -93,6 +93,9 @@ class PlannerConfig(BaseModel): ...@@ -93,6 +93,9 @@ class PlannerConfig(BaseModel):
metric_reporting_prometheus_port: int = Field( metric_reporting_prometheus_port: int = Field(
default_factory=lambda: int(os.environ.get("PLANNER_PROMETHEUS_PORT", 0)) default_factory=lambda: int(os.environ.get("PLANNER_PROMETHEUS_PORT", 0))
) )
throughput_metrics_source: Literal[
"frontend", "router"
] = SLAPlannerDefaults.throughput_metrics_source
no_correction: bool = SLAPlannerDefaults.no_correction no_correction: bool = SLAPlannerDefaults.no_correction
model_name: Optional[str] = None model_name: Optional[str] = None
...@@ -167,7 +170,12 @@ class PlannerConfig(BaseModel): ...@@ -167,7 +170,12 @@ class PlannerConfig(BaseModel):
inline JSON string, loads it, and validates. inline JSON string, loads it, and validates.
""" """
path = Path(config_arg) path = Path(config_arg)
if path.is_file(): try:
is_file = path.is_file()
except OSError:
# Path component too long (e.g. inline JSON string passed as config arg)
is_file = False
if is_file:
return cls._load_from_file(path) return cls._load_from_file(path)
# Try parsing as inline JSON # Try parsing as inline JSON
......
...@@ -298,8 +298,11 @@ class BasePlanner: ...@@ -298,8 +298,11 @@ class BasePlanner:
or PrometheusAPIClient( or PrometheusAPIClient(
config.metric_pulling_prometheus_endpoint, config.metric_pulling_prometheus_endpoint,
config.namespace, config.namespace,
metrics_source=config.throughput_metrics_source,
) )
) )
if config.throughput_metrics_source == "router":
self.prometheus_traffic_client.warn_if_router_not_scraped()
predictor_cls = LOAD_PREDICTORS[config.load_predictor] predictor_cls = LOAD_PREDICTORS[config.load_predictor]
self.num_req_predictor = predictor_cls(config) self.num_req_predictor = predictor_cls(config)
......
...@@ -91,63 +91,99 @@ class FrontendMetricContainer(BaseModel): ...@@ -91,63 +91,99 @@ class FrontendMetricContainer(BaseModel):
class PrometheusAPIClient: class PrometheusAPIClient:
def __init__(self, url: str, dynamo_namespace: str): def __init__(
self, url: str, dynamo_namespace: str, metrics_source: str = "frontend"
):
self.prom = PrometheusConnect(url=url, disable_ssl=True) self.prom = PrometheusConnect(url=url, disable_ssl=True)
self.dynamo_namespace = dynamo_namespace self.dynamo_namespace = dynamo_namespace
self.metrics_source = metrics_source # "frontend" | "router"
def _get_average_metric( def _get_average_metric(
self, full_metric_name: str, interval: str, operation_name: str, model_name: str self,
full_metric_name: str,
interval: str,
operation_name: str,
model_name: Optional[str] = None,
) -> float: ) -> float:
""" """Query average histogram metric.
Helper method to get average metrics using the pattern:
increase(metric_sum[interval])/increase(metric_count[interval])
Args: When model_name is None (router source): queries aggregate metrics via
full_metric_name: Full metric name (e.g., 'dynamo_frontend_inter_token_latency_seconds') sum(increase(metric_sum[interval])) / sum(increase(metric_count[interval])),
interval: Time interval for the query (e.g., '60s') filtered by dynamo_namespace. DYN_NAMESPACE uses dashes but Prometheus labels
operation_name: Human-readable name for error logging use underscores, so dashes are normalized before building the PromQL filter.
When model_name is provided (frontend source): queries per-model metrics
via increase(metric_sum)/increase(metric_count), filtered by model and
dynamo_namespace labels. The dynamo_frontend_ prefix is prepended
automatically if absent.
Returns: Returns:
Average metric value or 0 if no data/error Average metric value, or 0 if no data/error.
""" """
try: try:
# Prepend the frontend metric prefix if not already present if model_name is None:
if not full_metric_name.startswith(prometheus_names.name_prefix.FRONTEND): # Router aggregate path: filter by dynamo_namespace so each pool
full_metric_name = ( # planner only reads its own LocalRouter's metrics.
f"{prometheus_names.name_prefix.FRONTEND}_{full_metric_name}" # dynamo_component_router_* metrics are registered via MetricsHierarchy
) # which auto-injects dynamo_namespace with underscores (e.g.
query = f"increase({full_metric_name}_sum[{interval}])/increase({full_metric_name}_count[{interval}])" # "darfeen_dynamo_cloud_gp_prefill_1"). DYN_NAMESPACE uses dashes, so
result = self.prom.custom_query(query=query) # normalize before building the PromQL filter.
if not result: ns = self.dynamo_namespace.replace("-", "_")
# No data available yet (no requests made) - return 0 silently ns_filter = f'{prometheus_names.labels.NAMESPACE}="{ns}"'
logger.warning( query = (
f"No prometheus metric data available for {full_metric_name}, use 0 instead" f"sum(increase({full_metric_name}_sum{{{ns_filter}}}[{interval}])) / "
f"sum(increase({full_metric_name}_count{{{ns_filter}}}[{interval}]))"
) )
return 0 result = self.prom.custom_query(query=query)
metrics_containers = parse_frontend_metric_containers(result) if not result:
logger.warning(
values = [] f"No prometheus metric data available for {full_metric_name}, use 0 instead"
for container in metrics_containers: )
# Frontend lowercases model names for Prometheus labels so we need to do case-insensitive comparison return 0
if ( value = float(result[0]["value"][1])
container.metric.model return 0 if math.isnan(value) else value
and container.metric.model.lower() == model_name.lower() else:
and container.metric.dynamo_namespace == self.dynamo_namespace # Frontend per-model path: filter by model and dynamo_namespace labels
if not full_metric_name.startswith(
prometheus_names.name_prefix.FRONTEND
): ):
values.append(container.value[1]) full_metric_name = (
f"{prometheus_names.name_prefix.FRONTEND}_{full_metric_name}"
if not values: )
logger.warning( query = f"increase({full_metric_name}_sum[{interval}])/increase({full_metric_name}_count[{interval}])"
f"No prometheus metric data available for {full_metric_name} with model {model_name} and dynamo namespace {self.dynamo_namespace}, use 0 instead" result = self.prom.custom_query(query=query)
) if not result:
return 0 logger.warning(
return sum(values) / len(values) f"No prometheus metric data available for {full_metric_name}, use 0 instead"
)
return 0
metrics_containers = parse_frontend_metric_containers(result)
values = []
for container in metrics_containers:
# Frontend lowercases model names for Prometheus labels so we need to do case-insensitive comparison
if (
container.metric.model
and container.metric.model.lower() == model_name.lower()
and container.metric.dynamo_namespace == self.dynamo_namespace
):
values.append(container.value[1])
if not values:
logger.warning(
f"No prometheus metric data available for {full_metric_name} with model {model_name} and dynamo namespace {self.dynamo_namespace}, use 0 instead"
)
return 0
return sum(values) / len(values)
except Exception as e: except Exception as e:
logger.error(f"Error getting {operation_name}: {e}") logger.error(f"Error getting {operation_name}: {e}")
return 0 return 0
def get_avg_inter_token_latency(self, interval: str, model_name: str): def get_avg_inter_token_latency(self, interval: str, model_name: str):
if self.metrics_source == "router":
return self._get_average_metric(
f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.INTER_TOKEN_LATENCY_SECONDS}",
interval,
"avg inter token latency",
)
return self._get_average_metric( return self._get_average_metric(
prometheus_names.frontend_service.INTER_TOKEN_LATENCY_SECONDS, prometheus_names.frontend_service.INTER_TOKEN_LATENCY_SECONDS,
interval, interval,
...@@ -156,6 +192,12 @@ class PrometheusAPIClient: ...@@ -156,6 +192,12 @@ class PrometheusAPIClient:
) )
def get_avg_time_to_first_token(self, interval: str, model_name: str): def get_avg_time_to_first_token(self, interval: str, model_name: str):
if self.metrics_source == "router":
return self._get_average_metric(
f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.TIME_TO_FIRST_TOKEN_SECONDS}",
interval,
"avg time to first token",
)
return self._get_average_metric( return self._get_average_metric(
prometheus_names.frontend_service.TIME_TO_FIRST_TOKEN_SECONDS, prometheus_names.frontend_service.TIME_TO_FIRST_TOKEN_SECONDS,
interval, interval,
...@@ -164,6 +206,19 @@ class PrometheusAPIClient: ...@@ -164,6 +206,19 @@ class PrometheusAPIClient:
) )
def get_avg_request_duration(self, interval: str, model_name: str): def get_avg_request_duration(self, interval: str, model_name: str):
if self.metrics_source == "router":
# TODO: Replace work_handler.REQUEST_DURATION_SECONDS with
# prometheus_names.router.REQUEST_DURATION_SECONDS once
# RouterRequestMetrics in lib/llm/src/kv_router/metrics.rs
# registers dynamo_component_router_request_duration_seconds.
# Until then this queries a non-existent metric and returns 0,
# which causes the decode planner correction factor to use
# concurrency=0 (under-estimated), inflating replica recommendations.
return self._get_average_metric(
f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.work_handler.REQUEST_DURATION_SECONDS}",
interval,
"avg request duration",
)
return self._get_average_metric( return self._get_average_metric(
prometheus_names.frontend_service.REQUEST_DURATION_SECONDS, prometheus_names.frontend_service.REQUEST_DURATION_SECONDS,
interval, interval,
...@@ -172,6 +227,24 @@ class PrometheusAPIClient: ...@@ -172,6 +227,24 @@ class PrometheusAPIClient:
) )
def get_avg_request_count(self, interval: str, model_name: str): def get_avg_request_count(self, interval: str, model_name: str):
if self.metrics_source == "router":
try:
router_req_total = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.REQUESTS_TOTAL}"
ns = self.dynamo_namespace.replace("-", "_")
ns_filter = f'{prometheus_names.labels.NAMESPACE}="{ns}"'
query = f"sum(increase({router_req_total}{{{ns_filter}}}[{interval}]))"
result = self.prom.custom_query(query=query)
if not result:
logger.warning(
f"No prometheus metric data available for "
f"{router_req_total}, use 0 instead"
)
return 0
value = float(result[0]["value"][1])
return 0 if math.isnan(value) else value
except Exception as e:
logger.error(f"Error getting avg request count: {e}")
return 0
# This function follows a different query pattern than the other metrics # This function follows a different query pattern than the other metrics
try: try:
requests_total_metric = prometheus_names.frontend_service.REQUESTS_TOTAL requests_total_metric = prometheus_names.frontend_service.REQUESTS_TOTAL
...@@ -201,6 +274,12 @@ class PrometheusAPIClient: ...@@ -201,6 +274,12 @@ class PrometheusAPIClient:
return 0 return 0
def get_avg_input_sequence_tokens(self, interval: str, model_name: str): def get_avg_input_sequence_tokens(self, interval: str, model_name: str):
if self.metrics_source == "router":
return self._get_average_metric(
f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.INPUT_SEQUENCE_TOKENS}",
interval,
"avg input sequence tokens",
)
return self._get_average_metric( return self._get_average_metric(
prometheus_names.frontend_service.INPUT_SEQUENCE_TOKENS, prometheus_names.frontend_service.INPUT_SEQUENCE_TOKENS,
interval, interval,
...@@ -209,6 +288,12 @@ class PrometheusAPIClient: ...@@ -209,6 +288,12 @@ class PrometheusAPIClient:
) )
def get_avg_output_sequence_tokens(self, interval: str, model_name: str): def get_avg_output_sequence_tokens(self, interval: str, model_name: str):
if self.metrics_source == "router":
return self._get_average_metric(
f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.OUTPUT_SEQUENCE_TOKENS}",
interval,
"avg output sequence tokens",
)
return self._get_average_metric( return self._get_average_metric(
prometheus_names.frontend_service.OUTPUT_SEQUENCE_TOKENS, prometheus_names.frontend_service.OUTPUT_SEQUENCE_TOKENS,
interval, interval,
...@@ -216,6 +301,34 @@ class PrometheusAPIClient: ...@@ -216,6 +301,34 @@ class PrometheusAPIClient:
model_name, model_name,
) )
def warn_if_router_not_scraped(self) -> None:
"""Warn if Prometheus is not scraping any dynamo_component_router_* series.
Called once at planner startup when throughput_metrics_source="router".
Detects a missing or misconfigured PodMonitor early so the operator
sees a clear warning rather than silent zero metrics.
Uses absent() to check whether any dynamo_component_router_requests_total
series exist for this namespace. MetricsHierarchy injects dynamo_namespace
with underscores, so DYN_NAMESPACE dashes are normalized before the query.
"""
try:
metric = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.REQUESTS_TOTAL}"
ns = self.dynamo_namespace.replace("-", "_")
ns_filter = f'{prometheus_names.labels.NAMESPACE}="{ns}"'
result = self.prom.custom_query(query=f"absent({metric}{{{ns_filter}}})")
if result:
logger.warning(
f"[throughput_metrics_source=router] No '{metric}' series found "
f"for namespace '{ns}' in Prometheus. "
"Router metrics will read as zero until scraping is working. "
"Check: (1) PodMonitor 'dynamo-router' is installed in the operator namespace, "
"(2) LocalRouter pods have DYN_SYSTEM_PORT=9090, "
"(3) pods have label nvidia.com/metrics-enabled=true."
)
except Exception as e:
logger.warning(f"Could not check router scraping status: {e}")
def parse_frontend_metric_containers( def parse_frontend_metric_containers(
result: list[dict], result: list[dict],
......
...@@ -30,6 +30,9 @@ spec: ...@@ -30,6 +30,9 @@ spec:
- interval: 5s - interval: 5s
path: /metrics path: /metrics
port: http port: http
# honorLabels ensures dynamo_namespace and other labels set by the dynamo
# runtime (including router metrics embedded in the frontend) are preserved.
honorLabels: true
relabelings: relabelings:
- action: replace - action: replace
sourceLabels: sourceLabels:
...@@ -86,4 +89,46 @@ spec: ...@@ -86,4 +89,46 @@ spec:
matchLabels: matchLabels:
nvidia.com/dynamo-component-type: planner nvidia.com/dynamo-component-type: planner
nvidia.com/metrics-enabled: "true" nvidia.com/metrics-enabled: "true"
---
# TODO: Once a first-class 'router' componentType is added to the dynamo operator,
# replace this PodMonitor with one that selects on
# nvidia.com/dynamo-component-type: router instead of 'default'.
# That would also eliminate the port-override relabeling below, since router
# pods could declare containerPort: 9090 (DYN_SYSTEM_PORT) explicitly and
# the standard targetPort selector would work without the address rewrite hack.
apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
name: dynamo-router
spec:
{{- if .Values.namespaceRestriction.enabled }}
namespaceSelector:
matchNames:
- {{ .Release.Namespace }}
{{- else }}
namespaceSelector:
any: true
{{- end }}
podMetricsEndpoints:
- interval: 5s
path: /metrics
honorLabels: true
relabelings:
# Force port 9090 (DYN_SYSTEM_PORT) without requiring containerPort in the pod spec.
# targetPort: 9090 would generate a keep rule on __meta_kubernetes_pod_container_port_number
# that drops pods without an explicit containerPort: 9090 declaration.
- action: replace
sourceLabels:
- __address__
regex: '([^:]+)(?::\d+)?'
replacement: '${1}:9090'
targetLabel: __address__
- action: replace
sourceLabels:
- __meta_kubernetes_pod_label_nvidia_com_dynamo_namespace
targetLabel: dynamo_namespace
selector:
matchLabels:
nvidia.com/dynamo-component-type: default
nvidia.com/metrics-enabled: "true"
{{- end }} {{- end }}
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Mocker-based GlobalPlanner test: 2 prefill pools + 2 decode pools.
# Each pool SLA Planner reads router histogram metrics from cluster Prometheus
# and delegates scaling decisions to the GlobalPlanner (no-op mode).
#
# Architecture:
# DGD gp-ctrl: Frontend + GlobalRouter + GlobalPlanner (no-op)
# DGD gp-prefill-0: LocalRouter + MockerPrefill + Planner
# DGD gp-prefill-1: LocalRouter + MockerPrefill + Planner
# DGD gp-decode-0: LocalRouter + MockerDecode + Planner
# DGD gp-decode-1: LocalRouter + MockerDecode + Planner
#
# Usage:
# envsubst < hplanner-mocker-test.yaml | kubectl apply -n ${K8S_NAMESPACE} -f -
# envsubst < hplanner-mocker-test.yaml | kubectl delete -n ${K8S_NAMESPACE} -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: gp-global-router-config
data:
global_router_config.json: |
{
"num_prefill_pools": 2,
"num_decode_pools": 2,
"prefill_pool_dynamo_namespaces": [
"${K8S_NAMESPACE}-gp-prefill-0",
"${K8S_NAMESPACE}-gp-prefill-1"
],
"decode_pool_dynamo_namespaces": [
"${K8S_NAMESPACE}-gp-decode-0",
"${K8S_NAMESPACE}-gp-decode-1"
],
"prefill_pool_selection_strategy": {
"ttft_min": 10, "ttft_max": 3000, "ttft_resolution": 2,
"isl_min": 0, "isl_max": 32000, "isl_resolution": 2,
"prefill_pool_mapping": [[0,1],[0,1]]
},
"decode_pool_selection_strategy": {
"itl_min": 10, "itl_max": 500, "itl_resolution": 2,
"context_length_min": 0, "context_length_max": 32000, "context_length_resolution": 2,
"decode_pool_mapping": [[0,1],[0,1]]
}
}
---
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: gp-ctrl
spec:
services:
Frontend:
componentType: frontend
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
workingDir: /workspace
command:
- python3
- -m
- dynamo.frontend
args:
- --router-mode
- round-robin
- --namespace
- ${K8S_NAMESPACE}-gp-ctrl
- --model-name
- nvidia/Llama-3.1-8B-Instruct-FP8
GlobalRouter:
componentType: default
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
volumes:
- name: global-router-config
configMap:
name: gp-global-router-config
mainContainer:
image: ${DYNAMO_IMAGE}
workingDir: /workspace
command:
- python3
- -m
- dynamo.global_router
args:
- --config
- /config/global_router_config.json
- --model-name
- nvidia/Llama-3.1-8B-Instruct-FP8
- --namespace
- ${K8S_NAMESPACE}-gp-ctrl
volumeMounts:
- name: global-router-config
mountPath: /config
readOnly: true
GlobalPlanner:
componentType: default
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
command:
- python3
- -m
- dynamo.global_planner
args:
- --no-operation
---
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: gp-prefill-0
spec:
services:
LocalRouter:
componentType: default
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
workingDir: /workspace
env:
- name: DYN_SYSTEM_PORT
value: "9090"
command:
- python3
- -m
- dynamo.router
args:
- --endpoint
- ${K8S_NAMESPACE}-gp-prefill-0.prefill.generate
- --no-router-kv-events
MockerPrefill:
componentType: worker
subComponentType: prefill
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
workingDir: /workspace
command:
- python3
- -m
- dynamo.mocker
args:
- --model-path
- nvidia/Llama-3.1-8B-Instruct-FP8
- --model-name
- nvidia/Llama-3.1-8B-Instruct-FP8
- --speedup-ratio
- "5.0"
- --planner-profile-data
- /workspace/tests/planner/profiling_results/H200_TP1P_TP1D
- --is-prefill-worker
Planner:
componentType: planner
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
command:
- python3
- -m
- dynamo.planner
args:
- --config
- '{"environment":"global-planner","global_planner_namespace":"${K8S_NAMESPACE}-gp-ctrl","backend":"mocker","mode":"prefill","throughput_metrics_source":"router","throughput_adjustment_interval":30,"ttft":2000,"max_gpu_budget":-1,"prefill_engine_num_gpu":1,"no_correction":true,"profile_results_dir":"/workspace/tests/planner/profiling_results/H200_TP1P_TP1D","model_name":"nvidia/Llama-3.1-8B-Instruct-FP8"}'
---
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: gp-prefill-1
spec:
services:
LocalRouter:
componentType: default
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
workingDir: /workspace
env:
- name: DYN_SYSTEM_PORT
value: "9090"
command:
- python3
- -m
- dynamo.router
args:
- --endpoint
- ${K8S_NAMESPACE}-gp-prefill-1.prefill.generate
- --no-router-kv-events
MockerPrefill:
componentType: worker
subComponentType: prefill
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
workingDir: /workspace
command:
- python3
- -m
- dynamo.mocker
args:
- --model-path
- nvidia/Llama-3.1-8B-Instruct-FP8
- --model-name
- nvidia/Llama-3.1-8B-Instruct-FP8
- --speedup-ratio
- "5.0"
- --planner-profile-data
- /workspace/tests/planner/profiling_results/H200_TP1P_TP1D
- --is-prefill-worker
Planner:
componentType: planner
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
command:
- python3
- -m
- dynamo.planner
args:
- --config
- '{"environment":"global-planner","global_planner_namespace":"${K8S_NAMESPACE}-gp-ctrl","backend":"mocker","mode":"prefill","throughput_metrics_source":"router","throughput_adjustment_interval":30,"ttft":2000,"max_gpu_budget":-1,"prefill_engine_num_gpu":1,"no_correction":true,"profile_results_dir":"/workspace/tests/planner/profiling_results/H200_TP1P_TP1D","model_name":"nvidia/Llama-3.1-8B-Instruct-FP8"}'
---
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: gp-decode-0
spec:
services:
LocalRouter:
componentType: default
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
workingDir: /workspace
env:
- name: DYN_SYSTEM_PORT
value: "9090"
command:
- python3
- -m
- dynamo.router
args:
- --endpoint
- ${K8S_NAMESPACE}-gp-decode-0.backend.generate
- --no-router-kv-events
- --router-kv-overlap-score-weight=0
MockerDecode:
componentType: worker
subComponentType: decode
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
workingDir: /workspace
command:
- python3
- -m
- dynamo.mocker
args:
- --model-path
- nvidia/Llama-3.1-8B-Instruct-FP8
- --model-name
- nvidia/Llama-3.1-8B-Instruct-FP8
- --speedup-ratio
- "5.0"
- --planner-profile-data
- /workspace/tests/planner/profiling_results/H200_TP1P_TP1D
Planner:
componentType: planner
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
command:
- python3
- -m
- dynamo.planner
args:
- --config
- '{"environment":"global-planner","global_planner_namespace":"${K8S_NAMESPACE}-gp-ctrl","backend":"mocker","mode":"decode","throughput_metrics_source":"router","throughput_adjustment_interval":30,"itl":200,"max_gpu_budget":-1,"decode_engine_num_gpu":1,"no_correction":true,"profile_results_dir":"/workspace/tests/planner/profiling_results/H200_TP1P_TP1D","model_name":"nvidia/Llama-3.1-8B-Instruct-FP8"}'
---
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: gp-decode-1
spec:
services:
LocalRouter:
componentType: default
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
workingDir: /workspace
env:
- name: DYN_SYSTEM_PORT
value: "9090"
command:
- python3
- -m
- dynamo.router
args:
- --endpoint
- ${K8S_NAMESPACE}-gp-decode-1.backend.generate
- --no-router-kv-events
- --router-kv-overlap-score-weight=0
MockerDecode:
componentType: worker
subComponentType: decode
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
workingDir: /workspace
command:
- python3
- -m
- dynamo.mocker
args:
- --model-path
- nvidia/Llama-3.1-8B-Instruct-FP8
- --model-name
- nvidia/Llama-3.1-8B-Instruct-FP8
- --speedup-ratio
- "5.0"
- --planner-profile-data
- /workspace/tests/planner/profiling_results/H200_TP1P_TP1D
Planner:
componentType: planner
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
command:
- python3
- -m
- dynamo.planner
args:
- --config
- '{"environment":"global-planner","global_planner_namespace":"${K8S_NAMESPACE}-gp-ctrl","backend":"mocker","mode":"decode","throughput_metrics_source":"router","throughput_adjustment_interval":30,"itl":200,"max_gpu_budget":-1,"decode_engine_num_gpu":1,"no_correction":true,"profile_results_dir":"/workspace/tests/planner/profiling_results/H200_TP1P_TP1D","model_name":"nvidia/Llama-3.1-8B-Instruct-FP8"}'
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# vLLM-based GlobalPlanner test: 2 prefill pools + 1 decode pool.
# Each pool SLA Planner reads router histogram metrics from cluster Prometheus
# and delegates scaling decisions to the GlobalPlanner.
#
# Architecture:
# DGD gp-ctrl: Frontend + GlobalRouter + GlobalPlanner
# DGD gp-prefill-0: LocalRouter + VllmPrefillWorker (TP1) + Planner
# DGD gp-prefill-1: LocalRouter + VllmPrefillWorker (TP2) + Planner
# DGD gp-decode-0: LocalRouter + VllmDecodeWorker (TP1) + Planner
#
# Prerequisites:
# - Cluster Prometheus deployed and scraping LocalRouter pods via PodMonitor
# - HuggingFace token secret: kubectl create secret generic hf-token-secret \
# --from-literal=HF_TOKEN=<your-token> -n ${K8S_NAMESPACE}
#
# Usage:
# export K8S_NAMESPACE=... DYNAMO_IMAGE=... DYNAMO_VLLM_IMAGE=... MODEL_NAME=... STORAGE_CLASS_NAME=...
# envsubst < hplanner-vllm-test.yaml | kubectl apply -n ${K8S_NAMESPACE} -f -
# envsubst < hplanner-vllm-test.yaml | kubectl delete -n ${K8S_NAMESPACE} -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: ${K8S_NAMESPACE}-planner
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: dynamo-platform-dynamo-operator-planner
subjects:
- kind: ServiceAccount
name: default
namespace: ${K8S_NAMESPACE}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: gp-global-router-config
data:
global_router_config.json: |
{
"num_prefill_pools": 2,
"num_decode_pools": 1,
"prefill_pool_dynamo_namespaces": [
"${K8S_NAMESPACE}-gp-prefill-0",
"${K8S_NAMESPACE}-gp-prefill-1"
],
"decode_pool_dynamo_namespaces": [
"${K8S_NAMESPACE}-gp-decode-0"
],
"prefill_pool_selection_strategy": {
"ttft_min": 10, "ttft_max": 3000, "ttft_resolution": 2,
"isl_min": 0, "isl_max": 32000, "isl_resolution": 2,
"prefill_pool_mapping": [[0,1],[0,1]]
},
"decode_pool_selection_strategy": {
"itl_min": 10, "itl_max": 500, "itl_resolution": 2,
"context_length_min": 0, "context_length_max": 32000, "context_length_resolution": 2,
"decode_pool_mapping": [[0,0],[0,0]]
}
}
---
# Shared model cache — ReadWriteMany PVC mounted into all vLLM worker pods.
# The model is downloaded once and reused across pods and restarts.
# Set storageClassName to a RWX-capable storage class available in your cluster
# (e.g. azurefile-csi-premium on AKS, nfs-csi on Nebius, efs-sc on EKS).
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: hf-model-cache
spec:
accessModes:
- ReadWriteMany
storageClassName: ${STORAGE_CLASS_NAME}
resources:
requests:
storage: 50Gi
---
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: gp-ctrl
spec:
services:
Frontend:
componentType: frontend
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
workingDir: /workspace
command:
- python3
- -m
- dynamo.frontend
args:
- --router-mode
- round-robin
- --namespace
- ${K8S_NAMESPACE}-gp-ctrl
- --model-name
- ${MODEL_NAME}
GlobalRouter:
componentType: default
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
tolerations:
- key: "karpenter.sh/disrupted"
operator: "Exists"
effect: "NoSchedule"
affinity:
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
preference:
matchExpressions:
- key: karpenter.sh/nodepool
operator: In
values:
- general-medium-storage
volumes:
- name: global-router-config
configMap:
name: gp-global-router-config
mainContainer:
image: ${DYNAMO_IMAGE}
workingDir: /workspace
command:
- python3
- -m
- dynamo.global_router
args:
- --config
- /config/global_router_config.json
- --model-name
- ${MODEL_NAME}
- --namespace
- ${K8S_NAMESPACE}-gp-ctrl
volumeMounts:
- name: global-router-config
mountPath: /config
readOnly: true
GlobalPlanner:
componentType: default
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
command:
- python3
- -m
- dynamo.global_planner
---
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: gp-prefill-0
spec:
services:
LocalRouter:
componentType: default
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
workingDir: /workspace
env:
- name: DYN_SYSTEM_PORT
value: "9090"
command:
- python3
- -m
- dynamo.router
args:
- --endpoint
- ${K8S_NAMESPACE}-gp-prefill-0.prefill.generate
- --router-block-size
- "16"
- --no-router-track-active-blocks
VllmPrefillWorker:
envFromSecret: hf-token-secret
componentType: worker
subComponentType: prefill
replicas: 1
resources:
limits:
gpu: "1"
extraPodSpec:
volumes:
- name: hf-model-cache
persistentVolumeClaim:
claimName: hf-model-cache
mainContainer:
image: ${DYNAMO_VLLM_IMAGE}
workingDir: /workspace/examples/backends/vllm
command:
- python3
- -m
- dynamo.vllm
args:
- --model
- ${MODEL_NAME}
- --tensor-parallel-size
- "1"
- --is-prefill-worker
volumeMounts:
- name: hf-model-cache
mountPath: /home/dynamo/.cache/huggingface/hub
Planner:
componentType: planner
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
command:
- python3
- -m
- dynamo.planner
args:
- --config
- '{"environment":"global-planner","global_planner_namespace":"${K8S_NAMESPACE}-gp-ctrl","backend":"vllm","mode":"prefill","enable_load_scaling":false,"enable_throughput_scaling":true,"throughput_metrics_source":"router","ttft":2000,"max_gpu_budget":-1,"prefill_engine_num_gpu":1,"model_name":"${MODEL_NAME}","profile_results_dir":"/workspace/tests/planner/profiling_results/H200_TP1P_TP1D"}'
---
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: gp-prefill-1
spec:
services:
LocalRouter:
componentType: default
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
workingDir: /workspace
env:
- name: DYN_SYSTEM_PORT
value: "9090"
command:
- python3
- -m
- dynamo.router
args:
- --endpoint
- ${K8S_NAMESPACE}-gp-prefill-1.prefill.generate
- --router-block-size
- "16"
- --no-router-track-active-blocks
VllmPrefillWorker:
envFromSecret: hf-token-secret
componentType: worker
subComponentType: prefill
replicas: 1
resources:
limits:
gpu: "2"
extraPodSpec:
volumes:
- name: hf-model-cache
persistentVolumeClaim:
claimName: hf-model-cache
mainContainer:
image: ${DYNAMO_VLLM_IMAGE}
workingDir: /workspace/examples/backends/vllm
command:
- python3
- -m
- dynamo.vllm
args:
- --model
- ${MODEL_NAME}
- --tensor-parallel-size
- "2"
- --is-prefill-worker
volumeMounts:
- name: hf-model-cache
mountPath: /home/dynamo/.cache/huggingface/hub
Planner:
componentType: planner
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
command:
- python3
- -m
- dynamo.planner
args:
- --config
- '{"environment":"global-planner","global_planner_namespace":"${K8S_NAMESPACE}-gp-ctrl","backend":"vllm","mode":"prefill","enable_load_scaling":false,"enable_throughput_scaling":true,"throughput_metrics_source":"router","ttft":2000,"max_gpu_budget":-1,"prefill_engine_num_gpu":2,"model_name":"${MODEL_NAME}","profile_results_dir":"/workspace/tests/planner/profiling_results/H200_TP1P_TP1D"}'
---
apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: gp-decode-0
spec:
services:
LocalRouter:
componentType: default
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
workingDir: /workspace
env:
- name: DYN_SYSTEM_PORT
value: "9090"
command:
- python3
- -m
- dynamo.router
args:
- --endpoint
- ${K8S_NAMESPACE}-gp-decode-0.backend.generate
- --router-block-size
- "16"
- --router-kv-overlap-score-weight
- "0"
VllmDecodeWorker:
envFromSecret: hf-token-secret
componentType: worker
subComponentType: decode
replicas: 1
resources:
limits:
gpu: "1"
extraPodSpec:
volumes:
- name: hf-model-cache
persistentVolumeClaim:
claimName: hf-model-cache
mainContainer:
image: ${DYNAMO_VLLM_IMAGE}
workingDir: /workspace/examples/backends/vllm
command:
- python3
- -m
- dynamo.vllm
args:
- --model
- ${MODEL_NAME}
- --tensor-parallel-size
- "1"
volumeMounts:
- name: hf-model-cache
mountPath: /home/dynamo/.cache/huggingface/hub
Planner:
componentType: planner
replicas: 1
extraPodSpec:
imagePullSecrets:
- name: docker-imagepullsecret
mainContainer:
image: ${DYNAMO_IMAGE}
command:
- python3
- -m
- dynamo.planner
args:
- --config
- '{"environment":"global-planner","global_planner_namespace":"${K8S_NAMESPACE}-gp-ctrl","backend":"vllm","mode":"decode","enable_load_scaling":false,"enable_throughput_scaling":true,"throughput_metrics_source":"router","itl":200,"max_gpu_budget":-1,"decode_engine_num_gpu":1,"model_name":"${MODEL_NAME}","profile_results_dir":"/workspace/tests/planner/profiling_results/H200_TP1P_TP1D"}'
...@@ -158,3 +158,86 @@ Example: To always route to pool 0 regardless of request characteristics: ...@@ -158,3 +158,86 @@ Example: To always route to pool 0 regardless of request characteristics:
```json ```json
"prefill_pool_mapping": [[0, 0], [0, 0]] "prefill_pool_mapping": [[0, 0], [0, 0]]
``` ```
## SLA Planner with GlobalPlanner
Each pool can run an SLA Planner that reads throughput metrics and delegates autoscaling decisions
to a central **GlobalPlanner** service. The GlobalPlanner arbitrates across pools and executes
scaling via the Dynamo operator.
### Architecture with SLA Planners
```
Frontend (round-robin)
|
v
Global Router ─── GlobalPlanner ◄─── scale decisions from pool planners
|
+──────────────────────────────────────+
| | |
Prefill Pool 0 Prefill Pool 1 Decode Pool 0
LocalRouter LocalRouter LocalRouter
Worker Worker Worker
Planner ──────► Planner ──────► Planner ──────► (all → GlobalPlanner)
```
### SLA Planner configuration
The SLA Planner is configured via a JSON blob passed to `--config`. Key fields for the
global-planner environment:
| Field | Description |
|---|---|
| `environment` | `"global-planner"` to delegate scaling to GlobalPlanner |
| `global_planner_namespace` | Dynamo namespace of the DGD running GlobalPlanner |
| `mode` | `"prefill"` or `"decode"` |
| `throughput_metrics_source` | `"frontend"` (default) or `"router"` — see below |
### `throughput_metrics_source`
Controls where the SLA Planner reads aggregate throughput metrics (TTFT, ITL, request rate):
- **`frontend`** (default): reads `dynamo_frontend_*` histograms from the frontend service. Works
for single-DGD disagg deployments where the planner and frontend share a namespace.
- **`router`**: reads `dynamo_component_router_*` histograms emitted by LocalRouter pods and
scraped by cluster Prometheus. Required for hierarchical (multi-DGD) disagg deployments where
the SLA Planner runs in a pool DGD namespace that is different from the frontend DGD namespace.
Use `throughput_metrics_source: "router"` whenever the planner is co-located with a pool
(not the frontend), i.e. in any GlobalPlanner setup.
### Prometheus scraping for router metrics
The Dynamo operator Helm chart includes a PodMonitor that scrapes LocalRouter pods on port 9090.
LocalRouter pods must expose metrics on that port via:
```yaml
env:
- name: DYN_SYSTEM_PORT
value: "9090"
```
No standalone Prometheus is needed — the cluster-wide Prometheus picks up the PodMonitor
automatically.
### GlobalPlanner `--no-operation` mode
Pass `--no-operation` to GlobalPlanner to receive and log scale requests without executing them.
Useful for observing planner behaviour before enabling live scaling:
```yaml
command: [python3, -m, dynamo.global_planner]
args: [--no-operation]
```
### Example deployments
Complete end-to-end examples are in `examples/backends/`:
| File | Description |
|---|---|
| `mocker/deploy/hplanner-mocker-test.yaml` | 2 prefill + 2 decode pools with Mocker workers; GlobalPlanner in no-op mode |
| `vllm/deploy/hplanner-vllm-test.yaml` | 2 prefill (TP1, TP2) + 1 decode pool with real vLLM workers |
Both use `envsubst` for substituting `${K8S_NAMESPACE}`, `${DYNAMO_IMAGE}`, etc.
...@@ -214,6 +214,35 @@ class router_request: ...@@ -214,6 +214,35 @@ class router_request:
METRIC_PREFIX = "router_" METRIC_PREFIX = "router_"
class router:
"""Router request metrics (dynamo_component_router_* with dynamo_namespace label).
These constants are the full suffix portions combined with name_prefix.COMPONENT
("dynamo_component") to form the complete metric name, e.g.
dynamo_component_router_requests_total.
Registered via MetricsHierarchy (from_component()) which auto-injects
dynamo_namespace (underscores) and dynamo_component labels and registers
with the component's registry (port 9090).
"""
# Total number of requests processed by the router
REQUESTS_TOTAL = "router_requests_total"
# Time to first token observed at the router (seconds)
TIME_TO_FIRST_TOKEN_SECONDS = "router_time_to_first_token_seconds"
# Average inter-token latency observed at the router (seconds)
INTER_TOKEN_LATENCY_SECONDS = "router_inter_token_latency_seconds"
# Input sequence length in tokens observed at the router
INPUT_SEQUENCE_TOKENS = "router_input_sequence_tokens"
# Output sequence length in tokens observed at the router
OUTPUT_SEQUENCE_TOKENS = "router_output_sequence_tokens"
# TODO: Add REQUEST_DURATION_SECONDS = "router_request_duration_seconds" once
# RouterRequestMetrics in lib/llm/src/kv_router/metrics.rs registers a
# dynamo_component_router_request_duration_seconds histogram. Until then,
# get_avg_request_duration (router path) falls back to the work_handler
# constant and queries a non-existent metric, silently returning 0.
class routing_overhead: class routing_overhead:
"""Routing overhead phase latency histogram suffixes.""" """Routing overhead phase latency histogram suffixes."""
......
...@@ -451,6 +451,30 @@ pub mod routing_overhead { ...@@ -451,6 +451,30 @@ pub mod routing_overhead {
pub const TOTAL_MS: &str = "overhead_total_ms"; pub const TOTAL_MS: &str = "overhead_total_ms";
} }
/// Router request metrics (component-scoped aggregate histograms + counter)
///
/// These constants are the suffix portions of full metric names, combined with
/// [`name_prefix::COMPONENT`] to form the complete name, e.g.
/// `dynamo_component_router_requests_total`.
///
/// ⚠️ Python codegen: Run gen-python-prometheus-names after changes
pub mod router {
/// Total number of requests processed by the router
pub const REQUESTS_TOTAL: &str = "router_requests_total";
/// Time to first token observed at the router (seconds)
pub const TIME_TO_FIRST_TOKEN_SECONDS: &str = "router_time_to_first_token_seconds";
/// Average inter-token latency observed at the router (seconds)
pub const INTER_TOKEN_LATENCY_SECONDS: &str = "router_inter_token_latency_seconds";
/// Input sequence length in tokens observed at the router
pub const INPUT_SEQUENCE_TOKENS: &str = "router_input_sequence_tokens";
/// Output sequence length in tokens observed at the router
pub const OUTPUT_SEQUENCE_TOKENS: &str = "router_output_sequence_tokens";
}
// KvRouter (including KvInexer) Prometheus metric names // KvRouter (including KvInexer) Prometheus metric names
pub mod kvrouter { pub mod kvrouter {
/// Number of KV cache events applied to the index (including status) /// Number of KV cache events applied to the index (including status)
......
...@@ -63,3 +63,27 @@ def test_all_fields_work(): ...@@ -63,3 +63,27 @@ def test_all_fields_work():
assert config.itl == 50 assert config.itl == 50
assert config.max_gpu_budget == 16 assert config.max_gpu_budget == 16
assert config.throughput_adjustment_interval == 60 assert config.throughput_adjustment_interval == 60
def test_throughput_metrics_source_default():
"""throughput_metrics_source defaults to 'frontend'."""
config = PlannerConfig(namespace="test-ns")
assert config.throughput_metrics_source == "frontend"
def test_throughput_metrics_source_frontend():
"""throughput_metrics_source accepts 'frontend'."""
config = PlannerConfig(namespace="test-ns", throughput_metrics_source="frontend")
assert config.throughput_metrics_source == "frontend"
def test_throughput_metrics_source_router():
"""throughput_metrics_source accepts 'router'."""
config = PlannerConfig(namespace="test-ns", throughput_metrics_source="router")
assert config.throughput_metrics_source == "router"
def test_throughput_metrics_source_invalid():
"""throughput_metrics_source rejects invalid values."""
with pytest.raises(ValidationError):
PlannerConfig(namespace="test-ns", throughput_metrics_source="invalid")
...@@ -13,11 +13,13 @@ ...@@ -13,11 +13,13 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
import math import math
from unittest.mock import patch from unittest.mock import MagicMock, patch
import pytest import pytest
from dynamo import prometheus_names
from dynamo.planner.utils.prometheus import ( from dynamo.planner.utils.prometheus import (
FrontendMetric, FrontendMetric,
FrontendMetricContainer, FrontendMetricContainer,
...@@ -135,6 +137,8 @@ def test_frontend_metric_with_partial_data(): ...@@ -135,6 +137,8 @@ def test_frontend_metric_with_partial_data():
def test_get_average_metric_none_result(): def test_get_average_metric_none_result():
"""Test _get_average_metric when prometheus returns None""" """Test _get_average_metric when prometheus returns None"""
# TODO: Replace hardcoded port with allocate_port() from tests.utils.port_utils
# for xdist-safe parallel execution.
client = PrometheusAPIClient("http://localhost:9090", "test_namespace") client = PrometheusAPIClient("http://localhost:9090", "test_namespace")
with patch.object(client.prom, "custom_query") as mock_query: with patch.object(client.prom, "custom_query") as mock_query:
...@@ -255,3 +259,148 @@ def test_get_average_metric_multiple_matching_containers(mock_prometheus_result) ...@@ -255,3 +259,148 @@ def test_get_average_metric_multiple_matching_containers(mock_prometheus_result)
# Average of 42.7, 35.5, and 15.5 (using value[1] from each container) # Average of 42.7, 35.5, and 15.5 (using value[1] from each container)
expected = (42.7 + 35.5 + 15.5) / 3 expected = (42.7 + 35.5 + 15.5) / 3
assert result == expected assert result == expected
# ---------------------------------------------------------------------------
# Router metrics source tests
# ---------------------------------------------------------------------------
@pytest.fixture
def router_client():
"""PrometheusAPIClient configured with metrics_source='router'."""
# TODO: Replace hardcoded port with allocate_port() from tests.utils.port_utils
# for xdist-safe parallel execution.
client = PrometheusAPIClient(
"http://localhost:9090", "test-fe-namespace", metrics_source="router"
)
client.prom = MagicMock()
client.prom.custom_query.return_value = [{"value": [0, "42.0"]}]
return client
class TestPrometheusAPIClientRouterSource:
"""Tests for PrometheusAPIClient when metrics_source='router'."""
def test_get_avg_inter_token_latency_dispatches_to_router_histogram(
self, router_client
):
"""get_avg_inter_token_latency with router source queries dynamo_component_router_* metric."""
result = router_client.get_avg_inter_token_latency("60s", "mymodel")
assert result == 42.0
call_args = str(router_client.prom.custom_query.call_args)
expected_metric = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.INTER_TOKEN_LATENCY_SECONDS}"
assert expected_metric in call_args
def test_get_avg_time_to_first_token_dispatches_to_router_histogram(
self, router_client
):
"""get_avg_time_to_first_token with router source queries dynamo_component_router_* metric."""
result = router_client.get_avg_time_to_first_token("60s", "mymodel")
assert result == 42.0
call_args = str(router_client.prom.custom_query.call_args)
expected_metric = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.TIME_TO_FIRST_TOKEN_SECONDS}"
assert expected_metric in call_args
def test_get_avg_input_sequence_tokens_dispatches_to_router_histogram(
self, router_client
):
"""get_avg_input_sequence_tokens with router source queries dynamo_component_router_* metric."""
result = router_client.get_avg_input_sequence_tokens("60s", "mymodel")
assert result == 42.0
call_args = str(router_client.prom.custom_query.call_args)
expected_metric = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.INPUT_SEQUENCE_TOKENS}"
assert expected_metric in call_args
def test_get_avg_output_sequence_tokens_dispatches_to_router_histogram(
self, router_client
):
"""get_avg_output_sequence_tokens with router source queries dynamo_component_router_* metric."""
result = router_client.get_avg_output_sequence_tokens("60s", "mymodel")
assert result == 42.0
call_args = str(router_client.prom.custom_query.call_args)
expected_metric = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.OUTPUT_SEQUENCE_TOKENS}"
assert expected_metric in call_args
def test_get_avg_request_count_uses_router_requests_total(self, router_client):
"""get_avg_request_count with router source queries dynamo_component_router_requests_total."""
result = router_client.get_avg_request_count("60s", "mymodel")
assert result == 42.0
call_args = str(router_client.prom.custom_query.call_args)
expected_metric = f"{prometheus_names.name_prefix.COMPONENT}_{prometheus_names.router.REQUESTS_TOTAL}"
assert expected_metric in call_args
def test_dynamo_namespace_filter_in_router_histogram_query(self, router_client):
"""Router histogram query must filter by dynamo_namespace so each pool planner
only reads its own LocalRouter's metrics, not the cluster-wide aggregate.
dynamo_component_router_* metrics use MetricsHierarchy which injects dynamo_namespace
with underscores. DYN_NAMESPACE dashes are normalized to underscores for the PromQL filter.
"""
router_client.get_avg_inter_token_latency("60s", "mymodel")
call_args = str(router_client.prom.custom_query.call_args)
assert "dynamo_namespace" in call_args, (
"dynamo_namespace filter missing from router histogram query — "
"without it, all pool planners read the same cluster-wide aggregate"
)
# MetricsHierarchy injects underscores; DYN_NAMESPACE dashes are normalized
assert "test_fe_namespace" in call_args
def test_dynamo_namespace_filter_in_router_request_count_query(self, router_client):
"""Router request count query must filter by dynamo_namespace.
dynamo_component_router_* get dynamo_namespace from MetricsHierarchy (underscores).
"""
router_client.get_avg_request_count("60s", "mymodel")
call_args = str(router_client.prom.custom_query.call_args)
assert "dynamo_namespace" in call_args, (
"dynamo_namespace filter missing from router request count query — "
"without it, all pool planners read the same cluster-wide aggregate"
)
# MetricsHierarchy injects underscores; DYN_NAMESPACE dashes are normalized
assert "test_fe_namespace" in call_args
def test_router_histogram_returns_zero_on_empty_result(self, router_client):
"""_get_router_average_histogram returns 0 when Prometheus has no data."""
router_client.prom.custom_query.return_value = []
result = router_client.get_avg_inter_token_latency("60s", "mymodel")
assert result == 0
def test_router_request_count_returns_zero_on_empty_result(self, router_client):
"""get_avg_request_count (router) returns 0 when Prometheus has no data."""
router_client.prom.custom_query.return_value = []
result = router_client.get_avg_request_count("60s", "mymodel")
assert result == 0
def test_router_histogram_returns_zero_on_nan(self, router_client):
"""_get_router_average_histogram returns 0 when value is NaN."""
router_client.prom.custom_query.return_value = [{"value": [0, "NaN"]}]
result = router_client.get_avg_inter_token_latency("60s", "mymodel")
assert result == 0
def test_warn_if_router_not_scraped_logs_warning_when_absent(
self, router_client, caplog
):
"""warn_if_router_not_scraped logs a warning when absent() returns a result."""
router_client.prom.custom_query.return_value = [{"value": [0, "1"]}]
with caplog.at_level(logging.WARNING):
router_client.warn_if_router_not_scraped()
assert any(
"No 'dynamo_component_router_requests_total'" in r.message
for r in caplog.records
)
def test_warn_if_router_not_scraped_silent_when_present(
self, router_client, caplog
):
"""warn_if_router_not_scraped is silent when the metric exists (absent() returns empty)."""
router_client.prom.custom_query.return_value = []
with caplog.at_level(logging.WARNING):
router_client.warn_if_router_not_scraped()
assert not any(
"dynamo_component_router_requests_total" in r.message
for r in caplog.records
)
...@@ -18,6 +18,13 @@ from dynamo.planner.remote_planner_client import RemotePlannerClient ...@@ -18,6 +18,13 @@ from dynamo.planner.remote_planner_client import RemotePlannerClient
from dynamo.planner.scale_protocol import ScaleRequest, ScaleResponse, ScaleStatus from dynamo.planner.scale_protocol import ScaleRequest, ScaleResponse, ScaleStatus
from dynamo.planner.utils.exceptions import EmptyTargetReplicasError from dynamo.planner.utils.exceptions import EmptyTargetReplicasError
async def _async_responses(*items):
"""Async generator helper: yields each item in sequence, simulating a stream."""
for item in items:
yield item
pytestmark = [ pytestmark = [
pytest.mark.gpu_0, pytest.mark.gpu_0,
pytest.mark.pre_merge, pytest.mark.pre_merge,
...@@ -37,13 +44,14 @@ def mock_runtime(): ...@@ -37,13 +44,14 @@ def mock_runtime():
endpoint_mock.client = AsyncMock(return_value=client_mock) endpoint_mock.client = AsyncMock(return_value=client_mock)
client_mock.wait_for_instances = AsyncMock() client_mock.wait_for_instances = AsyncMock()
# Mock scale_request to return a response # Mock generate to return a single-item async stream with the response dict
client_mock.scale_request = AsyncMock( response_data = {
return_value={ "status": "success",
"status": "success", "message": "Scaled successfully",
"message": "Scaled successfully", "current_replicas": {"prefill": 3, "decode": 5},
"current_replicas": {"prefill": 3, "decode": 5}, }
} client_mock.generate = AsyncMock(
side_effect=lambda _: _async_responses(response_data)
) )
return runtime, client_mock return runtime, client_mock
...@@ -92,13 +100,15 @@ async def test_send_scale_request_error(): ...@@ -92,13 +100,15 @@ async def test_send_scale_request_error():
endpoint_mock.client = AsyncMock(return_value=client_mock) endpoint_mock.client = AsyncMock(return_value=client_mock)
client_mock.wait_for_instances = AsyncMock() client_mock.wait_for_instances = AsyncMock()
# Mock scale_request to return error response # Mock generate to return a single-item async stream with the error response dict
client_mock.scale_request = AsyncMock( client_mock.generate = AsyncMock(
return_value={ side_effect=lambda _: _async_responses(
"status": "error", {
"message": "Namespace not authorized", "status": "error",
"current_replicas": {}, "message": "Namespace not authorized",
} "current_replicas": {},
}
)
) )
client = RemotePlannerClient(runtime, "central-ns", "Planner") client = RemotePlannerClient(runtime, "central-ns", "Planner")
...@@ -131,8 +141,8 @@ async def test_send_scale_request_no_response(): ...@@ -131,8 +141,8 @@ async def test_send_scale_request_no_response():
endpoint_mock.client = AsyncMock(return_value=client_mock) endpoint_mock.client = AsyncMock(return_value=client_mock)
client_mock.wait_for_instances = AsyncMock() client_mock.wait_for_instances = AsyncMock()
# Mock scale_request to return None # Mock generate to return an empty async stream (no items → no response)
client_mock.scale_request = AsyncMock(return_value=None) client_mock.generate = AsyncMock(side_effect=lambda _: _async_responses())
client = RemotePlannerClient(runtime, "central-ns", "Planner") client = RemotePlannerClient(runtime, "central-ns", "Planner")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment