profile_sla.py 37.1 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16
import asyncio
17
18
19
import logging
import math
import os
20
from dataclasses import dataclass, field
21
22
23

import numpy as np
import yaml
24

25
26
27
28
from benchmarks.profiler.utils.aiperf import (
    get_decode_itl_and_thpt_per_gpu,
    get_prefill_ttft,
)
29
from benchmarks.profiler.utils.config import Config, get_service_name_by_type
30
from benchmarks.profiler.utils.config_modifiers import CONFIG_MODIFIERS
31
32
33
34
35
36
from benchmarks.profiler.utils.config_modifiers.parallelization_mapping import (
    ParallelizationMapping,
    apply_parallel_mapping_to_config,
    get_candidate_parallel_mappings,
)
from benchmarks.profiler.utils.defaults import EngineType
37
from benchmarks.profiler.utils.dgd_generation import generate_dgd_config_with_planner
38
from benchmarks.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator
39
40
from benchmarks.profiler.utils.plot import (
    plot_decode_performance,
41
    plot_pd_joint_results,
42
43
    plot_prefill_performance,
)
44
from benchmarks.profiler.utils.profile_decode import (
45
    get_num_request_range,
46
47
48
49
50
51
52
    profile_decode,
    profile_decode_aiconfigurator,
)
from benchmarks.profiler.utils.profile_prefill import (
    profile_prefill,
    profile_prefill_aiconfigurator,
)
53
from benchmarks.profiler.utils.profiler_argparse import create_profiler_parser
54
55
56
57
from benchmarks.profiler.utils.profiler_status import (
    ProfilerStatus,
    write_profiler_status,
)
58
59
60
61
62
from benchmarks.profiler.webui.select_config import (
    add_profiling_error,
    clear_profiling_errors,
    pick_config_with_webui,
)
63
64
65
66
from deploy.utils.dynamo_deployment import (
    DynamoDeploymentClient,
    cleanup_remaining_deployments,
)
67
from dynamo.planner.defaults import SubComponentType
68

69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127

@dataclass
class PrefillProfileData:
    """Container for prefill profiling results."""

    num_gpus: list[int] = field(default_factory=list)
    ttft: list[float] = field(default_factory=list)
    thpt_per_gpu: list[float] = field(default_factory=list)
    parallel_mapping_labels: list[str] = field(default_factory=list)
    parallel_mappings: list[ParallelizationMapping] = field(default_factory=list)

    def add_data(
        self,
        num_gpus: int,
        ttft: float,
        thpt_per_gpu: float,
        parallel_mapping_label: str,
        parallel_mapping: ParallelizationMapping,
    ) -> None:
        """Add a complete data point to the profile data."""
        self.num_gpus.append(num_gpus)
        self.ttft.append(ttft)
        self.thpt_per_gpu.append(thpt_per_gpu)
        self.parallel_mapping_labels.append(parallel_mapping_label)
        self.parallel_mappings.append(parallel_mapping)


@dataclass
class DecodeProfileData:
    """Container for decode profiling results."""

    num_gpus: list[int] = field(default_factory=list)
    itl: list[float] = field(default_factory=list)
    thpt_per_gpu: list[float] = field(default_factory=list)
    concurrency: list[int] = field(default_factory=list)
    kv_cache_size: list[int] = field(default_factory=list)
    parallel_mapping_labels: list[str] = field(default_factory=list)
    parallel_mappings: list[ParallelizationMapping] = field(default_factory=list)

    def add_data(
        self,
        num_gpus: int,
        itl: float,
        thpt_per_gpu: float,
        concurrency: int,
        kv_cache_size: int,
        parallel_mapping_label: str,
        parallel_mapping: ParallelizationMapping,
    ) -> None:
        """Add a complete data point to the profile data."""
        self.num_gpus.append(num_gpus)
        self.itl.append(itl)
        self.thpt_per_gpu.append(thpt_per_gpu)
        self.concurrency.append(concurrency)
        self.kv_cache_size.append(kv_cache_size)
        self.parallel_mapping_labels.append(parallel_mapping_label)
        self.parallel_mappings.append(parallel_mapping)


128
129
130
131
132
133
134
135
136
137
138
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
    "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)


139
140
141
async def run_profile(args):
    # List to track all created deployment clients for cleanup in case of failure
    deployment_clients = []
142

143
144
145
    # Clear any errors from previous profiling runs
    clear_profiling_errors()

146
147
148
149
    # Inherit aic_backend from backend if not explicitly set
    if not args.aic_backend:
        args.aic_backend = args.backend

150
151
152
153
154
155
156
157
    # Write initial status for external jobs to monitor
    os.makedirs(args.output_dir, exist_ok=True)
    write_profiler_status(
        args.output_dir,
        status=ProfilerStatus.RUNNING,
        message="Profiler job started",
    )

158
159
160
161
162
    try:
        config_modifier = CONFIG_MODIFIERS[args.backend]

        with open(args.config, "r") as f:
            config = yaml.safe_load(f)
163

164
165
        if args.dgd_image:
            config = config_modifier.update_image(config, args.dgd_image)
166
            logger.debug(f"Using DGD image: {args.dgd_image}")
167

168
169
170
171
172
        profile_num_gpus = [
            2**i
            for i in range(int(math.log2(args.max_num_gpus_per_engine)) + 1)
            if args.min_num_gpus_per_engine <= 2**i <= args.max_num_gpus_per_engine
        ]
173
        logger.info(f"Profiling GPU counts: {profile_num_gpus}")
174
        os.makedirs(args.output_dir, exist_ok=True)
175

176
        model_name, model_path = config_modifier.get_model_name(config)
177

178
179
180
181
182
183
184
185
186
        # Determine sweep max context length: allow user-provided cap to override model's if smaller
        use_specified_max_context_len = getattr(args, "max_context_length", None)
        model_max_context_len = args.model_info.max_context_length
        if not use_specified_max_context_len and not model_max_context_len:
            raise ValueError(
                "No max_context_length available from args.max_context_length or model_info from HF config"
            )
        elif not use_specified_max_context_len:
            sweep_max_context_length = model_max_context_len
187
            logger.info(
188
                f"Using model's maximum context length: {model_max_context_len}"
189
            )
190
191
        elif not model_max_context_len:
            sweep_max_context_length = use_specified_max_context_len
192
            logger.info(
193
                f"Using user-provided max_context_length: {use_specified_max_context_len}"
194
195
            )
        else:
196
197
198
199
200
201
            sweep_max_context_length = min(
                use_specified_max_context_len, model_max_context_len
            )
            logger.info(
                f"Using minimum of user-provided and model's maximum context length: {sweep_max_context_length}"
            )
202

203
204
205
206
207
        if args.use_ai_configurator:
            if not args.aic_system:
                raise ValueError(
                    "Must provide --aic-system when using --use-ai-configurator."
                )
208
209
210
211
212
213
214
215
216
217
218
219

            # Fallback to args.model if aic_hf_id is not provided
            if not args.aic_hf_id:
                if args.model:
                    logger.info(
                        f"--aic-hf-id not provided, using --model ({args.model}) as HuggingFace ID for AI configurator"
                    )
                    args.aic_hf_id = args.model
                else:
                    raise ValueError(
                        "Must provide --aic-hf-id or --model when using --use-ai-configurator."
                    )
220

221
            logger.info("Using aiconfigurator to estimate performance...")
222
            ai_configurator_perf_estimator = AIConfiguratorPerfEstimator(
223
                args.aic_hf_id,
224
                args.aic_system.lower(),
225
226
                args.aic_backend,
                args.aic_backend_version,
227
228
            )
        else:
229
            if args.aic_system or args.aic_hf_id or args.aic_backend_version:
230
                logger.warning(
231
                    "Ignoring --aic-system, --aic-hf-id, and/or --backend-version "
232
233
234
                    "when not using --use-ai-configurator."
                )

235
        # first profile prefill
236
        prefill_data = PrefillProfileData()
237
        logger.info("Profiling prefill...")
238
239
        base_prefill_config = config_modifier.convert_config(
            config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe
240
        )
241
        frontend_port = config_modifier.get_port(config)
242
243
        itl: float | None = None
        thpt_per_gpu: float | None = None
244
245
        for num_gpus in profile_num_gpus:
            logger.info(f"Profiling prefill with {num_gpus} GPUs...")
246
            candidate_mappings = get_candidate_parallel_mappings(
247
248
                num_gpus,
                args.model_info,
249
            )
250

251
252
253
254
255
            for mapping in candidate_mappings:
                # Apply parallel mapping to config
                prefill_config = apply_parallel_mapping_to_config(
                    base_prefill_config,
                    mapping,
256
                    SubComponentType.PREFILL,
257
258
                    config_modifier,
                    args.num_gpus_per_node,
259
                )
260
                logger.debug(f"Dynamo config: {prefill_config}")
261

262
263
264
                # Work dir includes mapping label (safe chars only)
                parallel_mapping_tag = (
                    mapping.label().replace("=", "").replace("/", "_")
265
                )
266
267
                work_dir = (
                    f"{args.output_dir}/prefill_{num_gpus}gpus_{parallel_mapping_tag}"
268
                )
269
270
271
272
273
274
275
276
277
278
                os.makedirs(work_dir, exist_ok=True)

                prefill_config_fn = f"{work_dir}/config.yaml"
                with open(prefill_config_fn, "w") as f:
                    yaml.dump(prefill_config, f)

                ttft = None
                if args.dry_run:
                    logger.info("Skipping deployment creation in dry run mode")
                elif args.use_ai_configurator:
279
                    logger.info("Using ai-configurator to estimate prefill latency")
280
281
                    perf_dict = ai_configurator_perf_estimator.estimate_prefill_perf(
                        args.isl,
282
                        tp_size=mapping.get_tp_size(),
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
                    )
                    ttft = perf_dict["context_latency"]
                    logger.info(f"Estimated prefill TTFT: {ttft:.2f}ms")
                else:
                    client = DynamoDeploymentClient(
                        namespace=args.namespace,
                        base_log_dir=work_dir,
                        model_name=model_name,
                        service_name=args.service_name,
                        frontend_port=frontend_port,
                        deployment_name=prefill_config["metadata"]["name"],
                    )
                    logger.info(
                        f"Created client with service_name: {client.service_name}"
                    )
                    deployment_clients.append(client)  # Track for cleanup
                    await client.create_deployment(prefill_config_fn)
                    logger.info("Waiting for deployment to be ready...")
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
                    try:
                        await client.wait_for_deployment_ready(
                            timeout=getattr(args, "deployment_timeout", 1800)
                        )
                    except TimeoutError:
                        logger.error(
                            f"Deployment for mapping {mapping.label()} with {num_gpus} GPUs "
                            f"failed to become ready within timeout during prefill profiling, skipping"
                        )
                        add_profiling_error(
                            f"Mapping {mapping.label()} with {num_gpus} GPUs timed out "
                            f"during prefill profiling"
                        )
                        logger.info("Cleaning up timed-out deployment...")
                        await client.delete_deployment()
                        deployment_clients.remove(client)
                        continue
318
319
320
321
322
323
324
                    logger.info("Deployment is ready")

                    logger.info("Getting deployment logs...")
                    await client.get_deployment_logs()
                    logger.info(
                        f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                    )
325

326
327
328
                    # run ai-perf
                    base_url = client.get_service_url()
                    ai_perf_artifact_dir = f"{work_dir}/aiperf_isl{args.isl}"
329
                    ttft = get_prefill_ttft(
330
331
332
                        args.isl,
                        ai_perf_artifact_dir,
                        model_name,
333
                        model_path,
334
335
                        base_url,
                        attention_dp_size=mapping.get_attn_dp_size(),
336
337
338
339
340
341
342
343
344
345
346
                    )

                    logger.info("Cleaning up deployment...")
                    await client.delete_deployment()
                    deployment_clients.remove(client)
                    logger.info("Deployment deleted")

                if ttft is not None:
                    prefill_data.add_data(
                        num_gpus=num_gpus,
                        ttft=ttft,
347
348
349
350
351
                        thpt_per_gpu=args.isl
                        / ttft
                        / num_gpus
                        * 1000
                        * mapping.get_attn_dp_size(),
352
353
354
                        parallel_mapping_label=mapping.label(),
                        parallel_mapping=mapping,
                    )
355

356
        # Plot the results as a 2D scatter plot
357
358
        if prefill_data.num_gpus and prefill_data.ttft and prefill_data.thpt_per_gpu:
            plot_prefill_performance(prefill_data, args.ttft, args.output_dir)
359

360
        # then profile decode
361
        decode_data = DecodeProfileData()
362
        logger.info("Profiling decode...")
363
364
        base_decode_config = config_modifier.convert_config(
            config, EngineType.DECODE, is_moe_model=args.model_info.is_moe
365
366
367
        )
        for num_gpus in profile_num_gpus:
            logger.info(f"Profiling decode with {num_gpus} GPUs...")
368
            candidate_mappings = get_candidate_parallel_mappings(
369
370
                num_gpus,
                args.model_info,
371
            )
372

373
374
375
376
377
            for mapping in candidate_mappings:
                # Apply parallel mapping to config
                decode_config = apply_parallel_mapping_to_config(
                    base_decode_config,
                    mapping,
378
                    SubComponentType.DECODE,
379
380
                    config_modifier,
                    args.num_gpus_per_node,
381
                )
382
                logger.debug(f"Dynamo config: {decode_config}")
383

384
385
386
387
                parallel_mapping_tag = (
                    mapping.label()
                    .replace("=", "")
                    .replace("/", "_")  # safe chars for directory
388
                )
389
390
                work_dir = (
                    f"{args.output_dir}/decode_{num_gpus}gpus_{parallel_mapping_tag}"
391
                )
392
                os.makedirs(work_dir, exist_ok=True)
393

394
395
396
                decode_config_fn = f"{work_dir}/config.yaml"
                with open(decode_config_fn, "w") as f:
                    yaml.dump(decode_config, f)
397

398
399
                if args.dry_run:
                    logger.info("Skipping deployment creation in dry run mode")
400

401
402
403
404
                elif args.use_ai_configurator:
                    # Compute max_concurrency and max_kv_tokens to know which
                    # num_request to sweep over.
                    max_concurrency = ai_configurator_perf_estimator.get_max_batch_size(
405
                        args.isl, args.osl, tp_size=mapping.get_tp_size()
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
                    )
                    max_kv_tokens = max_concurrency * (args.isl + args.osl)

                else:
                    client = DynamoDeploymentClient(
                        namespace=args.namespace,
                        base_log_dir=work_dir,
                        model_name=model_name,
                        service_name=args.service_name,
                        frontend_port=frontend_port,
                        deployment_name=decode_config["metadata"]["name"],
                    )
                    deployment_clients.append(client)  # Track for cleanup
                    await client.create_deployment(decode_config_fn)
                    logger.info("Waiting for deployment to be ready...")
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
                    try:
                        await client.wait_for_deployment_ready(
                            timeout=getattr(args, "deployment_timeout", 1800)
                        )
                    except TimeoutError:
                        logger.error(
                            f"Deployment for mapping {mapping.label()} with {num_gpus} GPUs "
                            f"failed to become ready within timeout during decode profiling, skipping"
                        )
                        add_profiling_error(
                            f"Mapping {mapping.label()} with {num_gpus} GPUs timed out "
                            f"during decode profiling"
                        )
                        logger.info("Cleaning up timed-out deployment...")
                        await client.delete_deployment()
                        deployment_clients.remove(client)
                        continue
438
439
440
441
442
443
444
                    logger.info("Deployment is ready")

                    logger.info("Getting deployment logs...")
                    await client.get_deployment_logs()
                    logger.info(
                        f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                    )
445

446
447
                    # Compute max_concurrency and max_kv_tokens to know which
                    # num_request to sweep over.
448
                    attention_dp_size = mapping.get_attn_dp_size()
449
450
451
452
453
                    # Get the actual decode service name from the config
                    decode_cfg = Config.model_validate(decode_config)
                    decode_service_name = get_service_name_by_type(
                        decode_cfg, args.backend, SubComponentType.DECODE
                    ).lower()
454
                    max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
455
                        f"{work_dir}/{client.deployment_name}/{decode_service_name}/0.log",
456
457
458
459
460
                        attention_dp_size=attention_dp_size,
                    )
                    max_concurrency = max_kv_tokens // (args.isl + args.osl)

                if not args.dry_run:
461
                    attention_dp_size = mapping.get_attn_dp_size()
462
463
464
465
466
467
468
469
                    sweep_num_request = get_num_request_range(
                        attention_dp_size,
                        max_concurrency,
                        args.decode_interpolation_granularity,
                    )
                    logger.info(
                        f"Sweeping num_request range based on maximum number of kv tokens: {sweep_num_request}"
                    )
470

471
472
473
474
475
476
477
478
479
480
481
                    for num_request in sweep_num_request:
                        itl = thpt_per_gpu = None
                        if args.use_ai_configurator:
                            logger.info(
                                "Using ai-configurator to estimate decode latency."
                            )
                            perf_dict = ai_configurator_perf_estimator.estimate_perf(
                                args.isl,
                                args.osl,
                                num_request,
                                mode=EngineType.DECODE,
482
                                tp_size=mapping.get_tp_size(),
483
484
                            )

485
486
487
488
489
490
491
492
493
                            itl = perf_dict["tpot"]
                            thpt_per_gpu = perf_dict["tokens/s/gpu"]
                            logger.info(f"Estimated decode ITL: {itl:.2f}ms")
                            logger.info(
                                f"Estimated decode throughput per GPU: {thpt_per_gpu:.2f} tokens/s/GPU"
                            )
                        else:
                            base_url = client.get_service_url()
                            ai_perf_artifact_dir = f"{work_dir}/aiperf_request{num_request}_isl{args.isl}_osl{args.osl}_n{num_request}"
494
                            itl, thpt_per_gpu = get_decode_itl_and_thpt_per_gpu(
495
496
497
498
499
                                args.isl,
                                args.osl,
                                num_request,
                                ai_perf_artifact_dir,
                                model_name,
500
                                model_path,
501
                                base_url=base_url,
502
                                num_gpus=num_gpus,
503
                                attention_dp_size=mapping.get_attn_dp_size(),
504
505
506
507
508
509
510
511
512
513
514
515
                            )

                        if itl is not None and thpt_per_gpu is not None:
                            decode_data.add_data(
                                num_gpus=num_gpus,
                                itl=itl,
                                thpt_per_gpu=thpt_per_gpu,
                                concurrency=num_request,
                                kv_cache_size=max_kv_tokens,
                                parallel_mapping_label=mapping.label(),
                                parallel_mapping=mapping,
                            )
516

517
518
519
520
521
                if not args.dry_run and not args.use_ai_configurator:
                    logger.info("Cleaning up deployment...")
                    await client.delete_deployment()
                    deployment_clients.remove(client)
                    logger.info("Deployment deleted")
522

523
        # Plot all decode results after profiling is complete
524
525
        if decode_data.num_gpus:
            plot_decode_performance(decode_data, args.itl, args.output_dir)
526

527
        if prefill_data.num_gpus and decode_data.num_gpus:
528
            plot_pd_joint_results(
529
                args.isl, args.osl, prefill_data, decode_data, args.output_dir
530
531
            )

532
533
534
535
536
        if args.dry_run:
            logger.info("Skipping recommendations in dry run mode")
        else:
            logger.info("Analyzing results and generate recommendations...")
            # Safety guards: no results → exit early with a clear message
537
            if not prefill_data.num_gpus:
538
539
540
                error_msg = "No prefill results produced; skipping recommendations."
                logger.error(error_msg)
                add_profiling_error(error_msg)
541
542
543
544
545
546
                write_profiler_status(
                    args.output_dir,
                    status=ProfilerStatus.FAILED,
                    error=error_msg,
                    message="Profiler failed: no prefill results produced",
                )
547
                return
548

549
550
551
552
            if args.pick_with_webui:
                # select best P/D config in webUI
                selected_prefill_idx, selected_decode_idx = pick_config_with_webui(
                    prefill_data, decode_data, args
553
                )
554
555
556
                # update TTFT/ITL SLA based on selected config
                args.ttft = prefill_data.ttft[selected_prefill_idx]
                args.itl = decode_data.itl[selected_decode_idx]
557
            else:
558
559
560
                # automatically select P/D config within SLA with the highest throughput/GPU
                # select best parallel mapping for prefill
                if min(prefill_data.ttft) > args.ttft:
561
562
563
                    warning_msg = "No engine configuration satisfies the TTFT requirement, please try a smaller model or more powerful hardware"
                    logger.warning(warning_msg)
                    add_profiling_error(warning_msg)
564
565
566
567
568
569
570
571
572
573
574
575
576
577
                    selected_prefill_idx = int(np.argmin(np.array(prefill_data.ttft)))
                else:
                    valid_indices = [
                        i
                        for i, ttft in enumerate(prefill_data.ttft)
                        if ttft <= args.ttft
                    ]
                    # Among valid TP sizes, select the one with highest throughput per GPU
                    valid_thpts = [prefill_data.thpt_per_gpu[i] for i in valid_indices]
                    max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
                    selected_prefill_idx = max_thpt_idx
                logger.info(
                    f"Suggested prefill parallel mapping: {prefill_data.parallel_mapping_labels[selected_prefill_idx]} on {prefill_data.num_gpus[selected_prefill_idx]} GPU(s) (TTFT {prefill_data.ttft[selected_prefill_idx]:.2f} ms, throughput {prefill_data.thpt_per_gpu[selected_prefill_idx]:.2f} tokens/s/GPU)"
                )
578

579
580
                # select best parallel mapping for decode
                if not decode_data.num_gpus:
581
582
583
                    error_msg = "No decode results produced; skipping recommendations."
                    logger.error(error_msg)
                    add_profiling_error(error_msg)
584
585
586
587
588
589
                    write_profiler_status(
                        args.output_dir,
                        status=ProfilerStatus.FAILED,
                        error=error_msg,
                        message="Profiler failed: no decode results produced",
                    )
590
591
                    return
                if min(decode_data.itl) > args.itl:
592
593
594
                    warning_msg = "No engine configuration satisfies the ITL requirement, please try a smaller model or more powerful hardware"
                    logger.warning(warning_msg)
                    add_profiling_error(warning_msg)
595
596
597
598
599
600
601
602
603
604
605
                    selected_decode_idx = int(np.argmin(np.array(decode_data.itl)))
                else:
                    valid_indices = [
                        i for i, itl in enumerate(decode_data.itl) if itl <= args.itl
                    ]
                    # Among valid TP sizes, select the one with highest throughput per GPU
                    valid_thpts = [decode_data.thpt_per_gpu[i] for i in valid_indices]
                    max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
                    selected_decode_idx = max_thpt_idx
                logger.info(
                    f"Suggested decode parallel mapping: {decode_data.parallel_mapping_labels[selected_decode_idx]} on {decode_data.num_gpus[selected_decode_idx]} GPU(s) (ITL {decode_data.itl[selected_decode_idx]:.2f} ms, throughput {decode_data.thpt_per_gpu[selected_decode_idx]:.2f} tokens/s/GPU)"
606
                )
607

608
        if args.dry_run:
609
            # use min value for prefill and decode GPU counts
610
611
612
613
614
615
616
617
            prefill_data.num_gpus = [args.min_num_gpus_per_engine]
            decode_data.num_gpus = [args.min_num_gpus_per_engine]
            prefill_data.parallel_mappings = [
                ParallelizationMapping(tp=args.min_num_gpus_per_engine)
            ]
            decode_data.parallel_mappings = [
                ParallelizationMapping(tp=args.min_num_gpus_per_engine)
            ]
618
619
            selected_prefill_idx = 0
            selected_decode_idx = 0
620

621
622
623
        # interpolate ISL - TTFT with best prefill parallel mapping
        best_prefill_gpus = prefill_data.num_gpus[selected_prefill_idx]
        best_prefill_mapping = prefill_data.parallel_mappings[selected_prefill_idx]
624
        logger.info(
625
            f"Profiling prefill under best {best_prefill_gpus} GPU(s) with parallel mapping [{best_prefill_mapping.label()}] with different ISL..."
626
        )
627
        prefill_config = config_modifier.convert_config(
628
629
630
631
632
            config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe
        )
        prefill_config = apply_parallel_mapping_to_config(
            prefill_config,
            best_prefill_mapping,
633
            SubComponentType.PREFILL,
634
635
            config_modifier,
            args.num_gpus_per_node,
636
        )
637
        logger.debug(f"Dynamo config: {prefill_config}")
638

639
640
        work_dir = f"{args.output_dir}/selected_prefill_interpolation"
        os.makedirs(work_dir, exist_ok=True)
641

642
643
644
645
        prefill_config_fn = f"{work_dir}/config.yaml"
        with open(prefill_config_fn, "w") as f:
            yaml.dump(prefill_config, f)

646
647
        if args.dry_run:
            logger.info("Skipping deployment creation in dry run mode")
648
649
650
        elif args.use_ai_configurator:
            profile_prefill_aiconfigurator(
                work_dir,
651
                best_prefill_gpus,  # num_gpus
652
                sweep_max_context_length,
653
654
                args.prefill_interpolation_granularity,
                ai_configurator_perf_estimator,
655
                tp_size=best_prefill_mapping.get_tp_size(),
656
            )
657
658
659
660
661
662
663
664
        else:
            client = DynamoDeploymentClient(
                namespace=args.namespace,
                base_log_dir=work_dir,
                model_name=model_name,
                service_name=args.service_name,
                frontend_port=frontend_port,
                deployment_name=prefill_config["metadata"]["name"],
665
            )
666
667
668
669
            deployment_clients.append(client)  # Track for cleanup
            await client.create_deployment(prefill_config_fn)
            logger.info("Waiting for deployment to be ready...")
            try:
670
671
672
                await client.wait_for_deployment_ready(
                    timeout=getattr(args, "deployment_timeout", 1800)
                )
673
                logger.info("Deployment is ready")
674

675
676
677
                skip_profile = False
            except TimeoutError:
                logger.error(
678
                    "Deployment or model failed to become ready within timeout, skipping profiling"
679
680
                )
                skip_profile = True
681

682
683
684
685
686
687
            if not skip_profile:
                logger.info("Getting deployment logs...")
                await client.get_deployment_logs()
                logger.info(
                    f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                )
688

689
            base_url = client.get_service_url()
690

691
692
693
            profile_prefill(
                work_dir,
                model_name,
694
                model_path,
695
                base_url,
696
                best_prefill_gpus,
697
                sweep_max_context_length,
698
                args.prefill_interpolation_granularity,
699
                attention_dp_size=best_prefill_mapping.get_attn_dp_size(),
700
            )
701

702
703
704
705
            logger.info("Cleaning up deployment...")
            await client.delete_deployment()
            deployment_clients.remove(client)
            logger.info("Deployment deleted")
706

707
708
709
710
711
712
713
714
715
716
717
718
        # interpolate ITL - Active_KV_Cache - Decode_Context_Length with best decode parallel mapping
        best_decode_gpus = decode_data.num_gpus[selected_decode_idx]
        best_decode_mapping = decode_data.parallel_mappings[selected_decode_idx]
        logger.info(
            f"Profiling decode with {best_decode_gpus} GPUs with parallel mapping [{best_decode_mapping.label()}]..."
        )
        decode_config = config_modifier.convert_config(
            config, EngineType.DECODE, is_moe_model=args.model_info.is_moe
        )
        decode_config = apply_parallel_mapping_to_config(
            decode_config,
            best_decode_mapping,
719
            SubComponentType.DECODE,
720
721
722
            config_modifier,
            args.num_gpus_per_node,
        )
723
        logger.debug(f"Dynamo config: {decode_config}")
724

725
726
727
728
729
730
731
        work_dir = f"{args.output_dir}/selected_decode_interpolation"
        os.makedirs(work_dir, exist_ok=True)

        decode_config_fn = f"{work_dir}/config.yaml"
        with open(decode_config_fn, "w") as f:
            yaml.dump(decode_config, f)

732
733
        if args.dry_run:
            logger.info("Skipping deployment creation in dry run mode")
734
        elif args.use_ai_configurator:
735
            attention_dp_size = best_decode_mapping.get_attn_dp_size()
736
            max_kv_tokens = ai_configurator_perf_estimator.get_max_kv_tokens(
737
                args.isl, args.osl, tp_size=best_decode_mapping.get_tp_size()
738
739
740
            )
            profile_decode_aiconfigurator(
                work_dir,
741
                best_decode_gpus,  # num_gpus
742
                max_kv_tokens,
743
                sweep_max_context_length,
744
745
                args.decode_interpolation_granularity,
                ai_configurator_perf_estimator,
746
                attention_dp_size,
747
                tp_size=best_decode_mapping.get_tp_size(),
748
            )
749
750
751
752
753
754
755
756
757
758
759
760
761
762
        else:
            client = DynamoDeploymentClient(
                namespace=args.namespace,
                base_log_dir=work_dir,
                model_name=model_name,
                service_name=args.service_name,
                frontend_port=frontend_port,
                deployment_name=decode_config["metadata"]["name"],
            )
            deployment_clients.append(client)  # Track for cleanup
            await client.create_deployment(decode_config_fn)
            logger.info("Waiting for deployment to be ready...")
            await client.wait_for_deployment_ready()
            logger.info("Deployment is ready")
763

764
765
766
767
768
            logger.info("Getting deployment logs...")
            await client.get_deployment_logs()
            logger.info(
                f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
            )
769

770
            attention_dp_size = best_decode_mapping.get_attn_dp_size()
771
772
773
774
775
            # Get the actual decode service name from the config
            decode_cfg = Config.model_validate(decode_config)
            decode_service_name = get_service_name_by_type(
                decode_cfg, args.backend, SubComponentType.DECODE
            ).lower()
776
            max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
777
                f"{work_dir}/{client.deployment_name}/{decode_service_name}/0.log",
778
                attention_dp_size=attention_dp_size,
779
780
781
782
783
784
785
            )

            base_url = client.get_service_url()

            profile_decode(
                work_dir,
                model_name,
786
                model_path,
787
                base_url,
788
                best_decode_gpus,
789
                max_kv_tokens,
790
                sweep_max_context_length,
791
                args.decode_interpolation_granularity,
792
                attention_dp_size,
793
            )
794

795
796
797
798
            logger.info("Cleaning up deployment...")
            await client.delete_deployment()
            deployment_clients.remove(client)
            logger.info("Deployment deleted")
799

800
        # generate DGD with planner based on profiling results
801
        config, mocker_config = generate_dgd_config_with_planner(
802
803
804
805
            config_path=args.config,
            config_modifier=config_modifier,
            output_dir=args.output_dir,
            args=args,
806
807
            best_prefill_mapping=best_prefill_mapping,
            best_decode_mapping=best_decode_mapping,
808
809
            num_gpus_per_node=args.num_gpus_per_node,
        )
810
        logger.debug(f"Final DGD config with planner: {config}")
811

812
        # save DGD config with planner; support multi-document output when a ConfigMap is included
813
        with open(f"{args.output_dir}/config_with_planner.yaml", "w") as f:
814
            if isinstance(config, list):
815
                yaml.safe_dump_all(config, f, sort_keys=False)
816
            else:
817
                yaml.safe_dump(config, f, sort_keys=False)
818

819
820
821
822
        # save mocker config with planner for testing purposes
        logger.debug(f"Mocker config with planner: {mocker_config}")
        with open(f"{args.output_dir}/mocker_config_with_planner.yaml", "w") as f:
            if isinstance(mocker_config, list):
823
                yaml.safe_dump_all(mocker_config, f, sort_keys=False)
824
            else:
825
                yaml.safe_dump(mocker_config, f, sort_keys=False)
826

827
828
829
830
831
832
833
834
835
836
837
838
        # Write success status with output files
        write_profiler_status(
            args.output_dir,
            status=ProfilerStatus.SUCCESS,
            message="Profiler completed successfully",
            outputs={
                "config_with_planner": "config_with_planner.yaml",
                "mocker_config_with_planner": "mocker_config_with_planner.yaml",
                "disagg_config": "disagg_config.yaml",
            },
        )

839
    except Exception as e:
840
841
842
843
844
845
846
        logger.exception("Profile job failed with error")
        write_profiler_status(
            args.output_dir,
            status=ProfilerStatus.FAILED,
            error=str(e),
            message=f"Profiler failed with exception: {type(e).__name__}",
        )
847
848
849
850
851
852
853
854
855
        raise
    finally:
        # Always clean up any remaining deployments, even if the job failed
        logger.info("Performing final cleanup of any remaining deployments...")
        await cleanup_remaining_deployments(deployment_clients, args.namespace)
        logger.info("Final cleanup completed.")


if __name__ == "__main__":
856
    args = create_profiler_parser()
857

858
859
860
861
862
863
864
865
866
867
    # setup file logging
    os.makedirs(args.output_dir, exist_ok=True)
    log_file_handler = logging.FileHandler(f"{args.output_dir}/profile_sla.log")
    log_file_handler.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
    )
    log_file_handler.setFormatter(formatter)
    logger.addHandler(log_file_handler)

868
    asyncio.run(run_profile(args))