"fern/pages/design-docs/disagg-serving.md" did not exist on "cea1902de7c54bd0a890b70559b1057e82fd7279"
profile_sla.py 32.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

16
import asyncio
17
18
19
import logging
import math
import os
20
from dataclasses import dataclass, field
21
22
23

import numpy as np
import yaml
24

25
26
27
28
from benchmarks.profiler.utils.aiperf import (
    get_decode_itl_and_thpt_per_gpu,
    get_prefill_ttft,
)
29
from benchmarks.profiler.utils.config_modifiers import CONFIG_MODIFIERS
30
31
32
33
34
35
from benchmarks.profiler.utils.config_modifiers.parallelization_mapping import (
    ParallelizationMapping,
    apply_parallel_mapping_to_config,
    get_candidate_parallel_mappings,
)
from benchmarks.profiler.utils.defaults import EngineType
36
from benchmarks.profiler.utils.dgd_generation import generate_dgd_config_with_planner
37
from benchmarks.profiler.utils.estimate_perf import AIConfiguratorPerfEstimator
38
39
from benchmarks.profiler.utils.plot import (
    plot_decode_performance,
40
    plot_pd_joint_results,
41
42
    plot_prefill_performance,
)
43
from benchmarks.profiler.utils.profile_decode import (
44
    get_num_request_range,
45
46
47
48
49
50
51
    profile_decode,
    profile_decode_aiconfigurator,
)
from benchmarks.profiler.utils.profile_prefill import (
    profile_prefill,
    profile_prefill_aiconfigurator,
)
52
from benchmarks.profiler.utils.profiler_argparse import create_profiler_parser
53
54
55
56
from deploy.utils.dynamo_deployment import (
    DynamoDeploymentClient,
    cleanup_remaining_deployments,
)
57
from dynamo.planner.defaults import WORKER_COMPONENT_NAMES
58

59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117

@dataclass
class PrefillProfileData:
    """Container for prefill profiling results."""

    num_gpus: list[int] = field(default_factory=list)
    ttft: list[float] = field(default_factory=list)
    thpt_per_gpu: list[float] = field(default_factory=list)
    parallel_mapping_labels: list[str] = field(default_factory=list)
    parallel_mappings: list[ParallelizationMapping] = field(default_factory=list)

    def add_data(
        self,
        num_gpus: int,
        ttft: float,
        thpt_per_gpu: float,
        parallel_mapping_label: str,
        parallel_mapping: ParallelizationMapping,
    ) -> None:
        """Add a complete data point to the profile data."""
        self.num_gpus.append(num_gpus)
        self.ttft.append(ttft)
        self.thpt_per_gpu.append(thpt_per_gpu)
        self.parallel_mapping_labels.append(parallel_mapping_label)
        self.parallel_mappings.append(parallel_mapping)


@dataclass
class DecodeProfileData:
    """Container for decode profiling results."""

    num_gpus: list[int] = field(default_factory=list)
    itl: list[float] = field(default_factory=list)
    thpt_per_gpu: list[float] = field(default_factory=list)
    concurrency: list[int] = field(default_factory=list)
    kv_cache_size: list[int] = field(default_factory=list)
    parallel_mapping_labels: list[str] = field(default_factory=list)
    parallel_mappings: list[ParallelizationMapping] = field(default_factory=list)

    def add_data(
        self,
        num_gpus: int,
        itl: float,
        thpt_per_gpu: float,
        concurrency: int,
        kv_cache_size: int,
        parallel_mapping_label: str,
        parallel_mapping: ParallelizationMapping,
    ) -> None:
        """Add a complete data point to the profile data."""
        self.num_gpus.append(num_gpus)
        self.itl.append(itl)
        self.thpt_per_gpu.append(thpt_per_gpu)
        self.concurrency.append(concurrency)
        self.kv_cache_size.append(kv_cache_size)
        self.parallel_mapping_labels.append(parallel_mapping_label)
        self.parallel_mappings.append(parallel_mapping)


118
119
120
121
122
123
124
125
126
127
128
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
    "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)


129
130
131
async def run_profile(args):
    # List to track all created deployment clients for cleanup in case of failure
    deployment_clients = []
132

133
134
135
136
    # Inherit aic_backend from backend if not explicitly set
    if not args.aic_backend:
        args.aic_backend = args.backend

137
    try:
138
        # Log MoE model support
139
        if args.model_info.is_moe:
140
            logger.info(
141
                "MoE (Mixture of Experts) model profiling, sweeping TEP/DEP size for prefill and decode"
142
143
144
145
146
147
            )
            assert args.backend in [
                "sglang"
            ], "MoE model support is only available for SGLang"
        else:
            logger.info(
148
                "Dense model profiling, sweeping TP size for prefill and decode"
149
150
            )

151
152
153
154
        config_modifier = CONFIG_MODIFIERS[args.backend]

        with open(args.config, "r") as f:
            config = yaml.safe_load(f)
155

156
157
        if args.dgd_image:
            config = config_modifier.update_image(config, args.dgd_image)
158
            logger.debug(f"Using DGD image: {args.dgd_image}")
159

160
161
162
163
164
        profile_num_gpus = [
            2**i
            for i in range(int(math.log2(args.max_num_gpus_per_engine)) + 1)
            if args.min_num_gpus_per_engine <= 2**i <= args.max_num_gpus_per_engine
        ]
165
        if args.model_info.is_moe:
166
167
168
            logger.info(f"Profiling MoE GPU counts (TEP/DEP): {profile_num_gpus}")
        else:
            logger.info(f"Profiling dense model GPU counts (TP): {profile_num_gpus}")
169

170
        os.makedirs(args.output_dir, exist_ok=True)
171

172
        model_name = config_modifier.get_model_name(config)
173

174
175
176
177
178
179
180
181
182
        # Determine sweep max context length: allow user-provided cap to override model's if smaller
        use_specified_max_context_len = getattr(args, "max_context_length", None)
        model_max_context_len = args.model_info.max_context_length
        if not use_specified_max_context_len and not model_max_context_len:
            raise ValueError(
                "No max_context_length available from args.max_context_length or model_info from HF config"
            )
        elif not use_specified_max_context_len:
            sweep_max_context_length = model_max_context_len
183
            logger.info(
184
                f"Using model's maximum context length: {model_max_context_len}"
185
            )
186
187
        elif not model_max_context_len:
            sweep_max_context_length = use_specified_max_context_len
188
            logger.info(
189
                f"Using user-provided max_context_length: {use_specified_max_context_len}"
190
191
            )
        else:
192
193
194
195
196
197
            sweep_max_context_length = min(
                use_specified_max_context_len, model_max_context_len
            )
            logger.info(
                f"Using minimum of user-provided and model's maximum context length: {sweep_max_context_length}"
            )
198

199
200
201
202
203
        if args.use_ai_configurator:
            if not args.aic_system:
                raise ValueError(
                    "Must provide --aic-system when using --use-ai-configurator."
                )
204
205
206
207
208
209
210
211
212
213
214
215

            # Fallback to args.model if aic_hf_id is not provided
            if not args.aic_hf_id:
                if args.model:
                    logger.info(
                        f"--aic-hf-id not provided, using --model ({args.model}) as HuggingFace ID for AI configurator"
                    )
                    args.aic_hf_id = args.model
                else:
                    raise ValueError(
                        "Must provide --aic-hf-id or --model when using --use-ai-configurator."
                    )
216

217
            logger.info("Using aiconfigurator to estimate performance...")
218
            ai_configurator_perf_estimator = AIConfiguratorPerfEstimator(
219
                args.aic_hf_id,
220
                args.aic_system.lower(),
221
222
                args.aic_backend,
                args.aic_backend_version,
223
224
            )
        else:
225
            if args.aic_system or args.aic_hf_id or args.aic_backend_version:
226
                logger.warning(
227
                    "Ignoring --aic-system, --aic-hf-id, and/or --backend-version "
228
229
230
                    "when not using --use-ai-configurator."
                )

231
        # first profile prefill
232
        prefill_data = PrefillProfileData()
233
        logger.info("Profiling prefill...")
234
235
        base_prefill_config = config_modifier.convert_config(
            config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe
236
        )
237
        frontend_port = config_modifier.get_port(config)
238
239
        itl: float | None = None
        thpt_per_gpu: float | None = None
240
241
        for num_gpus in profile_num_gpus:
            logger.info(f"Profiling prefill with {num_gpus} GPUs...")
242
243
244
            candidate_mappings = get_candidate_parallel_mappings(
                num_gpus, args.model_info, EngineType.PREFILL
            )
245

246
247
248
249
250
251
252
253
            for mapping in candidate_mappings:
                # Apply parallel mapping to config
                prefill_config = apply_parallel_mapping_to_config(
                    base_prefill_config,
                    mapping,
                    EngineType.PREFILL,
                    config_modifier,
                    args.num_gpus_per_node,
254
                )
255
                logger.debug(f"Dynamo config: {prefill_config}")
256

257
258
259
                # Work dir includes mapping label (safe chars only)
                parallel_mapping_tag = (
                    mapping.label().replace("=", "").replace("/", "_")
260
                )
261
262
                work_dir = (
                    f"{args.output_dir}/prefill_{num_gpus}gpus_{parallel_mapping_tag}"
263
                )
264
265
266
267
268
269
270
271
272
273
                os.makedirs(work_dir, exist_ok=True)

                prefill_config_fn = f"{work_dir}/config.yaml"
                with open(prefill_config_fn, "w") as f:
                    yaml.dump(prefill_config, f)

                ttft = None
                if args.dry_run:
                    logger.info("Skipping deployment creation in dry run mode")
                elif args.use_ai_configurator:
274
                    logger.info("Using ai-configurator to estimate prefill latency")
275
276
                    perf_dict = ai_configurator_perf_estimator.estimate_prefill_perf(
                        args.isl,
277
                        tp_size=mapping.get_tp_size(),
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
                    )
                    ttft = perf_dict["context_latency"]
                    logger.info(f"Estimated prefill TTFT: {ttft:.2f}ms")
                else:
                    client = DynamoDeploymentClient(
                        namespace=args.namespace,
                        base_log_dir=work_dir,
                        model_name=model_name,
                        service_name=args.service_name,
                        frontend_port=frontend_port,
                        deployment_name=prefill_config["metadata"]["name"],
                    )
                    logger.info(
                        f"Created client with service_name: {client.service_name}"
                    )
                    deployment_clients.append(client)  # Track for cleanup
                    await client.create_deployment(prefill_config_fn)
                    logger.info("Waiting for deployment to be ready...")
                    await client.wait_for_deployment_ready()
                    logger.info("Deployment is ready")

                    logger.info("Getting deployment logs...")
                    await client.get_deployment_logs()
                    logger.info(
                        f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                    )
304

305
306
307
                    # run ai-perf
                    base_url = client.get_service_url()
                    ai_perf_artifact_dir = f"{work_dir}/aiperf_isl{args.isl}"
308
                    ttft = get_prefill_ttft(
309
310
311
312
                        args.isl,
                        ai_perf_artifact_dir,
                        model_name,
                        model_name,
313
314
                        base_url,
                        attention_dp_size=mapping.get_attn_dp_size(),
315
316
317
318
319
320
321
322
323
324
325
                    )

                    logger.info("Cleaning up deployment...")
                    await client.delete_deployment()
                    deployment_clients.remove(client)
                    logger.info("Deployment deleted")

                if ttft is not None:
                    prefill_data.add_data(
                        num_gpus=num_gpus,
                        ttft=ttft,
326
327
328
329
330
                        thpt_per_gpu=args.isl
                        / ttft
                        / num_gpus
                        * 1000
                        * mapping.get_attn_dp_size(),
331
332
333
                        parallel_mapping_label=mapping.label(),
                        parallel_mapping=mapping,
                    )
334

335
        # Plot the results as a 2D scatter plot
336
337
        if prefill_data.num_gpus and prefill_data.ttft and prefill_data.thpt_per_gpu:
            plot_prefill_performance(prefill_data, args.ttft, args.output_dir)
338

339
        # then profile decode
340
        decode_data = DecodeProfileData()
341
        logger.info("Profiling decode...")
342
343
        base_decode_config = config_modifier.convert_config(
            config, EngineType.DECODE, is_moe_model=args.model_info.is_moe
344
345
346
        )
        for num_gpus in profile_num_gpus:
            logger.info(f"Profiling decode with {num_gpus} GPUs...")
347
348
349
            candidate_mappings = get_candidate_parallel_mappings(
                num_gpus, args.model_info, EngineType.DECODE
            )
350

351
352
353
354
355
356
357
358
            for mapping in candidate_mappings:
                # Apply parallel mapping to config
                decode_config = apply_parallel_mapping_to_config(
                    base_decode_config,
                    mapping,
                    EngineType.DECODE,
                    config_modifier,
                    args.num_gpus_per_node,
359
                )
360
                logger.debug(f"Dynamo config: {decode_config}")
361

362
363
364
365
                parallel_mapping_tag = (
                    mapping.label()
                    .replace("=", "")
                    .replace("/", "_")  # safe chars for directory
366
                )
367
368
                work_dir = (
                    f"{args.output_dir}/decode_{num_gpus}gpus_{parallel_mapping_tag}"
369
                )
370
                os.makedirs(work_dir, exist_ok=True)
371

372
373
374
                decode_config_fn = f"{work_dir}/config.yaml"
                with open(decode_config_fn, "w") as f:
                    yaml.dump(decode_config, f)
375

376
377
                if args.dry_run:
                    logger.info("Skipping deployment creation in dry run mode")
378

379
380
381
382
                elif args.use_ai_configurator:
                    # Compute max_concurrency and max_kv_tokens to know which
                    # num_request to sweep over.
                    max_concurrency = ai_configurator_perf_estimator.get_max_batch_size(
383
                        args.isl, args.osl, tp_size=mapping.get_tp_size()
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
                    )
                    max_kv_tokens = max_concurrency * (args.isl + args.osl)

                else:
                    client = DynamoDeploymentClient(
                        namespace=args.namespace,
                        base_log_dir=work_dir,
                        model_name=model_name,
                        service_name=args.service_name,
                        frontend_port=frontend_port,
                        deployment_name=decode_config["metadata"]["name"],
                    )
                    deployment_clients.append(client)  # Track for cleanup
                    await client.create_deployment(decode_config_fn)
                    logger.info("Waiting for deployment to be ready...")
                    await client.wait_for_deployment_ready()
                    logger.info("Deployment is ready")

                    logger.info("Getting deployment logs...")
                    await client.get_deployment_logs()
                    logger.info(
                        f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                    )
407

408
409
                    # Compute max_concurrency and max_kv_tokens to know which
                    # num_request to sweep over.
410
                    attention_dp_size = mapping.get_attn_dp_size()
411
412
413
414
415
416
417
                    max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
                        f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log",
                        attention_dp_size=attention_dp_size,
                    )
                    max_concurrency = max_kv_tokens // (args.isl + args.osl)

                if not args.dry_run:
418
                    attention_dp_size = mapping.get_attn_dp_size()
419
420
421
422
423
424
425
426
                    sweep_num_request = get_num_request_range(
                        attention_dp_size,
                        max_concurrency,
                        args.decode_interpolation_granularity,
                    )
                    logger.info(
                        f"Sweeping num_request range based on maximum number of kv tokens: {sweep_num_request}"
                    )
427

428
429
430
431
432
433
434
435
436
437
438
                    for num_request in sweep_num_request:
                        itl = thpt_per_gpu = None
                        if args.use_ai_configurator:
                            logger.info(
                                "Using ai-configurator to estimate decode latency."
                            )
                            perf_dict = ai_configurator_perf_estimator.estimate_perf(
                                args.isl,
                                args.osl,
                                num_request,
                                mode=EngineType.DECODE,
439
                                tp_size=mapping.get_tp_size(),
440
441
                            )

442
443
444
445
446
447
448
449
450
                            itl = perf_dict["tpot"]
                            thpt_per_gpu = perf_dict["tokens/s/gpu"]
                            logger.info(f"Estimated decode ITL: {itl:.2f}ms")
                            logger.info(
                                f"Estimated decode throughput per GPU: {thpt_per_gpu:.2f} tokens/s/GPU"
                            )
                        else:
                            base_url = client.get_service_url()
                            ai_perf_artifact_dir = f"{work_dir}/aiperf_request{num_request}_isl{args.isl}_osl{args.osl}_n{num_request}"
451
                            itl, thpt_per_gpu = get_decode_itl_and_thpt_per_gpu(
452
453
454
455
456
457
458
                                args.isl,
                                args.osl,
                                num_request,
                                ai_perf_artifact_dir,
                                model_name,
                                model_name,
                                base_url=base_url,
459
                                num_gpus=num_gpus,
460
461
462
463
464
465
466
467
468
469
470
471
                            )

                        if itl is not None and thpt_per_gpu is not None:
                            decode_data.add_data(
                                num_gpus=num_gpus,
                                itl=itl,
                                thpt_per_gpu=thpt_per_gpu,
                                concurrency=num_request,
                                kv_cache_size=max_kv_tokens,
                                parallel_mapping_label=mapping.label(),
                                parallel_mapping=mapping,
                            )
472

473
474
475
476
477
                if not args.dry_run and not args.use_ai_configurator:
                    logger.info("Cleaning up deployment...")
                    await client.delete_deployment()
                    deployment_clients.remove(client)
                    logger.info("Deployment deleted")
478

479
        # Plot all decode results after profiling is complete
480
481
        if decode_data.num_gpus:
            plot_decode_performance(decode_data, args.itl, args.output_dir)
482

483
        if prefill_data.num_gpus and decode_data.num_gpus:
484
            plot_pd_joint_results(
485
                args.isl, args.osl, prefill_data, decode_data, args.output_dir
486
487
            )

488
489
490
491
492
        if args.dry_run:
            logger.info("Skipping recommendations in dry run mode")
        else:
            logger.info("Analyzing results and generate recommendations...")
            # Safety guards: no results → exit early with a clear message
493
            if not prefill_data.num_gpus:
494
495
                logger.error("No prefill results produced; skipping recommendations.")

496
497
            # select best parallel mapping for prefill
            if min(prefill_data.ttft) > args.ttft:
498
499
                logger.warning(
                    "No engine configuration satisfies the TTFT requirement, please try a smaller model or more powerful hardware"
500
                )
501
                selected_prefill_idx = int(np.argmin(np.array(prefill_data.ttft)))
502
503
            else:
                valid_indices = [
504
                    i for i, ttft in enumerate(prefill_data.ttft) if ttft <= args.ttft
505
506
                ]
                # Among valid TP sizes, select the one with highest throughput per GPU
507
                valid_thpts = [prefill_data.thpt_per_gpu[i] for i in valid_indices]
508
509
510
                max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
                selected_prefill_idx = max_thpt_idx
            logger.info(
511
                f"Suggested prefill parallel mapping: {prefill_data.parallel_mapping_labels[selected_prefill_idx]} on {prefill_data.num_gpus[selected_prefill_idx]} GPU(s) (TTFT {prefill_data.ttft[selected_prefill_idx]:.2f} ms, throughput {prefill_data.thpt_per_gpu[selected_prefill_idx]:.2f} tokens/s/GPU)"
512
            )
513

514
515
            # select best parallel mapping for decode
            if not decode_data.num_gpus:
516
517
                logger.error("No decode results produced; skipping recommendations.")
                return
518
            if min(decode_data.itl) > args.itl:
519
520
                logger.warning(
                    "No engine configuration satisfies the ITL requirement, please try a smaller model or more powerful hardware"
521
                )
522
                selected_decode_idx = int(np.argmin(np.array(decode_data.itl)))
523
524
            else:
                valid_indices = [
525
                    i for i, itl in enumerate(decode_data.itl) if itl <= args.itl
526
527
                ]
                # Among valid TP sizes, select the one with highest throughput per GPU
528
                valid_thpts = [decode_data.thpt_per_gpu[i] for i in valid_indices]
529
530
531
                max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
                selected_decode_idx = max_thpt_idx
            logger.info(
532
                f"Suggested decode parallel mapping: {decode_data.parallel_mapping_labels[selected_decode_idx]} on {decode_data.num_gpus[selected_decode_idx]} GPU(s) (ITL {decode_data.itl[selected_decode_idx]:.2f} ms, throughput {decode_data.thpt_per_gpu[selected_decode_idx]:.2f} tokens/s/GPU)"
533
534
            )

535
        if args.dry_run:
536
            # use min value for prefill and decode GPU counts
537
538
539
540
541
542
543
544
            prefill_data.num_gpus = [args.min_num_gpus_per_engine]
            decode_data.num_gpus = [args.min_num_gpus_per_engine]
            prefill_data.parallel_mappings = [
                ParallelizationMapping(tp=args.min_num_gpus_per_engine)
            ]
            decode_data.parallel_mappings = [
                ParallelizationMapping(tp=args.min_num_gpus_per_engine)
            ]
545
546
            selected_prefill_idx = 0
            selected_decode_idx = 0
547

548
549
550
        # interpolate ISL - TTFT with best prefill parallel mapping
        best_prefill_gpus = prefill_data.num_gpus[selected_prefill_idx]
        best_prefill_mapping = prefill_data.parallel_mappings[selected_prefill_idx]
551
        logger.info(
552
            f"Profiling prefill under best {best_prefill_gpus} GPU(s) with parallel mapping [{best_prefill_mapping.label()}] with different ISL..."
553
        )
554
        prefill_config = config_modifier.convert_config(
555
556
557
558
559
560
561
562
            config, EngineType.PREFILL, is_moe_model=args.model_info.is_moe
        )
        prefill_config = apply_parallel_mapping_to_config(
            prefill_config,
            best_prefill_mapping,
            EngineType.PREFILL,
            config_modifier,
            args.num_gpus_per_node,
563
        )
564
        logger.debug(f"Dynamo config: {prefill_config}")
565

566
567
        work_dir = f"{args.output_dir}/selected_prefill_interpolation"
        os.makedirs(work_dir, exist_ok=True)
568

569
570
571
572
        prefill_config_fn = f"{work_dir}/config.yaml"
        with open(prefill_config_fn, "w") as f:
            yaml.dump(prefill_config, f)

573
574
        if args.dry_run:
            logger.info("Skipping deployment creation in dry run mode")
575
576
577
        elif args.use_ai_configurator:
            profile_prefill_aiconfigurator(
                work_dir,
578
                best_prefill_gpus,  # num_gpus
579
                sweep_max_context_length,
580
581
                args.prefill_interpolation_granularity,
                ai_configurator_perf_estimator,
582
                tp_size=best_prefill_mapping.get_tp_size(),
583
            )
584
585
586
587
588
589
590
591
        else:
            client = DynamoDeploymentClient(
                namespace=args.namespace,
                base_log_dir=work_dir,
                model_name=model_name,
                service_name=args.service_name,
                frontend_port=frontend_port,
                deployment_name=prefill_config["metadata"]["name"],
592
            )
593
594
595
596
597
598
            deployment_clients.append(client)  # Track for cleanup
            await client.create_deployment(prefill_config_fn)
            logger.info("Waiting for deployment to be ready...")
            try:
                await client.wait_for_deployment_ready()
                logger.info("Deployment is ready")
599

600
601
602
                skip_profile = False
            except TimeoutError:
                logger.error(
603
                    "Deployment or model failed to become ready within timeout, skipping profiling"
604
605
                )
                skip_profile = True
606

607
608
609
610
611
612
            if not skip_profile:
                logger.info("Getting deployment logs...")
                await client.get_deployment_logs()
                logger.info(
                    f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
                )
613

614
            base_url = client.get_service_url()
615

616
617
618
619
620
            profile_prefill(
                work_dir,
                model_name,
                model_name,
                base_url,
621
                best_prefill_gpus,
622
                sweep_max_context_length,
623
                args.prefill_interpolation_granularity,
624
                attention_dp_size=best_prefill_mapping.get_attn_dp_size(),
625
            )
626

627
628
629
630
            logger.info("Cleaning up deployment...")
            await client.delete_deployment()
            deployment_clients.remove(client)
            logger.info("Deployment deleted")
631

632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
        # interpolate ITL - Active_KV_Cache - Decode_Context_Length with best decode parallel mapping
        best_decode_gpus = decode_data.num_gpus[selected_decode_idx]
        best_decode_mapping = decode_data.parallel_mappings[selected_decode_idx]
        logger.info(
            f"Profiling decode with {best_decode_gpus} GPUs with parallel mapping [{best_decode_mapping.label()}]..."
        )
        decode_config = config_modifier.convert_config(
            config, EngineType.DECODE, is_moe_model=args.model_info.is_moe
        )
        decode_config = apply_parallel_mapping_to_config(
            decode_config,
            best_decode_mapping,
            EngineType.DECODE,
            config_modifier,
            args.num_gpus_per_node,
        )
648
        logger.debug(f"Dynamo config: {decode_config}")
649

650
651
652
653
654
655
656
        work_dir = f"{args.output_dir}/selected_decode_interpolation"
        os.makedirs(work_dir, exist_ok=True)

        decode_config_fn = f"{work_dir}/config.yaml"
        with open(decode_config_fn, "w") as f:
            yaml.dump(decode_config, f)

657
658
        if args.dry_run:
            logger.info("Skipping deployment creation in dry run mode")
659
        elif args.use_ai_configurator:
660
            attention_dp_size = best_decode_mapping.get_attn_dp_size()
661
            max_kv_tokens = ai_configurator_perf_estimator.get_max_kv_tokens(
662
                args.isl, args.osl, tp_size=best_decode_mapping.get_tp_size()
663
664
665
            )
            profile_decode_aiconfigurator(
                work_dir,
666
                best_decode_gpus,  # num_gpus
667
                max_kv_tokens,
668
                sweep_max_context_length,
669
670
                args.decode_interpolation_granularity,
                ai_configurator_perf_estimator,
671
                attention_dp_size,
672
                tp_size=best_decode_mapping.get_tp_size(),
673
            )
674
675
676
677
678
679
680
681
682
683
684
685
686
687
        else:
            client = DynamoDeploymentClient(
                namespace=args.namespace,
                base_log_dir=work_dir,
                model_name=model_name,
                service_name=args.service_name,
                frontend_port=frontend_port,
                deployment_name=decode_config["metadata"]["name"],
            )
            deployment_clients.append(client)  # Track for cleanup
            await client.create_deployment(decode_config_fn)
            logger.info("Waiting for deployment to be ready...")
            await client.wait_for_deployment_ready()
            logger.info("Deployment is ready")
688

689
690
691
692
693
            logger.info("Getting deployment logs...")
            await client.get_deployment_logs()
            logger.info(
                f"Logs have been saved to {client.base_log_dir / client.deployment_name}"
            )
694

695
            attention_dp_size = best_decode_mapping.get_attn_dp_size()
696
            max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
697
698
                f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log",
                attention_dp_size=attention_dp_size,
699
700
701
702
703
704
705
706
707
            )

            base_url = client.get_service_url()

            profile_decode(
                work_dir,
                model_name,
                model_name,
                base_url,
708
                best_decode_gpus,
709
                max_kv_tokens,
710
                sweep_max_context_length,
711
                args.decode_interpolation_granularity,
712
                attention_dp_size,
713
            )
714

715
716
717
718
            logger.info("Cleaning up deployment...")
            await client.delete_deployment()
            deployment_clients.remove(client)
            logger.info("Deployment deleted")
719

720
721
722
723
724
725
726
727
        # generate DGD with planner based on profiling results
        config = generate_dgd_config_with_planner(
            config_path=args.config,
            config_modifier=config_modifier,
            best_prefill_gpus=best_prefill_gpus,
            best_decode_gpus=best_decode_gpus,
            output_dir=args.output_dir,
            args=args,
728
            is_moe_model=args.model_info.is_moe,
729
730
            num_gpus_per_node=args.num_gpus_per_node,
        )
731
        logger.debug(f"Final DGD config with planner: {config}")
732

733
        # save DGD config with planner; support multi-document output when a ConfigMap is included
734
        with open(f"{args.output_dir}/config_with_planner.yaml", "w") as f:
735
736
737
738
            if isinstance(config, list):
                yaml.dump_all(config, f)
            else:
                yaml.dump(config, f)
739

740
741
742
743
744
745
746
747
748
749
750
    except Exception as e:
        logger.error(f"Profile job failed with error: {e}")
        raise
    finally:
        # Always clean up any remaining deployments, even if the job failed
        logger.info("Performing final cleanup of any remaining deployments...")
        await cleanup_remaining_deployments(deployment_clients, args.namespace)
        logger.info("Final cleanup completed.")


if __name__ == "__main__":
751
    args = create_profiler_parser()
752

753
754
755
756
757
758
759
760
761
762
    # setup file logging
    os.makedirs(args.output_dir, exist_ok=True)
    log_file_handler = logging.FileHandler(f"{args.output_dir}/profile_sla.log")
    log_file_handler.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
    )
    log_file_handler.setFormatter(formatter)
    logger.addHandler(log_file_handler)

763
    asyncio.run(run_profile(args))