genai.py 3.45 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import os
import subprocess
from pathlib import Path
from typing import List

# Default concurrency levels - can be overridden with CONCURRENCIES environment variable
DEFAULT_CONCURRENCIES: List[int] = [1, 2, 5, 10, 50, 100, 250]


def get_concurrency_levels() -> List[int]:
    """Get concurrency levels from environment variable or use defaults"""
    concurrencies_env = os.getenv("CONCURRENCIES")
    if concurrencies_env:
        try:
            # Parse comma-separated values
            concurrencies = [int(x.strip()) for x in concurrencies_env.split(",")]
            # Validate all are positive integers
            for c in concurrencies:
                if c <= 0:
                    raise ValueError(f"Concurrency level must be positive, got: {c}")
            return sorted(concurrencies)
        except ValueError as e:
            print(f"WARNING: Invalid CONCURRENCIES environment variable: {e}")
            print(f"Using default concurrency levels: {DEFAULT_CONCURRENCIES}")
            return DEFAULT_CONCURRENCIES

    return DEFAULT_CONCURRENCIES


CONCURRENCIES: List[int] = get_concurrency_levels()


36
def run_aiperf(
37
38
39
40
41
42
43
44
45
46
    service_url: str,
    model_name: str,
    isl: int,
    osl: int,
    stddev: int,
    concurrency: int,
    output_dir: Path,
) -> None:
    output_dir.mkdir(parents=True, exist_ok=True)
    cmd = [
47
        "aiperf",
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
        "profile",
        "-m",
        model_name,
        "--endpoint-type",
        "chat",
        "--streaming",
        "-u",
        service_url,
        "--synthetic-input-tokens-mean",
        str(isl),
        "--synthetic-input-tokens-stddev",
        str(stddev),
        "--concurrency",
        str(concurrency),
        "--output-tokens-mean",
        str(osl),
        "--extra-inputs",
        f"max_tokens:{osl}",
        "--extra-inputs",
        f"min_tokens:{osl}",
        "--extra-inputs",
        "ignore_eos:true",
        "--tokenizer",
        model_name,
        "--artifact-dir",
        str(output_dir),
        "--",
        "-vv",
        "--max-threads=300",
    ]
    print(
79
        f"Running aiperf with isl {isl}, osl {osl}, concurrency {concurrency}",
80
81
82
        flush=True,
    )

83
    aip_process = subprocess.Popen(
84
85
86
87
88
89
        cmd,
        cwd=str(output_dir),
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
    )
90
91
92
    stdout, stderr = aip_process.communicate()
    if aip_process.returncode == 0:
        print("Aiperf profiling completed successfully", flush=True)
93
94
95
        if stdout:
            print(stdout)
    else:
96
        print(f"Aiperf failed with error code: {aip_process.returncode}")
97
98
99
        if stderr:
            print(f"stderr: {stderr}")
        raise subprocess.CalledProcessError(
100
            aip_process.returncode, cmd, output=stdout, stderr=stderr
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
        )


def run_concurrency_sweep(
    service_url: str, model_name: str, isl: int, osl: int, stddev: int, output_dir: Path
) -> None:
    concurrency_levels = get_concurrency_levels()
    print(
        f"Running concurrency sweep for {model_name} with ISL {isl} and OSL {osl} and standard deviation {stddev}",
        flush=True,
    )
    print(f"Concurrency levels: {concurrency_levels}", flush=True)

    for c in concurrency_levels:
        print(f"Starting concurrency level {c}", flush=True)
116
        run_aiperf(service_url, model_name, isl, osl, stddev, c, output_dir / f"c{c}")