profile_sla.py 35.1 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16
import asyncio
17
18
19
import logging
import math
import os
20
from dataclasses import dataclass, field
21
22
23

import numpy as np
import yaml
24

25
26
27
28
from benchmarks.profiler.utils.aiperf import (
    get_decode_itl_and_thpt_per_gpu,
    get_prefill_ttft,
)
29
from benchmarks.profiler.utils.config_modifiers import CONFIG_MODIFIERS
30
31
32
33
34
35
from benchmarks.profiler.utils.config_modifiers.parallelization_mapping import (
    ParallelizationMapping,
    apply_parallel_mapping_to_config,
    get_candidate_parallel_mappings,
)
from benchmarks.profiler.utils.defaults import EngineType
36
from benchmarks.profiler.utils.dgd_generation import generate_dgd_config_with_planner
37
from benchmarks.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator
38
39
from benchmarks.profiler.utils.plot import (
    plot_decode_performance,
40
    plot_pd_joint_results,
41
42
    plot_prefill_performance,
)
43
from benchmarks.profiler.utils.profile_decode import (
44
    get_num_request_range,
45
46
47
48
49
50
51
    profile_decode,
    profile_decode_aiconfigurator,
)
from benchmarks.profiler.utils.profile_prefill import (
    profile_prefill,
    profile_prefill_aiconfigurator,
)
52
from benchmarks.profiler.utils.profiler_argparse import create_profiler_parser
53
54
55
56
from benchmarks.profiler.utils.profiler_status import (
    ProfilerStatus,
    write_profiler_status,
)
57
58
59
60
61
from benchmarks.profiler.webui.select_config import (
    add_profiling_error,
    clear_profiling_errors,
    pick_config_with_webui,
)
62
63
64
65
from deploy.utils.dynamo_deployment import (
    DynamoDeploymentClient,
    cleanup_remaining_deployments,
)
66
from dynamo.planner.defaults import WORKER_COMPONENT_NAMES, SubComponentType
67

68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126

@dataclass
class PrefillProfileData:
    """Container for prefill profiling results."""

    num_gpus: list[int] = field(default_factory=list)
    ttft: list[float] = field(default_factory=list)
    thpt_per_gpu: list[float] = field(default_factory=list)
    parallel_mapping_labels: list[str] = field(default_factory=list)
    parallel_mappings: list[ParallelizationMapping] = field(default_factory=list)

    def add_data(
        self,
        num_gpus: int,
        ttft: float,
        thpt_per_gpu: float,
        parallel_mapping_label: str,
        parallel_mapping: ParallelizationMapping,
    ) -> None:
        """Add a complete data point to the profile data."""
        self.num_gpus.append(num_gpus)
        self.ttft.append(ttft)
        self.thpt_per_gpu.append(thpt_per_gpu)
        self.parallel_mapping_labels.append(parallel_mapping_label)
        self.parallel_mappings.append(parallel_mapping)


@dataclass
class DecodeProfileData:
    """Container for decode profiling results."""

    num_gpus: list[int] = field(default_factory=list)
    itl: list[float] = field(default_factory=list)
    thpt_per_gpu: list[float] = field(default_factory=list)
    concurrency: list[int] = field(default_factory=list)
    kv_cache_size: list[int] = field(default_factory=list)
    parallel_mapping_labels: list[str] = field(default_factory=list)
    parallel_mappings: list[ParallelizationMapping] = field(default_factory=list)

    def add_data(
        self,
        num_gpus: int,
        itl: float,
        thpt_per_gpu: float,
        concurrency: int,
        kv_cache_size: int,
        parallel_mapping_label: str,
        parallel_mapping: ParallelizationMapping,
    ) -> None:
        """Add a complete data point to the profile data."""
        self.num_gpus.append(num_gpus)
        self.itl.append(itl)
        self.thpt_per_gpu.append(thpt_per_gpu)
        self.concurrency.append(concurrency)
        self.kv_cache_size.append(kv_cache_size)
        self.parallel_mapping_labels.append(parallel_mapping_label)
        self.parallel_mappings.append(parallel_mapping)


127
128
129
130
131
132
133
134
135
136
137
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
    "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)


138
139
140
async def run_profile(args):
    # List to track all created deployment clients for cleanup in case of failure
    deployment_clients = []
141

142
143
144
    # Clear any errors from previous profiling runs
    clear_profiling_errors()

145
146
147
148
    # Inherit aic_backend from backend if not explicitly set
    if not args.aic_backend:
        args.aic_backend = args.backend

149
150
151
152
153
154
155
156
    # Write initial status for external jobs to monitor
    os.makedirs(args.output_dir, exist_ok=True)
    write_profiler_status(
        args.output_dir,
        status=ProfilerStatus.RUNNING,
        message="Profiler job started",
    )

157
158
159
160
161
    try:
        config_modifier = CONFIG_MODIFIERS[args.backend]

        with open(args.config, "r") as f:
            config = yaml.safe_load(f)
162

163
164
        if args.dgd_image:
            config = config_modifier.update_image(config, args.dgd_image)
165
            logger.debug(f"Using DGD image: {args.dgd_image}")
166

167
168
169
170
171
        profile_num_gpus = [
            2**i
            for i in range(int(math.log2(args.max_num_gpus_per_engine)) + 1)
            if args.min_num_gpus_per_engine <= 2**i <= args.max_num_gpus_per_engine
        ]
172
        logger.info(f"Profiling GPU counts: {profile_num_gpus}")
173
        os.makedirs(args.output_dir, exist_ok=True)
174

175
        model_name, model_path = config_modifier.get_model_name(config)
176

177
178
179
180
181
182
183
184
185
        # Determine sweep max context length: allow user-provided cap to override model's if smaller
        use_specified_max_context_len = getattr(args, "max_context_length", None)
        model_max_context_len = args.model_info.max_context_length
        if not use_specified_max_context_len and not model_max_context_len:
            raise ValueError(
                "No max_context_length available from args.max_context_length or model_info from HF config"
            )
        elif not use_specified_max_context_len:
            sweep_max_context_length = model_max_context_len
186
            logger.info(
187
                f"Using model's maximum context length: {model_max_context_len}"
188
            )
189
190
        elif not model_max_context_len:
            sweep_max_context_length = use_specified_max_context_len
191
            logger.info(
192
                f"Using user-provided max_context_length: {use_specified_max_context_len}"
193
194
            )
        else:
195
196
197
198
199
200
            sweep_max_context_length = min(
                use_specified_max_context_len, model_max_context_len
            )
            logger.info(
                f"Using minimum of user-provided and model's maximum context length: {sweep_max_context_length}"
            )
201

202
203
204
205
206
        if args.use_ai_configurator:
            if not args.aic_system:
                raise ValueError(
                    "Must provide --aic-system when using --use-ai-configurator."
                )
207
208
209
210
211
212
213
214
215
216
217
218

            # Fallback to args.model if aic_hf_id is not provided
            if not args.aic_hf_id:
                if args.model:
                    logger.info(
                        f"--aic-hf-id not provided, using --model ({args.model}) as HuggingFace ID for AI configurator"
                    )
                    args.aic_hf_id = args.model
                else:
                    raise ValueError(
                        "Must provide --aic-hf-id or --model when using --use-ai-configurator."
                    )
219

220
            logger.info("Using aiconfigurator to estimate performance...")
221
            ai_configurator_perf_estimator = AIConfiguratorPerfEstimator(
222
                args.aic_hf_id,
223
                args.aic_system.lower(),
224
225
                args.aic_backend,
                args.aic_backend_version,
226
227
            )
        else:
228
            if args.aic_system or args.aic_hf_id or args.aic_backend_version:
229
                logger.warning(
230
                    "Ignoring --aic-system, --aic-hf-id, and/or --backend-version "
231
232
233
                    "when not using --use-ai-configurator."
                )

234
        # first profile prefill
235
        prefill_data = PrefillProfileData()
236
        logger.info("Profiling prefill...")
237
238
        base_prefill_config = config_modifier.convert_config(
            config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe
239
        )
240
        frontend_port = config_modifier.get_port(config)
241
242
        itl: float | None = None
        thpt_per_gpu: float | None = None
243
244
        for num_gpus in profile_num_gpus:
            logger.info(f"Profiling prefill with {num_gpus} GPUs...")
245
246
247
            candidate_mappings = get_candidate_parallel_mappings(
                num_gpus, args.model_info, EngineType.PREFILL
            )
248

249
250
251
252
253
            for mapping in candidate_mappings:
                # Apply parallel mapping to config
                prefill_config = apply_parallel_mapping_to_config(
                    base_prefill_config,
                    mapping,
254
                    SubComponentType.PREFILL,
255
256
                    config_modifier,
                    args.num_gpus_per_node,
257
                )
258
                logger.debug(f"Dynamo config: {prefill_config}")
259

260
261
262
                # Work dir includes mapping label (safe chars only)
                parallel_mapping_tag = (
                    mapping.label().replace("=", "").replace("/", "_")
263
                )
264
265
                work_dir = (
                    f"{args.output_dir}/prefill_{num_gpus}gpus_{parallel_mapping_tag}"
266
                )
267
268
269
270
271
272
273
274
275
276
                os.makedirs(work_dir, exist_ok=True)

                prefill_config_fn = f"{work_dir}/config.yaml"
                with open(prefill_config_fn, "w") as f:
                    yaml.dump(prefill_config, f)

                ttft = None
                if args.dry_run:
                    logger.info("Skipping deployment creation in dry run mode")
                elif args.use_ai_configurator:
277
                    logger.info("Using ai-configurator to estimate prefill latency")
278
279
                    perf_dict = ai_configurator_perf_estimator.estimate_prefill_perf(
                        args.isl,
280
                        tp_size=mapping.get_tp_size(),
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
                    )
                    ttft = perf_dict["context_latency"]
                    logger.info(f"Estimated prefill TTFT: {ttft:.2f}ms")
                else:
                    client = DynamoDeploymentClient(
                        namespace=args.namespace,
                        base_log_dir=work_dir,
                        model_name=model_name,
                        service_name=args.service_name,
                        frontend_port=frontend_port,
                        deployment_name=prefill_config["metadata"]["name"],
                    )
                    logger.info(
                        f"Created client with service_name: {client.service_name}"
                    )
                    deployment_clients.append(client)  # Track for cleanup
                    await client.create_deployment(prefill_config_fn)
                    logger.info("Waiting for deployment to be ready...")
299
300
301
                    await client.wait_for_deployment_ready(
                        timeout=getattr(args, "deployment_timeout", 1800)
                    )
302
303
304
305
306
307
308
                    logger.info("Deployment is ready")

                    logger.info("Getting deployment logs...")
                    await client.get_deployment_logs()
                    logger.info(
                        f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                    )
309

310
311
312
                    # run ai-perf
                    base_url = client.get_service_url()
                    ai_perf_artifact_dir = f"{work_dir}/aiperf_isl{args.isl}"
313
                    ttft = get_prefill_ttft(
314
315
316
                        args.isl,
                        ai_perf_artifact_dir,
                        model_name,
317
                        model_path,
318
319
                        base_url,
                        attention_dp_size=mapping.get_attn_dp_size(),
320
321
322
323
324
325
326
327
328
329
330
                    )

                    logger.info("Cleaning up deployment...")
                    await client.delete_deployment()
                    deployment_clients.remove(client)
                    logger.info("Deployment deleted")

                if ttft is not None:
                    prefill_data.add_data(
                        num_gpus=num_gpus,
                        ttft=ttft,
331
332
333
334
335
                        thpt_per_gpu=args.isl
                        / ttft
                        / num_gpus
                        * 1000
                        * mapping.get_attn_dp_size(),
336
337
338
                        parallel_mapping_label=mapping.label(),
                        parallel_mapping=mapping,
                    )
339

340
        # Plot the results as a 2D scatter plot
341
342
        if prefill_data.num_gpus and prefill_data.ttft and prefill_data.thpt_per_gpu:
            plot_prefill_performance(prefill_data, args.ttft, args.output_dir)
343

344
        # then profile decode
345
        decode_data = DecodeProfileData()
346
        logger.info("Profiling decode...")
347
348
        base_decode_config = config_modifier.convert_config(
            config, EngineType.DECODE, is_moe_model=args.model_info.is_moe
349
350
351
        )
        for num_gpus in profile_num_gpus:
            logger.info(f"Profiling decode with {num_gpus} GPUs...")
352
353
354
            candidate_mappings = get_candidate_parallel_mappings(
                num_gpus, args.model_info, EngineType.DECODE
            )
355

356
357
358
359
360
            for mapping in candidate_mappings:
                # Apply parallel mapping to config
                decode_config = apply_parallel_mapping_to_config(
                    base_decode_config,
                    mapping,
361
                    SubComponentType.DECODE,
362
363
                    config_modifier,
                    args.num_gpus_per_node,
364
                )
365
                logger.debug(f"Dynamo config: {decode_config}")
366

367
368
369
370
                parallel_mapping_tag = (
                    mapping.label()
                    .replace("=", "")
                    .replace("/", "_")  # safe chars for directory
371
                )
372
373
                work_dir = (
                    f"{args.output_dir}/decode_{num_gpus}gpus_{parallel_mapping_tag}"
374
                )
375
                os.makedirs(work_dir, exist_ok=True)
376

377
378
379
                decode_config_fn = f"{work_dir}/config.yaml"
                with open(decode_config_fn, "w") as f:
                    yaml.dump(decode_config, f)
380

381
382
                if args.dry_run:
                    logger.info("Skipping deployment creation in dry run mode")
383

384
385
386
387
                elif args.use_ai_configurator:
                    # Compute max_concurrency and max_kv_tokens to know which
                    # num_request to sweep over.
                    max_concurrency = ai_configurator_perf_estimator.get_max_batch_size(
388
                        args.isl, args.osl, tp_size=mapping.get_tp_size()
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
                    )
                    max_kv_tokens = max_concurrency * (args.isl + args.osl)

                else:
                    client = DynamoDeploymentClient(
                        namespace=args.namespace,
                        base_log_dir=work_dir,
                        model_name=model_name,
                        service_name=args.service_name,
                        frontend_port=frontend_port,
                        deployment_name=decode_config["metadata"]["name"],
                    )
                    deployment_clients.append(client)  # Track for cleanup
                    await client.create_deployment(decode_config_fn)
                    logger.info("Waiting for deployment to be ready...")
404
405
406
                    await client.wait_for_deployment_ready(
                        timeout=getattr(args, "deployment_timeout", 1800)
                    )
407
408
409
410
411
412
413
                    logger.info("Deployment is ready")

                    logger.info("Getting deployment logs...")
                    await client.get_deployment_logs()
                    logger.info(
                        f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                    )
414

415
416
                    # Compute max_concurrency and max_kv_tokens to know which
                    # num_request to sweep over.
417
                    attention_dp_size = mapping.get_attn_dp_size()
418
419
420
421
422
423
424
                    max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
                        f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log",
                        attention_dp_size=attention_dp_size,
                    )
                    max_concurrency = max_kv_tokens // (args.isl + args.osl)

                if not args.dry_run:
425
                    attention_dp_size = mapping.get_attn_dp_size()
426
427
428
429
430
431
432
433
                    sweep_num_request = get_num_request_range(
                        attention_dp_size,
                        max_concurrency,
                        args.decode_interpolation_granularity,
                    )
                    logger.info(
                        f"Sweeping num_request range based on maximum number of kv tokens: {sweep_num_request}"
                    )
434

435
436
437
438
439
440
441
442
443
444
445
                    for num_request in sweep_num_request:
                        itl = thpt_per_gpu = None
                        if args.use_ai_configurator:
                            logger.info(
                                "Using ai-configurator to estimate decode latency."
                            )
                            perf_dict = ai_configurator_perf_estimator.estimate_perf(
                                args.isl,
                                args.osl,
                                num_request,
                                mode=EngineType.DECODE,
446
                                tp_size=mapping.get_tp_size(),
447
448
                            )

449
450
451
452
453
454
455
456
457
                            itl = perf_dict["tpot"]
                            thpt_per_gpu = perf_dict["tokens/s/gpu"]
                            logger.info(f"Estimated decode ITL: {itl:.2f}ms")
                            logger.info(
                                f"Estimated decode throughput per GPU: {thpt_per_gpu:.2f} tokens/s/GPU"
                            )
                        else:
                            base_url = client.get_service_url()
                            ai_perf_artifact_dir = f"{work_dir}/aiperf_request{num_request}_isl{args.isl}_osl{args.osl}_n{num_request}"
458
                            itl, thpt_per_gpu = get_decode_itl_and_thpt_per_gpu(
459
460
461
462
463
                                args.isl,
                                args.osl,
                                num_request,
                                ai_perf_artifact_dir,
                                model_name,
464
                                model_path,
465
                                base_url=base_url,
466
                                num_gpus=num_gpus,
467
                                attention_dp_size=mapping.get_attn_dp_size(),
468
469
470
471
472
473
474
475
476
477
478
479
                            )

                        if itl is not None and thpt_per_gpu is not None:
                            decode_data.add_data(
                                num_gpus=num_gpus,
                                itl=itl,
                                thpt_per_gpu=thpt_per_gpu,
                                concurrency=num_request,
                                kv_cache_size=max_kv_tokens,
                                parallel_mapping_label=mapping.label(),
                                parallel_mapping=mapping,
                            )
480

481
482
483
484
485
                if not args.dry_run and not args.use_ai_configurator:
                    logger.info("Cleaning up deployment...")
                    await client.delete_deployment()
                    deployment_clients.remove(client)
                    logger.info("Deployment deleted")
486

487
        # Plot all decode results after profiling is complete
488
489
        if decode_data.num_gpus:
            plot_decode_performance(decode_data, args.itl, args.output_dir)
490

491
        if prefill_data.num_gpus and decode_data.num_gpus:
492
            plot_pd_joint_results(
493
                args.isl, args.osl, prefill_data, decode_data, args.output_dir
494
495
            )

496
497
498
499
500
        if args.dry_run:
            logger.info("Skipping recommendations in dry run mode")
        else:
            logger.info("Analyzing results and generate recommendations...")
            # Safety guards: no results → exit early with a clear message
501
            if not prefill_data.num_gpus:
502
503
504
                error_msg = "No prefill results produced; skipping recommendations."
                logger.error(error_msg)
                add_profiling_error(error_msg)
505
506
507
508
509
510
                write_profiler_status(
                    args.output_dir,
                    status=ProfilerStatus.FAILED,
                    error=error_msg,
                    message="Profiler failed: no prefill results produced",
                )
511
                return
512

513
514
515
516
            if args.pick_with_webui:
                # select best P/D config in webUI
                selected_prefill_idx, selected_decode_idx = pick_config_with_webui(
                    prefill_data, decode_data, args
517
                )
518
519
520
                # update TTFT/ITL SLA based on selected config
                args.ttft = prefill_data.ttft[selected_prefill_idx]
                args.itl = decode_data.itl[selected_decode_idx]
521
            else:
522
523
524
                # automatically select P/D config within SLA with the highest throughput/GPU
                # select best parallel mapping for prefill
                if min(prefill_data.ttft) > args.ttft:
525
526
527
                    warning_msg = "No engine configuration satisfies the TTFT requirement, please try a smaller model or more powerful hardware"
                    logger.warning(warning_msg)
                    add_profiling_error(warning_msg)
528
529
530
531
532
533
534
535
536
537
538
539
540
541
                    selected_prefill_idx = int(np.argmin(np.array(prefill_data.ttft)))
                else:
                    valid_indices = [
                        i
                        for i, ttft in enumerate(prefill_data.ttft)
                        if ttft <= args.ttft
                    ]
                    # Among valid TP sizes, select the one with highest throughput per GPU
                    valid_thpts = [prefill_data.thpt_per_gpu[i] for i in valid_indices]
                    max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
                    selected_prefill_idx = max_thpt_idx
                logger.info(
                    f"Suggested prefill parallel mapping: {prefill_data.parallel_mapping_labels[selected_prefill_idx]} on {prefill_data.num_gpus[selected_prefill_idx]} GPU(s) (TTFT {prefill_data.ttft[selected_prefill_idx]:.2f} ms, throughput {prefill_data.thpt_per_gpu[selected_prefill_idx]:.2f} tokens/s/GPU)"
                )
542

543
544
                # select best parallel mapping for decode
                if not decode_data.num_gpus:
545
546
547
                    error_msg = "No decode results produced; skipping recommendations."
                    logger.error(error_msg)
                    add_profiling_error(error_msg)
548
549
550
551
552
553
                    write_profiler_status(
                        args.output_dir,
                        status=ProfilerStatus.FAILED,
                        error=error_msg,
                        message="Profiler failed: no decode results produced",
                    )
554
555
                    return
                if min(decode_data.itl) > args.itl:
556
557
558
                    warning_msg = "No engine configuration satisfies the ITL requirement, please try a smaller model or more powerful hardware"
                    logger.warning(warning_msg)
                    add_profiling_error(warning_msg)
559
560
561
562
563
564
565
566
567
568
569
                    selected_decode_idx = int(np.argmin(np.array(decode_data.itl)))
                else:
                    valid_indices = [
                        i for i, itl in enumerate(decode_data.itl) if itl <= args.itl
                    ]
                    # Among valid TP sizes, select the one with highest throughput per GPU
                    valid_thpts = [decode_data.thpt_per_gpu[i] for i in valid_indices]
                    max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
                    selected_decode_idx = max_thpt_idx
                logger.info(
                    f"Suggested decode parallel mapping: {decode_data.parallel_mapping_labels[selected_decode_idx]} on {decode_data.num_gpus[selected_decode_idx]} GPU(s) (ITL {decode_data.itl[selected_decode_idx]:.2f} ms, throughput {decode_data.thpt_per_gpu[selected_decode_idx]:.2f} tokens/s/GPU)"
570
                )
571

572
        if args.dry_run:
573
            # use min value for prefill and decode GPU counts
574
575
576
577
578
579
580
581
            prefill_data.num_gpus = [args.min_num_gpus_per_engine]
            decode_data.num_gpus = [args.min_num_gpus_per_engine]
            prefill_data.parallel_mappings = [
                ParallelizationMapping(tp=args.min_num_gpus_per_engine)
            ]
            decode_data.parallel_mappings = [
                ParallelizationMapping(tp=args.min_num_gpus_per_engine)
            ]
582
583
            selected_prefill_idx = 0
            selected_decode_idx = 0
584

585
586
587
        # interpolate ISL - TTFT with best prefill parallel mapping
        best_prefill_gpus = prefill_data.num_gpus[selected_prefill_idx]
        best_prefill_mapping = prefill_data.parallel_mappings[selected_prefill_idx]
588
        logger.info(
589
            f"Profiling prefill under best {best_prefill_gpus} GPU(s) with parallel mapping [{best_prefill_mapping.label()}] with different ISL..."
590
        )
591
        prefill_config = config_modifier.convert_config(
592
593
594
595
596
            config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe
        )
        prefill_config = apply_parallel_mapping_to_config(
            prefill_config,
            best_prefill_mapping,
597
            SubComponentType.PREFILL,
598
599
            config_modifier,
            args.num_gpus_per_node,
600
        )
601
        logger.debug(f"Dynamo config: {prefill_config}")
602

603
604
        work_dir = f"{args.output_dir}/selected_prefill_interpolation"
        os.makedirs(work_dir, exist_ok=True)
605

606
607
608
609
        prefill_config_fn = f"{work_dir}/config.yaml"
        with open(prefill_config_fn, "w") as f:
            yaml.dump(prefill_config, f)

610
611
        if args.dry_run:
            logger.info("Skipping deployment creation in dry run mode")
612
613
614
        elif args.use_ai_configurator:
            profile_prefill_aiconfigurator(
                work_dir,
615
                best_prefill_gpus,  # num_gpus
616
                sweep_max_context_length,
617
618
                args.prefill_interpolation_granularity,
                ai_configurator_perf_estimator,
619
                tp_size=best_prefill_mapping.get_tp_size(),
620
            )
621
622
623
624
625
626
627
628
        else:
            client = DynamoDeploymentClient(
                namespace=args.namespace,
                base_log_dir=work_dir,
                model_name=model_name,
                service_name=args.service_name,
                frontend_port=frontend_port,
                deployment_name=prefill_config["metadata"]["name"],
629
            )
630
631
632
633
            deployment_clients.append(client)  # Track for cleanup
            await client.create_deployment(prefill_config_fn)
            logger.info("Waiting for deployment to be ready...")
            try:
634
635
636
                await client.wait_for_deployment_ready(
                    timeout=getattr(args, "deployment_timeout", 1800)
                )
637
                logger.info("Deployment is ready")
638

639
640
641
                skip_profile = False
            except TimeoutError:
                logger.error(
642
                    "Deployment or model failed to become ready within timeout, skipping profiling"
643
644
                )
                skip_profile = True
645

646
647
648
649
650
651
            if not skip_profile:
                logger.info("Getting deployment logs...")
                await client.get_deployment_logs()
                logger.info(
                    f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                )
652

653
            base_url = client.get_service_url()
654

655
656
657
            profile_prefill(
                work_dir,
                model_name,
658
                model_path,
659
                base_url,
660
                best_prefill_gpus,
661
                sweep_max_context_length,
662
                args.prefill_interpolation_granularity,
663
                attention_dp_size=best_prefill_mapping.get_attn_dp_size(),
664
            )
665

666
667
668
669
            logger.info("Cleaning up deployment...")
            await client.delete_deployment()
            deployment_clients.remove(client)
            logger.info("Deployment deleted")
670

671
672
673
674
675
676
677
678
679
680
681
682
        # interpolate ITL - Active_KV_Cache - Decode_Context_Length with best decode parallel mapping
        best_decode_gpus = decode_data.num_gpus[selected_decode_idx]
        best_decode_mapping = decode_data.parallel_mappings[selected_decode_idx]
        logger.info(
            f"Profiling decode with {best_decode_gpus} GPUs with parallel mapping [{best_decode_mapping.label()}]..."
        )
        decode_config = config_modifier.convert_config(
            config, EngineType.DECODE, is_moe_model=args.model_info.is_moe
        )
        decode_config = apply_parallel_mapping_to_config(
            decode_config,
            best_decode_mapping,
683
            SubComponentType.DECODE,
684
685
686
            config_modifier,
            args.num_gpus_per_node,
        )
687
        logger.debug(f"Dynamo config: {decode_config}")
688

689
690
691
692
693
694
695
        work_dir = f"{args.output_dir}/selected_decode_interpolation"
        os.makedirs(work_dir, exist_ok=True)

        decode_config_fn = f"{work_dir}/config.yaml"
        with open(decode_config_fn, "w") as f:
            yaml.dump(decode_config, f)

696
697
        if args.dry_run:
            logger.info("Skipping deployment creation in dry run mode")
698
        elif args.use_ai_configurator:
699
            attention_dp_size = best_decode_mapping.get_attn_dp_size()
700
            max_kv_tokens = ai_configurator_perf_estimator.get_max_kv_tokens(
701
                args.isl, args.osl, tp_size=best_decode_mapping.get_tp_size()
702
703
704
            )
            profile_decode_aiconfigurator(
                work_dir,
705
                best_decode_gpus,  # num_gpus
706
                max_kv_tokens,
707
                sweep_max_context_length,
708
709
                args.decode_interpolation_granularity,
                ai_configurator_perf_estimator,
710
                attention_dp_size,
711
                tp_size=best_decode_mapping.get_tp_size(),
712
            )
713
714
715
716
717
718
719
720
721
722
723
724
725
726
        else:
            client = DynamoDeploymentClient(
                namespace=args.namespace,
                base_log_dir=work_dir,
                model_name=model_name,
                service_name=args.service_name,
                frontend_port=frontend_port,
                deployment_name=decode_config["metadata"]["name"],
            )
            deployment_clients.append(client)  # Track for cleanup
            await client.create_deployment(decode_config_fn)
            logger.info("Waiting for deployment to be ready...")
            await client.wait_for_deployment_ready()
            logger.info("Deployment is ready")
727

728
729
730
731
732
            logger.info("Getting deployment logs...")
            await client.get_deployment_logs()
            logger.info(
                f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
            )
733

734
            attention_dp_size = best_decode_mapping.get_attn_dp_size()
735
            max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
736
737
                f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log",
                attention_dp_size=attention_dp_size,
738
739
740
741
742
743
744
            )

            base_url = client.get_service_url()

            profile_decode(
                work_dir,
                model_name,
745
                model_path,
746
                base_url,
747
                best_decode_gpus,
748
                max_kv_tokens,
749
                sweep_max_context_length,
750
                args.decode_interpolation_granularity,
751
                attention_dp_size,
752
            )
753

754
755
756
757
            logger.info("Cleaning up deployment...")
            await client.delete_deployment()
            deployment_clients.remove(client)
            logger.info("Deployment deleted")
758

759
        # generate DGD with planner based on profiling results
760
        config, mocker_config = generate_dgd_config_with_planner(
761
762
763
764
            config_path=args.config,
            config_modifier=config_modifier,
            output_dir=args.output_dir,
            args=args,
765
766
            best_prefill_mapping=best_prefill_mapping,
            best_decode_mapping=best_decode_mapping,
767
768
            num_gpus_per_node=args.num_gpus_per_node,
        )
769
        logger.debug(f"Final DGD config with planner: {config}")
770

771
        # save DGD config with planner; support multi-document output when a ConfigMap is included
772
        with open(f"{args.output_dir}/config_with_planner.yaml", "w") as f:
773
            if isinstance(config, list):
774
                yaml.safe_dump_all(config, f, sort_keys=False)
775
            else:
776
                yaml.safe_dump(config, f, sort_keys=False)
777

778
779
780
781
        # save mocker config with planner for testing purposes
        logger.debug(f"Mocker config with planner: {mocker_config}")
        with open(f"{args.output_dir}/mocker_config_with_planner.yaml", "w") as f:
            if isinstance(mocker_config, list):
782
                yaml.safe_dump_all(mocker_config, f, sort_keys=False)
783
            else:
784
                yaml.safe_dump(mocker_config, f, sort_keys=False)
785

786
787
788
789
790
791
792
793
794
795
796
797
        # Write success status with output files
        write_profiler_status(
            args.output_dir,
            status=ProfilerStatus.SUCCESS,
            message="Profiler completed successfully",
            outputs={
                "config_with_planner": "config_with_planner.yaml",
                "mocker_config_with_planner": "mocker_config_with_planner.yaml",
                "disagg_config": "disagg_config.yaml",
            },
        )

798
    except Exception as e:
799
800
801
802
803
804
805
        logger.exception("Profile job failed with error")
        write_profiler_status(
            args.output_dir,
            status=ProfilerStatus.FAILED,
            error=str(e),
            message=f"Profiler failed with exception: {type(e).__name__}",
        )
806
807
808
809
810
811
812
813
814
        raise
    finally:
        # Always clean up any remaining deployments, even if the job failed
        logger.info("Performing final cleanup of any remaining deployments...")
        await cleanup_remaining_deployments(deployment_clients, args.namespace)
        logger.info("Final cleanup completed.")


if __name__ == "__main__":
815
    args = create_profiler_parser()
816

817
818
819
820
821
822
823
824
825
826
    # setup file logging
    os.makedirs(args.output_dir, exist_ok=True)
    log_file_handler = logging.FileHandler(f"{args.output_dir}/profile_sla.log")
    log_file_handler.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
    )
    log_file_handler.setFormatter(formatter)
    logger.addHandler(log_file_handler)

827
    asyncio.run(run_profile(args))