Unverified Commit 2176c431 authored by Thomas Montfort's avatar Thomas Montfort Committed by GitHub
Browse files

feat(planner): Derive prefill/decode GPU counts from DGD (#5919)

parent 8daacbd7
......@@ -183,6 +183,41 @@ class Service(BaseModel):
return None
def get_gpu_count(self) -> int:
"""Get the GPU count from the service's resource specification.
GPU count is read from spec.services.[ServiceName].resources.limits.gpu,
falling back to requests.gpu if limits is not specified.
Returns:
The number of GPUs configured for this service
Raises:
ValueError: If GPU count is not specified or invalid
"""
resources = self.service.get("resources", {})
limits = resources.get("limits", {})
requests = resources.get("requests", {})
# Prefer limits, fall back to requests. For GPUs, Kubernetes device plugins
# typically treat requests and limits as equivalent since GPUs are
# non-compressible and allocated exclusively (no fractional sharing).
gpu_str = limits.get("gpu") or requests.get("gpu")
if gpu_str is None:
raise ValueError(
f"No GPU count specified for service '{self.name}'. "
f"Please set resources.limits.gpu or resources.requests.gpu in the DGD."
)
try:
return int(gpu_str)
except (ValueError, TypeError):
raise ValueError(
f"Invalid GPU count '{gpu_str}' for service '{self.name}'. "
f"GPU count must be an integer."
)
# TODO: still supporting framework component names for backwards compatibility
# Should be deprecated in favor of service subComponentType
......
......@@ -227,6 +227,57 @@ class KubernetesConnector(PlannerConnector):
return model_name
def get_gpu_counts(
self,
deployment: Optional[dict] = None,
require_prefill: bool = True,
require_decode: bool = True,
) -> tuple[int, int]:
"""Get the GPU counts for prefill and decode services from the deployment.
Args:
deployment: Optional deployment dict, fetched if not provided
require_prefill: Whether to require prefill service
require_decode: Whether to require decode service
Returns:
Tuple of (prefill_gpu_count, decode_gpu_count)
Raises:
DeploymentValidationError: If GPU counts cannot be determined from DGD
"""
if deployment is None:
deployment = self.kube_api.get_graph_deployment(self.graph_deployment_name)
prefill_gpu_count = 0
decode_gpu_count = 0
errors = []
if require_prefill:
try:
prefill_service = get_service_from_sub_component_type_or_name(
deployment,
SubComponentType.PREFILL,
)
prefill_gpu_count = prefill_service.get_gpu_count()
except (PlannerError, ValueError) as e:
errors.append(f"Failed to get prefill GPU count: {e}")
if require_decode:
try:
decode_service = get_service_from_sub_component_type_or_name(
deployment,
SubComponentType.DECODE,
)
decode_gpu_count = decode_service.get_gpu_count()
except (PlannerError, ValueError) as e:
errors.append(f"Failed to get decode GPU count: {e}")
if errors:
raise DeploymentValidationError(errors)
return prefill_gpu_count, decode_gpu_count
async def wait_for_deployment_ready(self):
"""Wait for the deployment to be ready"""
await self.kube_api.wait_for_graph_deployment_ready(
......
......@@ -16,6 +16,12 @@ from dynamo.planner.utils.trace_data_extractor import extract_metrics_from_moonc
def run_sla_planner_dryrun(args: argparse.Namespace) -> None:
# Dryrun mode: use defaults if GPU counts not provided (no DGD available)
if args.prefill_engine_num_gpu is None:
args.prefill_engine_num_gpu = 1
if args.decode_engine_num_gpu is None:
args.decode_engine_num_gpu = 1
warmup_metrics = None
if getattr(args, "load_predictor_warmup_trace", None):
warmup_metrics = extract_metrics_from_mooncake(
......
......@@ -78,14 +78,16 @@ def create_sla_planner_parser() -> argparse.ArgumentParser:
parser.add_argument(
"--decode-engine-num-gpu",
type=int,
default=SLAPlannerDefaults.decode_engine_num_gpu,
help="Number of GPUs for decode engine",
default=None,
help="Number of GPUs per decode engine. In Kubernetes mode, this is auto-detected "
"from DGD resources but can be overridden (e.g., for mockers without GPU resources).",
)
parser.add_argument(
"--prefill-engine-num-gpu",
type=int,
default=SLAPlannerDefaults.prefill_engine_num_gpu,
help="Number of GPUs for prefill engine",
default=None,
help="Number of GPUs per prefill engine. In Kubernetes mode, this is auto-detected "
"from DGD resources but can be overridden (e.g., for mockers without GPU resources).",
)
parser.add_argument(
"--profile-results-dir",
......
......@@ -18,6 +18,7 @@ from dynamo.planner import (
VirtualConnector,
)
from dynamo.planner.defaults import WORKER_COMPONENT_NAMES
from dynamo.planner.utils.exceptions import DeploymentValidationError
from dynamo.planner.utils.load_predictor import LOAD_PREDICTORS
from dynamo.planner.utils.perf_interpolation import (
DecodeInterpolator,
......@@ -206,6 +207,54 @@ def _apply_component_gpu_budget(
return next_num
def _initialize_gpu_counts(
args: argparse.Namespace,
connector,
require_prefill: bool,
require_decode: bool,
) -> None:
"""Initialize GPU counts from DGD (Kubernetes) or CLI args (virtual).
In Kubernetes mode: reads from DGD, falls back to CLI flags if not found
(useful for mockers that don't specify GPU resources).
In virtual mode: requires CLI flags, errors if not provided.
Raises:
DeploymentValidationError: If GPU counts cannot be determined
"""
# Try to read from DGD in Kubernetes mode
if hasattr(connector, "get_gpu_counts"):
try:
prefill_gpu, decode_gpu = connector.get_gpu_counts(
require_prefill=require_prefill,
require_decode=require_decode,
)
args.prefill_engine_num_gpu = prefill_gpu
args.decode_engine_num_gpu = decode_gpu
logger.info(
f"Detected GPU counts from DGD: prefill={prefill_gpu}, decode={decode_gpu}"
)
return
except Exception as e:
# Fall back to CLI flags (e.g., for mockers without GPU resources in DGD)
logger.warning(
f"Could not read GPU counts from DGD ({e}), falling back to CLI flags"
)
# Use CLI flags (virtual mode, or K8s fallback when DGD lacks GPU resources)
errors = []
if require_prefill and args.prefill_engine_num_gpu is None:
errors.append("Missing --prefill-engine-num-gpu flag")
if require_decode and args.decode_engine_num_gpu is None:
errors.append("Missing --decode-engine-num-gpu flag")
if errors:
raise DeploymentValidationError(errors)
logger.info(
f"Using GPU counts from CLI: prefill={args.prefill_engine_num_gpu}, "
f"decode={args.decode_engine_num_gpu}"
)
class BasePlanner:
component_type: SubComponentType
......@@ -636,6 +685,14 @@ class BasePlanner:
)
logger.info("Successfully validated the deployment")
# Initialize GPU counts
_initialize_gpu_counts(
self.args,
self.connector,
require_prefill=require_prefill,
require_decode=require_decode,
)
await self.connector.wait_for_deployment_ready()
model_name = await self._get_model_name(
......@@ -808,6 +865,14 @@ class DisaggPlanner:
)
logger.info("Successfully validated the deployment")
# Initialize GPU counts
_initialize_gpu_counts(
self.args,
self.prefill_planner.connector,
require_prefill=True,
require_decode=True,
)
await self.prefill_planner.connector.wait_for_deployment_ready()
model_name = await self.prefill_planner._get_model_name(
......
......@@ -19,6 +19,7 @@ from unittest.mock import AsyncMock, Mock, call, patch
import pytest
from dynamo.planner.defaults import (
Service,
SubComponentType,
get_service_from_sub_component_type_or_name,
)
......@@ -681,3 +682,209 @@ def test_get_model_name_agree_returns_model_name(kubernetes_connector, mock_kube
# Assert
assert result == "agreed-model"
# Tests for Service.get_gpu_count()
def test_service_get_gpu_count_valid():
"""Test that get_gpu_count returns correct GPU count from resources.limits.gpu"""
service = Service(
name="test-service",
service={
"replicas": 1,
"resources": {"limits": {"gpu": "4"}},
},
)
assert service.get_gpu_count() == 4
def test_service_get_gpu_count_from_requests_fallback():
"""Test that get_gpu_count falls back to requests.gpu when limits.gpu is missing"""
service = Service(
name="test-service",
service={
"replicas": 1,
"resources": {"requests": {"gpu": "2"}},
},
)
assert service.get_gpu_count() == 2
def test_service_get_gpu_count_limits_preferred_over_requests():
"""Test that limits.gpu is preferred over requests.gpu when both are present"""
service = Service(
name="test-service",
service={
"replicas": 1,
"resources": {
"limits": {"gpu": "4"},
"requests": {"gpu": "2"},
},
},
)
assert service.get_gpu_count() == 4
def test_service_get_gpu_count_integer_value():
"""Test that get_gpu_count works with integer GPU values"""
service = Service(
name="test-service",
service={
"replicas": 1,
"resources": {"limits": {"gpu": 2}},
},
)
assert service.get_gpu_count() == 2
def test_service_get_gpu_count_missing_raises_error():
"""Test that get_gpu_count raises ValueError when GPU count is missing"""
service = Service(
name="test-service",
service={"replicas": 1},
)
with pytest.raises(ValueError) as exc_info:
service.get_gpu_count()
assert "No GPU count specified" in str(exc_info.value)
assert "test-service" in str(exc_info.value)
def test_service_get_gpu_count_invalid_raises_error():
"""Test that get_gpu_count raises ValueError for invalid GPU count"""
service = Service(
name="test-service",
service={
"replicas": 1,
"resources": {"limits": {"gpu": "invalid"}},
},
)
with pytest.raises(ValueError) as exc_info:
service.get_gpu_count()
assert "Invalid GPU count" in str(exc_info.value)
# Tests for KubernetesConnector.get_gpu_counts()
def test_get_gpu_counts_both_services(kubernetes_connector, mock_kube_api):
"""Test get_gpu_counts returns correct counts for both prefill and decode"""
mock_deployment = {
"metadata": {"name": "test-graph"},
"spec": {
"services": {
"prefill-worker": {
"replicas": 1,
"subComponentType": "prefill",
"resources": {"limits": {"gpu": "2"}},
},
"decode-worker": {
"replicas": 1,
"subComponentType": "decode",
"resources": {"limits": {"gpu": "4"}},
},
}
},
}
mock_kube_api.get_graph_deployment.return_value = mock_deployment
prefill_gpu, decode_gpu = kubernetes_connector.get_gpu_counts()
assert prefill_gpu == 2
assert decode_gpu == 4
def test_get_gpu_counts_prefill_only(kubernetes_connector, mock_kube_api):
"""Test get_gpu_counts with require_decode=False"""
mock_deployment = {
"metadata": {"name": "test-graph"},
"spec": {
"services": {
"prefill-worker": {
"replicas": 1,
"subComponentType": "prefill",
"resources": {"limits": {"gpu": "2"}},
},
}
},
}
mock_kube_api.get_graph_deployment.return_value = mock_deployment
prefill_gpu, decode_gpu = kubernetes_connector.get_gpu_counts(
require_prefill=True, require_decode=False
)
assert prefill_gpu == 2
assert decode_gpu == 0
def test_get_gpu_counts_decode_only(kubernetes_connector, mock_kube_api):
"""Test get_gpu_counts with require_prefill=False"""
mock_deployment = {
"metadata": {"name": "test-graph"},
"spec": {
"services": {
"decode-worker": {
"replicas": 1,
"subComponentType": "decode",
"resources": {"limits": {"gpu": "4"}},
},
}
},
}
mock_kube_api.get_graph_deployment.return_value = mock_deployment
prefill_gpu, decode_gpu = kubernetes_connector.get_gpu_counts(
require_prefill=False, require_decode=True
)
assert prefill_gpu == 0
assert decode_gpu == 4
def test_get_gpu_counts_missing_gpu_raises_error(kubernetes_connector, mock_kube_api):
"""Test get_gpu_counts raises DeploymentValidationError when GPU count missing"""
mock_deployment = {
"metadata": {"name": "test-graph"},
"spec": {
"services": {
"prefill-worker": {
"replicas": 1,
"subComponentType": "prefill",
# No resources.limits.gpu
},
"decode-worker": {
"replicas": 1,
"subComponentType": "decode",
"resources": {"limits": {"gpu": "4"}},
},
}
},
}
mock_kube_api.get_graph_deployment.return_value = mock_deployment
with pytest.raises(DeploymentValidationError) as exc_info:
kubernetes_connector.get_gpu_counts()
assert "prefill GPU count" in str(exc_info.value)
def test_get_gpu_counts_service_not_found_raises_error(
kubernetes_connector, mock_kube_api
):
"""Test get_gpu_counts raises DeploymentValidationError when service not found"""
mock_deployment = {
"metadata": {"name": "test-graph"},
"spec": {
"services": {
"prefill-worker": {
"replicas": 1,
"subComponentType": "prefill",
"resources": {"limits": {"gpu": "2"}},
},
# No decode service
}
},
}
mock_kube_api.get_graph_deployment.return_value = mock_deployment
with pytest.raises(DeploymentValidationError) as exc_info:
kubernetes_connector.get_gpu_counts()
assert "decode GPU count" in str(exc_info.value)
......@@ -9,10 +9,12 @@ from unittest.mock import Mock, patch
import pytest
from dynamo.planner.utils.exceptions import DeploymentValidationError
from dynamo.planner.utils.planner_core import (
DecodePlanner,
PlannerSharedState,
PrefillPlanner,
_initialize_gpu_counts,
)
pytestmark = [
......@@ -201,3 +203,214 @@ def test_disagg_scale_down():
assert low_d == _expected_decode(args, decode_planner, samples[1])
assert low_p < high_p
assert low_d < high_d
# Tests for _initialize_gpu_counts
class TestInitializeGpuCounts:
def test_kubernetes_mode_reads_from_dgd(self):
"""Test that GPU counts are read from DGD in Kubernetes mode"""
args = argparse.Namespace()
args.prefill_engine_num_gpu = None
args.decode_engine_num_gpu = None
connector = Mock()
connector.get_gpu_counts = Mock(return_value=(2, 4))
_initialize_gpu_counts(
args, connector, require_prefill=True, require_decode=True
)
assert args.prefill_engine_num_gpu == 2
assert args.decode_engine_num_gpu == 4
connector.get_gpu_counts.assert_called_once_with(
require_prefill=True, require_decode=True
)
def test_kubernetes_mode_prefill_only(self):
"""Test GPU count initialization for prefill-only mode"""
args = argparse.Namespace()
args.prefill_engine_num_gpu = None
args.decode_engine_num_gpu = None
connector = Mock()
connector.get_gpu_counts = Mock(return_value=(2, 0))
_initialize_gpu_counts(
args, connector, require_prefill=True, require_decode=False
)
assert args.prefill_engine_num_gpu == 2
assert args.decode_engine_num_gpu == 0
connector.get_gpu_counts.assert_called_once_with(
require_prefill=True, require_decode=False
)
def test_virtual_mode_uses_cli_args(self):
"""Test that GPU counts come from CLI args in virtual mode"""
args = argparse.Namespace()
args.prefill_engine_num_gpu = 2
args.decode_engine_num_gpu = 4
# Virtual connector doesn't have get_gpu_counts method
connector = Mock(spec=[])
_initialize_gpu_counts(
args, connector, require_prefill=True, require_decode=True
)
# Values should remain unchanged
assert args.prefill_engine_num_gpu == 2
assert args.decode_engine_num_gpu == 4
def test_virtual_mode_missing_prefill_raises_error(self):
"""Test that missing prefill GPU flag raises error in virtual mode"""
args = argparse.Namespace()
args.prefill_engine_num_gpu = None
args.decode_engine_num_gpu = 4
connector = Mock(spec=[])
with pytest.raises(DeploymentValidationError) as exc_info:
_initialize_gpu_counts(
args, connector, require_prefill=True, require_decode=True
)
assert "prefill-engine-num-gpu" in str(exc_info.value)
def test_virtual_mode_missing_decode_raises_error(self):
"""Test that missing decode GPU flag raises error in virtual mode"""
args = argparse.Namespace()
args.prefill_engine_num_gpu = 2
args.decode_engine_num_gpu = None
connector = Mock(spec=[])
with pytest.raises(DeploymentValidationError) as exc_info:
_initialize_gpu_counts(
args, connector, require_prefill=True, require_decode=True
)
assert "decode-engine-num-gpu" in str(exc_info.value)
def test_virtual_mode_missing_both_raises_error_with_both_messages(self):
"""Test that missing both GPU flags shows both error messages"""
args = argparse.Namespace()
args.prefill_engine_num_gpu = None
args.decode_engine_num_gpu = None
connector = Mock(spec=[])
with pytest.raises(DeploymentValidationError) as exc_info:
_initialize_gpu_counts(
args, connector, require_prefill=True, require_decode=True
)
assert len(exc_info.value.errors) == 2
def test_virtual_mode_decode_only_no_prefill_error(self):
"""Test decode-only mode doesn't require prefill GPU flag"""
args = argparse.Namespace()
args.prefill_engine_num_gpu = None
args.decode_engine_num_gpu = 4
connector = Mock(spec=[])
# Should not raise - prefill not required
_initialize_gpu_counts(
args, connector, require_prefill=False, require_decode=True
)
assert args.decode_engine_num_gpu == 4
def test_kubernetes_mode_fallback_to_cli_on_dgd_error(self):
"""Test that K8s mode falls back to CLI flags when DGD parsing fails"""
args = argparse.Namespace()
args.prefill_engine_num_gpu = 2
args.decode_engine_num_gpu = 4
connector = Mock()
connector.get_gpu_counts = Mock(
side_effect=ValueError("No GPU count specified")
)
_initialize_gpu_counts(
args, connector, require_prefill=True, require_decode=True
)
# Should use CLI flag values after fallback
assert args.prefill_engine_num_gpu == 2
assert args.decode_engine_num_gpu == 4
def test_kubernetes_mode_fallback_missing_cli_flags_raises_error(self):
"""Test that K8s fallback raises error when CLI flags are also missing"""
args = argparse.Namespace()
args.prefill_engine_num_gpu = None
args.decode_engine_num_gpu = None
connector = Mock()
connector.get_gpu_counts = Mock(
side_effect=ValueError("No GPU count specified")
)
with pytest.raises(DeploymentValidationError) as exc_info:
_initialize_gpu_counts(
args, connector, require_prefill=True, require_decode=True
)
assert len(exc_info.value.errors) == 2
def test_kubernetes_mode_fallback_partial_cli_flags(self):
"""Test K8s fallback with only one CLI flag provided"""
args = argparse.Namespace()
args.prefill_engine_num_gpu = 2
args.decode_engine_num_gpu = None
connector = Mock()
connector.get_gpu_counts = Mock(
side_effect=ValueError("No GPU count specified")
)
with pytest.raises(DeploymentValidationError) as exc_info:
_initialize_gpu_counts(
args, connector, require_prefill=True, require_decode=True
)
assert "decode-engine-num-gpu" in str(exc_info.value)
# Tests for dryrun GPU defaults
class TestDryrunGpuDefaults:
def test_dryrun_defaults_gpu_counts_when_none(self):
"""Test that dryrun sets default GPU counts of 1 when None"""
from dynamo.planner.utils.dryrun import run_sla_planner_dryrun
args = _build_args()
args.prefill_engine_num_gpu = None
args.decode_engine_num_gpu = None
args.dataset = "nonexistent.jsonl" # Will fail but we check args first
# The function will set defaults before trying to load dataset
try:
run_sla_planner_dryrun(args)
except (FileNotFoundError, ValueError):
pass # Expected - dataset doesn't exist
assert args.prefill_engine_num_gpu == 1
assert args.decode_engine_num_gpu == 1
def test_dryrun_preserves_cli_gpu_counts(self):
"""Test that dryrun preserves GPU counts provided via CLI"""
from dynamo.planner.utils.dryrun import run_sla_planner_dryrun
args = _build_args()
args.prefill_engine_num_gpu = 2
args.decode_engine_num_gpu = 4
args.dataset = "nonexistent.jsonl"
try:
run_sla_planner_dryrun(args)
except (FileNotFoundError, ValueError):
pass
assert args.prefill_engine_num_gpu == 2
assert args.decode_engine_num_gpu == 4
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment