profile_sla.py 32.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16
import asyncio
17
18
19
import logging
import math
import os
20
from dataclasses import dataclass, field
21
22
23

import numpy as np
import yaml
24

25
26
27
28
from benchmarks.profiler.utils.aiperf import (
    get_decode_itl_and_thpt_per_gpu,
    get_prefill_ttft,
)
29
from benchmarks.profiler.utils.config_modifiers import CONFIG_MODIFIERS
30
31
32
33
34
35
from benchmarks.profiler.utils.config_modifiers.parallelization_mapping import (
    ParallelizationMapping,
    apply_parallel_mapping_to_config,
    get_candidate_parallel_mappings,
)
from benchmarks.profiler.utils.defaults import EngineType
36
from benchmarks.profiler.utils.dgd_generation import generate_dgd_config_with_planner
37
from benchmarks.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator
38
39
from benchmarks.profiler.utils.plot import (
    plot_decode_performance,
40
    plot_pd_joint_results,
41
42
    plot_prefill_performance,
)
43
from benchmarks.profiler.utils.profile_decode import (
44
    get_num_request_range,
45
46
47
48
49
50
51
    profile_decode,
    profile_decode_aiconfigurator,
)
from benchmarks.profiler.utils.profile_prefill import (
    profile_prefill,
    profile_prefill_aiconfigurator,
)
52
from benchmarks.profiler.utils.profiler_argparse import create_profiler_parser
53
from benchmarks.profiler.webui.select_config import pick_config_with_webui
54
55
56
57
from deploy.utils.dynamo_deployment import (
    DynamoDeploymentClient,
    cleanup_remaining_deployments,
)
58
from dynamo.planner.defaults import WORKER_COMPONENT_NAMES
59

60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118

@dataclass
class PrefillProfileData:
    """Container for prefill profiling results."""

    num_gpus: list[int] = field(default_factory=list)
    ttft: list[float] = field(default_factory=list)
    thpt_per_gpu: list[float] = field(default_factory=list)
    parallel_mapping_labels: list[str] = field(default_factory=list)
    parallel_mappings: list[ParallelizationMapping] = field(default_factory=list)

    def add_data(
        self,
        num_gpus: int,
        ttft: float,
        thpt_per_gpu: float,
        parallel_mapping_label: str,
        parallel_mapping: ParallelizationMapping,
    ) -> None:
        """Add a complete data point to the profile data."""
        self.num_gpus.append(num_gpus)
        self.ttft.append(ttft)
        self.thpt_per_gpu.append(thpt_per_gpu)
        self.parallel_mapping_labels.append(parallel_mapping_label)
        self.parallel_mappings.append(parallel_mapping)


@dataclass
class DecodeProfileData:
    """Container for decode profiling results."""

    num_gpus: list[int] = field(default_factory=list)
    itl: list[float] = field(default_factory=list)
    thpt_per_gpu: list[float] = field(default_factory=list)
    concurrency: list[int] = field(default_factory=list)
    kv_cache_size: list[int] = field(default_factory=list)
    parallel_mapping_labels: list[str] = field(default_factory=list)
    parallel_mappings: list[ParallelizationMapping] = field(default_factory=list)

    def add_data(
        self,
        num_gpus: int,
        itl: float,
        thpt_per_gpu: float,
        concurrency: int,
        kv_cache_size: int,
        parallel_mapping_label: str,
        parallel_mapping: ParallelizationMapping,
    ) -> None:
        """Add a complete data point to the profile data."""
        self.num_gpus.append(num_gpus)
        self.itl.append(itl)
        self.thpt_per_gpu.append(thpt_per_gpu)
        self.concurrency.append(concurrency)
        self.kv_cache_size.append(kv_cache_size)
        self.parallel_mapping_labels.append(parallel_mapping_label)
        self.parallel_mappings.append(parallel_mapping)


119
120
121
122
123
124
125
126
127
128
129
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
    "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)


130
131
132
async def run_profile(args):
    # List to track all created deployment clients for cleanup in case of failure
    deployment_clients = []
133

134
135
136
137
    # Inherit aic_backend from backend if not explicitly set
    if not args.aic_backend:
        args.aic_backend = args.backend

138
139
140
141
142
    try:
        config_modifier = CONFIG_MODIFIERS[args.backend]

        with open(args.config, "r") as f:
            config = yaml.safe_load(f)
143

144
145
        if args.dgd_image:
            config = config_modifier.update_image(config, args.dgd_image)
146
            logger.debug(f"Using DGD image: {args.dgd_image}")
147

148
149
150
151
152
        profile_num_gpus = [
            2**i
            for i in range(int(math.log2(args.max_num_gpus_per_engine)) + 1)
            if args.min_num_gpus_per_engine <= 2**i <= args.max_num_gpus_per_engine
        ]
153
        logger.info(f"Profiling GPU counts: {profile_num_gpus}")
154
        os.makedirs(args.output_dir, exist_ok=True)
155

156
        model_name = config_modifier.get_model_name(config)
157

158
159
160
161
162
163
164
165
166
        # Determine sweep max context length: allow user-provided cap to override model's if smaller
        use_specified_max_context_len = getattr(args, "max_context_length", None)
        model_max_context_len = args.model_info.max_context_length
        if not use_specified_max_context_len and not model_max_context_len:
            raise ValueError(
                "No max_context_length available from args.max_context_length or model_info from HF config"
            )
        elif not use_specified_max_context_len:
            sweep_max_context_length = model_max_context_len
167
            logger.info(
168
                f"Using model's maximum context length: {model_max_context_len}"
169
            )
170
171
        elif not model_max_context_len:
            sweep_max_context_length = use_specified_max_context_len
172
            logger.info(
173
                f"Using user-provided max_context_length: {use_specified_max_context_len}"
174
175
            )
        else:
176
177
178
179
180
181
            sweep_max_context_length = min(
                use_specified_max_context_len, model_max_context_len
            )
            logger.info(
                f"Using minimum of user-provided and model's maximum context length: {sweep_max_context_length}"
            )
182

183
184
185
186
187
        if args.use_ai_configurator:
            if not args.aic_system:
                raise ValueError(
                    "Must provide --aic-system when using --use-ai-configurator."
                )
188
189
190
191
192
193
194
195
196
197
198
199

            # Fallback to args.model if aic_hf_id is not provided
            if not args.aic_hf_id:
                if args.model:
                    logger.info(
                        f"--aic-hf-id not provided, using --model ({args.model}) as HuggingFace ID for AI configurator"
                    )
                    args.aic_hf_id = args.model
                else:
                    raise ValueError(
                        "Must provide --aic-hf-id or --model when using --use-ai-configurator."
                    )
200

201
            logger.info("Using aiconfigurator to estimate performance...")
202
            ai_configurator_perf_estimator = AIConfiguratorPerfEstimator(
203
                args.aic_hf_id,
204
                args.aic_system.lower(),
205
206
                args.aic_backend,
                args.aic_backend_version,
207
208
            )
        else:
209
            if args.aic_system or args.aic_hf_id or args.aic_backend_version:
210
                logger.warning(
211
                    "Ignoring --aic-system, --aic-hf-id, and/or --backend-version "
212
213
214
                    "when not using --use-ai-configurator."
                )

215
        # first profile prefill
216
        prefill_data = PrefillProfileData()
217
        logger.info("Profiling prefill...")
218
219
        base_prefill_config = config_modifier.convert_config(
            config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe
220
        )
221
        frontend_port = config_modifier.get_port(config)
222
223
        itl: float | None = None
        thpt_per_gpu: float | None = None
224
225
        for num_gpus in profile_num_gpus:
            logger.info(f"Profiling prefill with {num_gpus} GPUs...")
226
227
228
            candidate_mappings = get_candidate_parallel_mappings(
                num_gpus, args.model_info, EngineType.PREFILL
            )
229

230
231
232
233
234
235
236
237
            for mapping in candidate_mappings:
                # Apply parallel mapping to config
                prefill_config = apply_parallel_mapping_to_config(
                    base_prefill_config,
                    mapping,
                    EngineType.PREFILL,
                    config_modifier,
                    args.num_gpus_per_node,
238
                )
239
                logger.debug(f"Dynamo config: {prefill_config}")
240

241
242
243
                # Work dir includes mapping label (safe chars only)
                parallel_mapping_tag = (
                    mapping.label().replace("=", "").replace("/", "_")
244
                )
245
246
                work_dir = (
                    f"{args.output_dir}/prefill_{num_gpus}gpus_{parallel_mapping_tag}"
247
                )
248
249
250
251
252
253
254
255
256
257
                os.makedirs(work_dir, exist_ok=True)

                prefill_config_fn = f"{work_dir}/config.yaml"
                with open(prefill_config_fn, "w") as f:
                    yaml.dump(prefill_config, f)

                ttft = None
                if args.dry_run:
                    logger.info("Skipping deployment creation in dry run mode")
                elif args.use_ai_configurator:
258
                    logger.info("Using ai-configurator to estimate prefill latency")
259
260
                    perf_dict = ai_configurator_perf_estimator.estimate_prefill_perf(
                        args.isl,
261
                        tp_size=mapping.get_tp_size(),
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
                    )
                    ttft = perf_dict["context_latency"]
                    logger.info(f"Estimated prefill TTFT: {ttft:.2f}ms")
                else:
                    client = DynamoDeploymentClient(
                        namespace=args.namespace,
                        base_log_dir=work_dir,
                        model_name=model_name,
                        service_name=args.service_name,
                        frontend_port=frontend_port,
                        deployment_name=prefill_config["metadata"]["name"],
                    )
                    logger.info(
                        f"Created client with service_name: {client.service_name}"
                    )
                    deployment_clients.append(client)  # Track for cleanup
                    await client.create_deployment(prefill_config_fn)
                    logger.info("Waiting for deployment to be ready...")
                    await client.wait_for_deployment_ready()
                    logger.info("Deployment is ready")

                    logger.info("Getting deployment logs...")
                    await client.get_deployment_logs()
                    logger.info(
                        f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                    )
288

289
290
291
                    # run ai-perf
                    base_url = client.get_service_url()
                    ai_perf_artifact_dir = f"{work_dir}/aiperf_isl{args.isl}"
292
                    ttft = get_prefill_ttft(
293
294
295
296
                        args.isl,
                        ai_perf_artifact_dir,
                        model_name,
                        model_name,
297
298
                        base_url,
                        attention_dp_size=mapping.get_attn_dp_size(),
299
300
301
302
303
304
305
306
307
308
309
                    )

                    logger.info("Cleaning up deployment...")
                    await client.delete_deployment()
                    deployment_clients.remove(client)
                    logger.info("Deployment deleted")

                if ttft is not None:
                    prefill_data.add_data(
                        num_gpus=num_gpus,
                        ttft=ttft,
310
311
312
313
314
                        thpt_per_gpu=args.isl
                        / ttft
                        / num_gpus
                        * 1000
                        * mapping.get_attn_dp_size(),
315
316
317
                        parallel_mapping_label=mapping.label(),
                        parallel_mapping=mapping,
                    )
318

319
        # Plot the results as a 2D scatter plot
320
321
        if prefill_data.num_gpus and prefill_data.ttft and prefill_data.thpt_per_gpu:
            plot_prefill_performance(prefill_data, args.ttft, args.output_dir)
322

323
        # then profile decode
324
        decode_data = DecodeProfileData()
325
        logger.info("Profiling decode...")
326
327
        base_decode_config = config_modifier.convert_config(
            config, EngineType.DECODE, is_moe_model=args.model_info.is_moe
328
329
330
        )
        for num_gpus in profile_num_gpus:
            logger.info(f"Profiling decode with {num_gpus} GPUs...")
331
332
333
            candidate_mappings = get_candidate_parallel_mappings(
                num_gpus, args.model_info, EngineType.DECODE
            )
334

335
336
337
338
339
340
341
342
            for mapping in candidate_mappings:
                # Apply parallel mapping to config
                decode_config = apply_parallel_mapping_to_config(
                    base_decode_config,
                    mapping,
                    EngineType.DECODE,
                    config_modifier,
                    args.num_gpus_per_node,
343
                )
344
                logger.debug(f"Dynamo config: {decode_config}")
345

346
347
348
349
                parallel_mapping_tag = (
                    mapping.label()
                    .replace("=", "")
                    .replace("/", "_")  # safe chars for directory
350
                )
351
352
                work_dir = (
                    f"{args.output_dir}/decode_{num_gpus}gpus_{parallel_mapping_tag}"
353
                )
354
                os.makedirs(work_dir, exist_ok=True)
355

356
357
358
                decode_config_fn = f"{work_dir}/config.yaml"
                with open(decode_config_fn, "w") as f:
                    yaml.dump(decode_config, f)
359

360
361
                if args.dry_run:
                    logger.info("Skipping deployment creation in dry run mode")
362

363
364
365
366
                elif args.use_ai_configurator:
                    # Compute max_concurrency and max_kv_tokens to know which
                    # num_request to sweep over.
                    max_concurrency = ai_configurator_perf_estimator.get_max_batch_size(
367
                        args.isl, args.osl, tp_size=mapping.get_tp_size()
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
                    )
                    max_kv_tokens = max_concurrency * (args.isl + args.osl)

                else:
                    client = DynamoDeploymentClient(
                        namespace=args.namespace,
                        base_log_dir=work_dir,
                        model_name=model_name,
                        service_name=args.service_name,
                        frontend_port=frontend_port,
                        deployment_name=decode_config["metadata"]["name"],
                    )
                    deployment_clients.append(client)  # Track for cleanup
                    await client.create_deployment(decode_config_fn)
                    logger.info("Waiting for deployment to be ready...")
                    await client.wait_for_deployment_ready()
                    logger.info("Deployment is ready")

                    logger.info("Getting deployment logs...")
                    await client.get_deployment_logs()
                    logger.info(
                        f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                    )
391

392
393
                    # Compute max_concurrency and max_kv_tokens to know which
                    # num_request to sweep over.
394
                    attention_dp_size = mapping.get_attn_dp_size()
395
396
397
398
399
400
401
                    max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
                        f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log",
                        attention_dp_size=attention_dp_size,
                    )
                    max_concurrency = max_kv_tokens // (args.isl + args.osl)

                if not args.dry_run:
402
                    attention_dp_size = mapping.get_attn_dp_size()
403
404
405
406
407
408
409
410
                    sweep_num_request = get_num_request_range(
                        attention_dp_size,
                        max_concurrency,
                        args.decode_interpolation_granularity,
                    )
                    logger.info(
                        f"Sweeping num_request range based on maximum number of kv tokens: {sweep_num_request}"
                    )
411

412
413
414
415
416
417
418
419
420
421
422
                    for num_request in sweep_num_request:
                        itl = thpt_per_gpu = None
                        if args.use_ai_configurator:
                            logger.info(
                                "Using ai-configurator to estimate decode latency."
                            )
                            perf_dict = ai_configurator_perf_estimator.estimate_perf(
                                args.isl,
                                args.osl,
                                num_request,
                                mode=EngineType.DECODE,
423
                                tp_size=mapping.get_tp_size(),
424
425
                            )

426
427
428
429
430
431
432
433
434
                            itl = perf_dict["tpot"]
                            thpt_per_gpu = perf_dict["tokens/s/gpu"]
                            logger.info(f"Estimated decode ITL: {itl:.2f}ms")
                            logger.info(
                                f"Estimated decode throughput per GPU: {thpt_per_gpu:.2f} tokens/s/GPU"
                            )
                        else:
                            base_url = client.get_service_url()
                            ai_perf_artifact_dir = f"{work_dir}/aiperf_request{num_request}_isl{args.isl}_osl{args.osl}_n{num_request}"
435
                            itl, thpt_per_gpu = get_decode_itl_and_thpt_per_gpu(
436
437
438
439
440
441
442
                                args.isl,
                                args.osl,
                                num_request,
                                ai_perf_artifact_dir,
                                model_name,
                                model_name,
                                base_url=base_url,
443
                                num_gpus=num_gpus,
444
                                attention_dp_size=mapping.get_attn_dp_size(),
445
446
447
448
449
450
451
452
453
454
455
456
                            )

                        if itl is not None and thpt_per_gpu is not None:
                            decode_data.add_data(
                                num_gpus=num_gpus,
                                itl=itl,
                                thpt_per_gpu=thpt_per_gpu,
                                concurrency=num_request,
                                kv_cache_size=max_kv_tokens,
                                parallel_mapping_label=mapping.label(),
                                parallel_mapping=mapping,
                            )
457

458
459
460
461
462
                if not args.dry_run and not args.use_ai_configurator:
                    logger.info("Cleaning up deployment...")
                    await client.delete_deployment()
                    deployment_clients.remove(client)
                    logger.info("Deployment deleted")
463

464
        # Plot all decode results after profiling is complete
465
466
        if decode_data.num_gpus:
            plot_decode_performance(decode_data, args.itl, args.output_dir)
467

468
        if prefill_data.num_gpus and decode_data.num_gpus:
469
            plot_pd_joint_results(
470
                args.isl, args.osl, prefill_data, decode_data, args.output_dir
471
472
            )

473
474
475
476
477
        if args.dry_run:
            logger.info("Skipping recommendations in dry run mode")
        else:
            logger.info("Analyzing results and generate recommendations...")
            # Safety guards: no results → exit early with a clear message
478
            if not prefill_data.num_gpus:
479
                logger.error("No prefill results produced; skipping recommendations.")
480
                return
481

482
483
484
485
            if args.pick_with_webui:
                # select best P/D config in webUI
                selected_prefill_idx, selected_decode_idx = pick_config_with_webui(
                    prefill_data, decode_data, args
486
487
                )
            else:
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
                # automatically select P/D config within SLA with the highest throughput/GPU
                # select best parallel mapping for prefill
                if min(prefill_data.ttft) > args.ttft:
                    logger.warning(
                        "No engine configuration satisfies the TTFT requirement, please try a smaller model or more powerful hardware"
                    )
                    selected_prefill_idx = int(np.argmin(np.array(prefill_data.ttft)))
                else:
                    valid_indices = [
                        i
                        for i, ttft in enumerate(prefill_data.ttft)
                        if ttft <= args.ttft
                    ]
                    # Among valid TP sizes, select the one with highest throughput per GPU
                    valid_thpts = [prefill_data.thpt_per_gpu[i] for i in valid_indices]
                    max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
                    selected_prefill_idx = max_thpt_idx
                logger.info(
                    f"Suggested prefill parallel mapping: {prefill_data.parallel_mapping_labels[selected_prefill_idx]} on {prefill_data.num_gpus[selected_prefill_idx]} GPU(s) (TTFT {prefill_data.ttft[selected_prefill_idx]:.2f} ms, throughput {prefill_data.thpt_per_gpu[selected_prefill_idx]:.2f} tokens/s/GPU)"
                )
508

509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
                # select best parallel mapping for decode
                if not decode_data.num_gpus:
                    logger.error(
                        "No decode results produced; skipping recommendations."
                    )
                    return
                if min(decode_data.itl) > args.itl:
                    logger.warning(
                        "No engine configuration satisfies the ITL requirement, please try a smaller model or more powerful hardware"
                    )
                    selected_decode_idx = int(np.argmin(np.array(decode_data.itl)))
                else:
                    valid_indices = [
                        i for i, itl in enumerate(decode_data.itl) if itl <= args.itl
                    ]
                    # Among valid TP sizes, select the one with highest throughput per GPU
                    valid_thpts = [decode_data.thpt_per_gpu[i] for i in valid_indices]
                    max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
                    selected_decode_idx = max_thpt_idx
                logger.info(
                    f"Suggested decode parallel mapping: {decode_data.parallel_mapping_labels[selected_decode_idx]} on {decode_data.num_gpus[selected_decode_idx]} GPU(s) (ITL {decode_data.itl[selected_decode_idx]:.2f} ms, throughput {decode_data.thpt_per_gpu[selected_decode_idx]:.2f} tokens/s/GPU)"
530
                )
531

532
        if args.dry_run:
533
            # use min value for prefill and decode GPU counts
534
535
536
537
538
539
540
541
            prefill_data.num_gpus = [args.min_num_gpus_per_engine]
            decode_data.num_gpus = [args.min_num_gpus_per_engine]
            prefill_data.parallel_mappings = [
                ParallelizationMapping(tp=args.min_num_gpus_per_engine)
            ]
            decode_data.parallel_mappings = [
                ParallelizationMapping(tp=args.min_num_gpus_per_engine)
            ]
542
543
            selected_prefill_idx = 0
            selected_decode_idx = 0
544

545
546
547
        # interpolate ISL - TTFT with best prefill parallel mapping
        best_prefill_gpus = prefill_data.num_gpus[selected_prefill_idx]
        best_prefill_mapping = prefill_data.parallel_mappings[selected_prefill_idx]
548
        logger.info(
549
            f"Profiling prefill under best {best_prefill_gpus} GPU(s) with parallel mapping [{best_prefill_mapping.label()}] with different ISL..."
550
        )
551
        prefill_config = config_modifier.convert_config(
552
553
554
555
556
557
558
559
            config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe
        )
        prefill_config = apply_parallel_mapping_to_config(
            prefill_config,
            best_prefill_mapping,
            EngineType.PREFILL,
            config_modifier,
            args.num_gpus_per_node,
560
        )
561
        logger.debug(f"Dynamo config: {prefill_config}")
562

563
564
        work_dir = f"{args.output_dir}/selected_prefill_interpolation"
        os.makedirs(work_dir, exist_ok=True)
565

566
567
568
569
        prefill_config_fn = f"{work_dir}/config.yaml"
        with open(prefill_config_fn, "w") as f:
            yaml.dump(prefill_config, f)

570
571
        if args.dry_run:
            logger.info("Skipping deployment creation in dry run mode")
572
573
574
        elif args.use_ai_configurator:
            profile_prefill_aiconfigurator(
                work_dir,
575
                best_prefill_gpus,  # num_gpus
576
                sweep_max_context_length,
577
578
                args.prefill_interpolation_granularity,
                ai_configurator_perf_estimator,
579
                tp_size=best_prefill_mapping.get_tp_size(),
580
            )
581
582
583
584
585
586
587
588
        else:
            client = DynamoDeploymentClient(
                namespace=args.namespace,
                base_log_dir=work_dir,
                model_name=model_name,
                service_name=args.service_name,
                frontend_port=frontend_port,
                deployment_name=prefill_config["metadata"]["name"],
589
            )
590
591
592
593
594
595
            deployment_clients.append(client)  # Track for cleanup
            await client.create_deployment(prefill_config_fn)
            logger.info("Waiting for deployment to be ready...")
            try:
                await client.wait_for_deployment_ready()
                logger.info("Deployment is ready")
596

597
598
599
                skip_profile = False
            except TimeoutError:
                logger.error(
600
                    "Deployment or model failed to become ready within timeout, skipping profiling"
601
602
                )
                skip_profile = True
603

604
605
606
607
608
609
            if not skip_profile:
                logger.info("Getting deployment logs...")
                await client.get_deployment_logs()
                logger.info(
                    f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                )
610

611
            base_url = client.get_service_url()
612

613
614
615
616
617
            profile_prefill(
                work_dir,
                model_name,
                model_name,
                base_url,
618
                best_prefill_gpus,
619
                sweep_max_context_length,
620
                args.prefill_interpolation_granularity,
621
                attention_dp_size=best_prefill_mapping.get_attn_dp_size(),
622
            )
623

624
625
626
627
            logger.info("Cleaning up deployment...")
            await client.delete_deployment()
            deployment_clients.remove(client)
            logger.info("Deployment deleted")
628

629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
        # interpolate ITL - Active_KV_Cache - Decode_Context_Length with best decode parallel mapping
        best_decode_gpus = decode_data.num_gpus[selected_decode_idx]
        best_decode_mapping = decode_data.parallel_mappings[selected_decode_idx]
        logger.info(
            f"Profiling decode with {best_decode_gpus} GPUs with parallel mapping [{best_decode_mapping.label()}]..."
        )
        decode_config = config_modifier.convert_config(
            config, EngineType.DECODE, is_moe_model=args.model_info.is_moe
        )
        decode_config = apply_parallel_mapping_to_config(
            decode_config,
            best_decode_mapping,
            EngineType.DECODE,
            config_modifier,
            args.num_gpus_per_node,
        )
645
        logger.debug(f"Dynamo config: {decode_config}")
646

647
648
649
650
651
652
653
        work_dir = f"{args.output_dir}/selected_decode_interpolation"
        os.makedirs(work_dir, exist_ok=True)

        decode_config_fn = f"{work_dir}/config.yaml"
        with open(decode_config_fn, "w") as f:
            yaml.dump(decode_config, f)

654
655
        if args.dry_run:
            logger.info("Skipping deployment creation in dry run mode")
656
        elif args.use_ai_configurator:
657
            attention_dp_size = best_decode_mapping.get_attn_dp_size()
658
            max_kv_tokens = ai_configurator_perf_estimator.get_max_kv_tokens(
659
                args.isl, args.osl, tp_size=best_decode_mapping.get_tp_size()
660
661
662
            )
            profile_decode_aiconfigurator(
                work_dir,
663
                best_decode_gpus,  # num_gpus
664
                max_kv_tokens,
665
                sweep_max_context_length,
666
667
                args.decode_interpolation_granularity,
                ai_configurator_perf_estimator,
668
                attention_dp_size,
669
                tp_size=best_decode_mapping.get_tp_size(),
670
            )
671
672
673
674
675
676
677
678
679
680
681
682
683
684
        else:
            client = DynamoDeploymentClient(
                namespace=args.namespace,
                base_log_dir=work_dir,
                model_name=model_name,
                service_name=args.service_name,
                frontend_port=frontend_port,
                deployment_name=decode_config["metadata"]["name"],
            )
            deployment_clients.append(client)  # Track for cleanup
            await client.create_deployment(decode_config_fn)
            logger.info("Waiting for deployment to be ready...")
            await client.wait_for_deployment_ready()
            logger.info("Deployment is ready")
685

686
687
688
689
690
            logger.info("Getting deployment logs...")
            await client.get_deployment_logs()
            logger.info(
                f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
            )
691

692
            attention_dp_size = best_decode_mapping.get_attn_dp_size()
693
            max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
694
695
                f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log",
                attention_dp_size=attention_dp_size,
696
697
698
699
700
701
702
703
704
            )

            base_url = client.get_service_url()

            profile_decode(
                work_dir,
                model_name,
                model_name,
                base_url,
705
                best_decode_gpus,
706
                max_kv_tokens,
707
                sweep_max_context_length,
708
                args.decode_interpolation_granularity,
709
                attention_dp_size,
710
            )
711

712
713
714
715
            logger.info("Cleaning up deployment...")
            await client.delete_deployment()
            deployment_clients.remove(client)
            logger.info("Deployment deleted")
716

717
        # generate DGD with planner based on profiling results
718
        config, mocker_config = generate_dgd_config_with_planner(
719
720
721
722
            config_path=args.config,
            config_modifier=config_modifier,
            output_dir=args.output_dir,
            args=args,
723
724
            best_prefill_mapping=best_prefill_mapping,
            best_decode_mapping=best_decode_mapping,
725
726
            num_gpus_per_node=args.num_gpus_per_node,
        )
727
        logger.debug(f"Final DGD config with planner: {config}")
728

729
        # save DGD config with planner; support multi-document output when a ConfigMap is included
730
        with open(f"{args.output_dir}/config_with_planner.yaml", "w") as f:
731
732
733
734
            if isinstance(config, list):
                yaml.dump_all(config, f)
            else:
                yaml.dump(config, f)
735

736
737
738
739
740
741
742
743
        # save mocker config with planner for testing purposes
        logger.debug(f"Mocker config with planner: {mocker_config}")
        with open(f"{args.output_dir}/mocker_config_with_planner.yaml", "w") as f:
            if isinstance(mocker_config, list):
                yaml.dump_all(mocker_config, f)
            else:
                yaml.dump(mocker_config, f)

744
745
746
747
748
749
750
751
752
753
754
    except Exception as e:
        logger.error(f"Profile job failed with error: {e}")
        raise
    finally:
        # Always clean up any remaining deployments, even if the job failed
        logger.info("Performing final cleanup of any remaining deployments...")
        await cleanup_remaining_deployments(deployment_clients, args.namespace)
        logger.info("Final cleanup completed.")


if __name__ == "__main__":
755
    args = create_profiler_parser()
756

757
758
759
760
761
762
763
764
765
766
    # setup file logging
    os.makedirs(args.output_dir, exist_ok=True)
    log_file_handler = logging.FileHandler(f"{args.output_dir}/profile_sla.log")
    log_file_handler.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
    )
    log_file_handler.setFormatter(formatter)
    logger.addHandler(log_file_handler)

767
    asyncio.run(run_profile(args))