aiperf.py 5.35 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
4
5
6
7
8
9
10
# SPDX-License-Identifier: Apache-2.0

import os
import subprocess
from pathlib import Path
from typing import List

# Default concurrency levels - can be overridden with CONCURRENCIES environment variable
DEFAULT_CONCURRENCIES: List[int] = [1, 2, 5, 10, 50, 100, 250]
11
12
13
14
15
16
# Default request count per concurrency level - can be overridden with REQUEST_COUNT env var
# When set to 0 or unset, defaults to max(concurrency * REQUEST_COUNT_SCALE_FACTOR, 10)
# to ensure the concurrency level is fully utilized and each slot runs enough requests
# for stable measurements
DEFAULT_REQUEST_COUNT: int = 0
REQUEST_COUNT_SCALE_FACTOR: int = 3
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38


def get_concurrency_levels() -> List[int]:
    """Get concurrency levels from environment variable or use defaults"""
    concurrencies_env = os.getenv("CONCURRENCIES")
    if concurrencies_env:
        try:
            # Parse comma-separated values
            concurrencies = [int(x.strip()) for x in concurrencies_env.split(",")]
            # Validate all are positive integers
            for c in concurrencies:
                if c <= 0:
                    raise ValueError(f"Concurrency level must be positive, got: {c}")
            return sorted(concurrencies)
        except ValueError as e:
            print(f"WARNING: Invalid CONCURRENCIES environment variable: {e}")
            print(f"Using default concurrency levels: {DEFAULT_CONCURRENCIES}")
            return DEFAULT_CONCURRENCIES

    return DEFAULT_CONCURRENCIES


39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def get_request_count() -> int:
    """Get request count from environment variable or use default.

    Returns 0 to indicate 'auto' mode (will be computed per concurrency level).
    """
    request_count_env = os.getenv("REQUEST_COUNT")
    if request_count_env:
        try:
            count = int(request_count_env.strip())
            if count < 0:
                raise ValueError(f"Request count must be non-negative, got: {count}")
            return count
        except ValueError as e:
            print(f"WARNING: Invalid REQUEST_COUNT environment variable: {e}")
            return DEFAULT_REQUEST_COUNT
    return DEFAULT_REQUEST_COUNT


57
58
59
CONCURRENCIES: List[int] = get_concurrency_levels()


60
def run_aiperf(
61
62
63
64
65
66
67
    service_url: str,
    model_name: str,
    isl: int,
    osl: int,
    stddev: int,
    concurrency: int,
    output_dir: Path,
68
    request_count: int = 0,
69
70
) -> None:
    output_dir.mkdir(parents=True, exist_ok=True)
71
72
73
74
75
76
77
78
79
80
81
82

    # Auto-compute request count: need enough requests to fully utilize concurrency
    # and run each slot at least REQUEST_COUNT_SCALE_FACTOR times for stable measurements
    if request_count <= 0:
        request_count = max(concurrency * REQUEST_COUNT_SCALE_FACTOR, 10)
    elif request_count < concurrency:
        print(
            f"WARNING: request_count ({request_count}) < concurrency ({concurrency}). "
            f"Actual in-flight concurrency will be capped at {request_count}.",
            flush=True,
        )

83
    cmd = [
84
        "aiperf",
85
86
87
88
89
90
91
92
93
94
95
96
97
98
        "profile",
        "-m",
        model_name,
        "--endpoint-type",
        "chat",
        "--streaming",
        "-u",
        service_url,
        "--synthetic-input-tokens-mean",
        str(isl),
        "--synthetic-input-tokens-stddev",
        str(stddev),
        "--concurrency",
        str(concurrency),
99
100
        "--request-count",
        str(request_count),
101
102
103
104
105
106
107
108
109
110
111
112
113
114
        "--output-tokens-mean",
        str(osl),
        "--extra-inputs",
        f"max_tokens:{osl}",
        "--extra-inputs",
        f"min_tokens:{osl}",
        "--extra-inputs",
        "ignore_eos:true",
        "--tokenizer",
        model_name,
        "--artifact-dir",
        str(output_dir),
    ]
    print(
115
        f"Running aiperf with isl {isl}, osl {osl}, concurrency {concurrency}, request_count {request_count}",
116
117
118
        flush=True,
    )

119
    aip_process = subprocess.Popen(
120
121
122
123
124
125
        cmd,
        cwd=str(output_dir),
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
    )
126
127
128
    stdout, stderr = aip_process.communicate()
    if aip_process.returncode == 0:
        print("Aiperf profiling completed successfully", flush=True)
129
130
131
        if stdout:
            print(stdout)
    else:
132
        print(f"Aiperf failed with error code: {aip_process.returncode}")
133
134
135
        if stderr:
            print(f"stderr: {stderr}")
        raise subprocess.CalledProcessError(
136
            aip_process.returncode, cmd, output=stdout, stderr=stderr
137
138
139
140
141
142
143
        )


def run_concurrency_sweep(
    service_url: str, model_name: str, isl: int, osl: int, stddev: int, output_dir: Path
) -> None:
    concurrency_levels = get_concurrency_levels()
144
    request_count = get_request_count()
145
146
147
148
149
    print(
        f"Running concurrency sweep for {model_name} with ISL {isl} and OSL {osl} and standard deviation {stddev}",
        flush=True,
    )
    print(f"Concurrency levels: {concurrency_levels}", flush=True)
150
151
152
153
    print(
        f"Request count: {request_count if request_count > 0 else f'auto (max(concurrency*{REQUEST_COUNT_SCALE_FACTOR}, 10))'}",
        flush=True,
    )
154
155
156

    for c in concurrency_levels:
        print(f"Starting concurrency level {c}", flush=True)
157
158
159
160
161
162
163
164
165
166
        run_aiperf(
            service_url,
            model_name,
            isl,
            osl,
            stddev,
            c,
            output_dir / f"c{c}",
            request_count=request_count,
        )