profile_sla.py 32.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16
import asyncio
17
18
19
import logging
import math
import os
20
from dataclasses import dataclass, field
21
22
23

import numpy as np
import yaml
24

25
from benchmarks.profiler.utils.aiperf import benchmark_decode, benchmark_prefill
26
from benchmarks.profiler.utils.config_modifiers import CONFIG_MODIFIERS
27
28
29
30
31
32
from benchmarks.profiler.utils.config_modifiers.parallelization_mapping import (
    ParallelizationMapping,
    apply_parallel_mapping_to_config,
    get_candidate_parallel_mappings,
)
from benchmarks.profiler.utils.defaults import EngineType
33
from benchmarks.profiler.utils.dgd_generation import generate_dgd_config_with_planner
34
from benchmarks.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator
35
36
from benchmarks.profiler.utils.plot import (
    plot_decode_performance,
37
    plot_pd_joint_results,
38
39
    plot_prefill_performance,
)
40
from benchmarks.profiler.utils.profile_decode import (
41
    get_num_request_range,
42
43
44
45
46
47
48
    profile_decode,
    profile_decode_aiconfigurator,
)
from benchmarks.profiler.utils.profile_prefill import (
    profile_prefill,
    profile_prefill_aiconfigurator,
)
49
from benchmarks.profiler.utils.profiler_argparse import create_profiler_parser
50
51
52
53
from deploy.utils.dynamo_deployment import (
    DynamoDeploymentClient,
    cleanup_remaining_deployments,
)
54
from dynamo.planner.defaults import WORKER_COMPONENT_NAMES
55

56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114

@dataclass
class PrefillProfileData:
    """Container for prefill profiling results."""

    num_gpus: list[int] = field(default_factory=list)
    ttft: list[float] = field(default_factory=list)
    thpt_per_gpu: list[float] = field(default_factory=list)
    parallel_mapping_labels: list[str] = field(default_factory=list)
    parallel_mappings: list[ParallelizationMapping] = field(default_factory=list)

    def add_data(
        self,
        num_gpus: int,
        ttft: float,
        thpt_per_gpu: float,
        parallel_mapping_label: str,
        parallel_mapping: ParallelizationMapping,
    ) -> None:
        """Add a complete data point to the profile data."""
        self.num_gpus.append(num_gpus)
        self.ttft.append(ttft)
        self.thpt_per_gpu.append(thpt_per_gpu)
        self.parallel_mapping_labels.append(parallel_mapping_label)
        self.parallel_mappings.append(parallel_mapping)


@dataclass
class DecodeProfileData:
    """Container for decode profiling results."""

    num_gpus: list[int] = field(default_factory=list)
    itl: list[float] = field(default_factory=list)
    thpt_per_gpu: list[float] = field(default_factory=list)
    concurrency: list[int] = field(default_factory=list)
    kv_cache_size: list[int] = field(default_factory=list)
    parallel_mapping_labels: list[str] = field(default_factory=list)
    parallel_mappings: list[ParallelizationMapping] = field(default_factory=list)

    def add_data(
        self,
        num_gpus: int,
        itl: float,
        thpt_per_gpu: float,
        concurrency: int,
        kv_cache_size: int,
        parallel_mapping_label: str,
        parallel_mapping: ParallelizationMapping,
    ) -> None:
        """Add a complete data point to the profile data."""
        self.num_gpus.append(num_gpus)
        self.itl.append(itl)
        self.thpt_per_gpu.append(thpt_per_gpu)
        self.concurrency.append(concurrency)
        self.kv_cache_size.append(kv_cache_size)
        self.parallel_mapping_labels.append(parallel_mapping_label)
        self.parallel_mappings.append(parallel_mapping)


115
116
117
118
119
120
121
122
123
124
125
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
    "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)


126
127
128
async def run_profile(args):
    # List to track all created deployment clients for cleanup in case of failure
    deployment_clients = []
129

130
131
132
133
    # Inherit aic_backend from backend if not explicitly set
    if not args.aic_backend:
        args.aic_backend = args.backend

134
    try:
135
        # Log MoE model support
136
        if args.model_info.is_moe:
137
138
139
140
141
142
143
144
145
146
147
148
149
150
            logger.info(
                "MoE (Mixture of Experts) model profiling, sweeping TEP size for prefill and DEP size for decode"
            )
            assert args.backend in [
                "sglang"
            ], "MoE model support is only available for SGLang"
            assert (
                not args.use_ai_configurator
            ), "MoE model is not supported in ai-configurator"
        else:
            logger.info(
                "Standard dense model profiling, sweeping TP size for both prefill and decode"
            )

151
152
153
154
        config_modifier = CONFIG_MODIFIERS[args.backend]

        with open(args.config, "r") as f:
            config = yaml.safe_load(f)
155

156
157
158
159
        if args.dgd_image:
            config = config_modifier.update_image(config, args.dgd_image)
            logger.info(f"Using DGD image: {args.dgd_image}")

160
161
162
163
164
        profile_num_gpus = [
            2**i
            for i in range(int(math.log2(args.max_num_gpus_per_engine)) + 1)
            if args.min_num_gpus_per_engine <= 2**i <= args.max_num_gpus_per_engine
        ]
165
        if args.model_info.is_moe:
166
167
168
            logger.info(f"Profiling MoE GPU counts (TEP/DEP): {profile_num_gpus}")
        else:
            logger.info(f"Profiling dense model GPU counts (TP): {profile_num_gpus}")
169

170
        os.makedirs(args.output_dir, exist_ok=True)
171

172
        model_name = config_modifier.get_model_name(config)
173

174
175
176
177
178
179
180
181
182
        # Determine sweep max context length: allow user-provided cap to override model's if smaller
        use_specified_max_context_len = getattr(args, "max_context_length", None)
        model_max_context_len = args.model_info.max_context_length
        if not use_specified_max_context_len and not model_max_context_len:
            raise ValueError(
                "No max_context_length available from args.max_context_length or model_info from HF config"
            )
        elif not use_specified_max_context_len:
            sweep_max_context_length = model_max_context_len
183
            logger.info(
184
                f"Using model's maximum context length: {model_max_context_len}"
185
            )
186
187
        elif not model_max_context_len:
            sweep_max_context_length = use_specified_max_context_len
188
            logger.info(
189
                f"Using user-provided max_context_length: {use_specified_max_context_len}"
190
191
            )
        else:
192
193
194
195
196
197
            sweep_max_context_length = min(
                use_specified_max_context_len, model_max_context_len
            )
            logger.info(
                f"Using minimum of user-provided and model's maximum context length: {sweep_max_context_length}"
            )
198

199
200
201
202
203
204
205
206
207
        if args.use_ai_configurator:
            if not args.aic_system:
                raise ValueError(
                    "Must provide --aic-system when using --use-ai-configurator."
                )
            if not args.aic_model_name:
                raise ValueError(
                    "Must provide --aic-model-name when using --use-ai-configurator."
                )
208
            if not args.aic_backend_version:
209
                raise ValueError(
210
                    "Must provide --aic-backend-version when using --use-ai-configurator."
211
212
213
214
215
216
                )

            logger.info("Will use aiconfigurator to estimate perf.")
            ai_configurator_perf_estimator = AIConfiguratorPerfEstimator(
                args.aic_model_name,
                args.aic_system.lower(),
217
218
                args.aic_backend,
                args.aic_backend_version,
219
220
            )
        else:
221
            if args.aic_system or args.aic_model_name or args.aic_backend_version:
222
223
224
225
226
                logger.warning(
                    "Will ignore --aic-system, --aic-model-name, and/or --backend-version "
                    "when not using --use-ai-configurator."
                )

227
        # first profile prefill
228
        prefill_data = PrefillProfileData()
229
        logger.info("Profiling prefill...")
230
231
        base_prefill_config = config_modifier.convert_config(
            config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe
232
        )
233
        frontend_port = config_modifier.get_port(config)
234
235
        itl: float | None = None
        thpt_per_gpu: float | None = None
236
237
        for num_gpus in profile_num_gpus:
            logger.info(f"Profiling prefill with {num_gpus} GPUs...")
238
239
240
            candidate_mappings = get_candidate_parallel_mappings(
                num_gpus, args.model_info, EngineType.PREFILL
            )
241

242
243
244
245
246
247
248
249
            for mapping in candidate_mappings:
                # Apply parallel mapping to config
                prefill_config = apply_parallel_mapping_to_config(
                    base_prefill_config,
                    mapping,
                    EngineType.PREFILL,
                    config_modifier,
                    args.num_gpus_per_node,
250
                )
251
                logger.info(f"Dynamo config: {prefill_config}")
252

253
254
255
                # Work dir includes mapping label (safe chars only)
                parallel_mapping_tag = (
                    mapping.label().replace("=", "").replace("/", "_")
256
                )
257
258
                work_dir = (
                    f"{args.output_dir}/prefill_{num_gpus}gpus_{parallel_mapping_tag}"
259
                )
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
                os.makedirs(work_dir, exist_ok=True)

                prefill_config_fn = f"{work_dir}/config.yaml"
                with open(prefill_config_fn, "w") as f:
                    yaml.dump(prefill_config, f)

                ttft = None
                if args.dry_run:
                    logger.info("Skipping deployment creation in dry run mode")
                elif args.use_ai_configurator:
                    logger.info("Using ai-configurator to estimate prefill latency.")
                    perf_dict = ai_configurator_perf_estimator.estimate_prefill_perf(
                        args.isl,
                        tp_size=(mapping.tp or num_gpus),
                    )
                    ttft = perf_dict["context_latency"]
                    logger.info(f"Estimated prefill TTFT: {ttft:.2f}ms")
                else:
                    client = DynamoDeploymentClient(
                        namespace=args.namespace,
                        base_log_dir=work_dir,
                        model_name=model_name,
                        service_name=args.service_name,
                        frontend_port=frontend_port,
                        deployment_name=prefill_config["metadata"]["name"],
                    )
                    logger.info(
                        f"Created client with service_name: {client.service_name}"
                    )
                    deployment_clients.append(client)  # Track for cleanup
                    await client.create_deployment(prefill_config_fn)
                    logger.info("Waiting for deployment to be ready...")
                    await client.wait_for_deployment_ready()
                    logger.info("Deployment is ready")

                    logger.info("Getting deployment logs...")
                    await client.get_deployment_logs()
                    logger.info(
                        f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                    )
300

301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
                    # run ai-perf
                    base_url = client.get_service_url()
                    ai_perf_artifact_dir = f"{work_dir}/aiperf_isl{args.isl}"
                    aiperf_result = benchmark_prefill(
                        args.isl,
                        ai_perf_artifact_dir,
                        model_name,
                        model_name,
                        base_url=base_url,
                    )
                    if aiperf_result is not None:
                        ttft = aiperf_result["time_to_first_token"]["avg"]

                    logger.info("Cleaning up deployment...")
                    await client.delete_deployment()
                    deployment_clients.remove(client)
                    logger.info("Deployment deleted")

                if ttft is not None:
                    prefill_data.add_data(
                        num_gpus=num_gpus,
                        ttft=ttft,
                        thpt_per_gpu=args.isl / ttft / num_gpus * 1000,
                        parallel_mapping_label=mapping.label(),
                        parallel_mapping=mapping,
                    )
327

328
        # Plot the results as a 2D scatter plot
329
330
        if prefill_data.num_gpus and prefill_data.ttft and prefill_data.thpt_per_gpu:
            plot_prefill_performance(prefill_data, args.ttft, args.output_dir)
331

332
        # then profile decode
333
        decode_data = DecodeProfileData()
334
        logger.info("Profiling decode...")
335
336
        base_decode_config = config_modifier.convert_config(
            config, EngineType.DECODE, is_moe_model=args.model_info.is_moe
337
338
339
        )
        for num_gpus in profile_num_gpus:
            logger.info(f"Profiling decode with {num_gpus} GPUs...")
340
341
342
            candidate_mappings = get_candidate_parallel_mappings(
                num_gpus, args.model_info, EngineType.DECODE
            )
343

344
345
346
347
348
349
350
351
            for mapping in candidate_mappings:
                # Apply parallel mapping to config
                decode_config = apply_parallel_mapping_to_config(
                    base_decode_config,
                    mapping,
                    EngineType.DECODE,
                    config_modifier,
                    args.num_gpus_per_node,
352
                )
353
                logger.info(f"Dynamo config: {decode_config}")
354

355
356
357
358
                parallel_mapping_tag = (
                    mapping.label()
                    .replace("=", "")
                    .replace("/", "_")  # safe chars for directory
359
                )
360
361
                work_dir = (
                    f"{args.output_dir}/decode_{num_gpus}gpus_{parallel_mapping_tag}"
362
                )
363
                os.makedirs(work_dir, exist_ok=True)
364

365
366
367
                decode_config_fn = f"{work_dir}/config.yaml"
                with open(decode_config_fn, "w") as f:
                    yaml.dump(decode_config, f)
368

369
370
                if args.dry_run:
                    logger.info("Skipping deployment creation in dry run mode")
371

372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
                elif args.use_ai_configurator:
                    # Compute max_concurrency and max_kv_tokens to know which
                    # num_request to sweep over.
                    max_concurrency = ai_configurator_perf_estimator.get_max_batch_size(
                        args.isl, args.osl, tp_size=(mapping.tp or num_gpus)
                    )
                    max_kv_tokens = max_concurrency * (args.isl + args.osl)

                else:
                    client = DynamoDeploymentClient(
                        namespace=args.namespace,
                        base_log_dir=work_dir,
                        model_name=model_name,
                        service_name=args.service_name,
                        frontend_port=frontend_port,
                        deployment_name=decode_config["metadata"]["name"],
                    )
                    deployment_clients.append(client)  # Track for cleanup
                    await client.create_deployment(decode_config_fn)
                    logger.info("Waiting for deployment to be ready...")
                    await client.wait_for_deployment_ready()
                    logger.info("Deployment is ready")

                    logger.info("Getting deployment logs...")
                    await client.get_deployment_logs()
                    logger.info(
                        f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                    )
400

401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
                    # Compute max_concurrency and max_kv_tokens to know which
                    # num_request to sweep over.
                    attention_dp_size = mapping.get_attn_dp_size(num_gpus)
                    max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
                        f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log",
                        attention_dp_size=attention_dp_size,
                    )
                    max_concurrency = max_kv_tokens // (args.isl + args.osl)

                if not args.dry_run:
                    attention_dp_size = mapping.get_attn_dp_size(num_gpus)
                    sweep_num_request = get_num_request_range(
                        attention_dp_size,
                        max_concurrency,
                        args.decode_interpolation_granularity,
                    )
                    logger.info(
                        f"Sweeping num_request range based on maximum number of kv tokens: {sweep_num_request}"
                    )
420

421
422
423
424
425
426
427
428
429
430
431
432
                    for num_request in sweep_num_request:
                        itl = thpt_per_gpu = None
                        if args.use_ai_configurator:
                            logger.info(
                                "Using ai-configurator to estimate decode latency."
                            )
                            perf_dict = ai_configurator_perf_estimator.estimate_perf(
                                args.isl,
                                args.osl,
                                num_request,
                                mode=EngineType.DECODE,
                                tp_size=(mapping.tp or num_gpus),
433
434
                            )

435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
                            itl = perf_dict["tpot"]
                            thpt_per_gpu = perf_dict["tokens/s/gpu"]
                            logger.info(f"Estimated decode ITL: {itl:.2f}ms")
                            logger.info(
                                f"Estimated decode throughput per GPU: {thpt_per_gpu:.2f} tokens/s/GPU"
                            )
                        else:
                            base_url = client.get_service_url()
                            ai_perf_artifact_dir = f"{work_dir}/aiperf_request{num_request}_isl{args.isl}_osl{args.osl}_n{num_request}"
                            aiperf_result = benchmark_decode(
                                args.isl,
                                args.osl,
                                num_request,
                                ai_perf_artifact_dir,
                                model_name,
                                model_name,
                                base_url=base_url,
                            )
                            if aiperf_result is not None:
                                itl = aiperf_result["inter_token_latency"]["avg"]
                                thpt_per_gpu = (
                                    aiperf_result["output_token_throughput"]["avg"]
                                    / num_gpus
                                )

                        if itl is not None and thpt_per_gpu is not None:
                            decode_data.add_data(
                                num_gpus=num_gpus,
                                itl=itl,
                                thpt_per_gpu=thpt_per_gpu,
                                concurrency=num_request,
                                kv_cache_size=max_kv_tokens,
                                parallel_mapping_label=mapping.label(),
                                parallel_mapping=mapping,
                            )
470

471
472
473
474
475
                if not args.dry_run and not args.use_ai_configurator:
                    logger.info("Cleaning up deployment...")
                    await client.delete_deployment()
                    deployment_clients.remove(client)
                    logger.info("Deployment deleted")
476

477
        # Plot all decode results after profiling is complete
478
479
        if decode_data.num_gpus:
            plot_decode_performance(decode_data, args.itl, args.output_dir)
480

481
        if prefill_data.num_gpus and decode_data.num_gpus:
482
            plot_pd_joint_results(
483
                args.isl, args.osl, prefill_data, decode_data, args.output_dir
484
485
            )

486
487
488
489
490
        if args.dry_run:
            logger.info("Skipping recommendations in dry run mode")
        else:
            logger.info("Analyzing results and generate recommendations...")
            # Safety guards: no results → exit early with a clear message
491
            if not prefill_data.num_gpus:
492
493
                logger.error("No prefill results produced; skipping recommendations.")

494
495
            # select best parallel mapping for prefill
            if min(prefill_data.ttft) > args.ttft:
496
497
498
                logger.info(
                    "No TP size satisfies the TTFT requirement, please try a smaller model or a more powerful GPU SKU"
                )
499
                selected_prefill_idx = int(np.argmin(np.array(prefill_data.ttft)))
500
501
            else:
                valid_indices = [
502
                    i for i, ttft in enumerate(prefill_data.ttft) if ttft <= args.ttft
503
504
                ]
                # Among valid TP sizes, select the one with highest throughput per GPU
505
                valid_thpts = [prefill_data.thpt_per_gpu[i] for i in valid_indices]
506
507
508
                max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
                selected_prefill_idx = max_thpt_idx
            logger.info(
509
                f"Suggested prefill parallel mapping: {prefill_data.parallel_mapping_labels[selected_prefill_idx]} on {prefill_data.num_gpus[selected_prefill_idx]} GPU(s) (TTFT {prefill_data.ttft[selected_prefill_idx]:.2f} ms, throughput {prefill_data.thpt_per_gpu[selected_prefill_idx]:.2f} tokens/s/GPU)"
510
            )
511

512
513
            # select best parallel mapping for decode
            if not decode_data.num_gpus:
514
515
                logger.error("No decode results produced; skipping recommendations.")
                return
516
            if min(decode_data.itl) > args.itl:
517
518
519
                logger.info(
                    "No TP size satisfies the ITL requirement, please try a smaller model or a more powerful GPU SKU"
                )
520
                selected_decode_idx = int(np.argmin(np.array(decode_data.itl)))
521
522
            else:
                valid_indices = [
523
                    i for i, itl in enumerate(decode_data.itl) if itl <= args.itl
524
525
                ]
                # Among valid TP sizes, select the one with highest throughput per GPU
526
                valid_thpts = [decode_data.thpt_per_gpu[i] for i in valid_indices]
527
528
529
                max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
                selected_decode_idx = max_thpt_idx
            logger.info(
530
                f"Suggested decode parallel mapping: {decode_data.parallel_mapping_labels[selected_decode_idx]} on {decode_data.num_gpus[selected_decode_idx]} GPU(s) (ITL {decode_data.itl[selected_decode_idx]:.2f} ms, throughput {decode_data.thpt_per_gpu[selected_decode_idx]:.2f} tokens/s/GPU)"
531
532
            )

533
        if args.dry_run:
534
            # use min value for prefill and decode GPU counts
535
536
537
538
539
540
541
542
            prefill_data.num_gpus = [args.min_num_gpus_per_engine]
            decode_data.num_gpus = [args.min_num_gpus_per_engine]
            prefill_data.parallel_mappings = [
                ParallelizationMapping(tp=args.min_num_gpus_per_engine)
            ]
            decode_data.parallel_mappings = [
                ParallelizationMapping(tp=args.min_num_gpus_per_engine)
            ]
543
544
            selected_prefill_idx = 0
            selected_decode_idx = 0
545

546
547
548
        # interpolate ISL - TTFT with best prefill parallel mapping
        best_prefill_gpus = prefill_data.num_gpus[selected_prefill_idx]
        best_prefill_mapping = prefill_data.parallel_mappings[selected_prefill_idx]
549
        logger.info(
550
            f"Profiling prefill under best {best_prefill_gpus} GPU(s) with parallel mapping [{best_prefill_mapping.label()}] with different ISL..."
551
        )
552
        prefill_config = config_modifier.convert_config(
553
554
555
556
557
558
559
560
            config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe
        )
        prefill_config = apply_parallel_mapping_to_config(
            prefill_config,
            best_prefill_mapping,
            EngineType.PREFILL,
            config_modifier,
            args.num_gpus_per_node,
561
562
        )
        logger.info(f"Dynamo config: {prefill_config}")
563

564
565
        work_dir = f"{args.output_dir}/selected_prefill_interpolation"
        os.makedirs(work_dir, exist_ok=True)
566

567
568
569
570
        prefill_config_fn = f"{work_dir}/config.yaml"
        with open(prefill_config_fn, "w") as f:
            yaml.dump(prefill_config, f)

571
572
        if args.dry_run:
            logger.info("Skipping deployment creation in dry run mode")
573
574
575
        elif args.use_ai_configurator:
            profile_prefill_aiconfigurator(
                work_dir,
576
                best_prefill_gpus,  # num_gpus
577
                sweep_max_context_length,
578
579
                args.prefill_interpolation_granularity,
                ai_configurator_perf_estimator,
580
                tp_size=(best_prefill_mapping.tp or best_prefill_gpus),
581
            )
582
583
584
585
586
587
588
589
        else:
            client = DynamoDeploymentClient(
                namespace=args.namespace,
                base_log_dir=work_dir,
                model_name=model_name,
                service_name=args.service_name,
                frontend_port=frontend_port,
                deployment_name=prefill_config["metadata"]["name"],
590
            )
591
592
593
594
595
596
            deployment_clients.append(client)  # Track for cleanup
            await client.create_deployment(prefill_config_fn)
            logger.info("Waiting for deployment to be ready...")
            try:
                await client.wait_for_deployment_ready()
                logger.info("Deployment is ready")
597

598
599
600
                skip_profile = False
            except TimeoutError:
                logger.error(
601
                    "Deployment or model failed to become ready within timeout, skipping profiling"
602
603
                )
                skip_profile = True
604

605
606
607
608
609
610
            if not skip_profile:
                logger.info("Getting deployment logs...")
                await client.get_deployment_logs()
                logger.info(
                    f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                )
611

612
            base_url = client.get_service_url()
613

614
615
616
617
618
            profile_prefill(
                work_dir,
                model_name,
                model_name,
                base_url,
619
                best_prefill_gpus,
620
                sweep_max_context_length,
621
622
                args.prefill_interpolation_granularity,
            )
623

624
625
626
627
            logger.info("Cleaning up deployment...")
            await client.delete_deployment()
            deployment_clients.remove(client)
            logger.info("Deployment deleted")
628

629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
        # interpolate ITL - Active_KV_Cache - Decode_Context_Length with best decode parallel mapping
        best_decode_gpus = decode_data.num_gpus[selected_decode_idx]
        best_decode_mapping = decode_data.parallel_mappings[selected_decode_idx]
        logger.info(
            f"Profiling decode with {best_decode_gpus} GPUs with parallel mapping [{best_decode_mapping.label()}]..."
        )
        decode_config = config_modifier.convert_config(
            config, EngineType.DECODE, is_moe_model=args.model_info.is_moe
        )
        decode_config = apply_parallel_mapping_to_config(
            decode_config,
            best_decode_mapping,
            EngineType.DECODE,
            config_modifier,
            args.num_gpus_per_node,
        )
645
        logger.info(f"Dynamo config: {decode_config}")
646

647
648
649
650
651
652
653
        work_dir = f"{args.output_dir}/selected_decode_interpolation"
        os.makedirs(work_dir, exist_ok=True)

        decode_config_fn = f"{work_dir}/config.yaml"
        with open(decode_config_fn, "w") as f:
            yaml.dump(decode_config, f)

654
655
        if args.dry_run:
            logger.info("Skipping deployment creation in dry run mode")
656
        elif args.use_ai_configurator:
657
            attention_dp_size = best_decode_mapping.get_attn_dp_size(best_decode_gpus)
658
            max_kv_tokens = ai_configurator_perf_estimator.get_max_kv_tokens(
659
                args.isl, args.osl, tp_size=(best_decode_mapping.tp or best_decode_gpus)
660
661
662
            )
            profile_decode_aiconfigurator(
                work_dir,
663
                best_decode_gpus,  # num_gpus
664
                max_kv_tokens,
665
                sweep_max_context_length,
666
667
                args.decode_interpolation_granularity,
                ai_configurator_perf_estimator,
668
                attention_dp_size,
669
                tp_size=(best_decode_mapping.tp or best_decode_gpus),
670
            )
671
672
673
674
675
676
677
678
679
680
681
682
683
684
        else:
            client = DynamoDeploymentClient(
                namespace=args.namespace,
                base_log_dir=work_dir,
                model_name=model_name,
                service_name=args.service_name,
                frontend_port=frontend_port,
                deployment_name=decode_config["metadata"]["name"],
            )
            deployment_clients.append(client)  # Track for cleanup
            await client.create_deployment(decode_config_fn)
            logger.info("Waiting for deployment to be ready...")
            await client.wait_for_deployment_ready()
            logger.info("Deployment is ready")
685

686
687
688
689
690
            logger.info("Getting deployment logs...")
            await client.get_deployment_logs()
            logger.info(
                f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
            )
691

692
            attention_dp_size = best_decode_mapping.get_attn_dp_size(best_decode_gpus)
693
            max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
694
695
                f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log",
                attention_dp_size=attention_dp_size,
696
697
698
699
700
701
702
703
704
            )

            base_url = client.get_service_url()

            profile_decode(
                work_dir,
                model_name,
                model_name,
                base_url,
705
                best_decode_gpus,
706
                max_kv_tokens,
707
                sweep_max_context_length,
708
                args.decode_interpolation_granularity,
709
                attention_dp_size,
710
            )
711

712
713
714
715
            logger.info("Cleaning up deployment...")
            await client.delete_deployment()
            deployment_clients.remove(client)
            logger.info("Deployment deleted")
716

717
718
719
720
721
722
723
724
        # generate DGD with planner based on profiling results
        config = generate_dgd_config_with_planner(
            config_path=args.config,
            config_modifier=config_modifier,
            best_prefill_gpus=best_prefill_gpus,
            best_decode_gpus=best_decode_gpus,
            output_dir=args.output_dir,
            args=args,
725
            is_moe_model=args.model_info.is_moe,
726
727
728
729
            num_gpus_per_node=args.num_gpus_per_node,
        )
        logger.info(f"Final DGD config with planner: {config}")

730
        # save DGD config with planner; support multi-document output when a ConfigMap is included
731
        with open(f"{args.output_dir}/config_with_planner.yaml", "w") as f:
732
733
734
735
            if isinstance(config, list):
                yaml.dump_all(config, f)
            else:
                yaml.dump(config, f)
736

737
738
739
740
741
742
743
744
745
746
747
    except Exception as e:
        logger.error(f"Profile job failed with error: {e}")
        raise
    finally:
        # Always clean up any remaining deployments, even if the job failed
        logger.info("Performing final cleanup of any remaining deployments...")
        await cleanup_remaining_deployments(deployment_clients, args.namespace)
        logger.info("Final cleanup completed.")


if __name__ == "__main__":
748
    args = create_profiler_parser()
749

750
751
752
753
754
755
756
757
758
759
    # setup file logging
    os.makedirs(args.output_dir, exist_ok=True)
    log_file_handler = logging.FileHandler(f"{args.output_dir}/profile_sla.log")
    log_file_handler.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
    )
    log_file_handler.setFormatter(formatter)
    logger.addHandler(log_file_handler)

760
    asyncio.run(run_profile(args))