profile_sla.py 18.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import logging
import math
import os
import subprocess

import numpy as np
import yaml
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from utils.config import CONFIG_MODIFIERS
from utils.defaults import DECODE_NUM_REQUESTS_RANGE
from utils.genai_perf import benchmark_decode, benchmark_prefill
from utils.plot import (
    plot_decode_3d_surface,
    plot_decode_performance,
    plot_prefill_interpolation,
    plot_prefill_performance,
)
from utils.utils import (
    get_available_gpu_count,
    get_dynamo_serve_cmd,
    shutdown_deployment,
    wait_for_server_ready,
)
39
40
41
42
43
44
45
46
47
48
49
50
51

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
    "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
52
53
54
55
    parser.add_argument(
        "--backend",
        type=str,
        default="vllm_v0",
56
        choices=["vllm_v0", "vllm_v1"],
57
58
        help="backend type (currently only vllm is supported)",
    )
59
60
61
    parser.add_argument(
        "--config", type=str, required=True, help="Path to the dynamo config file"
    )
62
63
64
65
66
67
    parser.add_argument(
        "--example-dir",
        type=str,
        default=None,
        help="path to the example directory, if not provided, will try to infer from config file location",
    )
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
    parser.add_argument(
        "--output-dir",
        type=str,
        default="profiling_results",
        help="Path to the output results directory",
    )
    parser.add_argument(
        "--isl", type=int, default=3000, help="target input sequence length"
    )
    parser.add_argument(
        "--osl", type=int, default=500, help="target output sequence length"
    )
    parser.add_argument(
        "--ttft", type=int, default=50, help="target Time To First Token in ms"
    )
    parser.add_argument(
84
        "--itl", type=int, default=10, help="target Inter Token Latency in ms"
85
    )
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
    # below are arguments used for interpolating TTFT and ITL under different ISL/OSL
    parser.add_argument(
        "--max-context-length",
        type=int,
        default=16384,
        help="maximum context length supported by the served model",
    )
    parser.add_argument(
        "--prefill-interpolation-granularity",
        type=int,
        default=16,
        help="how many samples to benchmark to interpolate TTFT under different ISL",
    )
    parser.add_argument(
        "--decode-interpolation-granularity",
        type=int,
        default=6,
        help="how many samples to benchmark to interpolate ITL under different active kv cache size and decode context length",
    )
105
106
    args = parser.parse_args()

107
108
    config_modifier = CONFIG_MODIFIERS[args.backend]

109
110
111
112
113
114
115
116
117
118
119
120
    if args.example_dir is None:
        logger.info(
            "Example directory not provided, inferring from config file location..."
        )
        try:
            args.example_dir = os.path.dirname(os.path.dirname(args.config))
        except Exception:
            logger.error(
                "Failed to infer example directory, please provide explicitly using --example-dir <path-to-example-dir>"
            )
            exit(1)

121
122
123
124
125
126
127
128
129
130
131
    with open(args.config, "r") as f:
        config = yaml.safe_load(f)

    # Get the number of available GPUs
    available_gpus = get_available_gpu_count()

    profile_tp_size = [2**i for i in range(int(math.log2(available_gpus)) + 1)]
    logger.info(f"Profiling TP sizes: {profile_tp_size}")

    os.makedirs(args.output_dir, exist_ok=True)

132
133
    model_name = config_modifier.get_model_name(config)
    port = config_modifier.get_port(config)
134
135
136
137
138
139

    # first profile prefill
    prefill_tp_size = []
    prefill_ttft = []
    prefill_thpt_per_gpu = []
    logger.info("Profiling prefill...")
140
    prefill_config = config_modifier.convert_config(config, "prefill")
141
142
    for tp_size in profile_tp_size:
        logger.info(f"Profiling prefill with TP size {tp_size}...")
143
        prefill_config = config_modifier.set_config_tp_size(prefill_config, tp_size)
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
        logger.info(f"Dynamo config: {prefill_config}")

        work_dir = f"{args.output_dir}/prefill_tp{tp_size}"
        os.makedirs(work_dir, exist_ok=True)

        prefill_config_fn = f"{work_dir}/config.yaml"
        dynamo_log_fn = f"{work_dir}/dynamo.log"
        with open(prefill_config_fn, "w") as f:
            yaml.dump(prefill_config, f)

        # Start the dynamo serve process
        logger.info(f"Starting dynamo serve with TP size {tp_size}...")
        dynamo_serve_cmd = get_dynamo_serve_cmd(prefill_config_fn)
        with open(dynamo_log_fn, "w") as dynamo_log_f:
            dynamo_process = subprocess.Popen(
                dynamo_serve_cmd,
                stdout=dynamo_log_f,
                stderr=subprocess.STDOUT,
                text=True,
163
                cwd=args.example_dir,
164
165
166
167
168
169
170
171
172
                preexec_fn=os.setsid,  # Use process group for clean termination
            )

        if not wait_for_server_ready(model_name, port):
            logger.error(f"Server did not become ready, skip profiling tp={tp_size}")
            break

        # run genai-perf
        genai_perf_artifact_dir = f"{work_dir}/gap_isl{args.isl}"
173
174
        gap_result = benchmark_prefill(
            args.isl, genai_perf_artifact_dir, model_name, port
175
        )
176
        if gap_result is not None:
177
178
179
180
181
            ttft = gap_result["time_to_first_token"]["avg"]
            prefill_tp_size.append(tp_size)
            prefill_ttft.append(ttft)
            prefill_thpt_per_gpu.append(args.isl / ttft / tp_size * 1000)

182
        shutdown_deployment(dynamo_process)
183
184
185

    # Plot the results as a 2D scatter plot
    if prefill_tp_size and prefill_ttft and prefill_thpt_per_gpu:
186
187
188
189
190
191
        plot_prefill_performance(
            prefill_tp_size,
            prefill_ttft,
            prefill_thpt_per_gpu,
            args.ttft,
            args.output_dir,
192
193
194
195
196
197
198
199
        )

    # then profile decode
    decode_tp_size = []
    decode_itl = []
    decode_thpt_per_gpu = []
    decode_concurrency = []
    decode_kv_cache_size = []
200
    decode_results = []  # Store partial results for plotting later
201
    logger.info("Profiling decode...")
202
    decode_config = config_modifier.convert_config(config, "decode")
203
204
    for tp_size in profile_tp_size:
        logger.info(f"Profiling decode with TP size {tp_size}...")
205
        decode_config = config_modifier.set_config_tp_size(decode_config, tp_size)
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
        logger.info(f"Dynamo config: {decode_config}")

        work_dir = f"{args.output_dir}/decode_tp{tp_size}"
        os.makedirs(work_dir, exist_ok=True)

        decode_config_fn = f"{work_dir}/config.yaml"
        dynamo_log_fn = f"{work_dir}/dynamo.log"
        with open(decode_config_fn, "w") as f:
            yaml.dump(decode_config, f)

        # Start the dynamo serve process
        logger.info(f"Starting dynamo serve with TP size {tp_size}...")
        dynamo_serve_cmd = get_dynamo_serve_cmd(decode_config_fn)
        with open(dynamo_log_fn, "w") as dynamo_log_f:
            dynamo_process = subprocess.Popen(
                dynamo_serve_cmd,
                stdout=dynamo_log_f,
                stderr=subprocess.STDOUT,
                text=True,
225
                cwd=args.example_dir,
226
227
228
229
230
231
232
                preexec_fn=os.setsid,  # Use process group for clean termination
            )

        if not wait_for_server_ready(model_name, port):
            logger.error(f"Server did not become ready, skip profiling tp={tp_size}")
            break

233
        max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(dynamo_log_fn)
234
235
236
237
238
239
240
241
242
243
244
245
        max_concurrency = max_kv_tokens // (args.isl + args.osl)
        sweep_num_request = [
            num for num in DECODE_NUM_REQUESTS_RANGE if num < max_concurrency
        ]
        logger.info(
            f"Sweeping num_request range based on maximum number of kv tokens: {sweep_num_request}"
        )

        engine_decode_itl = []
        engine_decode_thpt_per_gpu = []
        for num_request in sweep_num_request:
            genai_perf_artifact_dir = f"{work_dir}/gap_request{num_request}_isl{args.isl}_osl{args.osl}_n{num_request}"
246
            gap_result = benchmark_decode(
247
248
249
                args.isl,
                args.osl,
                num_request,
250
251
252
                genai_perf_artifact_dir,
                model_name,
                port,
253
            )
254
            if gap_result is not None:
255
256
257
258
259
260
261
262
263
264
                itl = gap_result["inter_token_latency"]["avg"]
                thpt_per_gpu = gap_result["output_token_throughput"]["avg"] / tp_size
                engine_decode_itl.append(itl)
                engine_decode_thpt_per_gpu.append(thpt_per_gpu)
                decode_tp_size.append(tp_size)
                decode_itl.append(itl)
                decode_thpt_per_gpu.append(thpt_per_gpu)
                decode_concurrency.append(num_request)
                decode_kv_cache_size.append(max_kv_tokens)

265
        shutdown_deployment(dynamo_process)
266

267
268
        # Store partial results for plotting later
        decode_results.append((tp_size, engine_decode_itl, engine_decode_thpt_per_gpu))
269

270
271
272
    # Plot all decode results after profiling is complete
    if decode_results:
        plot_decode_performance(decode_results, args.itl, args.output_dir)
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326

    logger.info("Analyzing results and generate recommendations...")
    # select best tp size for prefill
    if min(prefill_ttft) > args.ttft:
        logger.info(
            "No TP size satisfies the TTFT requirement, please try a smaller model or a more powerful GPU SKU"
        )
        selected_prefill_idx = int(np.argmin(np.array(prefill_ttft)))
    else:
        valid_indices = [i for i, ttft in enumerate(prefill_ttft) if ttft <= args.ttft]
        # Among valid TP sizes, select the one with highest throughput per GPU
        valid_thpts = [prefill_thpt_per_gpu[i] for i in valid_indices]
        max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
        selected_prefill_idx = max_thpt_idx
    logger.info(
        f"Suggested prefill TP:{prefill_tp_size[selected_prefill_idx]} (TTFT {prefill_ttft[selected_prefill_idx]:.2f} ms, throughput {prefill_thpt_per_gpu[selected_prefill_idx]:.2f} tokens/s/GPU)"
    )

    # scale up if estimated TTFT is 120% of target TTFT
    prefill_queue_size_upper_bound = max(
        0.1, args.ttft * 1.2 / prefill_ttft[selected_prefill_idx] - 1
    )
    # scale down if estimated TTFT is 80% of target TTFT
    prefill_queue_size_lower_bound = max(
        0.1, args.ttft * 0.8 / prefill_ttft[selected_prefill_idx] - 1
    )
    logger.info(
        f"Suggested planner upper/lower bound for prefill queue size: {prefill_queue_size_upper_bound:.2f}/{prefill_queue_size_lower_bound:.2f}"
    )

    # select best tp size for decode
    if min(decode_itl) > args.itl:
        logger.info(
            "No TP size satisfies the ITL requirement, please try a smaller model or a more powerful GPU SKU"
        )
        selected_decode_idx = int(np.argmin(np.array(decode_itl)))
    else:
        valid_indices = [i for i, itl in enumerate(decode_itl) if itl <= args.itl]
        # Among valid TP sizes, select the one with highest throughput per GPU
        valid_thpts = [decode_thpt_per_gpu[i] for i in valid_indices]
        max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))]
        selected_decode_idx = max_thpt_idx
    logger.info(
        f"Suggested decode TP:{decode_tp_size[selected_decode_idx]} (ITL {decode_itl[selected_decode_idx]:.2f} ms, throughput {decode_thpt_per_gpu[selected_decode_idx]:.2f} tokens/s/GPU)"
    )

    # calculate kv cache utlization for the selected TP and concurrency
    selected_decode_kv_cache_utilization = (
        decode_concurrency[selected_decode_idx]
        * (args.isl + args.osl / 2)
        / decode_kv_cache_size[selected_decode_idx]
    )
    # set a +- 20% range for the kv cache utilization
    logger.info(
327
        f"Suggested planner upper/lower bound for decode kv cache utilization: {min(1, selected_decode_kv_cache_utilization + 0.2):.2f}/{max(0.1, selected_decode_kv_cache_utilization - 0.2):.2f}"
328
    )
329
330
331
332
333
334
335
336
337

    # interpolate ISL - TTFT with best prefill TP
    best_prefill_tp = prefill_tp_size[selected_prefill_idx]
    prefill_isl = []
    prefill_ttft = []
    prefill_thpt_per_gpu = []
    logger.info(
        f"Profiling prefill under best TP {best_prefill_tp} with different ISL..."
    )
338
339
    prefill_config = config_modifier.convert_config(config, "prefill")
    prefill_config = config_modifier.set_config_tp_size(prefill_config, tp_size)
340
341
    logger.info(f"Dynamo config: {prefill_config}")

342
    work_dir = f"{args.output_dir}/selected_prefill_interpolation"
343
344
345
    os.makedirs(work_dir, exist_ok=True)

    prefill_config_fn = f"{work_dir}/config.yaml"
346

347
348
349
350
351
352
353
354
355
356
357
358
359
    dynamo_log_fn = f"{work_dir}/dynamo.log"
    with open(prefill_config_fn, "w") as f:
        yaml.dump(prefill_config, f)

    # Start the dynamo serve process
    logger.info(f"Starting dynamo serve with TP size {tp_size}...")
    dynamo_serve_cmd = get_dynamo_serve_cmd(prefill_config_fn)
    with open(dynamo_log_fn, "w") as dynamo_log_f:
        dynamo_process = subprocess.Popen(
            dynamo_serve_cmd,
            stdout=dynamo_log_f,
            stderr=subprocess.STDOUT,
            text=True,
360
            cwd=args.example_dir,
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
            preexec_fn=os.setsid,  # Use process group for clean termination
        )

    if not wait_for_server_ready(model_name, port):
        logger.error(f"Server did not become ready, skip profiling tp={tp_size}")
    else:
        for isl in range(
            100,
            args.max_context_length,
            (args.max_context_length - 100) // args.prefill_interpolation_granularity,
        ):
            # run genai-perf
            genai_perf_artifact_dir = f"{work_dir}/gap_isl{isl}"
            gap_result = benchmark_prefill(
                isl, genai_perf_artifact_dir, model_name, port
            )
            if gap_result is not None:
                ttft = gap_result["time_to_first_token"]["avg"]
                prefill_isl.append(isl)
                prefill_ttft.append(ttft)
                prefill_thpt_per_gpu.append(isl / ttft / best_prefill_tp * 1000)

    shutdown_deployment(dynamo_process)

    # Interpolate prefill_ttft vs prefill_isl with quadratic function (y=ax^2+bx+c)
    if len(prefill_isl) > 2:
        logger.info("Interpolating prefill TTFT and throughput vs ISL...")

        # Convert to numpy arrays for easier manipulation
        prefill_isl_np = np.array(prefill_isl)
        prefill_ttft_np = np.array(prefill_ttft)
        prefill_thpt_per_gpu_np = np.array(prefill_thpt_per_gpu)

394
395
396
397
398
399
400
401
        save_path = f"{work_dir}/raw_data.npz"
        np.savez(
            save_path,
            prefill_isl=prefill_isl_np,
            prefill_ttft=prefill_ttft_np,
            prefill_thpt_per_gpu=prefill_thpt_per_gpu_np,
        )

402
403
404
        # Call the plotting function
        plot_prefill_interpolation(
            prefill_isl_np, prefill_ttft_np, prefill_thpt_per_gpu_np, work_dir
405
406
407
408
409
410
411
412
413
414
415
416
417
        )
    else:
        logger.warning(
            "Not enough data points to perform interpolation (need at least 3 points)"
        )

    # interpolate ITL - Active_KV_Cache - Decode_Context_Length with best decode TP
    x_kv_usage = []
    y_context_length = []
    z_itl = []
    z_thpt_per_gpu = []
    best_decode_tp = decode_tp_size[selected_decode_idx]
    logger.info(f"Profiling decode with TP size {best_decode_tp}...")
418
    decode_config = config_modifier.set_config_tp_size(decode_config, best_decode_tp)
419
420
    logger.info(f"Dynamo config: {decode_config}")

421
    work_dir = f"{args.output_dir}/selected_decode_interpolation"
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
    os.makedirs(work_dir, exist_ok=True)

    decode_config_fn = f"{work_dir}/config.yaml"
    dynamo_log_fn = f"{work_dir}/dynamo.log"
    with open(decode_config_fn, "w") as f:
        yaml.dump(decode_config, f)

    # Start the dynamo serve process
    logger.info(f"Starting dynamo serve with TP size {tp_size}...")
    dynamo_serve_cmd = get_dynamo_serve_cmd(decode_config_fn)
    with open(dynamo_log_fn, "w") as dynamo_log_f:
        dynamo_process = subprocess.Popen(
            dynamo_serve_cmd,
            stdout=dynamo_log_f,
            stderr=subprocess.STDOUT,
            text=True,
438
            cwd=args.example_dir,
439
440
441
442
443
444
            preexec_fn=os.setsid,  # Use process group for clean termination
        )

    if not wait_for_server_ready(model_name, port):
        logger.error(f"Server did not become ready, skip profiling tp={tp_size}")
    else:
445
        max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(dynamo_log_fn)
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479

        osl = 500  # not too large to reduce ITL variance, not too small to have stable measurement
        for isl in range(
            100,
            args.max_context_length - osl,
            (args.max_context_length - osl) // args.decode_interpolation_granularity,
        ):
            max_concurrency = max_kv_tokens // (isl + osl)
            sweep_num_request = list(
                range(
                    1,
                    max_concurrency,
                    max_concurrency // args.decode_interpolation_granularity,
                )
            )
            for num_request in sweep_num_request:
                genai_perf_artifact_dir = (
                    f"{work_dir}/gap_isl{isl}_osl{osl}_n{num_request}"
                )
                gap_result = benchmark_decode(
                    isl, osl, num_request, genai_perf_artifact_dir, model_name, port
                )
                if gap_result is not None:
                    itl = gap_result["inter_token_latency"]["avg"]
                    x_kv_usage.append((isl + osl / 2) * num_request / max_kv_tokens)
                    y_context_length.append(isl + osl / 2)
                    z_itl.append(itl)
                    z_thpt_per_gpu.append(
                        gap_result["output_token_throughput"]["avg"] / tp_size
                    )

        shutdown_deployment(dynamo_process)

        # Save the data points to a .npz file
480
        save_path = f"{work_dir}/raw_data.npz"
481
482
483
484
485
486
        np.savez(
            save_path,
            x_kv_usage=np.array(x_kv_usage),
            y_context_length=np.array(y_context_length),
            z_itl=np.array(z_itl),
            z_thpt_per_gpu=np.array(z_thpt_per_gpu),
487
            max_kv_tokens=np.array([max_kv_tokens]),
488
489
490
        )
        logger.info(f"Saved data points to {save_path}")

491
492
493
        # Plot 3D surface
        plot_decode_3d_surface(
            x_kv_usage, y_context_length, z_itl, best_decode_tp, work_dir
494
        )