test_helpers_profile_sla.py 17.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Unit tests for profile_sla.py private helper functions.

These tests exercise each helper in isolation, without running the full
profiling pipeline.  External I/O (DGD generation, deployment) is mocked
where needed.
"""

import os
import sys
from pathlib import Path
from unittest.mock import patch

import pytest
import yaml

project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))

try:
    from dynamo.planner.utils.planner_config import (
        PlannerConfig,
        PlannerPreDeploymentSweepMode,
    )
    from dynamo.profiler.profile_sla import (
        _assemble_final_config,
        _extract_profiler_params,
        _write_final_output,
    )
    from dynamo.profiler.utils.config_modifiers.parallelization_mapping import (
        PickedParallelConfig,
    )
    from dynamo.profiler.utils.defaults import SearchStrategy
    from dynamo.profiler.utils.dgdr_v1beta1_types import (
        DynamoGraphDeploymentRequestSpec,
        FeaturesSpec,
        HardwareSpec,
        MockerSpec,
        SLASpec,
        WorkloadSpec,
    )
44
45
46
47
    from dynamo.profiler.utils.dgdr_validate import (
        valid_dgdr_spec,
        validate_dgdr_dynamo_features,
    )
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
    from dynamo.profiler.utils.profile_common import ProfilerOperationalConfig
except ImportError as e:
    pytest.skip(f"Skip (missing dependency): {e}", allow_module_level=True)


# ---------------------------------------------------------------------------
# Shared fixtures
# ---------------------------------------------------------------------------


def _make_dgdr(**overrides) -> DynamoGraphDeploymentRequestSpec:
    """Build a minimal dgdr with all required fields set."""
    base = dict(
        model="Qwen/Qwen3-32B",
        backend="trtllm",
        image="nvcr.io/nvidia/ai-dynamo/dynamo-frontend:latest",
        hardware=HardwareSpec(gpuSku="h200_sxm", totalGpus=8, numGpusPerNode=8),
        workload=WorkloadSpec(isl=4000, osl=1000),
        sla=SLASpec(ttft=2000.0, itl=50.0),
    )
    base.update(overrides)
    return DynamoGraphDeploymentRequestSpec(**base)


def _make_planner(**overrides) -> PlannerConfig:
    base = dict(
        enable_throughput_scaling=True,
        enable_load_scaling=False,
        pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
        mode="disagg",
        backend="trtllm",
    )
    base.update(overrides)
    return PlannerConfig(**base)


def _make_ops(tmp_path, **kwargs) -> ProfilerOperationalConfig:
    return ProfilerOperationalConfig(
        output_dir=str(tmp_path / "out"),
        **kwargs,
    )


# ---------------------------------------------------------------------------
# _extract_profiler_params
# ---------------------------------------------------------------------------


class TestExtractProfilerParams:
    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_basic_ttft_itl(self):
        """Returns correct values when ttft/itl SLA is used."""
        dgdr = _make_dgdr()
        (
            model,
            backend,
            system,
            total_gpus,
            isl,
            osl,
            req_lat,
            ttft,
            tpot,
            strategy,
            picking,
        ) = _extract_profiler_params(dgdr)

        assert model == "Qwen/Qwen3-32B"
        assert backend == "trtllm"
        assert system == "h200_sxm"
        assert total_gpus == 8
        assert isl == 4000
        assert osl == 1000
        assert req_lat is None
        assert ttft == 2000.0
        assert tpot == 50.0
        assert strategy == SearchStrategy.RAPID
        assert picking == "default"

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_e2e_latency_sets_both_targets(self):
        """Both ttft and tpot equal e2eLatency when it is set."""
        dgdr = _make_dgdr(sla=SLASpec(ttft=None, itl=None, e2eLatency=35000.0))
        _, _, _, _, _, _, req_lat, ttft, tpot, _, _ = _extract_profiler_params(dgdr)
        assert req_lat == 35000.0
        assert ttft == 35000.0
        assert tpot == 35000.0

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_request_rate_yields_load_match_picking(self):
        """requestRate present in workload → picking_mode == 'load_match'."""
        dgdr = _make_dgdr(workload=WorkloadSpec(isl=4000, osl=1000, requestRate=5.0))
        _, _, _, _, _, _, _, _, _, _, picking = _extract_profiler_params(dgdr)
        assert picking == "load_match"

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_backend_lowercased(self):
        """backend value is always lower-cased."""
        dgdr = _make_dgdr(backend="trtllm")
        _, backend, _, _, _, _, _, _, _, _, _ = _extract_profiler_params(dgdr)
        assert backend == "trtllm"
        assert backend == backend.lower()

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_thorough_strategy_extracted(self):
        """searchStrategy: thorough is correctly reflected in the returned tuple."""
        dgdr = _make_dgdr(searchStrategy="thorough")
        _, _, _, _, _, _, _, _, _, strategy, _ = _extract_profiler_params(dgdr)
        assert strategy == SearchStrategy.THOROUGH


# ---------------------------------------------------------------------------
165
# valid_dgdr_spec
166
167
168
# ---------------------------------------------------------------------------


169
class TestValidDgdrSpec:
170
171
172
173
    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_thorough_auto_backend_raises(self):
        """THOROUGH + 'auto' backend is rejected."""
174
        dgdr = _make_dgdr(searchStrategy="thorough", backend="auto")
175
        with pytest.raises(ValueError, match="does not support 'auto' backend"):
176
            valid_dgdr_spec(dgdr)
177
178
179
180
181

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_thorough_concrete_backend_passes(self):
        """THOROUGH + concrete backend is fine."""
182
183
        dgdr = _make_dgdr(searchStrategy="thorough", backend="trtllm")
        valid_dgdr_spec(dgdr)
184
185
186
187
188

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_rapid_auto_backend_passes(self):
        """RAPID allows 'auto' backend."""
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
        dgdr = _make_dgdr(backend="auto")
        valid_dgdr_spec(dgdr)

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_missing_image_raises(self):
        """image is required."""
        dgdr = _make_dgdr(image="")
        with pytest.raises(ValueError, match="image.*required"):
            valid_dgdr_spec(dgdr)

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_missing_hardware_raises(self):
        """hardware is required."""
        dgdr = _make_dgdr(hardware=None)
        with pytest.raises(ValueError, match="hardware.*required"):
            valid_dgdr_spec(dgdr)

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_missing_gpu_sku_raises(self):
        """hardware.gpuSku is required."""
        dgdr = _make_dgdr(hardware=HardwareSpec(gpuSku="", numGpusPerNode=8))
        with pytest.raises(ValueError, match="gpuSku.*required"):
            valid_dgdr_spec(dgdr)

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_zero_gpus_per_node_raises(self):
        """hardware.numGpusPerNode must be positive."""
        dgdr = _make_dgdr(hardware=HardwareSpec(gpuSku="h200_sxm", numGpusPerNode=0))
        with pytest.raises(ValueError, match="numGpusPerNode.*positive"):
            valid_dgdr_spec(dgdr)

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_none_workload_gets_default(self):
        """None workload is populated with a default WorkloadSpec."""
        dgdr = _make_dgdr(workload=None)
        valid_dgdr_spec(dgdr)
        assert dgdr.workload is not None

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_none_sla_gets_default(self):
        """None sla is populated with a default SLASpec."""
        dgdr = _make_dgdr(sla=None)
        valid_dgdr_spec(dgdr)
        assert dgdr.sla is not None

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_both_concurrency_and_rate_raises(self):
        """concurrency and requestRate are mutually exclusive."""
        dgdr = _make_dgdr(
            workload=WorkloadSpec(isl=4000, osl=1000, concurrency=10, requestRate=5.0)
246
        )
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
        with pytest.raises(ValueError, match="concurrency.*requestRate"):
            valid_dgdr_spec(dgdr)

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_negative_sla_ttft_raises(self):
        """Negative SLA ttft must be rejected."""
        dgdr = _make_dgdr(sla=SLASpec(ttft=-1.0, itl=30.0))
        with pytest.raises(ValueError, match="ttft.*positive"):
            valid_dgdr_spec(dgdr)

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_e2e_latency_clears_ttft_itl(self):
        """e2eLatency takes precedence and nulls out ttft/itl."""
        dgdr = _make_dgdr(sla=SLASpec(ttft=None, itl=None, e2eLatency=35000.0))
        valid_dgdr_spec(dgdr)
        assert dgdr.sla.ttft is None
        assert dgdr.sla.itl is None
        assert dgdr.sla.e2eLatency == 35000.0

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_missing_ttft_and_itl_and_e2e_raises(self):
        """At least ttft+itl or e2eLatency must be provided."""
        dgdr = _make_dgdr(sla=SLASpec(ttft=None, itl=None, e2eLatency=None))
        with pytest.raises(ValueError, match="ttft.*itl.*e2eLatency"):
            valid_dgdr_spec(dgdr)

276

277
278
279
280
281
282
# ---------------------------------------------------------------------------
# validate_dgdr_dynamo_features
# ---------------------------------------------------------------------------


class TestValidateDgdrDynamoFeatures:
283
284
    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
285
286
    def test_no_features_passes(self):
        """No features → no error."""
287
        dgdr = _make_dgdr()
288
        validate_dgdr_dynamo_features(dgdr, aic_supported=False)
289
290
291

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
292
293
    def test_planner_throughput_scaling_aic_unsupported_rapid_sweep_raises(self):
        """Throughput scaling + rapid sweep + AIC unsupported is rejected."""
294
295
296
297
        dgdr = _make_dgdr(
            features=FeaturesSpec(
                planner=_make_planner(
                    enable_throughput_scaling=True,
298
                    pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
299
300
301
302
                    backend="vllm",
                )
            )
        )
303
304
        with pytest.raises(ValueError, match="AIC does not support"):
            validate_dgdr_dynamo_features(dgdr, aic_supported=False)
305
306
307

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
308
309
    def test_planner_throughput_scaling_aic_supported_passes(self):
        """Throughput scaling + rapid sweep + AIC supported is fine."""
310
311
312
313
        planner = _make_planner(
            pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
        )
        dgdr = _make_dgdr(features=FeaturesSpec(planner=planner))
314
        validate_dgdr_dynamo_features(dgdr, aic_supported=True)
315
316
        assert (
            dgdr.features.planner.pre_deployment_sweeping_mode
317
            == PlannerPreDeploymentSweepMode.Rapid
318
319
320
321
        )

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
322
323
    def test_planner_load_scaling_only_aic_unsupported_passes(self):
        """Load scaling only (no throughput scaling) + AIC unsupported passes."""
324
        planner = _make_planner(
325
326
            enable_throughput_scaling=False,
            enable_load_scaling=True,
327
            pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
328
            backend="vllm",
329
330
        )
        dgdr = _make_dgdr(features=FeaturesSpec(planner=planner))
331
        validate_dgdr_dynamo_features(dgdr, aic_supported=False)
332
333
334
335
336
        assert (
            dgdr.features.planner.pre_deployment_sweeping_mode
            == PlannerPreDeploymentSweepMode.Rapid
        )

337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_mocker_enabled_sweep_none_raises(self):
        """Mocker enabled + sweep mode None_ is rejected."""
        dgdr = _make_dgdr(
            features=FeaturesSpec(
                planner=_make_planner(
                    enable_throughput_scaling=False,
                    enable_load_scaling=True,
                    pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.None_,
                ),
                mocker=MockerSpec(enabled=True),
            )
        )
        with pytest.raises(ValueError, match="cannot be 'none'.*mocker"):
            validate_dgdr_dynamo_features(dgdr, aic_supported=True)

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_mocker_enabled_sweep_rapid_passes(self):
        """Mocker enabled + sweep mode Rapid is fine."""
        dgdr = _make_dgdr(
            features=FeaturesSpec(
                planner=_make_planner(
                    enable_throughput_scaling=False,
                    enable_load_scaling=True,
                    pre_deployment_sweeping_mode=PlannerPreDeploymentSweepMode.Rapid,
                ),
                mocker=MockerSpec(enabled=True),
            )
        )
        validate_dgdr_dynamo_features(dgdr, aic_supported=True)

370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519

# ---------------------------------------------------------------------------
# _write_final_output
# ---------------------------------------------------------------------------


class TestWriteFinalOutput:
    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_normal_config_writes_file_and_returns_true(self, tmp_path):
        ops = _make_ops(tmp_path)
        os.makedirs(ops.output_dir, exist_ok=True)
        final_config = {"apiVersion": "v1", "kind": "Deployment"}

        result = _write_final_output(ops, final_config)

        assert result is True
        out = Path(ops.output_dir) / "final_config.yaml"
        assert out.exists()
        assert yaml.safe_load(out.read_text()) == final_config

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_list_config_writes_multi_doc_yaml(self, tmp_path):
        ops = _make_ops(tmp_path)
        os.makedirs(ops.output_dir, exist_ok=True)
        final_config = [{"kind": "A"}, {"kind": "B"}]

        result = _write_final_output(ops, final_config)

        assert result is True
        out = Path(ops.output_dir) / "final_config.yaml"
        docs = list(yaml.safe_load_all(out.read_text()))
        assert len(docs) == 2

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_none_config_not_dry_run_returns_false(self, tmp_path):
        ops = _make_ops(tmp_path, dry_run=False)
        os.makedirs(ops.output_dir, exist_ok=True)

        result = _write_final_output(ops, None)

        assert result is False

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_none_config_dry_run_writes_empty_yaml_and_returns_true(self, tmp_path):
        ops = _make_ops(tmp_path, dry_run=True)
        os.makedirs(ops.output_dir, exist_ok=True)

        result = _write_final_output(ops, None)

        assert result is True
        out = Path(ops.output_dir) / "final_config.yaml"
        assert out.exists()
        assert yaml.safe_load(out.read_text()) is None  # empty YAML == None


# ---------------------------------------------------------------------------
# _assemble_final_config
# ---------------------------------------------------------------------------


class TestAssembleFinalConfig:
    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_no_planner_no_mocker_returns_dgd_config_unchanged(self, tmp_path):
        dgdr = _make_dgdr()
        ops = _make_ops(tmp_path)
        dgd_config = {"kind": "DynamoGraphDeployment"}

        result = _assemble_final_config(
            dgdr,
            ops,
            dgd_config,
            PickedParallelConfig(tp=1),
            PickedParallelConfig(tp=1),
        )

        assert result is dgd_config

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_none_dgd_config_passes_through_as_none(self, tmp_path):
        dgdr = _make_dgdr()
        ops = _make_ops(tmp_path)

        result = _assemble_final_config(
            dgdr,
            ops,
            None,
            PickedParallelConfig(tp=1),
            PickedParallelConfig(tp=1),
        )

        assert result is None

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_planner_no_mocker_returns_real_config(self, tmp_path):
        dgdr = _make_dgdr(features=FeaturesSpec(planner=_make_planner()))
        ops = _make_ops(tmp_path)
        os.makedirs(ops.output_dir, exist_ok=True)
        dgd_config = {"kind": "DGD"}
        real_cfg = {"kind": "real"}
        mocker_cfg = {"kind": "mocker"}

        with patch(
            "dynamo.profiler.profile_sla.generate_dgd_config_with_planner",
            return_value=(real_cfg, mocker_cfg),
        ):
            result = _assemble_final_config(
                dgdr,
                ops,
                dgd_config,
                PickedParallelConfig(tp=1),
                PickedParallelConfig(tp=1),
            )

        assert result is real_cfg

    @pytest.mark.pre_merge
    @pytest.mark.gpu_0
    def test_mocker_enabled_returns_mocker_config(self, tmp_path):
        dgdr = _make_dgdr(
            features=FeaturesSpec(
                planner=_make_planner(),
                mocker=MockerSpec(enabled=True),
            )
        )
        ops = _make_ops(tmp_path)
        os.makedirs(ops.output_dir, exist_ok=True)
        dgd_config = {"kind": "DGD"}
        real_cfg = {"kind": "real"}
        mocker_cfg = {"kind": "mocker"}

        with patch(
            "dynamo.profiler.profile_sla.generate_dgd_config_with_planner",
            return_value=(real_cfg, mocker_cfg),
        ):
            result = _assemble_final_config(
                dgdr,
                ops,
                dgd_config,
                PickedParallelConfig(tp=1),
                PickedParallelConfig(tp=1),
            )

        assert result is mocker_cfg