profile_sla.py 31.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16
import asyncio
17
18
19
import logging
import math
import os
20
from dataclasses import dataclass, field
21
22
23

import numpy as np
import yaml
24

25
26
27
28
from benchmarks.profiler.utils.aiperf import (
    get_decode_itl_and_thpt_per_gpu,
    get_prefill_ttft,
)
29
from benchmarks.profiler.utils.config_modifiers import CONFIG_MODIFIERS
30
31
32
33
34
35
from benchmarks.profiler.utils.config_modifiers.parallelization_mapping import (
    ParallelizationMapping,
    apply_parallel_mapping_to_config,
    get_candidate_parallel_mappings,
)
from benchmarks.profiler.utils.defaults import EngineType
36
from benchmarks.profiler.utils.dgd_generation import generate_dgd_config_with_planner
37
from benchmarks.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator
38
39
from benchmarks.profiler.utils.plot import (
    plot_decode_performance,
40
    plot_pd_joint_results,
41
42
    plot_prefill_performance,
)
43
from benchmarks.profiler.utils.profile_decode import (
44
    get_num_request_range,
45
46
47
48
49
50
51
    profile_decode,
    profile_decode_aiconfigurator,
)
from benchmarks.profiler.utils.profile_prefill import (
    profile_prefill,
    profile_prefill_aiconfigurator,
)
52
from benchmarks.profiler.utils.profiler_argparse import create_profiler_parser
53
54
55
56
from deploy.utils.dynamo_deployment import (
    DynamoDeploymentClient,
    cleanup_remaining_deployments,
)
57
from dynamo.planner.defaults import WORKER_COMPONENT_NAMES
58

59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117

@dataclass
class PrefillProfileData:
    """Container for prefill profiling results."""

    num_gpus: list[int] = field(default_factory=list)
    ttft: list[float] = field(default_factory=list)
    thpt_per_gpu: list[float] = field(default_factory=list)
    parallel_mapping_labels: list[str] = field(default_factory=list)
    parallel_mappings: list[ParallelizationMapping] = field(default_factory=list)

    def add_data(
        self,
        num_gpus: int,
        ttft: float,
        thpt_per_gpu: float,
        parallel_mapping_label: str,
        parallel_mapping: ParallelizationMapping,
    ) -> None:
        """Add a complete data point to the profile data."""
        self.num_gpus.append(num_gpus)
        self.ttft.append(ttft)
        self.thpt_per_gpu.append(thpt_per_gpu)
        self.parallel_mapping_labels.append(parallel_mapping_label)
        self.parallel_mappings.append(parallel_mapping)


@dataclass
class DecodeProfileData:
    """Container for decode profiling results."""

    num_gpus: list[int] = field(default_factory=list)
    itl: list[float] = field(default_factory=list)
    thpt_per_gpu: list[float] = field(default_factory=list)
    concurrency: list[int] = field(default_factory=list)
    kv_cache_size: list[int] = field(default_factory=list)
    parallel_mapping_labels: list[str] = field(default_factory=list)
    parallel_mappings: list[ParallelizationMapping] = field(default_factory=list)

    def add_data(
        self,
        num_gpus: int,
        itl: float,
        thpt_per_gpu: float,
        concurrency: int,
        kv_cache_size: int,
        parallel_mapping_label: str,
        parallel_mapping: ParallelizationMapping,
    ) -> None:
        """Add a complete data point to the profile data."""
        self.num_gpus.append(num_gpus)
        self.itl.append(itl)
        self.thpt_per_gpu.append(thpt_per_gpu)
        self.concurrency.append(concurrency)
        self.kv_cache_size.append(kv_cache_size)
        self.parallel_mapping_labels.append(parallel_mapping_label)
        self.parallel_mappings.append(parallel_mapping)


118
119
120
121
122
123
124
125
126
127
128
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
    "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)


129
130
131
async def run_profile(args):
    # List to track all created deployment clients for cleanup in case of failure
    deployment_clients = []
132

133
134
135
136
    # Inherit aic_backend from backend if not explicitly set
    if not args.aic_backend:
        args.aic_backend = args.backend

137
138
139
140
141
    try:
        config_modifier = CONFIG_MODIFIERS[args.backend]

        with open(args.config, "r") as f:
            config = yaml.safe_load(f)
142

143
144
        if args.dgd_image:
            config = config_modifier.update_image(config, args.dgd_image)
145
            logger.debug(f"Using DGD image: {args.dgd_image}")
146

147
148
149
150
151
        profile_num_gpus = [
            2**i
            for i in range(int(math.log2(args.max_num_gpus_per_engine)) + 1)
            if args.min_num_gpus_per_engine <= 2**i <= args.max_num_gpus_per_engine
        ]
152
        logger.info(f"Profiling GPU counts: {profile_num_gpus}")
153
        os.makedirs(args.output_dir, exist_ok=True)
154

155
        model_name = config_modifier.get_model_name(config)
156

157
158
159
160
161
162
163
164
165
        # Determine sweep max context length: allow user-provided cap to override model's if smaller
        use_specified_max_context_len = getattr(args, "max_context_length", None)
        model_max_context_len = args.model_info.max_context_length
        if not use_specified_max_context_len and not model_max_context_len:
            raise ValueError(
                "No max_context_length available from args.max_context_length or model_info from HF config"
            )
        elif not use_specified_max_context_len:
            sweep_max_context_length = model_max_context_len
166
            logger.info(
167
                f"Using model's maximum context length: {model_max_context_len}"
168
            )
169
170
        elif not model_max_context_len:
            sweep_max_context_length = use_specified_max_context_len
171
            logger.info(
172
                f"Using user-provided max_context_length: {use_specified_max_context_len}"
173
174
            )
        else:
175
176
177
178
179
180
            sweep_max_context_length = min(
                use_specified_max_context_len, model_max_context_len
            )
            logger.info(
                f"Using minimum of user-provided and model's maximum context length: {sweep_max_context_length}"
            )
181

182
183
184
185
186
        if args.use_ai_configurator:
            if not args.aic_system:
                raise ValueError(
                    "Must provide --aic-system when using --use-ai-configurator."
                )
187
188
189
190
191
192
193
194
195
196
197
198

            # Fallback to args.model if aic_hf_id is not provided
            if not args.aic_hf_id:
                if args.model:
                    logger.info(
                        f"--aic-hf-id not provided, using --model ({args.model}) as HuggingFace ID for AI configurator"
                    )
                    args.aic_hf_id = args.model
                else:
                    raise ValueError(
                        "Must provide --aic-hf-id or --model when using --use-ai-configurator."
                    )
199

200
            logger.info("Using aiconfigurator to estimate performance...")
201
            ai_configurator_perf_estimator = AIConfiguratorPerfEstimator(
202
                args.aic_hf_id,
203
                args.aic_system.lower(),
204
205
                args.aic_backend,
                args.aic_backend_version,
206
207
            )
        else:
208
            if args.aic_system or args.aic_hf_id or args.aic_backend_version:
209
                logger.warning(
210
                    "Ignoring --aic-system, --aic-hf-id, and/or --backend-version "
211
212
213
                    "when not using --use-ai-configurator."
                )

214
        # first profile prefill
215
        prefill_data = PrefillProfileData()
216
        logger.info("Profiling prefill...")
217
218
        base_prefill_config = config_modifier.convert_config(
            config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe
219
        )
220
        frontend_port = config_modifier.get_port(config)
221
222
        itl: float | None = None
        thpt_per_gpu: float | None = None
223
224
        for num_gpus in profile_num_gpus:
            logger.info(f"Profiling prefill with {num_gpus} GPUs...")
225
226
227
            candidate_mappings = get_candidate_parallel_mappings(
                num_gpus, args.model_info, EngineType.PREFILL
            )
228

229
230
231
232
233
234
235
236
            for mapping in candidate_mappings:
                # Apply parallel mapping to config
                prefill_config = apply_parallel_mapping_to_config(
                    base_prefill_config,
                    mapping,
                    EngineType.PREFILL,
                    config_modifier,
                    args.num_gpus_per_node,
237
                )
238
                logger.debug(f"Dynamo config: {prefill_config}")
239

240
241
242
                # Work dir includes mapping label (safe chars only)
                parallel_mapping_tag = (
                    mapping.label().replace("=", "").replace("/", "_")
243
                )
244
245
                work_dir = (
                    f"{args.output_dir}/prefill_{num_gpus}gpus_{parallel_mapping_tag}"
246
                )
247
248
249
250
251
252
253
254
255
256
                os.makedirs(work_dir, exist_ok=True)

                prefill_config_fn = f"{work_dir}/config.yaml"
                with open(prefill_config_fn, "w") as f:
                    yaml.dump(prefill_config, f)

                ttft = None
                if args.dry_run:
                    logger.info("Skipping deployment creation in dry run mode")
                elif args.use_ai_configurator:
257
                    logger.info("Using ai-configurator to estimate prefill latency")
258
259
                    perf_dict = ai_configurator_perf_estimator.estimate_prefill_perf(
                        args.isl,
260
                        tp_size=mapping.get_tp_size(),
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
                    )
                    ttft = perf_dict["context_latency"]
                    logger.info(f"Estimated prefill TTFT: {ttft:.2f}ms")
                else:
                    client = DynamoDeploymentClient(
                        namespace=args.namespace,
                        base_log_dir=work_dir,
                        model_name=model_name,
                        service_name=args.service_name,
                        frontend_port=frontend_port,
                        deployment_name=prefill_config["metadata"]["name"],
                    )
                    logger.info(
                        f"Created client with service_name: {client.service_name}"
                    )
                    deployment_clients.append(client)  # Track for cleanup
                    await client.create_deployment(prefill_config_fn)
                    logger.info("Waiting for deployment to be ready...")
                    await client.wait_for_deployment_ready()
                    logger.info("Deployment is ready")

                    logger.info("Getting deployment logs...")
                    await client.get_deployment_logs()
                    logger.info(
                        f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                    )
287

288
289
290
                    # run ai-perf
                    base_url = client.get_service_url()
                    ai_perf_artifact_dir = f"{work_dir}/aiperf_isl{args.isl}"
291
                    ttft = get_prefill_ttft(
292
293
294
295
                        args.isl,
                        ai_perf_artifact_dir,
                        model_name,
                        model_name,
296
297
                        base_url,
                        attention_dp_size=mapping.get_attn_dp_size(),
298
299
300
301
302
303
304
305
306
307
308
                    )

                    logger.info("Cleaning up deployment...")
                    await client.delete_deployment()
                    deployment_clients.remove(client)
                    logger.info("Deployment deleted")

                if ttft is not None:
                    prefill_data.add_data(
                        num_gpus=num_gpus,
                        ttft=ttft,
309
310
311
312
313
                        thpt_per_gpu=args.isl
                        / ttft
                        / num_gpus
                        * 1000
                        * mapping.get_attn_dp_size(),
314
315
316
                        parallel_mapping_label=mapping.label(),
                        parallel_mapping=mapping,
                    )
317

318
        # Plot the results as a 2D scatter plot
319
320
        if prefill_data.num_gpus and prefill_data.ttft and prefill_data.thpt_per_gpu:
            plot_prefill_performance(prefill_data, args.ttft, args.output_dir)
321

322
        # then profile decode
323
        decode_data = DecodeProfileData()
324
        logger.info("Profiling decode...")
325
326
        base_decode_config = config_modifier.convert_config(
            config, EngineType.DECODE, is_moe_model=args.model_info.is_moe
327
328
329
        )
        for num_gpus in profile_num_gpus:
            logger.info(f"Profiling decode with {num_gpus} GPUs...")
330
331
332
            candidate_mappings = get_candidate_parallel_mappings(
                num_gpus, args.model_info, EngineType.DECODE
            )
333

334
335
336
337
338
339
340
341
            for mapping in candidate_mappings:
                # Apply parallel mapping to config
                decode_config = apply_parallel_mapping_to_config(
                    base_decode_config,
                    mapping,
                    EngineType.DECODE,
                    config_modifier,
                    args.num_gpus_per_node,
342
                )
343
                logger.debug(f"Dynamo config: {decode_config}")
344

345
346
347
348
                parallel_mapping_tag = (
                    mapping.label()
                    .replace("=", "")
                    .replace("/", "_")  # safe chars for directory
349
                )
350
351
                work_dir = (
                    f"{args.output_dir}/decode_{num_gpus}gpus_{parallel_mapping_tag}"
352
                )
353
                os.makedirs(work_dir, exist_ok=True)
354

355
356
357
                decode_config_fn = f"{work_dir}/config.yaml"
                with open(decode_config_fn, "w") as f:
                    yaml.dump(decode_config, f)
358

359
360
                if args.dry_run:
                    logger.info("Skipping deployment creation in dry run mode")
361

362
363
364
365
                elif args.use_ai_configurator:
                    # Compute max_concurrency and max_kv_tokens to know which
                    # num_request to sweep over.
                    max_concurrency = ai_configurator_perf_estimator.get_max_batch_size(
366
                        args.isl, args.osl, tp_size=mapping.get_tp_size()
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
                    )
                    max_kv_tokens = max_concurrency * (args.isl + args.osl)

                else:
                    client = DynamoDeploymentClient(
                        namespace=args.namespace,
                        base_log_dir=work_dir,
                        model_name=model_name,
                        service_name=args.service_name,
                        frontend_port=frontend_port,
                        deployment_name=decode_config["metadata"]["name"],
                    )
                    deployment_clients.append(client)  # Track for cleanup
                    await client.create_deployment(decode_config_fn)
                    logger.info("Waiting for deployment to be ready...")
                    await client.wait_for_deployment_ready()
                    logger.info("Deployment is ready")

                    logger.info("Getting deployment logs...")
                    await client.get_deployment_logs()
                    logger.info(
                        f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                    )
390

391
392
                    # Compute max_concurrency and max_kv_tokens to know which
                    # num_request to sweep over.
393
                    attention_dp_size = mapping.get_attn_dp_size()
394
395
396
397
398
399
400
                    max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
                        f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log",
                        attention_dp_size=attention_dp_size,
                    )
                    max_concurrency = max_kv_tokens // (args.isl + args.osl)

                if not args.dry_run:
401
                    attention_dp_size = mapping.get_attn_dp_size()
402
403
404
405
406
407
408
409
                    sweep_num_request = get_num_request_range(
                        attention_dp_size,
                        max_concurrency,
                        args.decode_interpolation_granularity,
                    )
                    logger.info(
                        f"Sweeping num_request range based on maximum number of kv tokens: {sweep_num_request}"
                    )
410

411
412
413
414
415
416
417
418
419
420
421
                    for num_request in sweep_num_request:
                        itl = thpt_per_gpu = None
                        if args.use_ai_configurator:
                            logger.info(
                                "Using ai-configurator to estimate decode latency."
                            )
                            perf_dict = ai_configurator_perf_estimator.estimate_perf(
                                args.isl,
                                args.osl,
                                num_request,
                                mode=EngineType.DECODE,
422
                                tp_size=mapping.get_tp_size(),
423
424
                            )

425
426
427
428
429
430
431
432
433
                            itl = perf_dict["tpot"]
                            thpt_per_gpu = perf_dict["tokens/s/gpu"]
                            logger.info(f"Estimated decode ITL: {itl:.2f}ms")
                            logger.info(
                                f"Estimated decode throughput per GPU: {thpt_per_gpu:.2f} tokens/s/GPU"
                            )
                        else:
                            base_url = client.get_service_url()
                            ai_perf_artifact_dir = f"{work_dir}/aiperf_request{num_request}_isl{args.isl}_osl{args.osl}_n{num_request}"
434
                            itl, thpt_per_gpu = get_decode_itl_and_thpt_per_gpu(
435
436
437
438
439
440
441
                                args.isl,
                                args.osl,
                                num_request,
                                ai_perf_artifact_dir,
                                model_name,
                                model_name,
                                base_url=base_url,
442
                                num_gpus=num_gpus,
443
                                attention_dp_size=mapping.get_attn_dp_size(),
444
445
446
447
448
449
450
451
452
453
454
455
                            )

                        if itl is not None and thpt_per_gpu is not None:
                            decode_data.add_data(
                                num_gpus=num_gpus,
                                itl=itl,
                                thpt_per_gpu=thpt_per_gpu,
                                concurrency=num_request,
                                kv_cache_size=max_kv_tokens,
                                parallel_mapping_label=mapping.label(),
                                parallel_mapping=mapping,
                            )
456

457
458
459
460
461
                if not args.dry_run and not args.use_ai_configurator:
                    logger.info("Cleaning up deployment...")
                    await client.delete_deployment()
                    deployment_clients.remove(client)
                    logger.info("Deployment deleted")
462

463
        # Plot all decode results after profiling is complete
464
465
        if decode_data.num_gpus:
            plot_decode_performance(decode_data, args.itl, args.output_dir)
466

467
        if prefill_data.num_gpus and decode_data.num_gpus:
468
            plot_pd_joint_results(
469
                args.isl, args.osl, prefill_data, decode_data, args.output_dir
470
471
            )

472
473
474
475
476
        if args.dry_run:
            logger.info("Skipping recommendations in dry run mode")
        else:
            logger.info("Analyzing results and generate recommendations...")
            # Safety guards: no results → exit early with a clear message
477
            if not prefill_data.num_gpus:
478
479
                logger.error("No prefill results produced; skipping recommendations.")

480
481
            # select best parallel mapping for prefill
            if min(prefill_data.ttft) > args.ttft:
482
483
                logger.warning(
                    "No engine configuration satisfies the TTFT requirement, please try a smaller model or more powerful hardware"
484
                )
485
                selected_prefill_idx = int(np.argmin(np.array(prefill_data.ttft)))
486
487
            else:
                valid_indices = [
488
                    i for i, ttft in enumerate(prefill_data.ttft) if ttft <= args.ttft
489
490
                ]
                # Among valid TP sizes, select the one with highest throughput per GPU
491
                valid_thpts = [prefill_data.thpt_per_gpu[i] for i in valid_indices]
492
493
494
                max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
                selected_prefill_idx = max_thpt_idx
            logger.info(
495
                f"Suggested prefill parallel mapping: {prefill_data.parallel_mapping_labels[selected_prefill_idx]} on {prefill_data.num_gpus[selected_prefill_idx]} GPU(s) (TTFT {prefill_data.ttft[selected_prefill_idx]:.2f} ms, throughput {prefill_data.thpt_per_gpu[selected_prefill_idx]:.2f} tokens/s/GPU)"
496
            )
497

498
499
            # select best parallel mapping for decode
            if not decode_data.num_gpus:
500
501
                logger.error("No decode results produced; skipping recommendations.")
                return
502
            if min(decode_data.itl) > args.itl:
503
504
                logger.warning(
                    "No engine configuration satisfies the ITL requirement, please try a smaller model or more powerful hardware"
505
                )
506
                selected_decode_idx = int(np.argmin(np.array(decode_data.itl)))
507
508
            else:
                valid_indices = [
509
                    i for i, itl in enumerate(decode_data.itl) if itl <= args.itl
510
511
                ]
                # Among valid TP sizes, select the one with highest throughput per GPU
512
                valid_thpts = [decode_data.thpt_per_gpu[i] for i in valid_indices]
513
514
515
                max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
                selected_decode_idx = max_thpt_idx
            logger.info(
516
                f"Suggested decode parallel mapping: {decode_data.parallel_mapping_labels[selected_decode_idx]} on {decode_data.num_gpus[selected_decode_idx]} GPU(s) (ITL {decode_data.itl[selected_decode_idx]:.2f} ms, throughput {decode_data.thpt_per_gpu[selected_decode_idx]:.2f} tokens/s/GPU)"
517
518
            )

519
        if args.dry_run:
520
            # use min value for prefill and decode GPU counts
521
522
523
524
525
526
527
528
            prefill_data.num_gpus = [args.min_num_gpus_per_engine]
            decode_data.num_gpus = [args.min_num_gpus_per_engine]
            prefill_data.parallel_mappings = [
                ParallelizationMapping(tp=args.min_num_gpus_per_engine)
            ]
            decode_data.parallel_mappings = [
                ParallelizationMapping(tp=args.min_num_gpus_per_engine)
            ]
529
530
            selected_prefill_idx = 0
            selected_decode_idx = 0
531

532
533
534
        # interpolate ISL - TTFT with best prefill parallel mapping
        best_prefill_gpus = prefill_data.num_gpus[selected_prefill_idx]
        best_prefill_mapping = prefill_data.parallel_mappings[selected_prefill_idx]
535
        logger.info(
536
            f"Profiling prefill under best {best_prefill_gpus} GPU(s) with parallel mapping [{best_prefill_mapping.label()}] with different ISL..."
537
        )
538
        prefill_config = config_modifier.convert_config(
539
540
541
542
543
544
545
546
            config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe
        )
        prefill_config = apply_parallel_mapping_to_config(
            prefill_config,
            best_prefill_mapping,
            EngineType.PREFILL,
            config_modifier,
            args.num_gpus_per_node,
547
        )
548
        logger.debug(f"Dynamo config: {prefill_config}")
549

550
551
        work_dir = f"{args.output_dir}/selected_prefill_interpolation"
        os.makedirs(work_dir, exist_ok=True)
552

553
554
555
556
        prefill_config_fn = f"{work_dir}/config.yaml"
        with open(prefill_config_fn, "w") as f:
            yaml.dump(prefill_config, f)

557
558
        if args.dry_run:
            logger.info("Skipping deployment creation in dry run mode")
559
560
561
        elif args.use_ai_configurator:
            profile_prefill_aiconfigurator(
                work_dir,
562
                best_prefill_gpus,  # num_gpus
563
                sweep_max_context_length,
564
565
                args.prefill_interpolation_granularity,
                ai_configurator_perf_estimator,
566
                tp_size=best_prefill_mapping.get_tp_size(),
567
            )
568
569
570
571
572
573
574
575
        else:
            client = DynamoDeploymentClient(
                namespace=args.namespace,
                base_log_dir=work_dir,
                model_name=model_name,
                service_name=args.service_name,
                frontend_port=frontend_port,
                deployment_name=prefill_config["metadata"]["name"],
576
            )
577
578
579
580
581
582
            deployment_clients.append(client)  # Track for cleanup
            await client.create_deployment(prefill_config_fn)
            logger.info("Waiting for deployment to be ready...")
            try:
                await client.wait_for_deployment_ready()
                logger.info("Deployment is ready")
583

584
585
586
                skip_profile = False
            except TimeoutError:
                logger.error(
587
                    "Deployment or model failed to become ready within timeout, skipping profiling"
588
589
                )
                skip_profile = True
590

591
592
593
594
595
596
            if not skip_profile:
                logger.info("Getting deployment logs...")
                await client.get_deployment_logs()
                logger.info(
                    f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                )
597

598
            base_url = client.get_service_url()
599

600
601
602
603
604
            profile_prefill(
                work_dir,
                model_name,
                model_name,
                base_url,
605
                best_prefill_gpus,
606
                sweep_max_context_length,
607
                args.prefill_interpolation_granularity,
608
                attention_dp_size=best_prefill_mapping.get_attn_dp_size(),
609
            )
610

611
612
613
614
            logger.info("Cleaning up deployment...")
            await client.delete_deployment()
            deployment_clients.remove(client)
            logger.info("Deployment deleted")
615

616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
        # interpolate ITL - Active_KV_Cache - Decode_Context_Length with best decode parallel mapping
        best_decode_gpus = decode_data.num_gpus[selected_decode_idx]
        best_decode_mapping = decode_data.parallel_mappings[selected_decode_idx]
        logger.info(
            f"Profiling decode with {best_decode_gpus} GPUs with parallel mapping [{best_decode_mapping.label()}]..."
        )
        decode_config = config_modifier.convert_config(
            config, EngineType.DECODE, is_moe_model=args.model_info.is_moe
        )
        decode_config = apply_parallel_mapping_to_config(
            decode_config,
            best_decode_mapping,
            EngineType.DECODE,
            config_modifier,
            args.num_gpus_per_node,
        )
632
        logger.debug(f"Dynamo config: {decode_config}")
633

634
635
636
637
638
639
640
        work_dir = f"{args.output_dir}/selected_decode_interpolation"
        os.makedirs(work_dir, exist_ok=True)

        decode_config_fn = f"{work_dir}/config.yaml"
        with open(decode_config_fn, "w") as f:
            yaml.dump(decode_config, f)

641
642
        if args.dry_run:
            logger.info("Skipping deployment creation in dry run mode")
643
        elif args.use_ai_configurator:
644
            attention_dp_size = best_decode_mapping.get_attn_dp_size()
645
            max_kv_tokens = ai_configurator_perf_estimator.get_max_kv_tokens(
646
                args.isl, args.osl, tp_size=best_decode_mapping.get_tp_size()
647
648
649
            )
            profile_decode_aiconfigurator(
                work_dir,
650
                best_decode_gpus,  # num_gpus
651
                max_kv_tokens,
652
                sweep_max_context_length,
653
654
                args.decode_interpolation_granularity,
                ai_configurator_perf_estimator,
655
                attention_dp_size,
656
                tp_size=best_decode_mapping.get_tp_size(),
657
            )
658
659
660
661
662
663
664
665
666
667
668
669
670
671
        else:
            client = DynamoDeploymentClient(
                namespace=args.namespace,
                base_log_dir=work_dir,
                model_name=model_name,
                service_name=args.service_name,
                frontend_port=frontend_port,
                deployment_name=decode_config["metadata"]["name"],
            )
            deployment_clients.append(client)  # Track for cleanup
            await client.create_deployment(decode_config_fn)
            logger.info("Waiting for deployment to be ready...")
            await client.wait_for_deployment_ready()
            logger.info("Deployment is ready")
672

673
674
675
676
677
            logger.info("Getting deployment logs...")
            await client.get_deployment_logs()
            logger.info(
                f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
            )
678

679
            attention_dp_size = best_decode_mapping.get_attn_dp_size()
680
            max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
681
682
                f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log",
                attention_dp_size=attention_dp_size,
683
684
685
686
687
688
689
690
691
            )

            base_url = client.get_service_url()

            profile_decode(
                work_dir,
                model_name,
                model_name,
                base_url,
692
                best_decode_gpus,
693
                max_kv_tokens,
694
                sweep_max_context_length,
695
                args.decode_interpolation_granularity,
696
                attention_dp_size,
697
            )
698

699
700
701
702
            logger.info("Cleaning up deployment...")
            await client.delete_deployment()
            deployment_clients.remove(client)
            logger.info("Deployment deleted")
703

704
        # generate DGD with planner based on profiling results
705
        config, mocker_config = generate_dgd_config_with_planner(
706
707
708
709
            config_path=args.config,
            config_modifier=config_modifier,
            output_dir=args.output_dir,
            args=args,
710
711
            best_prefill_mapping=best_prefill_mapping,
            best_decode_mapping=best_decode_mapping,
712
713
            num_gpus_per_node=args.num_gpus_per_node,
        )
714
        logger.debug(f"Final DGD config with planner: {config}")
715

716
        # save DGD config with planner; support multi-document output when a ConfigMap is included
717
        with open(f"{args.output_dir}/config_with_planner.yaml", "w") as f:
718
719
720
721
            if isinstance(config, list):
                yaml.dump_all(config, f)
            else:
                yaml.dump(config, f)
722

723
724
725
726
727
728
729
730
        # save mocker config with planner for testing purposes
        logger.debug(f"Mocker config with planner: {mocker_config}")
        with open(f"{args.output_dir}/mocker_config_with_planner.yaml", "w") as f:
            if isinstance(mocker_config, list):
                yaml.dump_all(mocker_config, f)
            else:
                yaml.dump(mocker_config, f)

731
732
733
734
735
736
737
738
739
740
741
    except Exception as e:
        logger.error(f"Profile job failed with error: {e}")
        raise
    finally:
        # Always clean up any remaining deployments, even if the job failed
        logger.info("Performing final cleanup of any remaining deployments...")
        await cleanup_remaining_deployments(deployment_clients, args.namespace)
        logger.info("Final cleanup completed.")


if __name__ == "__main__":
742
    args = create_profiler_parser()
743

744
745
746
747
748
749
750
751
752
753
    # setup file logging
    os.makedirs(args.output_dir, exist_ok=True)
    log_file_handler = logging.FileHandler(f"{args.output_dir}/profile_sla.log")
    log_file_handler.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
    )
    log_file_handler.setFormatter(formatter)
    logger.addHandler(log_file_handler)

754
    asyncio.run(run_profile(args))