Unverified Commit 6d44b904 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

fix(planner): DYN-2776 multi-DGD + GlobalPlanner scaling and readiness (#8482)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
Co-authored-by: default avatarClaude Opus 4.7 (1M context) <noreply@anthropic.com>
parent 77a0051a
...@@ -102,7 +102,13 @@ async def main(runtime: DistributedRuntime, args): ...@@ -102,7 +102,13 @@ async def main(runtime: DistributedRuntime, args):
await scale_endpoint.serve_endpoint(handler.scale_request) await scale_endpoint.serve_endpoint(handler.scale_request)
logger.info(" ✓ scale_request - Receives scaling requests from Planners") logger.info(" ✓ scale_request - Receives scaling requests from Planners")
# Serve health check endpoint # Serve health check endpoint.
# Passing health_check_payload registers this endpoint as a health-check
# target so the runtime's system status server flips to HealthStatus::Ready
# once the handler is registered. Without it, the operator-injected HTTP
# probes on :system/live and :system/health return 503 forever and the pod
# never becomes Ready. Mirrors the pool-Planner entrypoint pattern at
# components/src/dynamo/planner/__main__.py.
async def health_check(request: HealthCheckRequest): async def health_check(request: HealthCheckRequest):
"""Health check endpoint for monitoring""" """Health check endpoint for monitoring"""
yield { yield {
...@@ -113,7 +119,10 @@ async def main(runtime: DistributedRuntime, args): ...@@ -113,7 +119,10 @@ async def main(runtime: DistributedRuntime, args):
} }
health_endpoint = runtime.endpoint(f"{namespace}.GlobalPlanner.health") health_endpoint = runtime.endpoint(f"{namespace}.GlobalPlanner.health")
await health_endpoint.serve_endpoint(health_check) await health_endpoint.serve_endpoint(
health_check,
health_check_payload={"text": "health"},
)
logger.info(" ✓ health - Health check endpoint") logger.info(" ✓ health - Health check endpoint")
logger.info("=" * 60) logger.info("=" * 60)
......
...@@ -8,11 +8,25 @@ import os ...@@ -8,11 +8,25 @@ import os
import time import time
from typing import Optional from typing import Optional
from kubernetes.client import ApiException
from kubernetes.config.config_exception import ConfigException
from dynamo.planner.config.defaults import SubComponentType, TargetReplica from dynamo.planner.config.defaults import SubComponentType, TargetReplica
from dynamo.planner.connectors.base import PlannerConnector from dynamo.planner.connectors.base import PlannerConnector
from dynamo.planner.connectors.kubernetes import KubernetesConnector
from dynamo.planner.connectors.protocol import ScaleRequest, ScaleStatus from dynamo.planner.connectors.protocol import ScaleRequest, ScaleStatus
from dynamo.planner.connectors.remote_client import RemotePlannerClient from dynamo.planner.connectors.remote_client import RemotePlannerClient
from dynamo.planner.errors import EmptyTargetReplicasError from dynamo.planner.errors import (
DeploymentModelNameMismatchError,
DeploymentValidationError,
EmptyTargetReplicasError,
ModelNameNotFoundError,
UserProvidedModelNameMismatchError,
)
from dynamo.planner.monitoring.worker_info import (
WorkerInfo,
build_worker_info_from_defaults,
)
from dynamo.runtime import DistributedRuntime from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging from dynamo.runtime.logging import configure_dynamo_logging
...@@ -57,6 +71,13 @@ class GlobalPlannerConnector(PlannerConnector): ...@@ -57,6 +71,13 @@ class GlobalPlannerConnector(PlannerConnector):
# Cache for predicted load (will be set by planner before scaling) # Cache for predicted load (will be set by planner before scaling)
self.last_predicted_load: Optional[dict] = None self.last_predicted_load: Optional[dict] = None
# Lazily-initialized KubernetesConnector scoped to the pool's own DGD.
# Used only to read pool-local MDC / DGD args for capability discovery
# (get_worker_info, get_model_name). Scaling actions still go through
# the RemotePlannerClient.
self._local_k8s_connector: Optional[KubernetesConnector] = None
self._local_k8s_init_attempted: bool = False
async def _async_init(self): async def _async_init(self):
"""Async initialization - creates RemotePlannerClient""" """Async initialization - creates RemotePlannerClient"""
self.remote_client = RemotePlannerClient( self.remote_client = RemotePlannerClient(
...@@ -207,13 +228,71 @@ class GlobalPlannerConnector(PlannerConnector): ...@@ -207,13 +228,71 @@ class GlobalPlannerConnector(PlannerConnector):
"(GlobalPlanner manages deployment state)" "(GlobalPlanner manages deployment state)"
) )
def _get_local_k8s_connector(self) -> Optional[KubernetesConnector]:
"""Lazily build a KubernetesConnector scoped to the pool's own DGD.
The pool's Planner pod has access to its own DGD and the
DynamoWorkerMetadata CRs of its own workers, even under
``environment: global-planner``. Querying them directly is the
simplest way to populate per-engine capabilities (context_length,
max_kv_tokens, ...) that load-scaling needs. Returns ``None`` if
the connector can't be created (e.g. running outside a cluster).
"""
if self._local_k8s_init_attempted:
return self._local_k8s_connector
self._local_k8s_init_attempted = True
try:
self._local_k8s_connector = KubernetesConnector(
dynamo_namespace=self.dynamo_namespace,
model_name=self.model_name,
)
except (DeploymentValidationError, ConfigException, ApiException) as e:
logger.warning(
"GlobalPlannerConnector: could not initialize local "
f"KubernetesConnector for MDC capability lookup: {e}. "
"Falling back to hard-coded worker defaults; easy-mode "
"load scaling will be disabled."
)
return self._local_k8s_connector
def get_worker_info(
self,
sub_component_type: SubComponentType,
backend: str = "vllm",
) -> WorkerInfo:
"""Resolve per-worker capabilities from the pool's own MDC/DGD.
Without this, ``resolve_worker_info`` falls through to
``build_worker_info_from_defaults`` which leaves ``context_length``
and ``max_kv_tokens`` unset, and load_scaling's easy-mode decisions
bail out every tick — so the pool Planner silently sends no
ScaleRequests.
"""
local = self._get_local_k8s_connector()
if local is not None:
return local.get_worker_info(sub_component_type, backend)
return build_worker_info_from_defaults(backend, sub_component_type)
def get_model_name(self, **kwargs) -> str: def get_model_name(self, **kwargs) -> str:
""" """
Get model name. Get model name.
Returns the model name if provided during initialization, otherwise Prefers the value provided at init time, then the pool's own DGD
returns a placeholder indicating the model is managed remotely. container args (via the local KubernetesConnector), and finally
falls back to a placeholder indicating the model is managed
remotely.
""" """
if self.model_name: if self.model_name:
return self.model_name return self.model_name
local = self._get_local_k8s_connector()
if local is not None:
try:
return local.get_model_name(**kwargs)
except (
ModelNameNotFoundError,
DeploymentModelNameMismatchError,
UserProvidedModelNameMismatchError,
ApiException,
) as e:
logger.warning(f"Could not resolve model name from local DGD args: {e}")
return "managed-remotely" return "managed-remotely"
...@@ -16,7 +16,8 @@ from dynamo.planner import SubComponentType, TargetReplica ...@@ -16,7 +16,8 @@ from dynamo.planner import SubComponentType, TargetReplica
from dynamo.planner.connectors.global_planner import GlobalPlannerConnector from dynamo.planner.connectors.global_planner import GlobalPlannerConnector
from dynamo.planner.connectors.protocol import ScaleRequest, ScaleResponse, ScaleStatus from dynamo.planner.connectors.protocol import ScaleRequest, ScaleResponse, ScaleStatus
from dynamo.planner.connectors.remote_client import RemotePlannerClient from dynamo.planner.connectors.remote_client import RemotePlannerClient
from dynamo.planner.errors import EmptyTargetReplicasError from dynamo.planner.errors import DeploymentValidationError, EmptyTargetReplicasError
from dynamo.planner.monitoring.worker_info import WorkerInfo
async def _async_responses(*items): async def _async_responses(*items):
...@@ -330,15 +331,80 @@ async def test_connector_unsupported_and_noop_operations(connector): ...@@ -330,15 +331,80 @@ async def test_connector_unsupported_and_noop_operations(connector):
def test_connector_model_name_and_predicted_load(connector_runtime): def test_connector_model_name_and_predicted_load(connector_runtime):
"""Test GlobalPlannerConnector model name and predicted load tracking""" """Test GlobalPlannerConnector model name and predicted load tracking.
# With model name
The ``model_name=None`` branch forces KubernetesConnector init to raise
so the connector falls back to the "managed-remotely" placeholder
deterministically, independent of the caller's kube config.
"""
# With model name — local connector is never consulted.
c1 = GlobalPlannerConnector(connector_runtime, "ns", "gns", "GP", model_name="test") c1 = GlobalPlannerConnector(connector_runtime, "ns", "gns", "GP", model_name="test")
assert c1.get_model_name() == "test" assert c1.get_model_name() == "test"
# Without model name # Without model name — force the local connector init to fail so we
c2 = GlobalPlannerConnector(connector_runtime, "ns", "gns", "GP", model_name=None) # exercise the fallback deterministically.
with patch(
"dynamo.planner.connectors.global_planner.KubernetesConnector",
side_effect=DeploymentValidationError(["forced test failure"]),
):
c2 = GlobalPlannerConnector(
connector_runtime, "ns", "gns", "GP", model_name=None
)
assert c2.get_model_name() == "managed-remotely" assert c2.get_model_name() == "managed-remotely"
# Predicted load # Predicted load
c1.set_predicted_load(42.0, 256.0, 128.0) c1.set_predicted_load(42.0, 256.0, 128.0)
assert c1.last_predicted_load == {"num_requests": 42.0, "isl": 256.0, "osl": 128.0} assert c1.last_predicted_load == {"num_requests": 42.0, "isl": 256.0, "osl": 128.0}
def test_connector_get_worker_info_delegates_to_local_k8s(connector_runtime):
"""get_worker_info should delegate to a pool-local KubernetesConnector
so that MDC-populated capabilities (context_length, max_kv_tokens, ...)
reach load-scaling under environment=global-planner.
"""
c = GlobalPlannerConnector(connector_runtime, "ns", "gns", "GP", model_name="test")
mdc_info = WorkerInfo(
k8s_name="VllmPrefillWorker",
component_name="prefill",
endpoint="generate",
context_length=32768,
total_kv_blocks=1000,
kv_cache_block_size=16,
)
fake_local = MagicMock()
fake_local.get_worker_info = MagicMock(return_value=mdc_info)
fake_local.get_model_name = MagicMock(return_value="Qwen/Qwen3-8B")
c._local_k8s_connector = fake_local
c._local_k8s_init_attempted = True
info = c.get_worker_info(SubComponentType.PREFILL, backend="vllm")
assert info.context_length == 32768
assert info.max_kv_tokens == 16000
fake_local.get_worker_info.assert_called_once_with(SubComponentType.PREFILL, "vllm")
# get_model_name should prefer the init-time value and not call the
# local connector when one was provided.
assert c.get_model_name() == "test"
fake_local.get_model_name.assert_not_called()
def test_connector_get_worker_info_falls_back_on_local_init_failure(connector_runtime):
"""If the pool-local KubernetesConnector can't be created (e.g. outside
a cluster), get_worker_info should fall back to hard-coded defaults
rather than raising. Forces the init failure explicitly so the test
doesn't depend on the caller's kube config.
"""
with patch(
"dynamo.planner.connectors.global_planner.KubernetesConnector",
side_effect=DeploymentValidationError(["forced test failure"]),
):
c = GlobalPlannerConnector(
connector_runtime, "ns", "gns", "GP", model_name="test"
)
info = c.get_worker_info(SubComponentType.PREFILL, backend="vllm")
# Defaults populate component identifiers but leave capability fields
# unset — callers use this to detect "no MDC" without crashing.
assert info.context_length is None
assert info.max_kv_tokens is None
assert info.component_name is not None
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment