"vscode:/vscode.git/clone" did not exist on "07d578942325a102452b6e7d6545b9590ecb727b"
real_data_priority_benchmark.py 9.23 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
#!/usr/bin/env python3

# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Priority queue benchmark: splits a trace into priority tiers, runs a baseline
(no priority tagging) and a priority-tagged run with the same split, then
produces a bar chart comparing TTFT across tiers."""

import argparse
import copy
import json
import os
import subprocess

import matplotlib.pyplot as plt
import numpy as np
from common import (
    add_common_args,
    add_synthesis_args,
    get_aiperf_cmd_for_trace,
    prepare_trace_dataset,
    resolve_tokenizer,
    setup_logger,
)

logger = setup_logger(__name__)

TIERS = ["low", "medium", "high"]


def parse_float_list(s):
    """Parse a comma-separated string into a list of floats."""
    return [float(x.strip()) for x in s.split(",")]


def split_trace(requests, distribution, seed):
    """Split requests into priority tiers by distribution. Deterministic given seed."""
    rng = np.random.RandomState(seed)
    labels = rng.choice(len(distribution), size=len(requests), p=distribution)
    return {
        tier: [r for r, label in zip(requests, labels) if label == i]
        for i, tier in enumerate(TIERS)
    }


def offset_hash_ids(tier_requests):
    """Return a deep copy of tier_requests with all hash_ids shifted by max_hash_id + 1.

    Preserves the prefix tree structure (relative ordering and sharing)
    while ensuring no KV cache hits from a previous run.
    """
    max_hash_id = max(
        h
        for requests in tier_requests.values()
        for req in requests
        for h in req["hash_ids"]
    )
    offset = max_hash_id + 1
    shifted = {}
    for tier, requests in tier_requests.items():
        shifted[tier] = []
        for req in requests:
            r = copy.copy(req)
            r["hash_ids"] = [h + offset for h in r["hash_ids"]]
            shifted[tier].append(r)
    return shifted


def write_trace_file(requests, path):
    """Write a list of request dicts to a JSONL file."""
    os.makedirs(os.path.dirname(path), exist_ok=True)
    with open(path, "w") as f:
        for request in requests:
            f.write(json.dumps(request) + "\n")


def run_concurrent_streams(
    args, tier_requests, priority_values, run_dir, tag_priority, logger, seed=None
):
    """Launch concurrent aiperf subprocesses for each tier.

    Args:
        tag_priority: If True, inject nvext.agent_hints.latency_sensitivity per tier.
    """
    processes = []
    log_files = []
    for tier, pj in zip(TIERS, priority_values):
        tier_dir = os.path.join(run_dir, f"{tier}_priority")
        os.makedirs(tier_dir, exist_ok=True)

        trace_path = os.path.join(tier_dir, "trace.jsonl")
        write_trace_file(tier_requests[tier], trace_path)

        artifact_dir = os.path.join(tier_dir, "aiperf_artifacts")
        os.makedirs(artifact_dir, exist_ok=True)

        cmd = get_aiperf_cmd_for_trace(
            args.model,
            args.tokenizer,
            trace_path,
            artifact_dir,
            seed if seed is not None else args.seed,
            args.block_size,
            args.url,
        )
        cmd.extend(["--log-level", "WARNING", "--ui-type", "none"])
        if tag_priority:
            cmd.extend(
                [
                    "--extra-inputs",
                    json.dumps({"nvext": {"agent_hints": {"latency_sensitivity": pj}}}),
                ]
            )

        log_path = os.path.join(tier_dir, "aiperf.log")
        log_file = open(log_path, "w")
        log_files.append(log_file)

        label = "priority" if tag_priority else "baseline"
        logger.info(f"Launching {tier} tier ({label}, latency_sensitivity={pj})")
        logger.info(f"  Command: {' '.join(cmd)}")

        proc = subprocess.Popen(cmd, stdout=log_file, stderr=subprocess.STDOUT)
        processes.append((tier, proc))

    failed = []
    for tier, proc in processes:
        proc.wait()
        if proc.returncode == 0:
            logger.info(f"  {tier} tier completed successfully")
        else:
            logger.error(f"  {tier} tier failed with exit code {proc.returncode}")
            failed.append(tier)

    for log_file in log_files:
        log_file.close()

    if failed:
        label = "priority" if tag_priority else "baseline"
        logger.error(f"Failed tiers in {label} run: {', '.join(failed)}")
        logger.error("Check the aiperf.log files in each tier directory for details")
        raise SystemExit(1)


def load_ttft(run_dir, tier):
    """Load TTFT stats from an aiperf result JSON."""
    result_path = os.path.join(
        run_dir, f"{tier}_priority", "aiperf_artifacts", "profile_export_aiperf.json"
    )
    with open(result_path, "r") as f:
        data = json.load(f)
    ttft = data["time_to_first_token"]
    return ttft["p50"], ttft["p25"], ttft["p75"]


def plot_ttft_comparison(baseline_dir, priority_dir, output_path, priority_values):
    """Create a grouped bar chart comparing TTFT between baseline and priority runs."""
    x = np.arange(len(TIERS))
    width = 0.35

    baseline_medians = []
    baseline_lo = []
    baseline_hi = []
    priority_medians = []
    priority_lo = []
    priority_hi = []

    for tier in TIERS:
        p50, p25, p75 = load_ttft(baseline_dir, tier)
        baseline_medians.append(p50)
        baseline_lo.append(p50 - p25)
        baseline_hi.append(p75 - p50)

        p50, p25, p75 = load_ttft(priority_dir, tier)
        priority_medians.append(p50)
        priority_lo.append(p50 - p25)
        priority_hi.append(p75 - p50)

    fig, ax = plt.subplots(figsize=(8, 5))

    ax.bar(
        x - width / 2,
        baseline_medians,
        width,
        yerr=[baseline_lo, baseline_hi],
        label="Baseline (no priority)",
        capsize=4,
    )
    ax.bar(
        x + width / 2,
        priority_medians,
        width,
        yerr=[priority_lo, priority_hi],
        label="With latency_sensitivity",
        capsize=4,
    )

    tier_labels = [
        f"{tier.capitalize()}\n(ls={pj})" for tier, pj in zip(TIERS, priority_values)
    ]
    ax.set_xticks(x)
    ax.set_xticklabels(tier_labels)
    ax.set_ylabel("TTFT (ms)")
    ax.set_title("Time to First Token by Priority Tier")
    ax.legend()
    ax.grid(axis="y", alpha=0.3)

    fig.tight_layout()
    fig.savefig(output_path, dpi=150)
    logger.info(f"Plot saved to: {output_path}")
    plt.close(fig)


def main():
    parser = argparse.ArgumentParser(
        description="Priority benchmark: compare TTFT with and without priority tagging"
    )

    add_common_args(parser)
    add_synthesis_args(parser)

    parser.add_argument(
        "--priority-distribution",
        type=str,
        default="0.5,0.3,0.2",
        help="Comma-separated fractions for low/medium/high tiers (default: 0.5,0.3,0.2)",
    )
    parser.add_argument(
        "--priority-values",
        type=str,
        default="0,1,2",
        help="Comma-separated latency_sensitivity values for low/medium/high tiers (default: 0,1,2)",
    )

    args = parser.parse_args()
    resolve_tokenizer(args)

    distribution = parse_float_list(args.priority_distribution)
    priority_values = parse_float_list(args.priority_values)

    if len(distribution) != len(TIERS):
        parser.error(
            f"--priority-distribution must have {len(TIERS)} values, got {len(distribution)}"
        )
    if len(priority_values) != len(TIERS):
        parser.error(
            f"--priority-values must have {len(TIERS)} values, got {len(priority_values)}"
        )
    if abs(sum(distribution) - 1.0) > 1e-6:
        parser.error(
            f"--priority-distribution must sum to 1.0, got {sum(distribution)}"
        )

    os.makedirs(args.output_dir, exist_ok=True)

    # Prepare the trace dataset (synthesis if needed)
    requests, _ = prepare_trace_dataset(args, args.output_dir, logger)

    # Split into priority tiers (deterministic via seed)
    tier_requests = split_trace(requests, distribution, args.seed)
    for tier in TIERS:
        logger.info(f"  {tier} priority: {len(tier_requests[tier])} requests")

265
266
267
    # Offset hash_ids for the priority run so it starts with a cold KV cache,
    # keeping the comparison fair. Same seed for both runs so prompts match.
    priority_tier_requests = offset_hash_ids(tier_requests)
268
269
270
271
272
273
274
275
276
277
278

    # Run 1: Baseline (same split, no priority tagging)
    baseline_dir = os.path.join(args.output_dir, "baseline")
    logger.info("=== Running baseline (no priority tagging) ===")
    run_concurrent_streams(
        args,
        tier_requests,
        priority_values,
        baseline_dir,
        tag_priority=False,
        logger=logger,
279
        seed=args.seed,
280
281
    )

282
    # Run 2: With priority tagging (offset hash_ids for cold cache)
283
284
285
286
    priority_dir = os.path.join(args.output_dir, "priority")
    logger.info("=== Running with priority tagging ===")
    run_concurrent_streams(
        args,
287
        priority_tier_requests,
288
289
290
291
        priority_values,
        priority_dir,
        tag_priority=True,
        logger=logger,
292
        seed=args.seed,
293
294
295
296
297
298
299
300
301
302
303
    )

    # Plot comparison
    plot_path = os.path.join(args.output_dir, "ttft_comparison.png")
    plot_ttft_comparison(baseline_dir, priority_dir, plot_path, priority_values)

    logger.info(f"All runs completed. Results saved to: {args.output_dir}")


if __name__ == "__main__":
    main()