profile_sla.py 36.6 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16
import asyncio
17
18
19
import logging
import math
import os
20
from dataclasses import dataclass, field
21
22
23

import numpy as np
import yaml
24

25
26
27
28
from benchmarks.profiler.utils.aiperf import (
    get_decode_itl_and_thpt_per_gpu,
    get_prefill_ttft,
)
29
from benchmarks.profiler.utils.config_modifiers import CONFIG_MODIFIERS
30
31
32
33
34
35
from benchmarks.profiler.utils.config_modifiers.parallelization_mapping import (
    ParallelizationMapping,
    apply_parallel_mapping_to_config,
    get_candidate_parallel_mappings,
)
from benchmarks.profiler.utils.defaults import EngineType
36
from benchmarks.profiler.utils.dgd_generation import generate_dgd_config_with_planner
37
from benchmarks.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator
38
39
from benchmarks.profiler.utils.plot import (
    plot_decode_performance,
40
    plot_pd_joint_results,
41
42
    plot_prefill_performance,
)
43
from benchmarks.profiler.utils.profile_decode import (
44
    get_num_request_range,
45
46
47
48
49
50
51
    profile_decode,
    profile_decode_aiconfigurator,
)
from benchmarks.profiler.utils.profile_prefill import (
    profile_prefill,
    profile_prefill_aiconfigurator,
)
52
from benchmarks.profiler.utils.profiler_argparse import create_profiler_parser
53
54
55
56
from benchmarks.profiler.utils.profiler_status import (
    ProfilerStatus,
    write_profiler_status,
)
57
58
59
60
61
from benchmarks.profiler.webui.select_config import (
    add_profiling_error,
    clear_profiling_errors,
    pick_config_with_webui,
)
62
63
64
65
from deploy.utils.dynamo_deployment import (
    DynamoDeploymentClient,
    cleanup_remaining_deployments,
)
66
from dynamo.planner.defaults import WORKER_COMPONENT_NAMES, SubComponentType
67

68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126

@dataclass
class PrefillProfileData:
    """Container for prefill profiling results."""

    num_gpus: list[int] = field(default_factory=list)
    ttft: list[float] = field(default_factory=list)
    thpt_per_gpu: list[float] = field(default_factory=list)
    parallel_mapping_labels: list[str] = field(default_factory=list)
    parallel_mappings: list[ParallelizationMapping] = field(default_factory=list)

    def add_data(
        self,
        num_gpus: int,
        ttft: float,
        thpt_per_gpu: float,
        parallel_mapping_label: str,
        parallel_mapping: ParallelizationMapping,
    ) -> None:
        """Add a complete data point to the profile data."""
        self.num_gpus.append(num_gpus)
        self.ttft.append(ttft)
        self.thpt_per_gpu.append(thpt_per_gpu)
        self.parallel_mapping_labels.append(parallel_mapping_label)
        self.parallel_mappings.append(parallel_mapping)


@dataclass
class DecodeProfileData:
    """Container for decode profiling results."""

    num_gpus: list[int] = field(default_factory=list)
    itl: list[float] = field(default_factory=list)
    thpt_per_gpu: list[float] = field(default_factory=list)
    concurrency: list[int] = field(default_factory=list)
    kv_cache_size: list[int] = field(default_factory=list)
    parallel_mapping_labels: list[str] = field(default_factory=list)
    parallel_mappings: list[ParallelizationMapping] = field(default_factory=list)

    def add_data(
        self,
        num_gpus: int,
        itl: float,
        thpt_per_gpu: float,
        concurrency: int,
        kv_cache_size: int,
        parallel_mapping_label: str,
        parallel_mapping: ParallelizationMapping,
    ) -> None:
        """Add a complete data point to the profile data."""
        self.num_gpus.append(num_gpus)
        self.itl.append(itl)
        self.thpt_per_gpu.append(thpt_per_gpu)
        self.concurrency.append(concurrency)
        self.kv_cache_size.append(kv_cache_size)
        self.parallel_mapping_labels.append(parallel_mapping_label)
        self.parallel_mappings.append(parallel_mapping)


127
128
129
130
131
132
133
134
135
136
137
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
    "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)


138
139
140
async def run_profile(args):
    # List to track all created deployment clients for cleanup in case of failure
    deployment_clients = []
141

142
143
144
    # Clear any errors from previous profiling runs
    clear_profiling_errors()

145
146
147
148
    # Inherit aic_backend from backend if not explicitly set
    if not args.aic_backend:
        args.aic_backend = args.backend

149
150
151
152
153
154
155
156
    # Write initial status for external jobs to monitor
    os.makedirs(args.output_dir, exist_ok=True)
    write_profiler_status(
        args.output_dir,
        status=ProfilerStatus.RUNNING,
        message="Profiler job started",
    )

157
158
159
160
161
    try:
        config_modifier = CONFIG_MODIFIERS[args.backend]

        with open(args.config, "r") as f:
            config = yaml.safe_load(f)
162

163
164
        if args.dgd_image:
            config = config_modifier.update_image(config, args.dgd_image)
165
            logger.debug(f"Using DGD image: {args.dgd_image}")
166

167
168
169
170
171
        profile_num_gpus = [
            2**i
            for i in range(int(math.log2(args.max_num_gpus_per_engine)) + 1)
            if args.min_num_gpus_per_engine <= 2**i <= args.max_num_gpus_per_engine
        ]
172
        logger.info(f"Profiling GPU counts: {profile_num_gpus}")
173
        os.makedirs(args.output_dir, exist_ok=True)
174

175
        model_name, model_path = config_modifier.get_model_name(config)
176

177
178
179
180
181
182
183
184
185
        # Determine sweep max context length: allow user-provided cap to override model's if smaller
        use_specified_max_context_len = getattr(args, "max_context_length", None)
        model_max_context_len = args.model_info.max_context_length
        if not use_specified_max_context_len and not model_max_context_len:
            raise ValueError(
                "No max_context_length available from args.max_context_length or model_info from HF config"
            )
        elif not use_specified_max_context_len:
            sweep_max_context_length = model_max_context_len
186
            logger.info(
187
                f"Using model's maximum context length: {model_max_context_len}"
188
            )
189
190
        elif not model_max_context_len:
            sweep_max_context_length = use_specified_max_context_len
191
            logger.info(
192
                f"Using user-provided max_context_length: {use_specified_max_context_len}"
193
194
            )
        else:
195
196
197
198
199
200
            sweep_max_context_length = min(
                use_specified_max_context_len, model_max_context_len
            )
            logger.info(
                f"Using minimum of user-provided and model's maximum context length: {sweep_max_context_length}"
            )
201

202
203
204
205
206
        if args.use_ai_configurator:
            if not args.aic_system:
                raise ValueError(
                    "Must provide --aic-system when using --use-ai-configurator."
                )
207
208
209
210
211
212
213
214
215
216
217
218

            # Fallback to args.model if aic_hf_id is not provided
            if not args.aic_hf_id:
                if args.model:
                    logger.info(
                        f"--aic-hf-id not provided, using --model ({args.model}) as HuggingFace ID for AI configurator"
                    )
                    args.aic_hf_id = args.model
                else:
                    raise ValueError(
                        "Must provide --aic-hf-id or --model when using --use-ai-configurator."
                    )
219

220
            logger.info("Using aiconfigurator to estimate performance...")
221
            ai_configurator_perf_estimator = AIConfiguratorPerfEstimator(
222
                args.aic_hf_id,
223
                args.aic_system.lower(),
224
225
                args.aic_backend,
                args.aic_backend_version,
226
227
            )
        else:
228
            if args.aic_system or args.aic_hf_id or args.aic_backend_version:
229
                logger.warning(
230
                    "Ignoring --aic-system, --aic-hf-id, and/or --backend-version "
231
232
233
                    "when not using --use-ai-configurator."
                )

234
        # first profile prefill
235
        prefill_data = PrefillProfileData()
236
        logger.info("Profiling prefill...")
237
238
        base_prefill_config = config_modifier.convert_config(
            config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe
239
        )
240
        frontend_port = config_modifier.get_port(config)
241
242
        itl: float | None = None
        thpt_per_gpu: float | None = None
243
244
        for num_gpus in profile_num_gpus:
            logger.info(f"Profiling prefill with {num_gpus} GPUs...")
245
            candidate_mappings = get_candidate_parallel_mappings(
246
247
                num_gpus,
                args.model_info,
248
            )
249

250
251
252
253
254
            for mapping in candidate_mappings:
                # Apply parallel mapping to config
                prefill_config = apply_parallel_mapping_to_config(
                    base_prefill_config,
                    mapping,
255
                    SubComponentType.PREFILL,
256
257
                    config_modifier,
                    args.num_gpus_per_node,
258
                )
259
                logger.debug(f"Dynamo config: {prefill_config}")
260

261
262
263
                # Work dir includes mapping label (safe chars only)
                parallel_mapping_tag = (
                    mapping.label().replace("=", "").replace("/", "_")
264
                )
265
266
                work_dir = (
                    f"{args.output_dir}/prefill_{num_gpus}gpus_{parallel_mapping_tag}"
267
                )
268
269
270
271
272
273
274
275
276
277
                os.makedirs(work_dir, exist_ok=True)

                prefill_config_fn = f"{work_dir}/config.yaml"
                with open(prefill_config_fn, "w") as f:
                    yaml.dump(prefill_config, f)

                ttft = None
                if args.dry_run:
                    logger.info("Skipping deployment creation in dry run mode")
                elif args.use_ai_configurator:
278
                    logger.info("Using ai-configurator to estimate prefill latency")
279
280
                    perf_dict = ai_configurator_perf_estimator.estimate_prefill_perf(
                        args.isl,
281
                        tp_size=mapping.get_tp_size(),
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
                    )
                    ttft = perf_dict["context_latency"]
                    logger.info(f"Estimated prefill TTFT: {ttft:.2f}ms")
                else:
                    client = DynamoDeploymentClient(
                        namespace=args.namespace,
                        base_log_dir=work_dir,
                        model_name=model_name,
                        service_name=args.service_name,
                        frontend_port=frontend_port,
                        deployment_name=prefill_config["metadata"]["name"],
                    )
                    logger.info(
                        f"Created client with service_name: {client.service_name}"
                    )
                    deployment_clients.append(client)  # Track for cleanup
                    await client.create_deployment(prefill_config_fn)
                    logger.info("Waiting for deployment to be ready...")
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
                    try:
                        await client.wait_for_deployment_ready(
                            timeout=getattr(args, "deployment_timeout", 1800)
                        )
                    except TimeoutError:
                        logger.error(
                            f"Deployment for mapping {mapping.label()} with {num_gpus} GPUs "
                            f"failed to become ready within timeout during prefill profiling, skipping"
                        )
                        add_profiling_error(
                            f"Mapping {mapping.label()} with {num_gpus} GPUs timed out "
                            f"during prefill profiling"
                        )
                        logger.info("Cleaning up timed-out deployment...")
                        await client.delete_deployment()
                        deployment_clients.remove(client)
                        continue
317
318
319
320
321
322
323
                    logger.info("Deployment is ready")

                    logger.info("Getting deployment logs...")
                    await client.get_deployment_logs()
                    logger.info(
                        f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                    )
324

325
326
327
                    # run ai-perf
                    base_url = client.get_service_url()
                    ai_perf_artifact_dir = f"{work_dir}/aiperf_isl{args.isl}"
328
                    ttft = get_prefill_ttft(
329
330
331
                        args.isl,
                        ai_perf_artifact_dir,
                        model_name,
332
                        model_path,
333
334
                        base_url,
                        attention_dp_size=mapping.get_attn_dp_size(),
335
336
337
338
339
340
341
342
343
344
345
                    )

                    logger.info("Cleaning up deployment...")
                    await client.delete_deployment()
                    deployment_clients.remove(client)
                    logger.info("Deployment deleted")

                if ttft is not None:
                    prefill_data.add_data(
                        num_gpus=num_gpus,
                        ttft=ttft,
346
347
348
349
350
                        thpt_per_gpu=args.isl
                        / ttft
                        / num_gpus
                        * 1000
                        * mapping.get_attn_dp_size(),
351
352
353
                        parallel_mapping_label=mapping.label(),
                        parallel_mapping=mapping,
                    )
354

355
        # Plot the results as a 2D scatter plot
356
357
        if prefill_data.num_gpus and prefill_data.ttft and prefill_data.thpt_per_gpu:
            plot_prefill_performance(prefill_data, args.ttft, args.output_dir)
358

359
        # then profile decode
360
        decode_data = DecodeProfileData()
361
        logger.info("Profiling decode...")
362
363
        base_decode_config = config_modifier.convert_config(
            config, EngineType.DECODE, is_moe_model=args.model_info.is_moe
364
365
366
        )
        for num_gpus in profile_num_gpus:
            logger.info(f"Profiling decode with {num_gpus} GPUs...")
367
            candidate_mappings = get_candidate_parallel_mappings(
368
369
                num_gpus,
                args.model_info,
370
            )
371

372
373
374
375
376
            for mapping in candidate_mappings:
                # Apply parallel mapping to config
                decode_config = apply_parallel_mapping_to_config(
                    base_decode_config,
                    mapping,
377
                    SubComponentType.DECODE,
378
379
                    config_modifier,
                    args.num_gpus_per_node,
380
                )
381
                logger.debug(f"Dynamo config: {decode_config}")
382

383
384
385
386
                parallel_mapping_tag = (
                    mapping.label()
                    .replace("=", "")
                    .replace("/", "_")  # safe chars for directory
387
                )
388
389
                work_dir = (
                    f"{args.output_dir}/decode_{num_gpus}gpus_{parallel_mapping_tag}"
390
                )
391
                os.makedirs(work_dir, exist_ok=True)
392

393
394
395
                decode_config_fn = f"{work_dir}/config.yaml"
                with open(decode_config_fn, "w") as f:
                    yaml.dump(decode_config, f)
396

397
398
                if args.dry_run:
                    logger.info("Skipping deployment creation in dry run mode")
399

400
401
402
403
                elif args.use_ai_configurator:
                    # Compute max_concurrency and max_kv_tokens to know which
                    # num_request to sweep over.
                    max_concurrency = ai_configurator_perf_estimator.get_max_batch_size(
404
                        args.isl, args.osl, tp_size=mapping.get_tp_size()
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
                    )
                    max_kv_tokens = max_concurrency * (args.isl + args.osl)

                else:
                    client = DynamoDeploymentClient(
                        namespace=args.namespace,
                        base_log_dir=work_dir,
                        model_name=model_name,
                        service_name=args.service_name,
                        frontend_port=frontend_port,
                        deployment_name=decode_config["metadata"]["name"],
                    )
                    deployment_clients.append(client)  # Track for cleanup
                    await client.create_deployment(decode_config_fn)
                    logger.info("Waiting for deployment to be ready...")
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
                    try:
                        await client.wait_for_deployment_ready(
                            timeout=getattr(args, "deployment_timeout", 1800)
                        )
                    except TimeoutError:
                        logger.error(
                            f"Deployment for mapping {mapping.label()} with {num_gpus} GPUs "
                            f"failed to become ready within timeout during decode profiling, skipping"
                        )
                        add_profiling_error(
                            f"Mapping {mapping.label()} with {num_gpus} GPUs timed out "
                            f"during decode profiling"
                        )
                        logger.info("Cleaning up timed-out deployment...")
                        await client.delete_deployment()
                        deployment_clients.remove(client)
                        continue
437
438
439
440
441
442
443
                    logger.info("Deployment is ready")

                    logger.info("Getting deployment logs...")
                    await client.get_deployment_logs()
                    logger.info(
                        f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                    )
444

445
446
                    # Compute max_concurrency and max_kv_tokens to know which
                    # num_request to sweep over.
447
                    attention_dp_size = mapping.get_attn_dp_size()
448
449
450
451
452
453
454
                    max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
                        f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log",
                        attention_dp_size=attention_dp_size,
                    )
                    max_concurrency = max_kv_tokens // (args.isl + args.osl)

                if not args.dry_run:
455
                    attention_dp_size = mapping.get_attn_dp_size()
456
457
458
459
460
461
462
463
                    sweep_num_request = get_num_request_range(
                        attention_dp_size,
                        max_concurrency,
                        args.decode_interpolation_granularity,
                    )
                    logger.info(
                        f"Sweeping num_request range based on maximum number of kv tokens: {sweep_num_request}"
                    )
464

465
466
467
468
469
470
471
472
473
474
475
                    for num_request in sweep_num_request:
                        itl = thpt_per_gpu = None
                        if args.use_ai_configurator:
                            logger.info(
                                "Using ai-configurator to estimate decode latency."
                            )
                            perf_dict = ai_configurator_perf_estimator.estimate_perf(
                                args.isl,
                                args.osl,
                                num_request,
                                mode=EngineType.DECODE,
476
                                tp_size=mapping.get_tp_size(),
477
478
                            )

479
480
481
482
483
484
485
486
487
                            itl = perf_dict["tpot"]
                            thpt_per_gpu = perf_dict["tokens/s/gpu"]
                            logger.info(f"Estimated decode ITL: {itl:.2f}ms")
                            logger.info(
                                f"Estimated decode throughput per GPU: {thpt_per_gpu:.2f} tokens/s/GPU"
                            )
                        else:
                            base_url = client.get_service_url()
                            ai_perf_artifact_dir = f"{work_dir}/aiperf_request{num_request}_isl{args.isl}_osl{args.osl}_n{num_request}"
488
                            itl, thpt_per_gpu = get_decode_itl_and_thpt_per_gpu(
489
490
491
492
493
                                args.isl,
                                args.osl,
                                num_request,
                                ai_perf_artifact_dir,
                                model_name,
494
                                model_path,
495
                                base_url=base_url,
496
                                num_gpus=num_gpus,
497
                                attention_dp_size=mapping.get_attn_dp_size(),
498
499
500
501
502
503
504
505
506
507
508
509
                            )

                        if itl is not None and thpt_per_gpu is not None:
                            decode_data.add_data(
                                num_gpus=num_gpus,
                                itl=itl,
                                thpt_per_gpu=thpt_per_gpu,
                                concurrency=num_request,
                                kv_cache_size=max_kv_tokens,
                                parallel_mapping_label=mapping.label(),
                                parallel_mapping=mapping,
                            )
510

511
512
513
514
515
                if not args.dry_run and not args.use_ai_configurator:
                    logger.info("Cleaning up deployment...")
                    await client.delete_deployment()
                    deployment_clients.remove(client)
                    logger.info("Deployment deleted")
516

517
        # Plot all decode results after profiling is complete
518
519
        if decode_data.num_gpus:
            plot_decode_performance(decode_data, args.itl, args.output_dir)
520

521
        if prefill_data.num_gpus and decode_data.num_gpus:
522
            plot_pd_joint_results(
523
                args.isl, args.osl, prefill_data, decode_data, args.output_dir
524
525
            )

526
527
528
529
530
        if args.dry_run:
            logger.info("Skipping recommendations in dry run mode")
        else:
            logger.info("Analyzing results and generate recommendations...")
            # Safety guards: no results → exit early with a clear message
531
            if not prefill_data.num_gpus:
532
533
534
                error_msg = "No prefill results produced; skipping recommendations."
                logger.error(error_msg)
                add_profiling_error(error_msg)
535
536
537
538
539
540
                write_profiler_status(
                    args.output_dir,
                    status=ProfilerStatus.FAILED,
                    error=error_msg,
                    message="Profiler failed: no prefill results produced",
                )
541
                return
542

543
544
545
546
            if args.pick_with_webui:
                # select best P/D config in webUI
                selected_prefill_idx, selected_decode_idx = pick_config_with_webui(
                    prefill_data, decode_data, args
547
                )
548
549
550
                # update TTFT/ITL SLA based on selected config
                args.ttft = prefill_data.ttft[selected_prefill_idx]
                args.itl = decode_data.itl[selected_decode_idx]
551
            else:
552
553
554
                # automatically select P/D config within SLA with the highest throughput/GPU
                # select best parallel mapping for prefill
                if min(prefill_data.ttft) > args.ttft:
555
556
557
                    warning_msg = "No engine configuration satisfies the TTFT requirement, please try a smaller model or more powerful hardware"
                    logger.warning(warning_msg)
                    add_profiling_error(warning_msg)
558
559
560
561
562
563
564
565
566
567
568
569
570
571
                    selected_prefill_idx = int(np.argmin(np.array(prefill_data.ttft)))
                else:
                    valid_indices = [
                        i
                        for i, ttft in enumerate(prefill_data.ttft)
                        if ttft <= args.ttft
                    ]
                    # Among valid TP sizes, select the one with highest throughput per GPU
                    valid_thpts = [prefill_data.thpt_per_gpu[i] for i in valid_indices]
                    max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
                    selected_prefill_idx = max_thpt_idx
                logger.info(
                    f"Suggested prefill parallel mapping: {prefill_data.parallel_mapping_labels[selected_prefill_idx]} on {prefill_data.num_gpus[selected_prefill_idx]} GPU(s) (TTFT {prefill_data.ttft[selected_prefill_idx]:.2f} ms, throughput {prefill_data.thpt_per_gpu[selected_prefill_idx]:.2f} tokens/s/GPU)"
                )
572

573
574
                # select best parallel mapping for decode
                if not decode_data.num_gpus:
575
576
577
                    error_msg = "No decode results produced; skipping recommendations."
                    logger.error(error_msg)
                    add_profiling_error(error_msg)
578
579
580
581
582
583
                    write_profiler_status(
                        args.output_dir,
                        status=ProfilerStatus.FAILED,
                        error=error_msg,
                        message="Profiler failed: no decode results produced",
                    )
584
585
                    return
                if min(decode_data.itl) > args.itl:
586
587
588
                    warning_msg = "No engine configuration satisfies the ITL requirement, please try a smaller model or more powerful hardware"
                    logger.warning(warning_msg)
                    add_profiling_error(warning_msg)
589
590
591
592
593
594
595
596
597
598
599
                    selected_decode_idx = int(np.argmin(np.array(decode_data.itl)))
                else:
                    valid_indices = [
                        i for i, itl in enumerate(decode_data.itl) if itl <= args.itl
                    ]
                    # Among valid TP sizes, select the one with highest throughput per GPU
                    valid_thpts = [decode_data.thpt_per_gpu[i] for i in valid_indices]
                    max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
                    selected_decode_idx = max_thpt_idx
                logger.info(
                    f"Suggested decode parallel mapping: {decode_data.parallel_mapping_labels[selected_decode_idx]} on {decode_data.num_gpus[selected_decode_idx]} GPU(s) (ITL {decode_data.itl[selected_decode_idx]:.2f} ms, throughput {decode_data.thpt_per_gpu[selected_decode_idx]:.2f} tokens/s/GPU)"
600
                )
601

602
        if args.dry_run:
603
            # use min value for prefill and decode GPU counts
604
605
606
607
608
609
610
611
            prefill_data.num_gpus = [args.min_num_gpus_per_engine]
            decode_data.num_gpus = [args.min_num_gpus_per_engine]
            prefill_data.parallel_mappings = [
                ParallelizationMapping(tp=args.min_num_gpus_per_engine)
            ]
            decode_data.parallel_mappings = [
                ParallelizationMapping(tp=args.min_num_gpus_per_engine)
            ]
612
613
            selected_prefill_idx = 0
            selected_decode_idx = 0
614

615
616
617
        # interpolate ISL - TTFT with best prefill parallel mapping
        best_prefill_gpus = prefill_data.num_gpus[selected_prefill_idx]
        best_prefill_mapping = prefill_data.parallel_mappings[selected_prefill_idx]
618
        logger.info(
619
            f"Profiling prefill under best {best_prefill_gpus} GPU(s) with parallel mapping [{best_prefill_mapping.label()}] with different ISL..."
620
        )
621
        prefill_config = config_modifier.convert_config(
622
623
624
625
626
            config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe
        )
        prefill_config = apply_parallel_mapping_to_config(
            prefill_config,
            best_prefill_mapping,
627
            SubComponentType.PREFILL,
628
629
            config_modifier,
            args.num_gpus_per_node,
630
        )
631
        logger.debug(f"Dynamo config: {prefill_config}")
632

633
634
        work_dir = f"{args.output_dir}/selected_prefill_interpolation"
        os.makedirs(work_dir, exist_ok=True)
635

636
637
638
639
        prefill_config_fn = f"{work_dir}/config.yaml"
        with open(prefill_config_fn, "w") as f:
            yaml.dump(prefill_config, f)

640
641
        if args.dry_run:
            logger.info("Skipping deployment creation in dry run mode")
642
643
644
        elif args.use_ai_configurator:
            profile_prefill_aiconfigurator(
                work_dir,
645
                best_prefill_gpus,  # num_gpus
646
                sweep_max_context_length,
647
648
                args.prefill_interpolation_granularity,
                ai_configurator_perf_estimator,
649
                tp_size=best_prefill_mapping.get_tp_size(),
650
            )
651
652
653
654
655
656
657
658
        else:
            client = DynamoDeploymentClient(
                namespace=args.namespace,
                base_log_dir=work_dir,
                model_name=model_name,
                service_name=args.service_name,
                frontend_port=frontend_port,
                deployment_name=prefill_config["metadata"]["name"],
659
            )
660
661
662
663
            deployment_clients.append(client)  # Track for cleanup
            await client.create_deployment(prefill_config_fn)
            logger.info("Waiting for deployment to be ready...")
            try:
664
665
666
                await client.wait_for_deployment_ready(
                    timeout=getattr(args, "deployment_timeout", 1800)
                )
667
                logger.info("Deployment is ready")
668

669
670
671
                skip_profile = False
            except TimeoutError:
                logger.error(
672
                    "Deployment or model failed to become ready within timeout, skipping profiling"
673
674
                )
                skip_profile = True
675

676
677
678
679
680
681
            if not skip_profile:
                logger.info("Getting deployment logs...")
                await client.get_deployment_logs()
                logger.info(
                    f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                )
682

683
            base_url = client.get_service_url()
684

685
686
687
            profile_prefill(
                work_dir,
                model_name,
688
                model_path,
689
                base_url,
690
                best_prefill_gpus,
691
                sweep_max_context_length,
692
                args.prefill_interpolation_granularity,
693
                attention_dp_size=best_prefill_mapping.get_attn_dp_size(),
694
            )
695

696
697
698
699
            logger.info("Cleaning up deployment...")
            await client.delete_deployment()
            deployment_clients.remove(client)
            logger.info("Deployment deleted")
700

701
702
703
704
705
706
707
708
709
710
711
712
        # interpolate ITL - Active_KV_Cache - Decode_Context_Length with best decode parallel mapping
        best_decode_gpus = decode_data.num_gpus[selected_decode_idx]
        best_decode_mapping = decode_data.parallel_mappings[selected_decode_idx]
        logger.info(
            f"Profiling decode with {best_decode_gpus} GPUs with parallel mapping [{best_decode_mapping.label()}]..."
        )
        decode_config = config_modifier.convert_config(
            config, EngineType.DECODE, is_moe_model=args.model_info.is_moe
        )
        decode_config = apply_parallel_mapping_to_config(
            decode_config,
            best_decode_mapping,
713
            SubComponentType.DECODE,
714
715
716
            config_modifier,
            args.num_gpus_per_node,
        )
717
        logger.debug(f"Dynamo config: {decode_config}")
718

719
720
721
722
723
724
725
        work_dir = f"{args.output_dir}/selected_decode_interpolation"
        os.makedirs(work_dir, exist_ok=True)

        decode_config_fn = f"{work_dir}/config.yaml"
        with open(decode_config_fn, "w") as f:
            yaml.dump(decode_config, f)

726
727
        if args.dry_run:
            logger.info("Skipping deployment creation in dry run mode")
728
        elif args.use_ai_configurator:
729
            attention_dp_size = best_decode_mapping.get_attn_dp_size()
730
            max_kv_tokens = ai_configurator_perf_estimator.get_max_kv_tokens(
731
                args.isl, args.osl, tp_size=best_decode_mapping.get_tp_size()
732
733
734
            )
            profile_decode_aiconfigurator(
                work_dir,
735
                best_decode_gpus,  # num_gpus
736
                max_kv_tokens,
737
                sweep_max_context_length,
738
739
                args.decode_interpolation_granularity,
                ai_configurator_perf_estimator,
740
                attention_dp_size,
741
                tp_size=best_decode_mapping.get_tp_size(),
742
            )
743
744
745
746
747
748
749
750
751
752
753
754
755
756
        else:
            client = DynamoDeploymentClient(
                namespace=args.namespace,
                base_log_dir=work_dir,
                model_name=model_name,
                service_name=args.service_name,
                frontend_port=frontend_port,
                deployment_name=decode_config["metadata"]["name"],
            )
            deployment_clients.append(client)  # Track for cleanup
            await client.create_deployment(decode_config_fn)
            logger.info("Waiting for deployment to be ready...")
            await client.wait_for_deployment_ready()
            logger.info("Deployment is ready")
757

758
759
760
761
762
            logger.info("Getting deployment logs...")
            await client.get_deployment_logs()
            logger.info(
                f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
            )
763

764
            attention_dp_size = best_decode_mapping.get_attn_dp_size()
765
            max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
766
767
                f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log",
                attention_dp_size=attention_dp_size,
768
769
770
771
772
773
774
            )

            base_url = client.get_service_url()

            profile_decode(
                work_dir,
                model_name,
775
                model_path,
776
                base_url,
777
                best_decode_gpus,
778
                max_kv_tokens,
779
                sweep_max_context_length,
780
                args.decode_interpolation_granularity,
781
                attention_dp_size,
782
            )
783

784
785
786
787
            logger.info("Cleaning up deployment...")
            await client.delete_deployment()
            deployment_clients.remove(client)
            logger.info("Deployment deleted")
788

789
        # generate DGD with planner based on profiling results
790
        config, mocker_config = generate_dgd_config_with_planner(
791
792
793
794
            config_path=args.config,
            config_modifier=config_modifier,
            output_dir=args.output_dir,
            args=args,
795
796
            best_prefill_mapping=best_prefill_mapping,
            best_decode_mapping=best_decode_mapping,
797
798
            num_gpus_per_node=args.num_gpus_per_node,
        )
799
        logger.debug(f"Final DGD config with planner: {config}")
800

801
        # save DGD config with planner; support multi-document output when a ConfigMap is included
802
        with open(f"{args.output_dir}/config_with_planner.yaml", "w") as f:
803
            if isinstance(config, list):
804
                yaml.safe_dump_all(config, f, sort_keys=False)
805
            else:
806
                yaml.safe_dump(config, f, sort_keys=False)
807

808
809
810
811
        # save mocker config with planner for testing purposes
        logger.debug(f"Mocker config with planner: {mocker_config}")
        with open(f"{args.output_dir}/mocker_config_with_planner.yaml", "w") as f:
            if isinstance(mocker_config, list):
812
                yaml.safe_dump_all(mocker_config, f, sort_keys=False)
813
            else:
814
                yaml.safe_dump(mocker_config, f, sort_keys=False)
815

816
817
818
819
820
821
822
823
824
825
826
827
        # Write success status with output files
        write_profiler_status(
            args.output_dir,
            status=ProfilerStatus.SUCCESS,
            message="Profiler completed successfully",
            outputs={
                "config_with_planner": "config_with_planner.yaml",
                "mocker_config_with_planner": "mocker_config_with_planner.yaml",
                "disagg_config": "disagg_config.yaml",
            },
        )

828
    except Exception as e:
829
830
831
832
833
834
835
        logger.exception("Profile job failed with error")
        write_profiler_status(
            args.output_dir,
            status=ProfilerStatus.FAILED,
            error=str(e),
            message=f"Profiler failed with exception: {type(e).__name__}",
        )
836
837
838
839
840
841
842
843
844
        raise
    finally:
        # Always clean up any remaining deployments, even if the job failed
        logger.info("Performing final cleanup of any remaining deployments...")
        await cleanup_remaining_deployments(deployment_clients, args.namespace)
        logger.info("Final cleanup completed.")


if __name__ == "__main__":
845
    args = create_profiler_parser()
846

847
848
849
850
851
852
853
854
855
856
    # setup file logging
    os.makedirs(args.output_dir, exist_ok=True)
    log_file_handler = logging.FileHandler(f"{args.output_dir}/profile_sla.log")
    log_file_handler.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
    )
    log_file_handler.setFormatter(formatter)
    logger.addHandler(log_file_handler)

857
    asyncio.run(run_profile(args))