Unverified Commit 135bce43 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

chore: clearer prometheus port handling in planner (#4707)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent 7e499b5c
...@@ -139,6 +139,8 @@ def generate_dgd_config_with_planner( ...@@ -139,6 +139,8 @@ def generate_dgd_config_with_planner(
# Override profiling-specific arguments with results from profiling # Override profiling-specific arguments with results from profiling
# Remove and re-add to ensure correct values from profiling context # Remove and re-add to ensure correct values from profiling context
# Note: --namespace is NOT added here; planner gets it from DYN_NAMESPACE env var
# which is automatically injected by the operator based on dynamoNamespace
planner_args = [ planner_args = [
arg arg
for arg in planner_args for arg in planner_args
...@@ -154,11 +156,9 @@ def generate_dgd_config_with_planner( ...@@ -154,11 +156,9 @@ def generate_dgd_config_with_planner(
] ]
# Add arguments determined by profiling results # Add arguments determined by profiling results
frontend_namespace = getattr(config.spec.services["Frontend"], "dynamoNamespace", "dynamo") # type: ignore[attr-defined]
cm_mount_path = f"{get_workspace_dir()}/profiling_results" cm_mount_path = f"{get_workspace_dir()}/profiling_results"
planner_args.extend( planner_args.extend(
[ [
f"--namespace={frontend_namespace}",
f"--prefill-engine-num-gpu={best_prefill_mapping.get_num_gpus()}", f"--prefill-engine-num-gpu={best_prefill_mapping.get_num_gpus()}",
f"--decode-engine-num-gpu={best_decode_mapping.get_num_gpus()}", f"--decode-engine-num-gpu={best_decode_mapping.get_num_gpus()}",
f"--profile-results-dir={cm_mount_path}", f"--profile-results-dir={cm_mount_path}",
...@@ -375,7 +375,6 @@ def _generate_mocker_config_with_planner( ...@@ -375,7 +375,6 @@ def _generate_mocker_config_with_planner(
# Update planner's dynamoNamespace to match mocker's namespace # Update planner's dynamoNamespace to match mocker's namespace
mocker_planner_dict["dynamoNamespace"] = mocker_namespace mocker_planner_dict["dynamoNamespace"] = mocker_namespace
# Override --backend to mocker and --namespace to match mocker's dynamoNamespace
# Planner args use --key=value format, so we need to find and replace # Planner args use --key=value format, so we need to find and replace
planner_main_container = mocker_planner_dict.get("extraPodSpec", {}).get( planner_main_container = mocker_planner_dict.get("extraPodSpec", {}).get(
"mainContainer", {} "mainContainer", {}
...@@ -385,8 +384,6 @@ def _generate_mocker_config_with_planner( ...@@ -385,8 +384,6 @@ def _generate_mocker_config_with_planner(
for arg in planner_args: for arg in planner_args:
if arg.startswith("--backend="): if arg.startswith("--backend="):
updated_planner_args.append("--backend=mocker") updated_planner_args.append("--backend=mocker")
elif arg.startswith("--namespace="):
updated_planner_args.append(f"--namespace={mocker_namespace}")
else: else:
updated_planner_args.append(arg) updated_planner_args.append(arg)
planner_main_container["args"] = updated_planner_args planner_main_container["args"] = updated_planner_args
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
import argparse import argparse
from typing import Any
from dynamo.planner.utils.planner_argparse import create_sla_planner_parser from dynamo.planner.utils.planner_argparse import create_sla_planner_parser
...@@ -79,13 +80,35 @@ def _build_action_kwargs( ...@@ -79,13 +80,35 @@ def _build_action_kwargs(
return kwargs return kwargs
def _format_arg_for_command_line(arg_name: str, value) -> list[str]: def _get_planner_defaults() -> dict[str, Any]:
"""
Get default values for all planner arguments from the planner parser.
Returns:
Dictionary mapping argument names (with dashes) to their default values
"""
planner_parser = create_sla_planner_parser()
defaults = {}
for action in planner_parser._actions:
if action.dest == "help" or not action.option_strings:
continue
# Convert dest (underscores) to arg name (dashes)
arg_name = action.dest.replace("_", "-")
defaults[arg_name] = action.default
return defaults
def _format_arg_for_command_line(
arg_name: str, value, defaults: dict[str, Any] | None = None
) -> list[str]:
""" """
Format an argument name and value for command line usage. Format an argument name and value for command line usage.
Args: Args:
arg_name: The argument name (without dashes) arg_name: The argument name (without dashes)
value: The argument value value: The argument value
defaults: Optional dict of default values. If provided and value matches
the default, the arg is skipped (allows operator env vars to take effect)
Returns: Returns:
List of command-line argument strings (empty list if value is None or False bool) List of command-line argument strings (empty list if value is None or False bool)
...@@ -93,6 +116,13 @@ def _format_arg_for_command_line(arg_name: str, value) -> list[str]: ...@@ -93,6 +116,13 @@ def _format_arg_for_command_line(arg_name: str, value) -> list[str]:
if value is None: if value is None:
return [] return []
# Skip args that match their default values
# This allows the operator's injected env vars to take effect
# (e.g., PLANNER_PROMETHEUS_PORT=9085 won't be overridden by --prometheus-port=0)
if defaults is not None and arg_name in defaults:
if value == defaults[arg_name]:
return []
if isinstance(value, bool): if isinstance(value, bool):
# For boolean flags, only add if True # For boolean flags, only add if True
if value: if value:
...@@ -104,7 +134,10 @@ def _format_arg_for_command_line(arg_name: str, value) -> list[str]: ...@@ -104,7 +134,10 @@ def _format_arg_for_command_line(arg_name: str, value) -> list[str]:
def _collect_args_from_namespace( def _collect_args_from_namespace(
args: argparse.Namespace, arg_names: list[str], prefix_to_strip: str = "" args: argparse.Namespace,
arg_names: list[str],
prefix_to_strip: str = "",
defaults: dict[str, Any] | None = None,
) -> list[str]: ) -> list[str]:
""" """
Collect and format command-line arguments from a namespace for given attribute names. Collect and format command-line arguments from a namespace for given attribute names.
...@@ -113,6 +146,7 @@ def _collect_args_from_namespace( ...@@ -113,6 +146,7 @@ def _collect_args_from_namespace(
args: The argparse Namespace containing parsed arguments args: The argparse Namespace containing parsed arguments
arg_names: List of attribute names to collect from the namespace arg_names: List of attribute names to collect from the namespace
prefix_to_strip: Optional prefix to remove from attribute names before formatting prefix_to_strip: Optional prefix to remove from attribute names before formatting
defaults: Optional dict of default values. Args matching defaults are skipped.
Returns: Returns:
List of formatted command-line argument strings List of formatted command-line argument strings
...@@ -125,7 +159,7 @@ def _collect_args_from_namespace( ...@@ -125,7 +159,7 @@ def _collect_args_from_namespace(
arg_name = attr_name[len(prefix_to_strip) :].replace("_", "-") arg_name = attr_name[len(prefix_to_strip) :].replace("_", "-")
else: else:
arg_name = attr_name.replace("_", "-") arg_name = attr_name.replace("_", "-")
result.extend(_format_arg_for_command_line(arg_name, value)) result.extend(_format_arg_for_command_line(arg_name, value, defaults))
return result return result
...@@ -182,6 +216,9 @@ def build_planner_args_from_namespace( ...@@ -182,6 +216,9 @@ def build_planner_args_from_namespace(
Automatically detects shared arguments between profile_sla and planner, Automatically detects shared arguments between profile_sla and planner,
and uses profile_sla values for those. and uses profile_sla values for those.
Args that match their default values are skipped, allowing the operator's
injected environment variables to take effect (e.g., PLANNER_PROMETHEUS_PORT).
Args: Args:
args: Parsed arguments namespace args: Parsed arguments namespace
prefix: Prefix used for planner arguments prefix: Prefix used for planner arguments
...@@ -191,6 +228,10 @@ def build_planner_args_from_namespace( ...@@ -191,6 +228,10 @@ def build_planner_args_from_namespace(
""" """
planner_args = [] planner_args = []
# Get default values to skip args that match defaults
# This allows operator-injected env vars to take effect
defaults = _get_planner_defaults()
# Auto-detect shared arguments by comparing planner parser with args namespace # Auto-detect shared arguments by comparing planner parser with args namespace
planner_parser = create_sla_planner_parser() planner_parser = create_sla_planner_parser()
planner_arg_dests = { planner_arg_dests = {
...@@ -204,12 +245,16 @@ def build_planner_args_from_namespace( ...@@ -204,12 +245,16 @@ def build_planner_args_from_namespace(
shared_arg_dests = {dest for dest in planner_arg_dests if hasattr(args, dest)} shared_arg_dests = {dest for dest in planner_arg_dests if hasattr(args, dest)}
# Add shared arguments from profile_sla (without prefix) # Add shared arguments from profile_sla (without prefix)
planner_args.extend(_collect_args_from_namespace(args, list(shared_arg_dests))) planner_args.extend(
_collect_args_from_namespace(args, list(shared_arg_dests), defaults=defaults)
)
# Get all planner-prefixed attributes from args (planner-specific only) # Get all planner-prefixed attributes from args (planner-specific only)
prefixed_attrs = [attr for attr in dir(args) if attr.startswith(prefix)] prefixed_attrs = [attr for attr in dir(args) if attr.startswith(prefix)]
planner_args.extend( planner_args.extend(
_collect_args_from_namespace(args, prefixed_attrs, prefix_to_strip=prefix) _collect_args_from_namespace(
args, prefixed_attrs, prefix_to_strip=prefix, defaults=defaults
)
) )
return planner_args return planner_args
...@@ -21,7 +21,6 @@ from typing import Optional ...@@ -21,7 +21,6 @@ from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
from dynamo.planner.kube import get_current_k8s_namespace
from dynamo.planner.utils.exceptions import ( from dynamo.planner.utils.exceptions import (
DuplicateSubComponentError, DuplicateSubComponentError,
SubComponentNotFoundError, SubComponentNotFoundError,
...@@ -32,17 +31,10 @@ configure_dynamo_logging() ...@@ -32,17 +31,10 @@ configure_dynamo_logging()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _get_prometheus_port_from_env():
"""
Get prometheus port from environment variables if set.
Otherwise, return 0, which means not reporting metrics using prometheus.
"""
return os.environ.get("PLANNER_PROMETHEUS_PORT", 0)
# Source of truth for planner defaults # Source of truth for planner defaults
class BasePlannerDefaults: class BasePlannerDefaults:
namespace = "dynamo" # Namespace from DYN_NAMESPACE env var (injected by operator as "{k8s_namespace}-{dgd_name}")
namespace = os.environ.get("DYN_NAMESPACE", "dynamo")
environment = "kubernetes" environment = "kubernetes"
backend = "vllm" backend = "vllm"
no_operation = False no_operation = False
...@@ -52,7 +44,8 @@ class BasePlannerDefaults: ...@@ -52,7 +44,8 @@ class BasePlannerDefaults:
min_endpoint = 1 # applies to both decode and prefill min_endpoint = 1 # applies to both decode and prefill
decode_engine_num_gpu = 1 decode_engine_num_gpu = 1
prefill_engine_num_gpu = 1 prefill_engine_num_gpu = 1
prometheus_port = _get_prometheus_port_from_env() # Port for exposing planner's own metrics (0 means disabled)
metric_reporting_prometheus_port = int(os.environ.get("PLANNER_PROMETHEUS_PORT", 0))
class LoadPlannerDefaults(BasePlannerDefaults): class LoadPlannerDefaults(BasePlannerDefaults):
...@@ -63,29 +56,12 @@ class LoadPlannerDefaults(BasePlannerDefaults): ...@@ -63,29 +56,12 @@ class LoadPlannerDefaults(BasePlannerDefaults):
prefill_queue_scale_down_threshold = 0.2 prefill_queue_scale_down_threshold = 0.2
def _get_default_prometheus_endpoint(port: str, namespace: str):
"""Compute default prometheus endpoint using environment variables and Kubernetes service discovery"""
prometheus_endpoint = os.environ.get("PROMETHEUS_ENDPOINT", "").strip()
if prometheus_endpoint:
logger.debug("Using PROMETHEUS_ENDPOINT override: %s", prometheus_endpoint)
return prometheus_endpoint
k8s_namespace = get_current_k8s_namespace()
if k8s_namespace and k8s_namespace != "default":
prometheus_service = f"{namespace}-prometheus"
return f"http://{prometheus_service}.{k8s_namespace}.svc.cluster.local:{port}"
else:
logger.warning(
f"Cannot determine Prometheus endpoint. Running in namespace '{k8s_namespace}'. "
"Ensure the planner is deployed in a Kubernetes cluster with proper namespace configuration."
)
return f"{namespace}-prometheus"
class SLAPlannerDefaults(BasePlannerDefaults): class SLAPlannerDefaults(BasePlannerDefaults):
port = os.environ.get("PROMETHEUS_PORT", "9090") # Prometheus endpoint URL for pulling/querying metrics
namespace = os.environ.get("DYN_NAMESPACE", "vllm-disagg-planner") metric_pulling_prometheus_endpoint = os.environ.get(
prometheus_endpoint = _get_default_prometheus_endpoint(port, namespace) "PROMETHEUS_ENDPOINT",
"http://prometheus-kube-prometheus-prometheus.monitoring.svc.cluster.local:9090",
)
profile_results_dir = "profiling_results" profile_results_dir = "profiling_results"
isl = 3000 # in number of tokens isl = 3000 # in number of tokens
osl = 150 # in number of tokens osl = 150 # in number of tokens
......
...@@ -34,7 +34,7 @@ def create_sla_planner_parser() -> argparse.ArgumentParser: ...@@ -34,7 +34,7 @@ def create_sla_planner_parser() -> argparse.ArgumentParser:
parser.add_argument( parser.add_argument(
"--namespace", "--namespace",
default=SLAPlannerDefaults.namespace, default=SLAPlannerDefaults.namespace,
help="Namespace", help="Dynamo namespace",
) )
parser.add_argument( parser.add_argument(
"--backend", "--backend",
...@@ -110,10 +110,16 @@ def create_sla_planner_parser() -> argparse.ArgumentParser: ...@@ -110,10 +110,16 @@ def create_sla_planner_parser() -> argparse.ArgumentParser:
help="Load prediction window size", help="Load prediction window size",
) )
parser.add_argument( parser.add_argument(
"--prometheus-port", "--metric-pulling-prometheus-endpoint",
type=str,
default=SLAPlannerDefaults.metric_pulling_prometheus_endpoint,
help="Prometheus endpoint URL for pulling dynamo deployment metrics",
)
parser.add_argument(
"--metric-reporting-prometheus-port",
type=int, type=int,
default=SLAPlannerDefaults.prometheus_port, default=SLAPlannerDefaults.metric_reporting_prometheus_port,
help="Prometheus port", help="Port for exposing planner's own metrics to Prometheus",
) )
parser.add_argument( parser.add_argument(
"--no-correction", "--no-correction",
......
...@@ -17,7 +17,7 @@ from dynamo.planner import ( ...@@ -17,7 +17,7 @@ from dynamo.planner import (
TargetReplica, TargetReplica,
VirtualConnector, VirtualConnector,
) )
from dynamo.planner.defaults import WORKER_COMPONENT_NAMES, SLAPlannerDefaults from dynamo.planner.defaults import WORKER_COMPONENT_NAMES
from dynamo.planner.utils.load_predictor import LOAD_PREDICTORS from dynamo.planner.utils.load_predictor import LOAD_PREDICTORS
from dynamo.planner.utils.perf_interpolation import ( from dynamo.planner.utils.perf_interpolation import (
DecodeInterpolator, DecodeInterpolator,
...@@ -90,7 +90,7 @@ class Planner: ...@@ -90,7 +90,7 @@ class Planner:
raise ValueError(f"Invalid environment: {args.environment}") raise ValueError(f"Invalid environment: {args.environment}")
self.prometheus_api_client = PrometheusAPIClient( self.prometheus_api_client = PrometheusAPIClient(
SLAPlannerDefaults.prometheus_endpoint, args.metric_pulling_prometheus_endpoint,
args.namespace, args.namespace,
) )
...@@ -150,7 +150,7 @@ class Planner: ...@@ -150,7 +150,7 @@ class Planner:
self.last_adjustment_time = time.time() self.last_adjustment_time = time.time()
self.last_metrics = Metrics() self.last_metrics = Metrics()
self.prometheus_port = args.prometheus_port self.prometheus_port = args.metric_reporting_prometheus_port
# Initialize Prometheus metrics # Initialize Prometheus metrics
# TODO: use proper naming # TODO: use proper naming
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment