workflow.py 3.37 KB
Newer Older
1
2
3
4
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from pathlib import Path
5
from typing import Dict, List
6
7
8

from benchmarks.utils.genai import run_concurrency_sweep
from benchmarks.utils.plot import generate_plots
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from deploy.utils.kubernetes import is_running_in_cluster


def has_http_scheme(url: str) -> bool:
    """Check if URL has HTTP or HTTPS scheme."""
    return url.lower().startswith(("http://", "https://"))


def normalize_service_url(endpoint: str) -> str:
    e = endpoint.strip()
    if has_http_scheme(e):
        return e
    if is_running_in_cluster():
        return f"http://{e}"
    return e  # Outside cluster, validation will have ensured scheme is present
24
25
26


def print_concurrency_start(
27
    label: str, model: str, isl: int, osl: int, std: int
28
29
) -> None:
    """Print concurrency sweep start messages"""
30
    print(f"⚙️  Starting {label} concurrency sweep!", flush=True)
31
32
33
34
35
36
37
    print(
        "⏱️  This may take several minutes - running through multiple concurrency levels...",
        flush=True,
    )
    print(f"🎯 Model: {model} | ISL: {isl} | OSL: {osl} | StdDev: {std}")


38
def run_endpoint_benchmark(
39
40
41
42
43
44
    label: str,
    endpoint: str,
    model: str,
    isl: int,
    osl: int,
    std: int,
45
    output_dir: Path,
46
47
) -> None:
    """Run benchmark for an existing endpoint with custom label"""
48
49
50
51
    # Normalize endpoint to a usable URL (handles in-cluster scheme-less inputs)
    service_url = normalize_service_url(endpoint)

    print(f"🚀 Starting benchmark of endpoint '{label}': {service_url}")
52
53
54
55
56
    print(f"📁 Results will be saved to: {output_dir / label}")
    print_concurrency_start(label, model, isl, osl, std)

    # Create output directory
    (output_dir / label).mkdir(parents=True, exist_ok=True)
57
58

    run_concurrency_sweep(
59
        service_url=service_url,
60
61
62
63
        model_name=model,
        isl=isl,
        osl=osl,
        stddev=std,
64
        output_dir=output_dir / label,
65
66
67
68
    )
    print("✅ Endpoint benchmark completed successfully!")


69
def print_final_summary(output_dir: Path, labels: List[str]) -> None:
70
71
    """Print final benchmark summary"""
    print("📊 Generating performance plots...")
72
73
74
    generate_plots(base_output_dir=output_dir, output_dir=output_dir / "plots")
    print(f"📈 Plots saved to: {output_dir / 'plots'}")
    print(f"📋 Summary saved to: {output_dir / 'plots' / 'SUMMARY.txt'}")
75
76
77
78
79

    print()
    print("🎉 Benchmark workflow completed successfully!")
    print(f"📁 All results available at: {output_dir}")

80
81
    if labels:
        print(f"🚀 Benchmarked: {', '.join(labels)}")
82

83
    print(f"📊 View plots at: {output_dir / 'plots'}")
84
85


86
def run_benchmark_workflow(
87
    inputs: Dict[str, str],
88
    isl: int = 2000,
89
    std: int = 10,
90
91
    osl: int = 256,
    model: str = "Qwen/Qwen3-0.6B",
92
93
    output_dir: str = "benchmarks/results",
) -> None:
94
    """Main benchmark workflow orchestrator for HTTP endpoints (and in-cluster internal service URLs)"""
95
96
    output_dir_path = Path(output_dir)
    output_dir_path.mkdir(parents=True, exist_ok=True)
97
98

    # Run endpoint benchmarks
99
100
101
102
    benchmarked_labels = []
    for label, endpoint in inputs.items():
        run_endpoint_benchmark(label, endpoint, model, isl, osl, std, output_dir_path)
        benchmarked_labels.append(label)
103
104

    # Generate final summary
105
    print_final_summary(output_dir_path, benchmarked_labels)