aiperf.py 11.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
import os
import random
import subprocess
21
from typing import Optional, Tuple
22

23
24
25
26
27
28
from benchmarks.profiler.utils.defaults import (
    AIPERF_PREFILL_ATTN_DP_NUM_REQ_RATIO,
    AIPERF_PREFILL_BENCHMARK_OSL,
    AIPERF_WARMUP_REQUEST_PER_DP_RANK,
)

29
30
31
32
33
34
35
36
37
38
39
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
    "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)


40
def _get_common_aiperf_cmd(
41
42
43
    artifact_dir,
    seed=100,
    model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
44
    tokenizer="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
45
    base_url="http://localhost:8000",
46
    warmup_request_count: int = AIPERF_WARMUP_REQUEST_PER_DP_RANK,
47
48
):
    return [
49
        "aiperf",
50
51
52
53
        "profile",
        "--model",
        model,
        "--tokenizer",
54
        tokenizer,
55
56
57
58
59
60
        "--endpoint-type",
        "chat",
        "--endpoint",
        "/v1/chat/completions",
        "--streaming",
        "--url",
61
        base_url,
62
63
64
65
66
        "--extra-inputs",
        "ignore_eos:true",
        "--extra-inputs",
        '{"nvext":{"ignore_eos":true}}',
        "--warmup-request-count",
67
        str(warmup_request_count),
68
69
70
71
        "--artifact-dir",
        artifact_dir,
        "--random-seed",
        str(seed),
72
73
        "--request-timeout-seconds",
        "1800",
74
75
76
    ]


77
def get_prefill_aiperf_cmd(
78
79
80
81
    isl,
    artifact_dir,
    seed=100,
    model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
82
    tokenizer="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
83
    osl=AIPERF_PREFILL_BENCHMARK_OSL,
84
    base_url="http://localhost:8000",
85
86
    concurrency: int = 1,
    request_count: int = 1,
87
    warmup_request_count: int = AIPERF_WARMUP_REQUEST_PER_DP_RANK,
88
):
89
    return _get_common_aiperf_cmd(
90
91
92
        artifact_dir,
        seed,
        model,
93
        tokenizer,
94
        base_url,
95
        warmup_request_count=warmup_request_count,
96
97
98
99
100
101
102
103
104
105
106
107
108
109
    ) + [
        "--synthetic-input-tokens-mean",
        str(isl),
        "--synthetic-input-tokens-stddev",
        "0",
        "--output-tokens-mean",
        str(osl),
        "--output-tokens-stddev",
        "0",
        "--extra-inputs",
        f"max_tokens:{osl}",
        "--extra-inputs",
        f"min_tokens:{osl}",
        "--concurrency",
110
        str(concurrency),
111
        "--request-count",
112
        str(request_count),
113
114
115
    ]


116
def get_decode_aiperf_cmd(
117
118
119
120
121
122
    isl,
    osl,
    artifact_dir,
    num_request,
    seed=100,
    model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
123
    tokenizer="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
124
    base_url="http://localhost:8000",
125
    warmup_request_count: int = AIPERF_WARMUP_REQUEST_PER_DP_RANK,
126
):
127
    return _get_common_aiperf_cmd(
128
129
130
        artifact_dir,
        seed,
        model,
131
        tokenizer,
132
        base_url,
133
        warmup_request_count=warmup_request_count,
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
    ) + [
        "--synthetic-input-tokens-mean",
        str(isl),
        "--synthetic-input-tokens-stddev",
        "0",
        "--output-tokens-mean",
        str(osl),
        "--output-tokens-stddev",
        "0",
        "--extra-inputs",
        f"max_tokens:{osl}",
        "--extra-inputs",
        f"min_tokens:{osl}",
        "--concurrency",
        str(num_request),
        "--num-dataset-entries",
        str(num_request),
        "--request-count",
        str(num_request),
    ]


156
def get_aiperf_result(artifact_dir: str) -> dict:
157
158
    json_file_path = None
    for root, _, files in os.walk(artifact_dir):
159
160
        if "profile_export_aiperf.json" in files:
            json_file_path = os.path.join(root, "profile_export_aiperf.json")
161
162
163
            break
    if json_file_path is None:
        raise FileNotFoundError(
164
            f"profile_export_aiperf.json not found in {artifact_dir}"
165
166
167
168
169
        )
    with open(json_file_path, "r") as f:
        return json.load(f)


170
def benchmark_prefill(
171
    isl,
172
    aiperf_artifact_dir,
173
174
175
    model_name,
    tokenizer,
    base_url="http://localhost:8000",
176
177
178
    concurrency: int = 1,
    request_count: int = 1,
    warmup_request_count: int = 3,
179
):
180
181
    logger.info(f"Running aiperf with isl {isl}")
    aiperf_cmd = get_prefill_aiperf_cmd(
182
        isl,
183
        aiperf_artifact_dir,
184
185
186
        model=model_name,
        tokenizer=tokenizer,
        base_url=base_url,
187
188
189
        concurrency=concurrency,
        request_count=request_count,
        warmup_request_count=warmup_request_count,
190
    )
191
192
    logger.debug(f"aiperf cmd: {aiperf_cmd}")

193
194
    aiperf_process = subprocess.Popen(
        aiperf_cmd,
195
196
197
198
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
    )
199
200
201
    stdout, stderr = aiperf_process.communicate()
    if aiperf_process.returncode == 0:
        logger.info("AIperf profiling completed successfully")
202
        logger.debug(stdout)
203
204
        aiperf_result = get_aiperf_result(aiperf_artifact_dir)
        return aiperf_result
205
    else:
206
        logger.error(f"AIPerf failed with error code: {aiperf_process.returncode}")
207
208
209
210
        logger.error(f"stderr: {stderr}")
        return None


211
212
213
214
215
216
217
def get_prefill_ttft(
    isl: int,
    aiperf_artifact_dir: str,
    model_name: str,
    tokenizer: str,
    base_url: str = "http://localhost:8000",
    attention_dp_size: int = 1,
218
    attn_dp_num_req_ratio: int = AIPERF_PREFILL_ATTN_DP_NUM_REQ_RATIO,
219
220
221
222
223
224
225
226
227
228
) -> Optional[float]:
    """
    Run prefill benchmark and extract TTFT (ms). Returns None on failure.
    If attention_dp_size > 1 (DEP), send attn_dp_size * attn_dp_num_req_ratio concurrent requests (single burst),
    then compute TTFT as (max TTFT across burst) / attn_dp_num_req_ratio.
    attn_dp_num_req_ratio defaults to 4 rounds to account for the error margin caused
    by the first batch being launched too early without enough requests.
    """
    # DEP-aware measurement (waves of size attention_dp_size)
    if attention_dp_size > 1:
229
        assert attn_dp_num_req_ratio > 0, "attn_dp_num_req_ratio must be greater than 0"
230
231
232
233
234
235
236
237
238
239
240
241
242
243
        total_concurrency = attention_dp_size * attn_dp_num_req_ratio
        logger.info(
            f"DEP prefill measurement: isl={isl}, attn_dp={attention_dp_size}, attn_dp_num_req_ratio={attn_dp_num_req_ratio}, "
            f"total_concurrency={total_concurrency}"
        )
        # Run aiperf with the requested concurrency; allow normal warmup behavior
        aiperf_result = benchmark_prefill(
            isl,
            aiperf_artifact_dir,
            model_name,
            tokenizer,
            base_url=base_url,
            concurrency=total_concurrency,
            request_count=total_concurrency,
244
            warmup_request_count=AIPERF_WARMUP_REQUEST_PER_DP_RANK * attention_dp_size,
245
246
247
        )
        try:
            max_ttft = float(aiperf_result["time_to_first_token"]["max"])
248
249
250
251
252
253
            # subtract the decoding time in-between prefill runs
            max_ttft -= (
                float(aiperf_result["inter_token_latency"]["avg"])
                * (AIPERF_PREFILL_BENCHMARK_OSL - 1)
                * (attn_dp_num_req_ratio - 1)
            )
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
            return max_ttft / float(attn_dp_num_req_ratio)
        except (KeyError, TypeError, ValueError):
            logger.warning(
                "Failed to extract max TTFT from AIPerf result for DEP prefill"
            )
            return None

    # Default path (non-DEP): use AIPerf's TTFT metric
    aiperf_result = benchmark_prefill(
        isl,
        aiperf_artifact_dir,
        model_name,
        tokenizer,
        base_url=base_url,
    )
    try:
        return float(aiperf_result["time_to_first_token"]["avg"])
    except (KeyError, TypeError, ValueError):
        logger.warning("Failed to extract TTFT from AIPerf result")
        return None


def get_decode_itl_and_thpt_per_gpu(
    isl: int,
    osl: int,
    num_request: int,
    aiperf_artifact_dir: str,
    model_name: str,
    tokenizer: str,
    base_url: str = "http://localhost:8000",
    num_gpus: int = 1,
285
    attention_dp_size: int = 1,
286
287
288
289
290
291
292
293
294
295
296
297
298
) -> Tuple[Optional[float], Optional[float]]:
    """
    Run decode benchmark and extract (ITL ms, throughput per GPU).
    Returns (None, None) on failure.
    """
    aiperf_result = benchmark_decode(
        isl,
        osl,
        num_request,
        aiperf_artifact_dir,
        model_name,
        tokenizer,
        base_url=base_url,
299
        warmup_request_count=AIPERF_WARMUP_REQUEST_PER_DP_RANK * attention_dp_size,
300
301
302
303
304
305
306
307
308
309
310
311
312
    )
    if aiperf_result is None:
        return None, None
    try:
        itl = float(aiperf_result["inter_token_latency"]["avg"])
        thpt_total = float(aiperf_result["output_token_throughput"]["avg"])
        thpt_per_gpu = thpt_total / max(num_gpus, 1)
        return itl, thpt_per_gpu
    except (KeyError, TypeError, ValueError):
        logger.warning("Failed to extract decode metrics from AIPerf result")
        return None, None


313
314
315
316
def benchmark_decode(
    isl,
    osl,
    num_request,
317
    aiperf_artifact_dir,
318
    model_name,
319
    tokenizer,
320
    base_url="http://localhost:8000",
321
    warmup_request_count: int = AIPERF_WARMUP_REQUEST_PER_DP_RANK,
322
):
323
324
325
326
327
    logger.info(f"Profiling decode with num_request {num_request}...")

    # first warm-up the engine by pre-computing all prefill tokens
    # we use the same random seed to make sure the prompt is the same
    seed = random.randint(0, 1000000)
328

329
    aiperf_cmd = get_decode_aiperf_cmd(
330
331
        isl,
        osl,
332
        f"{aiperf_artifact_dir}_warmup",
333
334
335
        num_request,
        seed=seed,
        model=model_name,
336
        tokenizer=tokenizer,
337
        base_url=base_url,
338
        warmup_request_count=warmup_request_count,
339
    )
340
341
    aiperf_process = subprocess.Popen(
        aiperf_cmd,
342
343
344
345
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
    )
346
    aiperf_process.communicate()
347
    # then send out the real requests, hopefully, this will skip all prefill computation
348
    aiperf_cmd = get_decode_aiperf_cmd(
349
350
        isl,
        osl,
351
        aiperf_artifact_dir,
352
353
354
        num_request,
        seed=seed,
        model=model_name,
355
        tokenizer=tokenizer,
356
        base_url=base_url,
357
    )
358
359
    aiperf_process = subprocess.Popen(
        aiperf_cmd,
360
361
362
363
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
    )
364
365
366
    stdout, stderr = aiperf_process.communicate()
    if aiperf_process.returncode == 0:
        logger.info("AIperf profiling completed successfully")
367
        logger.debug(stdout)
368
369
        aiperf_result = get_aiperf_result(aiperf_artifact_dir)
        return aiperf_result
370
    else:
371
        logger.error(f"AIPerf failed with error code: {aiperf_process.returncode}")
372
373
        logger.error(f"stderr: {stderr}")
        return None