"tests/vscode:/vscode.git/clone" did not exist on "8c2072cf34d5146abf9df2668d94b6ca87ee0eb0"
workflow.py 3 KB
Newer Older
1
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
4
# SPDX-License-Identifier: Apache-2.0

from pathlib import Path
5
from typing import Dict, List
6

7
from benchmarks.utils.aiperf import run_concurrency_sweep
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from deploy.utils.kubernetes import is_running_in_cluster


def has_http_scheme(url: str) -> bool:
    """Check if URL has HTTP or HTTPS scheme."""
    return url.lower().startswith(("http://", "https://"))


def normalize_service_url(endpoint: str) -> str:
    e = endpoint.strip()
    if has_http_scheme(e):
        return e
    if is_running_in_cluster():
        return f"http://{e}"
    return e  # Outside cluster, validation will have ensured scheme is present
23
24
25


def print_concurrency_start(
26
    label: str, model: str, isl: int, osl: int, std: int
27
28
) -> None:
    """Print concurrency sweep start messages"""
29
    print(f"⚙️  Starting {label} concurrency sweep!", flush=True)
30
31
32
33
34
35
36
    print(
        "⏱️  This may take several minutes - running through multiple concurrency levels...",
        flush=True,
    )
    print(f"🎯 Model: {model} | ISL: {isl} | OSL: {osl} | StdDev: {std}")


37
def run_endpoint_benchmark(
38
39
40
41
42
43
    label: str,
    endpoint: str,
    model: str,
    isl: int,
    osl: int,
    std: int,
44
    output_dir: Path,
45
46
) -> None:
    """Run benchmark for an existing endpoint with custom label"""
47
48
49
50
    # Normalize endpoint to a usable URL (handles in-cluster scheme-less inputs)
    service_url = normalize_service_url(endpoint)

    print(f"🚀 Starting benchmark of endpoint '{label}': {service_url}")
51
52
53
54
55
    print(f"📁 Results will be saved to: {output_dir / label}")
    print_concurrency_start(label, model, isl, osl, std)

    # Create output directory
    (output_dir / label).mkdir(parents=True, exist_ok=True)
56
57

    run_concurrency_sweep(
58
        service_url=service_url,
59
60
61
62
        model_name=model,
        isl=isl,
        osl=osl,
        stddev=std,
63
        output_dir=output_dir / label,
64
65
66
67
    )
    print("✅ Endpoint benchmark completed successfully!")


68
def print_final_summary(output_dir: Path, labels: List[str]) -> None:
69
70
71
72
    """Print final benchmark summary"""
    print("🎉 Benchmark workflow completed successfully!")
    print(f"📁 All results available at: {output_dir}")

73
74
    if labels:
        print(f"🚀 Benchmarked: {', '.join(labels)}")
75
76


77
def run_benchmark_workflow(
78
    inputs: Dict[str, str],
79
    isl: int = 2000,
80
    std: int = 10,
81
82
    osl: int = 256,
    model: str = "Qwen/Qwen3-0.6B",
83
84
    output_dir: str = "benchmarks/results",
) -> None:
85
    """Main benchmark workflow orchestrator for HTTP endpoints (and in-cluster internal service URLs)"""
86
87
    output_dir_path = Path(output_dir)
    output_dir_path.mkdir(parents=True, exist_ok=True)
88
89

    # Run endpoint benchmarks
90
91
92
93
    benchmarked_labels = []
    for label, endpoint in inputs.items():
        run_endpoint_benchmark(label, endpoint, model, isl, osl, std, output_dir_path)
        benchmarked_labels.append(label)
94
95

    # Generate final summary
96
    print_final_summary(output_dir_path, benchmarked_labels)