Unverified Commit 597b7249 authored by Ashna Mehrotra's avatar Ashna Mehrotra Committed by GitHub
Browse files

feat: add DGDR test suite (#7343)


Signed-off-by: default avatarashnamehrotra <ashnamehrotra@gmail.com>
parent 8dfed173
# DGDR v1beta1 End-to-End Test Suite
This directory contains the end-to-end test suite for **DynamoGraphDeploymentRequest
(DGDR) v1beta1** — the high-level, SLA-driven Kubernetes API for deploying
inference models with Dynamo.
## What's tested
| Test group | Marker(s) | GPU req | Mocker OK? | What it covers |
|---|---|---|---|---|
| `TestDGDRValidation` | `gpu_0`, `pre_merge` | None | ✅ | Webhook validation: rejected/accepted specs, value enforcement, storage version, shortname |
| `TestDGDRVersionConversion` | `gpu_0`, `pre_merge` | None | ✅ | v1alpha1 → v1beta1 conversion webhook |
| `TestDGDRMinimalDeployment` | `gpu_1`, `pre_merge`, `e2e` | 1+ | ⚠️ see note | Full Pending → Profiling → Ready → Deploying → Deployed lifecycle |
| `TestDGDRBackendSelection` | `gpu_1`, `nightly`, `e2e` | 1+ | ⚠️ vllm+trtllm only | vllm and trtllm pass; sglang **skipped** (no AIC silicon data for sglang on the mocker GPU SKU) |
| `TestDGDRSearchStrategies` | `gpu_1`/`gpu_8`, `e2e` | 1 or 8 | ⚠️ rapid only | `rapid` uses AIC and works; `thorough` requires real GPU sweeps |
| `TestDGDRSLATargets` | `gpu_1`, `nightly`, `e2e` | 1+ | ✅ | ttft+itl, e2eLatency, optimizationType (latency/throughput) |
| `TestDGDRWorkloadPickingModes` | `gpu_1`, `nightly`, `e2e` | 1+ | ✅ | requestRate, concurrency, isl/osl |
| `TestDGDRFeatures` | `gpu_1`, `nightly`, `e2e` | 1+ | ⚠️ see note | planner (rapid/none sweep), mocker |
| `TestDGDRModelCache` | `gpu_1`, `nightly`, `e2e` | 1+ | ✅ | PVC-backed model cache, cache propagated to DGD |
| `TestDGDRHardwareOverride` | `gpu_1`, `pre_merge`, `e2e` | ✅ | ✅ | Manual gpuSku/numGpusPerNode/totalGpus/vramMb |
| `TestDGDRAutoApply` | `gpu_1`, `pre_merge`, `e2e` | 1+ | ⚠️ see note | autoApply=true **skipped** in mocker (operator race); autoApply=false keeps Ready |
| `TestDGDROverrides` | `gpu_1`, `nightly`, `e2e` | 1+ | ✅ | Profiling job tolerations; DGD metadata label merging **xfail** (operator gap) |
| `TestDGDRStatusAndConditions` | `gpu_1`, `pre_merge`, `e2e` | 1+ | ⚠️ see note | All conditions set correctly, sub-phases tracked, Pareto configs; all-conditions **xfail** in mocker; pareto **skipped** in mocker |
| `TestDGDRImmutability` | mixed | 0–1 | ⚠️ see note | Spec rejected in Profiling/Deployed, metadata always allowed |
| `TestDGDRCleanup` | `gpu_1`, `pre_merge`, `e2e` | 1+ | ⚠️ see note | Job deleted with DGDR; DGD preserved; ConfigMap cleanup **xfail** (operator gap); DGD-persistence test **skipped** in mocker |
| `TestDGDRMoEModels` | `gpu_8`, `nightly`, `e2e` | 8 | ❌ | DeepSeek-R1 MoE on SGLang — requires real 8-GPU node |
## Prerequisites
1. A running Kubernetes cluster with GPU nodes (or see [GPU-free mode](#gpu-free-mocker-mode) below)
2. The Dynamo operator installed (including CRDs and webhooks)
3. `kubectl` configured and pointing at the cluster
4. Python 3.10+ with `pytest` and `pyyaml` installed:
```bash
pip install pytest pyyaml
# or, from the repo root:
pip install -e ".[test]"
```
## One-time cluster setup
Before running any tests, ensure the following are in place in your cluster.
These are required even for GPU-free (mocker) mode.
### 1. Install the Dynamo operator
```bash
cd deploy/operator
helm install dynamo-operator helm/dynamo-operator -n dynamo-system --create-namespace
```
### 2. Deploy NATS
Mocker workers (and real workers) connect to NATS for inter-component messaging.
The operator expects NATS at `nats://dynamo-operator-nats.dynamo-system.svc.cluster.local:4222`.
```bash
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm repo update
helm install dynamo-operator-nats nats/nats -n dynamo-system --create-namespace
```
### 3. Create the HuggingFace token secret
The profiling job reads the HF token from a secret named `hf-token-secret` using the
key `HF_TOKEN` (not `HUGGING_FACE_HUB_TOKEN`).
```bash
kubectl create secret generic hf-token-secret \
--from-literal=HF_TOKEN=<your-hf-token> \
-n default
# If running in a non-default namespace, adjust -n accordingly
```
> **Important:** The key must be `HF_TOKEN`. The secret name must be `hf-token-secret`.
> Using a different key name will cause the profiling job to fail silently.
## Running the tests
There are two main ways to run the suite depending on whether you have GPU hardware.
---
### GPU-free (mocker mode) — recommended for local development and CI
No GPU nodes required. Uses AIC simulation for profiling and mock inference workers
for deployment. Covers all `gpu_0` and `gpu_1` tests (~45 tests); `gpu_8` tests are
excluded because they require a real 8-GPU node even in mocker mode.
```bash
python3 -m pytest tests/dgdr/ -m "gpu_0 or gpu_1" -v \
--dgdr-namespace=default \
--dgdr-image=<your-image>
```
Expect: 37 passed, 6 skipped (2 model-cache PVC; sglang backend; pareto in mocker; DGD-persistence in mocker; auto-apply-true in mocker), 4 xfail (DGD label merging; all-conditions requires Deployed; dry-run immutability requires Deployed; ConfigMap cleanup on deletion).
`test_backend[sglang]` is one of the 6 skips (no AIC silicon data for sglang in mocker mode).
---
### Full suite with real GPUs — for production/nightly validation
Requires a Kubernetes cluster with GPU nodes. Set `--dgdr-no-mocker` to disable mocker
injection and run against real hardware. `gpu_8` tests additionally require an 8-GPU node.
```bash
# gpu_0 + gpu_1 tests on real GPUs (single-GPU node sufficient)
python3 -m pytest tests/dgdr/ -m "gpu_0 or gpu_1" -v \
--dgdr-namespace=dynamo-test \
--dgdr-image=<your-image> \
--dgdr-no-mocker \
--dgdr-profiling-timeout=3600 \
--dgdr-deploy-timeout=1800
# Full nightly suite including 8-GPU tests
python3 -m pytest tests/dgdr/ -v \
--dgdr-namespace=dynamo-test \
--dgdr-image=<your-image> \
--dgdr-no-mocker \
--dgdr-pvc-name=model-cache \
--dgdr-profiling-timeout=14400 \
--dgdr-deploy-timeout=3600
```
Expect (gpu_0 + gpu_1, with `--dgdr-pvc-name`): **~43 passed, 0 skipped, 2 xfail** (DGD label-merging operator gap; ConfigMap cleanup operator gap).
Without `--dgdr-pvc-name`: 2 additional skips for the model-cache tests.
> **Note:** Two xfails are **permanent operator gaps** that persist in both mocker and GPU mode:
> - `test_dgd_override_injects_custom_labels` — the operator does not yet merge `spec.overrides.dgd.metadata.labels` onto the created DGD.
> - `test_deletion_removes_output_configmap` — the operator's `FinalizeResource` is a no-op and does not delete the output ConfigMap on DGDR deletion.
> All other mocker-mode xfails/skips disappear in GPU mode and are expected to pass.
---
### Other useful invocations
```bash
# Validation + conversion tests only (no cluster setup required beyond CRDs)
python3 -m pytest tests/dgdr/ -m "gpu_0" -v \
--dgdr-namespace=default \
--dgdr-image=<your-image>
# Pre-merge gate (GPU-free)
python3 -m pytest tests/dgdr/ -m "pre_merge" -v \
--dgdr-namespace=default \
--dgdr-image=<your-image>
# Single test class
python3 -m pytest tests/dgdr/test_dgdr_v1beta1.py::TestDGDRAutoApply -v \
--dgdr-namespace=default \
--dgdr-image=<your-image>
```
## CLI options
| Option | Default | Description |
|---|---|---|
| `--dgdr-namespace` | _(required)_ | Kubernetes namespace for test resources |
| `--dgdr-image` | _(required)_ | Container image for profiling and inference workers |
| `--dgdr-model` | `Qwen/Qwen3-0.6B` | HuggingFace model ID used by most tests |
| `--dgdr-backend` | `vllm` | Default backend for DGDR tests |
| `--dgdr-pvc-name` | _(empty)_ | PVC name holding pre-downloaded model weights (PVC tests are skipped if unset) |
| `--dgdr-profiling-timeout` | `3600` | Seconds to wait for profiling to complete |
| `--dgdr-deploy-timeout` | `600` | Seconds to wait for DGD to reach Deployed phase |
| `--dgdr-no-mocker` | `false` | Disable mocker mode (require real GPU nodes) |
## DGDR v1beta1 feature coverage matrix
The following spec fields are exercised by at least one test:
| Field | Tests that exercise it |
|---|---|
| `spec.model` | All tests |
| `spec.backend` (auto/vllm/sglang/trtllm) | `TestDGDRBackendSelection`, `TestDGDRValidation` |
| `spec.image` | All tests |
| `spec.searchStrategy` (rapid/thorough) | `TestDGDRSearchStrategies` |
| `spec.sla.ttft` + `spec.sla.itl` | `TestDGDRSLATargets::test_sla_ttft_and_itl` |
| `spec.sla.e2eLatency` | `TestDGDRSLATargets::test_sla_e2e_latency` |
| `spec.sla.optimizationType` | `TestDGDRSLATargets::test_sla_optimization_type_*` |
| `spec.workload.isl` + `spec.workload.osl` | `TestDGDRWorkloadPickingModes` |
| `spec.workload.requestRate` | `TestDGDRWorkloadPickingModes::test_request_rate_picking` |
| `spec.workload.concurrency` | `TestDGDRWorkloadPickingModes::test_concurrency_picking` |
| `spec.features.planner` (opaque config) | `TestDGDRFeatures::test_planner_enabled_*` |
| `spec.features.mocker.enabled` | `TestDGDRFeatures::test_mocker_enabled` |
| `spec.modelCache.pvcName` | `TestDGDRModelCache` |
| `spec.hardware.gpuSku` | `TestDGDRHardwareOverride::test_hardware_manual_override` |
| `spec.hardware.numGpusPerNode` | `TestDGDRHardwareOverride` |
| `spec.hardware.totalGpus` / `spec.hardware.vramMb` | `TestDGDRHardwareOverride::test_hardware_total_gpus_and_vram` |
| `spec.autoApply` | `TestDGDRAutoApply` |
| `spec.overrides.profilingJob` | `TestDGDROverrides::test_profiling_job_toleration_override` |
| `spec.overrides.dgd` | `TestDGDROverrides::test_dgd_override_injects_custom_labels` |
| `status.phase` | All lifecycle tests |
| `status.profilingPhase` | `TestDGDRStatusAndConditions::test_profiling_sub_phase_tracked` |
| `status.profilingJobName` | `TestDGDRStatusAndConditions::test_profiling_job_name_populated` |
| `status.dgdName` | `TestDGDRAutoApply`, `TestDGDRMinimalDeployment` |
| `status.profilingResults.selectedConfig` | Multiple |
| `status.profilingResults.pareto` | `TestDGDRStatusAndConditions::test_pareto_configs_in_profiling_results` |
| `status.deploymentInfo` | `TestDGDRMinimalDeployment` |
| `status.conditions` (all types) | `TestDGDRStatusAndConditions` |
| `status.observedGeneration` | `TestDGDRStatusAndConditions::test_observed_generation_tracks_spec` |
## GPU-free mode (default)
By default, the test suite runs the full DGDR lifecycle **without any GPU nodes**
by combining two simulation features:
| Feature | How it's enabled | Which phase it affects |
|---|---|---|
| **AIC (AI Configurator)** | `searchStrategy: rapid` (the default) | **Profiling** — profiler runs CPU-only simulation instead of online GPU sweep |
| **Mocker** | Enabled by default (disable with `--dgdr-no-mocker`) | **Deployment** — DGD uses mock inference workers (no GPU resources requested) |
**How it works:**
- `searchStrategy: rapid` is the default for v1beta1 DGDRs. The profiler automatically
uses AI Configurator (AIC) simulation when rapid is set — no additional config needed.
- Mocker mode is **enabled by default**. The `dgdr_factory` fixture automatically injects
`spec.features.mocker.enabled: true` and a default `spec.hardware` config into every DGDR.
- AIC profiling creates a Kubernetes Job that runs CPU-only (job prefix: `profile-aic-`).
The profiling pod does not request GPU resources.
- Mocker deployment selects the profiler's `mocker_config_with_planner.yaml` output
instead of the real deployment config, resulting in DGD pods that don't request GPUs.
- Pass `--dgdr-no-mocker` to disable mocker mode and run against real GPU hardware.
> **Note:** Some test assertions (e.g., status.deploymentInfo.gpuCount, pareto configs)
> may produce different values under mocker than under real GPU profiling.
> The tests are written to validate structure and phase transitions, not exact
> profiling output values, so they work correctly in both modes.
> **Note:** `searchStrategy: thorough` requires online (GPU) profiling even with mocker,
> since thorough performs real benchmark measurements. Use rapid for GPU-free testing.
> **Note:** `TestDGDRFeatures::test_planner_enabled_with_rapid_sweep` runs with
> `auto_apply=False` in mocker mode (same root cause as the note below — the operator
> pre-sets `Status.DGDName` from the profiling output and then immediately fires
> `handleDGDDeleted` when the DGD cannot be found). In mocker mode the test only
> validates that spec generation succeeds (waits for `PHASE_READY` and checks `dgdName`
> + `selectedConfig`). Full deployment with rapid sweeping is verified outside mocker
> mode. `test_planner_enabled_no_pre_deployment_sweep` and `test_mocker_enabled` are
> likewise restricted to `PHASE_READY` in mocker mode.
> **Note:** `auto_apply=True` consistently hits `handleDGDDeleted` in mocker mode. The
> operator's `generateDGDSpec` pre-populates `Status.DGDName` from the profiling output
> (e.g. `mocker-disagg`) _before_ the DGD is actually created. When `handleDeployingPhase`
> then runs it checks `DGDName != ""` and immediately tries to GET that DGD; since it does
> not exist yet it fires `handleDGDDeleted` and the DGDR transitions to Failed.
> All tests that would enter the Deploying phase in mocker mode therefore use
> `auto_apply=False`/`PHASE_READY` instead (minimal lifecycle, backend selection,
> mocker feature, planner-no-sweep, planner-rapid-sweep, DGD label override).
> Tests whose sole purpose is to verify `auto_apply=True` DGD creation are skipped in
> mocker mode (`test_auto_apply_true_creates_dgd_automatically`,
> `test_deletion_does_not_remove_created_dgd`).
> Non-mocker mode (real GPU cluster) is unaffected.
> **Note:** `TestDGDRImmutability::test_spec_immutable_in_deployed_via_dry_run` is **xfail**
> in mocker mode. The test relies on the session `deployed_dgdr` fixture which, in mocker
> mode, stops at `PHASE_READY` instead of `PHASE_DEPLOYED`. The webhook's
> `ValidateUpdate` immutability enforcement only activates when the DGDR is in `Deployed`
> phase, so the server-dry-run mutation is accepted rather than rejected.
> **Note:** `gpu_8` tests cannot be run with mocker and require a real 8-GPU node.
> `TestDGDRSearchStrategies::test_thorough_strategy_completes` uses `searchStrategy: thorough`
> which performs real GPU benchmark sweeps. `TestDGDRMoEModels` (DeepSeek-R1) requires 8 GPUs
> for the real inference workload. Exclude them from GPU-free runs with `-m "gpu_0 or gpu_1"`.
### AIC silicon data availability
AIC operates in **silicon mode**: it looks up pre-recorded per-op performance data
files shipped inside the `aiconfigurator` Python package. These files are organised
by `{gpu_sku}/{backend}/{backend_version}/`. The mocker fixture injects
`gpuSku: a100_sxm` into every DGDR — but the package only ships vllm data for that SKU:
| Backend | a100_sxm data? | Mocker result |
|---|---|---|
| `vllm` | ✅ present | Profiling succeeds |
| `trtllm` | ✅ present | Profiling succeeds |
| `sglang` | ❌ missing | Test **skipped** automatically (no `sglang/0.5.8` perf data for `a100_sxm`) |
To test sglang/trtllm, run against a real GPU cluster (`--dgdr-no-mocker`) where AIC
can use a GPU SKU for which those data files are present.
## Cleanup
Tests clean up their own DGDRs via the `dgdr_factory` fixture. If a test is
interrupted, resources can be cleaned up manually:
```bash
# Delete all DGDRs created by the test suite (they are labelled automatically)
kubectl delete dgdr -n default -l "test.dynamo/managed=true"
# If you used a custom namespace:
kubectl delete dgdr -n <namespace> -l "test.dynamo/managed=true"
```
## Architecture notes
- All tests interact with the cluster **exclusively via `kubectl`** subprocess calls,
consistent with the rest of the Dynamo test suite.
- The `dgdr_factory` fixture ensures DGDR cleanup via `yield` regardless of test
outcome.
- Tests that require an optional PVC (`--dgdr-pvc-name`) skip automatically when the
option is not provided.
- Timeout values are configurable to accommodate clusters with varying profiling speeds.
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Pytest fixtures and helpers for DGDR v1beta1 e2e tests.
These tests exercise the DynamoGraphDeploymentRequest CRD directly on a live
Kubernetes cluster running the Dynamo operator. A GPU cluster is assumed to be
available (GPU nodes reachable from the cluster).
"""
from __future__ import annotations
import asyncio
import logging
import subprocess
import uuid
from typing import Any, Dict, Generator, List, Optional
import pytest
from tests.utils.managed_deployment import ManagedDGDR
logger = logging.getLogger(__name__)
DGDR_API_VERSION = "nvidia.com/v1beta1"
DGDR_KIND = "DynamoGraphDeploymentRequest"
DGDR_SHORT_NAME = "dgdr"
# Default timeout values (seconds)
DEFAULT_PROFILING_TIMEOUT = 3600 # 1h for rapid, up to 4h for thorough
DEFAULT_DEPLOY_TIMEOUT = 600 # 10 minutes for DGD rollout
# Label applied to all test-managed DGDRs so they can be bulk-deleted on cleanup
DGDR_TEST_LABEL_KEY = "test.dynamo/managed"
# DGD kind name and the fixed DGD name that the mocker profiler always generates
DGD_KIND = "DynamoGraphDeployment"
MOCKER_DGD_NAME = "mocker-disagg"
# Phase values mirroring DGDRPhase Go enum
PHASE_PENDING = "Pending"
PHASE_PROFILING = "Profiling"
PHASE_READY = "Ready"
PHASE_DEPLOYING = "Deploying"
PHASE_DEPLOYED = "Deployed"
PHASE_FAILED = "Failed"
# ---------------------------------------------------------------------------
# Pytest option registration
# ---------------------------------------------------------------------------
def pytest_addoption(parser: pytest.Parser) -> None:
"""Register DGDR-specific CLI options for the test session."""
group = parser.getgroup("dgdr", "DynamoGraphDeploymentRequest e2e options")
group.addoption(
"--dgdr-namespace",
default=None,
help="Kubernetes namespace for DGDR resources (required to run tests)",
)
group.addoption(
"--dgdr-image",
default=None,
help="Container image used for profiling and deployment workers (required to run tests)",
)
group.addoption(
"--dgdr-model",
default="Qwen/Qwen3-0.6B",
help="HuggingFace model ID for test DGDRs (default: Qwen/Qwen3-0.6B)",
)
group.addoption(
"--dgdr-backend",
default="vllm",
choices=["auto", "vllm", "sglang", "trtllm"],
help="Default backend for DGDR tests (default: vllm)",
)
group.addoption(
"--dgdr-pvc-name",
default="",
help="Optional PVC name containing pre-downloaded model weights",
)
group.addoption(
"--dgdr-profiling-timeout",
type=int,
default=DEFAULT_PROFILING_TIMEOUT,
help="Max seconds to wait for profiling to complete (default: 3600)",
)
group.addoption(
"--dgdr-deploy-timeout",
type=int,
default=DEFAULT_DEPLOY_TIMEOUT,
help="Max seconds to wait for DGD to reach Deployed phase (default: 600)",
)
group.addoption(
"--dgdr-no-mocker",
action="store_true",
default=False,
help=(
"Disable mocker mode (requires real GPU nodes for deployment). "
"By default, mocker mode is ENABLED: DGD uses mock inference workers "
"and AIC simulation (via searchStrategy=rapid) for GPU-free testing. "
"Pass this flag to run against a real GPU cluster."
),
)
# ---------------------------------------------------------------------------
# Skip DGDR tests gracefully when required CLI args are not provided
# (e.g. when the whole test suite is run by CI without --dgdr-* flags)
# ---------------------------------------------------------------------------
def pytest_collection_modifyitems(
config: pytest.Config, items: list[pytest.Item]
) -> None:
"""Skip all DGDR tests when --dgdr-namespace or --dgdr-image are not supplied.
This prevents a session-aborting failure when the global CI runner collects
and executes ``tests/`` without passing the DGDR-specific CLI options.
"""
missing = []
if not config.getoption("--dgdr-namespace", default=None):
missing.append("--dgdr-namespace")
if not config.getoption("--dgdr-image", default=None):
missing.append("--dgdr-image")
if not missing:
return
reason = f"DGDR tests require: {', '.join(missing)}"
skip = pytest.mark.skip(reason=reason)
for item in items:
if "dgdr" in str(item.fspath):
item.add_marker(skip)
# ---------------------------------------------------------------------------
# Session-scoped fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(scope="session")
def dgdr_namespace(request: pytest.FixtureRequest) -> str:
value = request.config.getoption("--dgdr-namespace")
if not value:
pytest.skip("--dgdr-namespace is required to run DGDR tests")
return value
@pytest.fixture(scope="session")
def dgdr_image(request: pytest.FixtureRequest) -> str:
value = request.config.getoption("--dgdr-image")
if not value:
pytest.skip("--dgdr-image is required to run DGDR tests")
return value
@pytest.fixture(scope="session")
def dgdr_model(request: pytest.FixtureRequest) -> str:
return request.config.getoption("--dgdr-model")
@pytest.fixture(scope="session")
def dgdr_backend(request: pytest.FixtureRequest) -> str:
return request.config.getoption("--dgdr-backend")
@pytest.fixture(scope="session")
def dgdr_pvc_name(request: pytest.FixtureRequest) -> str:
return request.config.getoption("--dgdr-pvc-name")
@pytest.fixture(scope="session")
def dgdr_profiling_timeout(request: pytest.FixtureRequest) -> int:
return request.config.getoption("--dgdr-profiling-timeout")
@pytest.fixture(scope="session")
def dgdr_deploy_timeout(request: pytest.FixtureRequest) -> int:
return request.config.getoption("--dgdr-deploy-timeout")
@pytest.fixture(scope="session")
def dgdr_use_mocker(request: pytest.FixtureRequest) -> bool:
# Mocker is ON by default; --dgdr-no-mocker disables it
return not request.config.getoption("--dgdr-no-mocker")
# ---------------------------------------------------------------------------
# Session-scoped ManagedDGDR client
# ---------------------------------------------------------------------------
@pytest.fixture(scope="session")
def _dgdr_event_loop() -> Generator[asyncio.AbstractEventLoop, None, None]:
"""Session-scoped event loop shared by all DGDR async helpers."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
def dgdr_test_label() -> str:
"""Generate a unique label value for this test session to avoid cross-run cleanup races."""
return f"test-{uuid.uuid4().hex[:8]}"
@pytest.fixture(scope="session")
def dgdr_test_label_selector(dgdr_test_label: str) -> str:
return f"{DGDR_TEST_LABEL_KEY}={dgdr_test_label}"
@pytest.fixture(scope="session")
def managed_dgdr(
dgdr_namespace: str, _dgdr_event_loop: asyncio.AbstractEventLoop
) -> Generator[ManagedDGDR, None, None]:
"""Session-scoped async K8s client for DGDR operations."""
mgr = ManagedDGDR(namespace=dgdr_namespace, loop=_dgdr_event_loop)
mgr.run(mgr.init())
yield mgr
mgr.run(mgr.close())
# ---------------------------------------------------------------------------
# Simulation-mode helpers (Mocker)
# ---------------------------------------------------------------------------
# Default hardware config for GPU-free testing (AIC simulation needs hardware metadata)
DEFAULT_MOCKER_HARDWARE = {
"gpuSku": "a100_sxm",
"vramMb": 81920,
"numGpusPerNode": 8,
"totalGpus": 8,
}
def _inject_mocker_config(manifest: Dict[str, Any]) -> None:
"""Mutate *manifest* in-place to enable mocker deployment.
Mocker: sets ``spec.features.mocker.enabled = true`` so the DGD uses mock
inference workers that do not require GPU resources.
Also injects a default ``spec.hardware`` config if not already set, since
AIC simulation needs hardware metadata (GPU model, VRAM) even though it
doesn't actually use GPUs.
Combined with ``searchStrategy: rapid`` (the default), this enables the full
DGDR lifecycle (Pending -> Profiling -> Ready -> Deploying -> Deployed) to
complete without any GPU nodes, because:
- rapid uses AI Configurator (AIC) simulation in the profiler (CPU-only)
- mocker uses mock inference pods (no GPU resources requested)
"""
spec = manifest.setdefault("spec", {})
# Enable mocker for GPU-free deployment
features = spec.setdefault("features", {})
mocker = features.setdefault("mocker", {})
mocker["enabled"] = True
# Inject default hardware if not already set (AIC needs hardware metadata).
# If hardware is partially set (e.g. the test only sets gpuSku/numGpusPerNode),
# fill in any missing fields from DEFAULT_MOCKER_HARDWARE so AIC has the full
# metadata it needs (vramMb, totalGpus) without overriding fields the test set.
if "hardware" not in spec:
spec["hardware"] = DEFAULT_MOCKER_HARDWARE.copy()
logger.info(
"Injected default hardware config for DGDR %s: %s",
manifest.get("metadata", {}).get("name", "?"),
spec["hardware"],
)
else:
merged = False
for k, v in DEFAULT_MOCKER_HARDWARE.items():
if k not in spec["hardware"]:
spec["hardware"][k] = v
merged = True
if merged:
logger.info(
"Merged missing hardware fields for DGDR %s: %s",
manifest.get("metadata", {}).get("name", "?"),
spec["hardware"],
)
logger.info(
"Mocker mode enabled for DGDR %s", manifest.get("metadata", {}).get("name", "?")
)
async def _cleanup_mocker_dgd(mgr: ManagedDGDR) -> None:
"""Delete the shared `mocker-disagg` DGD if it exists.
The mocker profiler always names the generated DGD ``mocker-disagg``. When
multiple DGDRs run sequentially in the same test session (all creating the same
DGD name), the second DGDR's operator finds ``mocker-disagg`` already in the
cluster. If that DGD is in a bad/terminating state from the previous test, the
operator fires ``handleDGDDeleted`` immediately → DGDR reaches Failed. Deleting
the DGD between tests guarantees each DGDR starts from a clean slate.
"""
obj = await mgr.get_dgd(MOCKER_DGD_NAME)
if obj is not None:
logger.info(
"Deleting shared mocker DGD %s/%s to prevent state pollution",
mgr.namespace,
MOCKER_DGD_NAME,
)
await mgr.delete_dgd(MOCKER_DGD_NAME)
# ---------------------------------------------------------------------------
# kubectl helper (kept only for tests that validate CLI behaviour itself)
# ---------------------------------------------------------------------------
def _run_kubectl(
args: List[str], check: bool = True, input: Optional[str] = None
) -> subprocess.CompletedProcess:
"""Run a kubectl command, returning the CompletedProcess.
This is intentionally kept for the small number of tests that validate
kubectl CLI behaviour (short-names, custom-columns, CRD metadata).
All DGDR CRUD and phase-polling should use :class:`ManagedDGDR` instead.
"""
cmd = ["kubectl"] + args
logger.debug("Running: %s", " ".join(cmd))
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=False,
input=input,
timeout=60,
)
except subprocess.TimeoutExpired as e:
logger.error("kubectl timed out: %s", e)
pytest.fail(f"kubectl timed out after 60s: {e}")
if check and result.returncode != 0:
raise subprocess.CalledProcessError(
result.returncode, cmd, result.stdout, result.stderr
)
return result
# ---------------------------------------------------------------------------
# DGDR manifest builder
# ---------------------------------------------------------------------------
def build_dgdr_manifest(
name: str,
model: str,
image: str,
*,
backend: str = "vllm",
search_strategy: str = "rapid",
sla: Optional[Dict[str, Any]] = None,
workload: Optional[Dict[str, Any]] = None,
features: Optional[Dict[str, Any]] = None,
hardware: Optional[Dict[str, Any]] = None,
model_cache: Optional[Dict[str, Any]] = None,
overrides: Optional[Dict[str, Any]] = None,
auto_apply: Optional[bool] = None,
labels: Optional[Dict[str, str]] = None,
extra_spec_fields: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Build a v1beta1 DGDR manifest dict.
Only ``name``, ``model``, and ``image`` are required. All other fields
are optional and map 1-to-1 to the v1beta1 spec defined in
``deploy/operator/api/v1beta1/dynamographdeploymentrequest_types.go``.
"""
spec: Dict[str, Any] = {
"model": model,
"backend": backend,
"image": image,
"searchStrategy": search_strategy,
}
if sla is not None:
spec["sla"] = sla
if workload is not None:
spec["workload"] = workload
if features is not None:
spec["features"] = features
if hardware is not None:
spec["hardware"] = hardware
if model_cache is not None:
spec["modelCache"] = model_cache
if overrides is not None:
spec["overrides"] = overrides
if auto_apply is not None:
spec["autoApply"] = auto_apply
if extra_spec_fields:
spec.update(extra_spec_fields)
manifest: Dict[str, Any] = {
"apiVersion": DGDR_API_VERSION,
"kind": DGDR_KIND,
"metadata": {
"name": name,
},
"spec": spec,
}
if labels:
manifest["metadata"]["labels"] = labels
return manifest
def unique_dgdr_name(prefix: str = "test") -> str:
"""Generate a unique DGDR name safe for Kubernetes (lowercase, 63 chars max)."""
uid = uuid.uuid4().hex[:8]
return f"{prefix}-{uid}"
# ---------------------------------------------------------------------------
# Core fixture: managed DGDR lifecycle
# ---------------------------------------------------------------------------
@pytest.fixture
def dgdr_factory(
managed_dgdr: ManagedDGDR,
dgdr_namespace: str,
dgdr_profiling_timeout: int,
dgdr_deploy_timeout: int,
dgdr_use_mocker: bool,
dgdr_test_label: str,
dgdr_test_label_selector: str,
):
"""
A factory fixture that applies a DGDR manifest and ensures cleanup.
When mocker mode is enabled (the default), the factory automatically
injects mocker config into every manifest before applying it. This
makes the injection transparent to individual test functions.
Combined with ``searchStrategy: rapid`` (the default), mocker mode
enables a fully GPU-free lifecycle:
- Profiling uses AIC simulation (CPU-only, no GPU resources needed)
- Deployment uses mock inference pods (no GPU resources requested)
Usage::
def test_something(dgdr_factory, dgdr_image, dgdr_model):
manifest = build_dgdr_manifest("my-test", dgdr_model, dgdr_image)
name = dgdr_factory(manifest)
managed_dgdr.run(managed_dgdr.wait_for_phase(name, PHASE_DEPLOYED, ...))
"""
created: List[str] = []
use_mocker = dgdr_use_mocker
def _cleanup_all_test_dgdrs() -> None:
"""Delete all DGDRs bearing the test-managed label (handles orphans from prior runs)."""
items = managed_dgdr.run(
managed_dgdr.list(label_selector=dgdr_test_label_selector)
)
for item in items:
item_name = item.get("metadata", {}).get("name", "")
if item_name:
logger.info(
"Cleaning up test-managed DGDR %s/%s", dgdr_namespace, item_name
)
managed_dgdr.run(managed_dgdr.delete(item_name))
# Pre-test: remove any orphaned DGDRs left by previously interrupted runs.
_cleanup_all_test_dgdrs()
def _create(manifest: Dict[str, Any]) -> str:
name = manifest["metadata"]["name"]
# Stamp the test-managed label so orphan cleanup can find it
manifest.setdefault("metadata", {})
manifest["metadata"].setdefault("labels", {})
manifest["metadata"]["labels"][DGDR_TEST_LABEL_KEY] = dgdr_test_label
# Inject mocker config if enabled
if use_mocker:
_inject_mocker_config(manifest)
managed_dgdr.run(managed_dgdr.create(manifest))
created.append(name)
logger.info("Created DGDR %s/%s", dgdr_namespace, name)
return name
def _register_for_cleanup(name: str) -> None:
"""Register an externally-created DGDR name for teardown cleanup."""
if name not in created:
created.append(name)
_create.register_for_cleanup = _register_for_cleanup # type: ignore[attr-defined]
yield _create
# Post-test: delete everything we created (plus any label-matching stragglers)
for name in reversed(created):
logger.info("Cleaning up DGDR %s/%s", dgdr_namespace, name)
managed_dgdr.run(managed_dgdr.delete(name))
_cleanup_all_test_dgdrs()
# Clean up the shared mocker DGD so the next test starts fresh
if use_mocker:
managed_dgdr.run(_cleanup_mocker_dgd(managed_dgdr))
# ---------------------------------------------------------------------------
# Session-scoped shared deployment
# ---------------------------------------------------------------------------
@pytest.fixture(scope="session")
def deployed_dgdr(
managed_dgdr: ManagedDGDR,
dgdr_namespace: str,
dgdr_image: str,
dgdr_model: str,
dgdr_use_mocker: bool,
dgdr_profiling_timeout: int,
dgdr_deploy_timeout: int,
dgdr_test_label: str,
) -> Generator[str, None, None]:
"""
Session-scoped fixture: deploys a single DGDR once for the entire test
session and tears it down afterward.
Tests that only need a *Deployed* DGDR to read status from should use this
fixture instead of spinning up their own lifecycle. This avoids repeated
~1-2 minute profiling cycles for tests that are purely asserting status
fields on an already-deployed resource.
"""
name = unique_dgdr_name("session")
# In mocker mode, auto_apply=True hits a consistent "DeploymentDeleted" failure because
# the operator cannot complete DGD creation with the shared mocker-disagg name. Use
# auto_apply=False and target PHASE_READY instead; tests that strictly require the
# Deployed phase are xfailed in mocker mode.
manifest = build_dgdr_manifest(
name,
model=dgdr_model,
image=dgdr_image,
backend="vllm",
search_strategy="rapid",
auto_apply=not dgdr_use_mocker,
)
manifest.setdefault("metadata", {})
manifest["metadata"].setdefault("labels", {})
# Stamp session DGDR with the per-session test label so it is cleaned up
# together with other test-managed resources.
manifest["metadata"]["labels"][DGDR_TEST_LABEL_KEY] = dgdr_test_label
if dgdr_use_mocker:
_inject_mocker_config(manifest)
# Ensure no stale mocker-disagg DGD from a previous test so the session DGDR
# gets a clean mocker-disagg on its first deploy attempt.
managed_dgdr.run(_cleanup_mocker_dgd(managed_dgdr))
managed_dgdr.run(managed_dgdr.create(manifest))
logger.info("Session DGDR %s/%s created", dgdr_namespace, name)
try:
target_phase = PHASE_READY if dgdr_use_mocker else PHASE_DEPLOYED
managed_dgdr.run(
managed_dgdr.wait_for_phase(
name,
target_phase,
timeout=dgdr_profiling_timeout + dgdr_deploy_timeout,
)
)
logger.info("Session DGDR %s/%s reached %s", dgdr_namespace, name, target_phase)
yield name
finally:
managed_dgdr.run(managed_dgdr.delete(name))
if dgdr_use_mocker:
managed_dgdr.run(_cleanup_mocker_dgd(managed_dgdr))
logger.info("Session DGDR %s/%s cleaned up", dgdr_namespace, name)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Webhook validation and version conversion tests for DGDR v1beta1.
These tests verify that:
- The admission webhook correctly accepts/rejects DGDR specs (TestDGDRValidation)
- v1alpha1 resources are transparently converted to v1beta1 (TestDGDRVersionConversion)
No GPU or cluster profiling is required (gpu_0 only). The only prerequisite is a
running Kubernetes cluster with the Dynamo operator CRDs and webhooks installed.
Run:
pytest tests/dgdr/test_dgdr_validation.py -m gpu_0 -v --dgdr-namespace=default --dgdr-image=<image>
Test markers:
gpu_0 No GPU required
nightly Requires live K8s cluster (not run in general pre-merge CI)
integration Integration-level (uses live webhook)
"""
from __future__ import annotations
import json
import logging
import pytest
import yaml
from kubernetes_asyncio.client import exceptions as k8s_exceptions
from tests.dgdr.conftest import (
DGDR_API_VERSION,
DGDR_SHORT_NAME,
_run_kubectl,
build_dgdr_manifest,
unique_dgdr_name,
)
from tests.utils.managed_deployment import ManagedDGDR
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# ── Group 1: Webhook Validation (gpu_0, no profiling required) ──────────────
# ---------------------------------------------------------------------------
@pytest.mark.gpu_0
@pytest.mark.nightly
@pytest.mark.integration
@pytest.mark.k8s
class TestDGDRValidation:
"""
Tests that verify the admission webhook correctly validates DGDR specs
before they are persisted. These tests use server-side dry-run so no
resources are actually created.
"""
def test_missing_model_rejected(
self, managed_dgdr: ManagedDGDR, dgdr_image: str
) -> None:
"""
A DGDR without spec.model must be rejected by the webhook.
The model field is the only hard-required spec field in v1beta1.
"""
manifest = build_dgdr_manifest(
unique_dgdr_name("no-model"),
model="", # intentionally empty
image=dgdr_image,
)
# Clear model so the field is absent
del manifest["spec"]["model"]
with pytest.raises(k8s_exceptions.ApiException):
managed_dgdr.run(managed_dgdr.server_dry_run(manifest))
def test_thorough_with_auto_backend_rejected(
self, managed_dgdr: ManagedDGDR, dgdr_image: str, dgdr_model: str
) -> None:
"""
searchStrategy: thorough + backend: auto must be rejected.
'thorough' sweeps real GPU engines and requires a concrete backend.
"""
manifest = build_dgdr_manifest(
unique_dgdr_name("thorough-auto"),
model=dgdr_model,
image=dgdr_image,
backend="auto",
search_strategy="thorough",
)
with pytest.raises(k8s_exceptions.ApiException) as exc_info:
managed_dgdr.run(managed_dgdr.server_dry_run(manifest))
error_body = str(exc_info.value)
assert (
"auto" in error_body.lower()
or "backend" in error_body.lower()
or "thorough" in error_body.lower()
), f"Error message should mention backend/thorough incompatibility. Got: {error_body}"
def test_invalid_backend_rejected(
self, managed_dgdr: ManagedDGDR, dgdr_image: str, dgdr_model: str
) -> None:
"""
An unknown backend value must be rejected by the admission webhook.
Valid values: auto, vllm, sglang, trtllm.
"""
manifest = build_dgdr_manifest(
unique_dgdr_name("bad-backend"),
model=dgdr_model,
image=dgdr_image,
backend="unknown_backend",
)
with pytest.raises(k8s_exceptions.ApiException):
managed_dgdr.run(managed_dgdr.server_dry_run(manifest))
def test_invalid_search_strategy_rejected(
self, managed_dgdr: ManagedDGDR, dgdr_image: str, dgdr_model: str
) -> None:
"""
An unknown searchStrategy value must be rejected by the admission webhook.
"""
manifest = build_dgdr_manifest(
unique_dgdr_name("bad-strategy"),
model=dgdr_model,
image=dgdr_image,
search_strategy="superfast", # not a valid strategy
)
with pytest.raises(k8s_exceptions.ApiException):
managed_dgdr.run(managed_dgdr.server_dry_run(manifest))
def test_invalid_optimization_type_rejected(
self, managed_dgdr: ManagedDGDR, dgdr_image: str, dgdr_model: str
) -> None:
"""
An invalid sla.optimizationType value must be rejected by the
admission webhook. Valid values: latency, throughput.
"""
manifest = build_dgdr_manifest(
unique_dgdr_name("bad-opt-type"),
model=dgdr_model,
image=dgdr_image,
sla={"optimizationType": "cost"}, # not valid
)
with pytest.raises(k8s_exceptions.ApiException):
managed_dgdr.run(managed_dgdr.server_dry_run(manifest))
def test_valid_minimal_dgdr_accepted(
self, managed_dgdr: ManagedDGDR, dgdr_image: str, dgdr_model: str
) -> None:
"""
A DGDR with only the required fields (model + image) must pass validation.
All other fields have defaults and are optional.
"""
manifest = build_dgdr_manifest(
unique_dgdr_name("valid-minimal"),
model=dgdr_model,
image=dgdr_image,
)
# Should not raise — accepted by the webhook
managed_dgdr.run(managed_dgdr.server_dry_run(manifest))
def test_valid_full_spec_accepted(
self, managed_dgdr: ManagedDGDR, dgdr_image: str, dgdr_model: str
) -> None:
"""
A fully-specified v1beta1 DGDR should pass webhook validation.
Exercises every top-level optional field.
"""
manifest = build_dgdr_manifest(
unique_dgdr_name("valid-full"),
model=dgdr_model,
image=dgdr_image,
backend="vllm",
search_strategy="rapid",
sla={"ttft": 200.0, "itl": 20.0},
workload={"isl": 3000, "osl": 150},
features={
"planner": {"plannerPreDeploymentSweeping": "rapid"},
"mocker": {"enabled": False},
},
hardware={"numGpusPerNode": 8},
auto_apply=True,
)
# Should not raise — accepted by the webhook
managed_dgdr.run(managed_dgdr.server_dry_run(manifest))
def test_v1beta1_is_storage_version(self, dgdr_namespace: str) -> None:
"""
The CRD's storage version must be v1beta1 (it is the conversion hub).
"""
result = _run_kubectl(
[
"get",
"crd",
"dynamographdeploymentrequests.nvidia.com",
"-o",
"jsonpath={.status.storedVersions}",
],
check=False,
)
assert result.returncode == 0, f"Failed to get CRD: {result.stderr}"
assert (
"v1beta1" in result.stdout
), f"v1beta1 should be the storage version. Got: {result.stdout}"
def test_kubectl_shortname_dgdr_works(self, dgdr_namespace: str) -> None:
"""
kubectl get dgdr must work (tests the shortName 'dgdr' in the CRD).
"""
result = _run_kubectl(
["get", DGDR_SHORT_NAME, "-n", dgdr_namespace, "--ignore-not-found"],
check=False,
)
assert (
result.returncode == 0
), f"kubectl get dgdr failed (shortname may not be registered). stderr: {result.stderr}"
def test_kubectl_get_columns_schema(
self, dgdr_namespace: str, dgdr_image: str, dgdr_model: str, dgdr_factory
) -> None:
"""
kubectl get dgdr should output the columns defined in the CRD:
NAME, MODEL, BACKEND, PHASE, PROFILING, DGD, AGE.
"""
name = unique_dgdr_name("col-test")
manifest = build_dgdr_manifest(name, model=dgdr_model, image=dgdr_image)
dgdr_factory(manifest)
result = _run_kubectl(
["get", DGDR_SHORT_NAME, name, "-n", dgdr_namespace],
check=False,
)
assert result.returncode == 0, f"kubectl get dgdr failed: {result.stderr}"
header = (
result.stdout.splitlines()[0].upper() if result.stdout.splitlines() else ""
)
expected_columns = {"NAME", "MODEL", "BACKEND", "PHASE"}
for col in expected_columns:
assert (
col in header
), f"Expected column {col!r} in kubectl output header. Got: {header}"
# ---------------------------------------------------------------------------
# ── Group 2: v1alpha1 → v1beta1 Version Conversion ─────────────────────────
# ---------------------------------------------------------------------------
@pytest.mark.gpu_0
@pytest.mark.nightly
@pytest.mark.integration
@pytest.mark.k8s
class TestDGDRVersionConversion:
"""
Tests that v1alpha1 DGDR resources can be submitted and are stored
transparently as v1beta1 (conversion hub). No profiling required.
"""
def test_v1alpha1_dgdr_can_be_applied(
self, dgdr_namespace: str, dgdr_image: str, dgdr_model: str, dgdr_factory
) -> None:
"""
A v1alpha1 DynamoGraphDeploymentRequest should be accepted and
automatically converted to v1beta1 storage by the conversion webhook.
Note: v1alpha1 manifests use a different spec shape (profilingConfig
instead of image) so we must use kubectl here rather than the
v1beta1-only ManagedDGDR client.
"""
name = unique_dgdr_name("v1a1")
v1alpha1_manifest = {
"apiVersion": "nvidia.com/v1alpha1",
"kind": "DynamoGraphDeploymentRequest",
"metadata": {"name": name},
"spec": {
"model": dgdr_model,
"backend": "vllm",
"profilingConfig": {
"profilerImage": dgdr_image,
},
},
}
yaml_str = yaml.dump(v1alpha1_manifest)
result = _run_kubectl(
["apply", "-n", dgdr_namespace, "-f", "-"], input=yaml_str, check=False
)
if result.returncode == 0:
# Register for cleanup without re-creating (resource already exists)
dgdr_factory.register_for_cleanup(name)
# Either accepted (0) or rejected for a known conversion reason – just not a 500
assert result.returncode in (
0,
1,
), f"Unexpected error applying v1alpha1 DGDR: {result.stderr}"
def test_v1beta1_get_on_v1alpha1_object(
self,
managed_dgdr: ManagedDGDR,
dgdr_namespace: str,
dgdr_image: str,
dgdr_model: str,
dgdr_factory,
) -> None:
"""
A resource stored as v1beta1 must be retrievable as v1alpha1 via conversion.
"""
name = unique_dgdr_name("conv-get")
manifest = build_dgdr_manifest(name, model=dgdr_model, image=dgdr_image)
dgdr_factory(manifest)
# Retrieve as v1beta1 (storage version) via ManagedDGDR
obj_v1beta1 = managed_dgdr.run(managed_dgdr.get(name))
assert obj_v1beta1 is not None
assert obj_v1beta1["apiVersion"] == DGDR_API_VERSION
# Retrieve as v1alpha1 (should trigger conversion webhook).
# Must use kubectl here since ManagedDGDR targets v1beta1 only.
result = _run_kubectl(
[
"get",
"dynamographdeploymentrequests.v1alpha1.nvidia.com",
name,
"-n",
dgdr_namespace,
"-o",
"json",
],
check=False,
)
# If the conversion webhook is working, we get a 200 with v1alpha1 resource.
# If not registered, we may get a 404 - that is also acceptable here as
# some cluster configs only register v1beta1.
assert result.returncode in (
0,
1,
), f"Unexpected failure getting v1alpha1 DGDR: {result.stderr}"
if result.returncode == 0:
obj_v1alpha1 = json.loads(result.stdout)
assert (
obj_v1alpha1["apiVersion"] == "nvidia.com/v1alpha1"
), "Retrieved object should have v1alpha1 apiVersion"
......@@ -1179,6 +1179,287 @@ class ManagedDeployment:
await self._cleanup()
class ManagedDGDR:
"""Async helper for managing DynamoGraphDeploymentRequest custom resources.
Provides CRUD operations and phase-polling against the DGDR CRD using the
``kubernetes_asyncio`` client, following the same patterns as
``ManagedDeployment`` (shared kubeconfig initialisation, timeout logic,
structured error messages).
Typical usage from a pytest fixture::
dgdr = ManagedDGDR(namespace="default")
await dgdr.init()
await dgdr.create(manifest)
phase = await dgdr.wait_for_phase(name, "Ready", timeout=600)
await dgdr.delete(name)
await dgdr.close()
"""
# CRD coordinates for DGDR
DGDR_GROUP = "nvidia.com"
DGDR_VERSION = "v1beta1"
DGDR_PLURAL = "dynamographdeploymentrequests"
# CRD coordinates for DGD (for mocker cleanup)
DGD_PLURAL = "dynamographdeployments"
DEFAULT_POLL_INTERVAL = 10 # seconds
def __init__(
self,
namespace: str = "default",
loop: Optional[asyncio.AbstractEventLoop] = None,
):
self.namespace = namespace
self._custom_api: Optional[client.CustomObjectsApi] = None
self._api_client: Optional[client.ApiClient] = None
self._logger = logging.getLogger(self.__class__.__name__)
self._loop = loop
def run(self, coro):
"""Run an async coroutine synchronously using the stored event loop.
Convenience for callers that are not themselves async (e.g. pytest
fixtures and synchronous test methods).
"""
if self._loop is None:
raise RuntimeError(
"No event loop set on ManagedDGDR; pass loop= at construction or call init() first"
)
return self._loop.run_until_complete(coro)
async def init(self) -> None:
"""Initialise the kubernetes_asyncio client.
Priority: KUBECONFIG env → in-cluster → ~/.kube/config (same as
ManagedDeployment._init_kubernetes).
"""
kubeconfig_path = os.environ.get("KUBECONFIG")
if kubeconfig_path and os.path.exists(kubeconfig_path):
self._logger.info("Loading kubeconfig from KUBECONFIG: %s", kubeconfig_path)
await config.load_kube_config(config_file=kubeconfig_path)
else:
try:
self._logger.info("Attempting in-cluster kubernetes config")
config.load_incluster_config()
except Exception as e:
self._logger.warning(
"In-cluster config failed (%s: %s), falling back to default kubeconfig",
type(e).__name__,
e,
)
await config.load_kube_config()
self._api_client = client.ApiClient()
self._custom_api = client.CustomObjectsApi(self._api_client)
async def close(self) -> None:
"""Close the underlying API client."""
if self._api_client:
await self._api_client.close()
self._api_client = None
self._custom_api = None
# ----- CRUD -----
async def create(self, manifest: dict) -> str:
"""Create a DGDR custom resource. Returns the resource name."""
assert self._custom_api is not None, "call init() first"
name = manifest["metadata"]["name"]
await self._custom_api.create_namespaced_custom_object(
group=self.DGDR_GROUP,
version=self.DGDR_VERSION,
namespace=self.namespace,
plural=self.DGDR_PLURAL,
body=manifest,
)
self._logger.info("Created DGDR %s/%s", self.namespace, name)
return name
async def get(self, name: str) -> Optional[dict]:
"""Get a DGDR as a dict, or ``None`` if not found."""
assert self._custom_api is not None, "call init() first"
try:
return await self._custom_api.get_namespaced_custom_object(
group=self.DGDR_GROUP,
version=self.DGDR_VERSION,
namespace=self.namespace,
plural=self.DGDR_PLURAL,
name=name,
)
except exceptions.ApiException as e:
if e.status == 404:
return None
raise
async def delete(self, name: str, ignore_not_found: bool = True) -> None:
"""Delete a DGDR."""
assert self._custom_api is not None, "call init() first"
try:
await self._custom_api.delete_namespaced_custom_object(
group=self.DGDR_GROUP,
version=self.DGDR_VERSION,
namespace=self.namespace,
plural=self.DGDR_PLURAL,
name=name,
)
self._logger.info("Deleted DGDR %s/%s", self.namespace, name)
except exceptions.ApiException as e:
if e.status == 404 and ignore_not_found:
return
raise
async def list(self, label_selector: str = "") -> List[dict]:
"""List DGDRs, optionally filtered by label selector. Returns items."""
assert self._custom_api is not None, "call init() first"
resp = await self._custom_api.list_namespaced_custom_object(
group=self.DGDR_GROUP,
version=self.DGDR_VERSION,
namespace=self.namespace,
plural=self.DGDR_PLURAL,
label_selector=label_selector,
)
return resp.get("items", [])
async def server_dry_run(self, manifest: dict) -> dict:
"""Apply with server-side dry-run to validate admission webhooks.
Returns the API response dict. Raises ``ApiException`` on rejection.
"""
assert self._custom_api is not None, "call init() first"
return await self._custom_api.create_namespaced_custom_object(
group=self.DGDR_GROUP,
version=self.DGDR_VERSION,
namespace=self.namespace,
plural=self.DGDR_PLURAL,
body=manifest,
dry_run="All",
)
# ----- Phase helpers -----
async def get_phase(self, name: str) -> Optional[str]:
"""Return ``status.phase`` of the named DGDR, or ``None``."""
obj = await self.get(name)
if obj is None:
return None
return obj.get("status", {}).get("phase")
async def get_condition(self, name: str, condition_type: str) -> Optional[dict]:
"""Return the named condition dict from ``status.conditions``."""
obj = await self.get(name)
if obj is None:
return None
for c in obj.get("status", {}).get("conditions", []):
if c.get("type") == condition_type:
return c
return None
async def wait_for_phase(
self,
name: str,
target_phase: str,
timeout: int = 3600,
fail_fast_phases: Optional[List[str]] = None,
poll_interval: int = DEFAULT_POLL_INTERVAL,
) -> str:
"""Poll until the DGDR reaches *target_phase* or times out.
Returns the final observed phase. Raises ``AssertionError`` on
fail-fast and ``TimeoutError`` on timeout.
"""
if fail_fast_phases is None:
fail_fast_phases = ["Failed"]
deadline = time.monotonic() + timeout
last_phase: Optional[str] = None
while time.monotonic() < deadline:
current = await self.get_phase(name)
if current != last_phase:
self._logger.info("DGDR %s/%s phase: %s", self.namespace, name, current)
last_phase = current
if current == target_phase:
return current
if current in fail_fast_phases:
obj = await self.get(name)
conditions = obj.get("status", {}).get("conditions", []) if obj else []
raise AssertionError(
f"DGDR {self.namespace}/{name} reached fail-fast phase {current!r} "
f"while waiting for {target_phase!r}. conditions={conditions}"
)
await asyncio.sleep(poll_interval)
raise TimeoutError(
f"Timed out after {timeout}s waiting for DGDR {self.namespace}/{name} "
f"to reach phase {target_phase!r}. Last phase: {last_phase!r}"
)
async def wait_for_any_phase(
self,
name: str,
target_phases: List[str],
timeout: int = 3600,
poll_interval: int = DEFAULT_POLL_INTERVAL,
) -> str:
"""Poll until the DGDR reaches any of *target_phases*. Returns matched phase."""
deadline = time.monotonic() + timeout
last_phase: Optional[str] = None
while time.monotonic() < deadline:
current = await self.get_phase(name)
if current != last_phase:
self._logger.info("DGDR %s/%s phase: %s", self.namespace, name, current)
last_phase = current
if current in target_phases:
return current
await asyncio.sleep(poll_interval)
raise TimeoutError(
f"Timed out after {timeout}s waiting for DGDR {self.namespace}/{name} "
f"to reach any of {target_phases!r}. Last phase: {last_phase!r}"
)
# ----- DGD helpers (for mocker cleanup) -----
async def delete_dgd(self, name: str, ignore_not_found: bool = True) -> None:
"""Delete a DynamoGraphDeployment resource."""
assert self._custom_api is not None, "call init() first"
try:
await self._custom_api.delete_namespaced_custom_object(
group=self.DGDR_GROUP,
version="v1alpha1",
namespace=self.namespace,
plural=self.DGD_PLURAL,
name=name,
)
self._logger.info("Deleted DGD %s/%s", self.namespace, name)
except exceptions.ApiException as e:
if e.status == 404 and ignore_not_found:
return
raise
async def get_dgd(self, name: str) -> Optional[dict]:
"""Get a DynamoGraphDeployment, or ``None`` if not found."""
assert self._custom_api is not None, "call init() first"
try:
return await self._custom_api.get_namespaced_custom_object(
group=self.DGDR_GROUP,
version="v1alpha1",
namespace=self.namespace,
plural=self.DGD_PLURAL,
name=name,
)
except exceptions.ApiException as e:
if e.status == 404:
return None
raise
async def main():
LOG_FORMAT = "[TEST] %(asctime)s %(levelname)s %(name)s: %(message)s"
DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment