Unverified Commit 88efa735 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix: e2e aiperf profiling on NAT dataset (#5990)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 910d74f5
......@@ -19,7 +19,7 @@ import re
from collections import defaultdict
from aiperf.common.tokenizer import Tokenizer
from aiperf.dataset.synthesis.rolling_hasher import texts_to_hashes
from aiperf.dataset.synthesis.rolling_hasher import RollingHasher
from tqdm import tqdm
......@@ -49,8 +49,8 @@ def parse_args():
parser.add_argument(
"--block-size",
type=int,
default=128,
help="Block size for hash generation (default: 128)",
default=64,
help="Block size for hash generation (default: 64)",
)
parser.add_argument(
"--num-requests",
......@@ -64,6 +64,12 @@ def parse_args():
default=0,
help="Skip the first N requests (default: 0)",
)
parser.add_argument(
"--delay",
type=int,
default=500,
help="Delay in ms to add between LLM calls within a session (simulates tool call timing). If not specified, no delay field is added.",
)
return parser.parse_args()
......@@ -173,6 +179,39 @@ def extract_llm_calls(request: dict) -> list[dict]:
return llm_calls
def texts_to_hashes_and_lengths(
tokenizer: Tokenizer,
texts: list[str],
block_size: int,
) -> tuple[list[list[int]], list[int]]:
"""
Convert texts to hash IDs and token lengths.
Returns:
Tuple of (hash_ids_list, token_lengths) where:
- hash_ids_list: List of hash ID sequences, one per input text
- token_lengths: List of token counts, one per input text
"""
hasher = RollingHasher(block_size=block_size)
hash_results: list[list[int]] = []
length_results: list[int] = []
for text in texts:
tokens = tokenizer.encode(text)
length_results.append(len(tokens))
blocks: list[list[int]] = [
tokens[i : i + block_size] for i in range(0, len(tokens), block_size)
]
if blocks:
hashes = hasher.hash_token_blocks(blocks)
hash_results.append(hashes)
else:
hash_results.append([])
return hash_results, length_results
def chat_inputs_to_text(chat_inputs: list) -> str:
"""
Convert chat_inputs array to a single text string for hashing.
......@@ -200,6 +239,7 @@ def convert_to_mooncake(
block_size: int,
skip_requests: int = 0,
num_requests: int | None = None,
delay: int | None = None,
) -> list[dict]:
"""
Convert NAT requests to mooncake format.
......@@ -210,6 +250,7 @@ def convert_to_mooncake(
block_size: Block size for hash generation
skip_requests: Number of requests to skip
num_requests: Maximum number of requests to process
delay: Delay in ms to add between LLM calls within a session
Returns:
List of mooncake-format dicts
......@@ -222,7 +263,7 @@ def convert_to_mooncake(
print(f"Processing {len(requests)} requests...")
# Phase 1: Collect all texts and metadata
all_entries = [] # List of (session_id, prompt_tokens, completion_tokens, text)
all_entries = [] # List of (session_id, completion_tokens, text)
for req in tqdm(requests, desc="Extracting LLM calls"):
request_number = req.get("request_number", 0)
......@@ -235,10 +276,6 @@ def convert_to_mooncake(
continue
for call in llm_calls:
# Skip calls with zero tokens (might be incomplete)
if call["prompt_tokens"] == 0:
continue
# Convert chat_inputs to text for hashing
text = chat_inputs_to_text(call["chat_inputs"])
......@@ -251,7 +288,6 @@ def convert_to_mooncake(
all_entries.append(
(
session_id,
call["prompt_tokens"],
call["completion_tokens"],
text,
)
......@@ -261,24 +297,33 @@ def convert_to_mooncake(
print("No valid LLM calls found")
return []
# Phase 2: Batch hash all texts at once (single hasher instance)
all_texts = [entry[3] for entry in all_entries]
print(f"Hashing {len(all_texts)} texts...")
# Phase 2: Tokenize texts to get hash IDs and token lengths
all_texts = [entry[2] for entry in all_entries]
print(f"Tokenizing and hashing {len(all_texts)} texts...")
tokenizer = Tokenizer.from_pretrained(tokenizer_name)
all_hash_ids = texts_to_hashes(tokenizer, all_texts, block_size)
all_hash_ids, all_input_lengths = texts_to_hashes_and_lengths(
tokenizer, all_texts, block_size
)
# Phase 3: Build mooncake entries
mooncake_data = []
for (session_id, prompt_tokens, completion_tokens, _), hash_ids in zip(
all_entries, all_hash_ids, strict=True
seen_sessions = set()
for (session_id, completion_tokens, _), hash_ids, input_length in zip(
all_entries, all_hash_ids, all_input_lengths, strict=True
):
mooncake_entry = {
"session_id": session_id,
"input_length": prompt_tokens,
"input_length": input_length,
"output_length": completion_tokens,
"hash_ids": hash_ids,
}
# Add delay for all but the first entry in each session
if delay is not None:
if session_id in seen_sessions:
mooncake_entry["delay"] = delay
else:
seen_sessions.add(session_id)
mooncake_data.append(mooncake_entry)
return mooncake_data
......@@ -386,6 +431,7 @@ def main():
args.block_size,
skip_requests=args.skip_requests,
num_requests=args.num_requests,
delay=args.delay,
)
# Print statistics
......
......@@ -41,7 +41,7 @@ classifiers = [
dependencies = [
"aiconfigurator[webapp] @ git+https://github.com/ai-dynamo/aiconfigurator.git@release/0.6.0",
"aiperf @ git+https://github.com/ai-dynamo/aiperf.git@40530ed231fb01ae1cfe3c9d22e43a0e7143780b",
"aiperf @ git+https://github.com/ai-dynamo/aiperf.git@c3fc969e9e30e9ddad35b2f613aa7c1d418f2de9",
"matplotlib",
"networkx",
"numpy",
......
......@@ -41,7 +41,8 @@ This will start both etcd and NATS with the required configurations in the backg
- **`ping.sh`** - Simple test script to verify the setup is working
- **`prefix_ratio_benchmark.py`** - Main benchmarking script that sweeps prefix ratios
- **`real_data_benchmark.py`** - Benchmarking script that uses real mooncake-style trace data
- **`plot_prefix_ratio_comparison.py`** - Generates comparison plots from benchmark results
- **`agent_benchmark.py`** - Concurrency-based benchmarking for multi-turn conversation traces
- **`mock_server.py`** - Simple mock server to receive and log requests from aiperf
## Usage Instructions
......@@ -245,6 +246,61 @@ python real_data_benchmark.py --input-dataset trace.jsonl --prefix-root-multipli
> ```
> However, by the time of release, the aiperf version included in the vLLM runtime container should be up to date enough to use as-is.
### Step 4 (Alternative): Agent Benchmark (Concurrency-Based Multi-Turn)
For benchmarking with multi-turn conversation traces using concurrency-based load generation (instead of timestamp-based replay), use `agent_benchmark.py`. This is useful for testing how the system handles multiple concurrent agent sessions.
```bash
python agent_benchmark.py --input-dataset trace.jsonl --concurrency 10
```
**Key parameters:**
- `--concurrency`: Number of concurrent sessions to maintain (default: 10)
- `--delay`: Override delay (ms) between turns within a session. Set to 0 to remove all delays.
Examples:
```bash
# Run with 20 concurrent sessions using delays from trace file
python agent_benchmark.py --input-dataset trace.jsonl --concurrency 20
# Run with no delays between turns (stress test)
python agent_benchmark.py --input-dataset trace.jsonl --concurrency 10 --delay 0
# Run with fixed 1-second delay between turns
python agent_benchmark.py --input-dataset trace.jsonl --concurrency 10 --delay 1000
```
### Trace Dataset Format (JSONL)
Both `real_data_benchmark.py` and `agent_benchmark.py` accept trace datasets in JSONL format (one JSON object per line). The format is compatible with [Mooncake trace format](https://github.com/kvcache-ai/Mooncake).
#### Fields
| Field | Type | Description |
|-------|------|-------------|
| `input_length` | int | Number of input tokens for this request |
| `output_length` | int | Number of output tokens to generate |
| `session_id` | string | Groups turns into multi-turn conversations. Requests with the same `session_id` are processed sequentially. |
| `hash_ids` | list[int] | List of hash IDs representing prefix blocks for KV cache routing. Shared hash IDs indicate shared prefixes. |
| `delay` | int | Delay in milliseconds to wait before sending this turn (applied after the previous turn in the same session completes). Not applied to first turns. |
#### Example Trace File
```jsonl
{"session_id": "conv_0", "input_length": 9176, "output_length": 152, "hash_ids": [0, 1, 2, 3, 4, 5]}
{"session_id": "conv_0", "input_length": 9368, "output_length": 104, "hash_ids": [0, 1, 2, 3, 4, 5, 6, 7], "delay": 500}
{"session_id": "conv_0", "input_length": 9516, "output_length": 164, "hash_ids": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], "delay": 500}
{"session_id": "conv_1", "input_length": 9445, "output_length": 143, "hash_ids": [0, 1, 2, 10, 11, 12, 13]}
{"session_id": "conv_1", "input_length": 9628, "output_length": 123, "hash_ids": [0, 1, 2, 10, 11, 12, 13, 14, 15], "delay": 500}
```
In this example:
- `conv_0` and `conv_1` are two separate conversations that can run concurrently
- Within each conversation, turns are processed sequentially
- Subsequent turns have a 500ms delay after the previous turn completes
- `hash_ids` show prefix sharing: both conversations share prefix blocks `[0, 1, 2]`
## Benchmarking Results
We benchmarked the Dynamo KV Router against a baseline round-robin routing strategy to evaluate the performance benefits of cache-aware routing. The experiments were conducted using deepseek-ai/DeepSeek-R1-Distill-Llama-8B on 8 L40S GPUs under aggregated serving, with the following configuration:
......
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Agent benchmark script for running concurrency-based benchmarks with multi-turn
conversation traces. Uses aiperf with concurrency mode to run multiple sessions
in parallel while maintaining sequential ordering within each session.
Expected input JSON format (JSONL - one JSON object per line):
{
"session_id": "conv_0", # Groups turns into conversations (required for multi-turn)
"input_length": 9176, # Number of input tokens (required)
"output_length": 152, # Number of output tokens (required)
"hash_ids": [0, 1, 2, ...], # List of hash IDs for prefix caching (optional)
"delay": 500 # Delay in ms before this turn (optional, applied after previous turn completes)
}
"""
import argparse
import json
import os
import subprocess
from common import (
DEFAULT_BLOCK_SIZE,
add_common_args,
get_common_aiperf_flags,
resolve_tokenizer,
setup_logger,
)
logger = setup_logger(__name__)
def count_dataset_entries(input_dataset):
"""Count the number of entries in a JSONL dataset file."""
count = 0
with open(input_dataset, "r") as f:
for line in f:
if line.strip():
count += 1
return count
def get_aiperf_cmd(
model,
tokenizer,
input_dataset,
artifact_dir,
seed,
concurrency,
block_size,
request_count,
url="http://localhost:8000",
):
"""Build aiperf command for concurrency-based trace benchmarking."""
cmd = [
"aiperf",
"profile",
"--model",
model,
"--tokenizer",
tokenizer,
"--url",
url,
"--input-file",
input_dataset,
"--custom-dataset-type",
"mooncake_trace",
"--concurrency",
str(concurrency),
"--request-count",
str(request_count),
"--prompt-input-tokens-block-size",
str(block_size),
"--random-seed",
str(seed),
"--artifact-dir",
artifact_dir,
]
cmd.extend(get_common_aiperf_flags())
return cmd
def prepare_dataset(input_dataset, output_path, delay_override=None):
"""
Prepare the dataset, optionally overriding delay values.
Args:
input_dataset: Path to input JSONL file
output_path: Path to write modified dataset
delay_override: If set, override all delay values with this value (in ms).
Use 0 to remove delays entirely.
Returns:
Path to the dataset to use (original or modified)
"""
if delay_override is None:
return input_dataset
logger.info(f"Overriding delay values with: {delay_override}ms")
requests = []
with open(input_dataset, "r") as f:
for line in f:
line = line.strip()
if line:
requests.append(json.loads(line))
# Track sessions to know which entries are first turns (no delay on first turn)
session_first_seen = set()
for request in requests:
session_id = request.get("session_id")
if session_id is not None and session_id not in session_first_seen:
# First turn of a session - remove delay if present
session_first_seen.add(session_id)
request.pop("delay", None)
elif delay_override == 0:
# Remove delay entirely
request.pop("delay", None)
else:
# Override delay for subsequent turns
request["delay"] = delay_override
with open(output_path, "w") as f:
for request in requests:
f.write(json.dumps(request) + "\n")
logger.info(f"Modified dataset saved to: {output_path}")
return output_path
def run_benchmark(
model,
tokenizer,
trace_dataset,
artifact_dir,
url,
seed,
concurrency,
block_size,
request_count,
):
"""Run aiperf benchmark with concurrency mode."""
aiperf_cmd = get_aiperf_cmd(
model,
tokenizer,
trace_dataset,
artifact_dir,
seed,
concurrency,
block_size,
request_count,
url,
)
logger.info(
f"Running aiperf with concurrency={concurrency}, request_count={request_count}"
)
logger.info(f"Dataset: {trace_dataset}")
logger.info(f"Command: {' '.join(aiperf_cmd)}")
try:
subprocess.run(aiperf_cmd, check=True)
logger.info("AIPerf profiling completed successfully")
except subprocess.CalledProcessError as e:
logger.error(f"AIPerf failed with error code: {e.returncode}")
raise
def main():
parser = argparse.ArgumentParser(
description="Run concurrency-based benchmark with multi-turn conversation traces"
)
# Common arguments
add_common_args(parser)
parser.add_argument(
"--output-dir",
type=str,
default="agent_benchmark_results",
help="Output directory for results",
)
# Dataset configuration
parser.add_argument(
"--input-dataset",
type=str,
required=True,
help="Path to the input trace dataset file (JSONL format)",
)
# Benchmark configuration
parser.add_argument(
"--concurrency",
type=int,
default=10,
help="Number of concurrent sessions to maintain (default: 10)",
)
parser.add_argument(
"--delay",
type=int,
default=None,
help="Override delay (ms) between turns within a session. "
"If not set, uses delay values from the trace file. "
"Set to 0 to remove all delays.",
)
parser.add_argument(
"--block-size",
type=int,
default=DEFAULT_BLOCK_SIZE,
help=f"Block size for prompt generation from hash_ids (default: {DEFAULT_BLOCK_SIZE})",
)
parser.add_argument(
"--request-count",
type=int,
default=None,
help="Number of requests to send. If not set, defaults to number of entries in input dataset.",
)
args = parser.parse_args()
resolve_tokenizer(args)
# Default request_count to dataset entry count if not specified
if args.request_count is None:
args.request_count = count_dataset_entries(args.input_dataset)
logger.info(
f"Request count not specified, using dataset entry count: {args.request_count}"
)
# Create output directory
os.makedirs(args.output_dir, exist_ok=True)
# Prepare dataset (apply delay override if specified)
if args.delay is not None:
modified_dataset_path = os.path.join(args.output_dir, "modified_trace.jsonl")
trace_dataset_path = prepare_dataset(
args.input_dataset, modified_dataset_path, args.delay
)
else:
trace_dataset_path = args.input_dataset
logger.info(f"Using original trace dataset: {trace_dataset_path}")
# Run benchmark
artifact_dir = os.path.join(args.output_dir, "aiperf_artifacts")
os.makedirs(artifact_dir, exist_ok=True)
run_benchmark(
args.model,
args.tokenizer,
trace_dataset_path,
artifact_dir,
args.url,
args.seed,
args.concurrency,
args.block_size,
args.request_count,
)
logger.info(f"Results saved to: {artifact_dir}")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Common utilities shared across router benchmark scripts."""
import logging
# Default values
DEFAULT_MODEL = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"
DEFAULT_URL = "http://localhost:8000"
DEFAULT_SEED = 0
DEFAULT_BLOCK_SIZE = 64
DEFAULT_MOONCAKE_BLOCK_SIZE = 512
def setup_logger(name: str) -> logging.Logger:
"""Setup and return a logger with standard formatting."""
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
def add_common_args(parser):
"""Add common CLI arguments shared across benchmark scripts."""
parser.add_argument(
"--model",
type=str,
default=DEFAULT_MODEL,
help="Model name",
)
parser.add_argument(
"--tokenizer",
type=str,
default=None,
help="Tokenizer name (defaults to model)",
)
parser.add_argument(
"--url",
type=str,
default=DEFAULT_URL,
help="Server URL",
)
parser.add_argument(
"--seed",
type=int,
default=DEFAULT_SEED,
help="Random seed for reproducibility (default: 0)",
)
parser.add_argument(
"--use-expected-osl",
action="store_true",
help="Pass expected_output_tokens to nvext for router tracking",
)
def resolve_tokenizer(args):
"""Set tokenizer to model if not specified."""
if args.tokenizer is None:
args.tokenizer = args.model
def get_common_aiperf_flags():
"""Return common aiperf flags used across benchmarks."""
return [
"--endpoint-type",
"chat",
"--endpoint",
"v1/chat/completions",
"--streaming",
"--extra-inputs",
"ignore_eos:true",
"--no-gpu-telemetry",
"-H",
"Authorization: Bearer NOT USED",
"-H",
"Accept: text/event-stream",
]
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Simple mock server to receive and log requests from aiperf loadgen.
Prints each request with timestamp, useful for debugging request patterns,
verifying session ordering, and inspecting delays between requests.
Usage:
python mock_server.py --port 8000
Then run aiperf pointing to this server:
python agent_benchmark.py --input-dataset trace.jsonl --url http://localhost:8000
"""
import argparse
import asyncio
import json
import time
from datetime import datetime
from aiohttp import web
class RequestLogger:
"""Tracks and logs incoming requests with timing information."""
def __init__(self, output_file: str | None = None, payload_file: str | None = None):
self.request_count = 0
self.start_time = None
self.session_last_seen: dict[str, float] = {}
self.output_file = output_file
self.payload_file = payload_file
self._file_handle = None
self._payload_handle = None
if output_file:
self._file_handle = open(output_file, "w")
if payload_file:
self._payload_handle = open(payload_file, "w")
def close(self):
"""Close the output files if open."""
if self._file_handle:
self._file_handle.close()
if self._payload_handle:
self._payload_handle.close()
def log_request(self, request_data: dict, headers: dict) -> None:
"""Log a request with timestamp and relevant metadata."""
now = time.time()
if self.start_time is None:
self.start_time = now
self.request_count += 1
elapsed = now - self.start_time
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
# Extract useful fields
model = request_data.get("model", "unknown")
messages = request_data.get("messages", [])
# Support both max_completion_tokens (new) and max_tokens (legacy)
max_tokens = request_data.get(
"max_completion_tokens", request_data.get("max_tokens", "N/A")
)
# Check for session tracking via x-correlation-id header
correlation_id = headers.get(
"x-correlation-id", headers.get("X-Correlation-Id")
)
# Calculate delay since last request from same session
session_delay = None
if correlation_id and correlation_id in self.session_last_seen:
session_delay = (now - self.session_last_seen[correlation_id]) * 1000 # ms
if correlation_id:
self.session_last_seen[correlation_id] = now
# Build log line
log_parts = [
f"[{timestamp}]",
f"#{self.request_count:4d}",
f"elapsed={elapsed:7.2f}s",
f"model={model}",
f"msgs={len(messages)}",
f"max_tokens={max_tokens}",
]
if correlation_id:
# Truncate for display
short_id = correlation_id[:8] if len(correlation_id) > 8 else correlation_id
log_parts.append(f"session={short_id}")
if session_delay is not None:
log_parts.append(f"delay={session_delay:.0f}ms")
log_line = " | ".join(log_parts)
print(log_line)
if self._file_handle:
self._file_handle.write(log_line + "\n")
self._file_handle.flush()
# Dump full payload to separate file
if self._payload_handle:
payload_entry = {
"request_number": self.request_count,
"timestamp": timestamp,
"elapsed_s": elapsed,
"correlation_id": correlation_id,
"headers": {
k: v
for k, v in headers.items()
if k.lower() not in ("authorization", "accept")
},
"body": request_data,
}
self._payload_handle.write(json.dumps(payload_entry) + "\n")
self._payload_handle.flush()
logger: RequestLogger = None # Initialized in main()
async def health_check(request: web.Request) -> web.Response:
"""Health check endpoint."""
return web.json_response({"status": "ok"})
async def chat_completions(request: web.Request) -> web.StreamResponse:
"""Handle /v1/chat/completions requests."""
try:
body = await request.json()
except json.JSONDecodeError:
return web.json_response({"error": "Invalid JSON"}, status=400)
# Log the request
headers = dict(request.headers)
logger.log_request(body, headers)
# Check if streaming is requested
stream = body.get("stream", False)
# Support both max_completion_tokens (new) and max_tokens (legacy)
max_tokens = body.get("max_completion_tokens", body.get("max_tokens", 10))
if stream:
# Return a streaming response
response = web.StreamResponse(
status=200,
headers={
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
await response.prepare(request)
# Send a minimal streaming response
# Each chunk simulates a token being generated
for i in range(max_tokens):
chunk = {
"id": f"chatcmpl-mock-{logger.request_count}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": body.get("model", "mock"),
"choices": [
{
"index": 0,
"delta": {"content": "x"}
if i > 0
else {"role": "assistant", "content": ""},
"finish_reason": None,
}
],
}
await response.write(f"data: {json.dumps(chunk)}\n\n".encode())
await asyncio.sleep(0.001) # Small delay between tokens
# Send final chunk with finish_reason
final_chunk = {
"id": f"chatcmpl-mock-{logger.request_count}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": body.get("model", "mock"),
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": "stop",
}
],
}
await response.write(f"data: {json.dumps(final_chunk)}\n\n".encode())
await response.write(b"data: [DONE]\n\n")
await response.write_eof()
return response
else:
# Non-streaming response
return web.json_response(
{
"id": f"chatcmpl-mock-{logger.request_count}",
"object": "chat.completion",
"created": int(time.time()),
"model": body.get("model", "mock"),
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": "x" * max_tokens},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": 10,
"completion_tokens": max_tokens,
"total_tokens": 10 + max_tokens,
},
}
)
async def models_list(request: web.Request) -> web.Response:
"""Handle /v1/models endpoint."""
return web.json_response(
{
"object": "list",
"data": [
{
"id": "mock-model",
"object": "model",
"created": int(time.time()),
"owned_by": "mock",
}
],
}
)
def create_app() -> web.Application:
"""Create the aiohttp application."""
app = web.Application()
app.router.add_get("/health", health_check)
app.router.add_get("/v1/models", models_list)
app.router.add_post("/v1/chat/completions", chat_completions)
return app
def main():
global logger
parser = argparse.ArgumentParser(
description="Mock server to receive and log aiperf requests"
)
parser.add_argument(
"--port",
type=int,
default=8000,
help="Port to listen on (default: 8000)",
)
parser.add_argument(
"--host",
type=str,
default="0.0.0.0",
help="Host to bind to (default: 0.0.0.0)",
)
parser.add_argument(
"--output",
type=str,
default="mock_server_logs.txt",
help="Output file for request logs (default: mock_server_logs.txt)",
)
parser.add_argument(
"--payload-output",
type=str,
default="mock_server_payloads.jsonl",
help="Output file for full request payloads in JSONL format (default: mock_server_payloads.jsonl)",
)
args = parser.parse_args()
# Initialize logger with output files
logger = RequestLogger(output_file=args.output, payload_file=args.payload_output)
print(f"Starting mock server on {args.host}:{args.port}")
print(f"Logging requests to: {args.output}")
print(f"Dumping payloads to: {args.payload_output}")
print("Waiting for requests from aiperf...")
print("-" * 80)
try:
app = create_app()
web.run_app(app, host=args.host, port=args.port, print=None)
finally:
logger.close()
if __name__ == "__main__":
main()
......@@ -16,6 +16,6 @@ curl -X POST http://localhost:${PORT}/v1/chat/completions \
{"role": "user", "content": "What is 2+2?"}
],
"stream": true,
"max_tokens": 10,
"max_completion_tokens": 10,
"ignore_eos": true
}'
\ No newline at end of file
......@@ -5,31 +5,27 @@
import argparse
import json
import logging
import os
import subprocess
from typing import Dict, List, Optional
from typing import Dict, Optional
import matplotlib
matplotlib.use("Agg") # Use non-interactive backend
import matplotlib.pyplot as plt
# Setup logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
from common import (
add_common_args,
get_common_aiperf_flags,
resolve_tokenizer,
setup_logger,
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger = setup_logger(__name__)
def get_aiperf_cmd(
model,
tokenizer, # Add tokenizer parameter
tokenizer,
prefix_ratio,
isl,
osl,
......@@ -51,18 +47,13 @@ def get_aiperf_cmd(
nvext_dict["expected_output_tokens"] = osl
nvext_json = json.dumps({"nvext": nvext_dict})
return [
cmd = [
"aiperf",
"profile",
"--model",
model,
"--tokenizer",
tokenizer, # Use the tokenizer parameter instead of model
"--endpoint-type",
"chat",
"--endpoint",
"v1/chat/completions",
"--streaming",
tokenizer,
"--url",
url,
"--synthetic-input-tokens-mean",
......@@ -74,8 +65,6 @@ def get_aiperf_cmd(
"--output-tokens-stddev",
str(round(osl / 4)),
"--extra-inputs",
"ignore_eos:true",
"--extra-inputs",
nvext_json,
"--concurrency",
str(concurrency),
......@@ -93,11 +82,9 @@ def get_aiperf_cmd(
artifact_dir,
"--dataset-sampling-strategy",
"shuffle",
"-H",
"Authorization: Bearer NOT USED",
"-H",
"Accept: text/event-stream",
]
cmd.extend(get_common_aiperf_flags())
return cmd
def get_aiperf_result(artifact_dir: str) -> dict:
......@@ -117,90 +104,9 @@ def get_aiperf_result(artifact_dir: str) -> dict:
return json.load(f)
def run_benchmark_single_url(
model,
tokenizer, # Add tokenizer parameter
prefix_ratio,
isl,
osl,
requests,
concurrency,
seed,
num_prefix_prompts,
artifact_dir,
url,
use_expected_osl=False,
) -> Optional[Dict]:
"""Run aiperf benchmark for a single URL"""
aiperf_cmd = get_aiperf_cmd(
model,
tokenizer, # Pass tokenizer parameter
prefix_ratio,
isl,
osl,
requests,
concurrency,
seed,
num_prefix_prompts,
artifact_dir,
url,
use_expected_osl,
)
logger.info(f"Running command for URL {url}: {' '.join(aiperf_cmd)}")
try:
# Run aiperf and let it output directly to terminal
subprocess.run(aiperf_cmd, check=True)
logger.info(f"AIPerf profiling completed successfully for URL {url}")
aiperf_result = get_aiperf_result(artifact_dir)
return aiperf_result
except subprocess.CalledProcessError as e:
logger.error(f"AIPerf failed for URL {url} with error code: {e.returncode}")
return None
def aggregate_results(results: List[Optional[Dict]]) -> Optional[Dict]:
"""Aggregate results from multiple URLs"""
if not results:
return None
valid_results = [r for r in results if r is not None]
if not valid_results:
return None
# For TTFT percentiles, average across URLs
ttft_p25_values = [r["time_to_first_token"]["p25"] for r in valid_results]
ttft_p50_values = [r["time_to_first_token"]["p50"] for r in valid_results]
ttft_p75_values = [r["time_to_first_token"]["p75"] for r in valid_results]
# For ITL percentiles, average across URLs
itl_p25_values = [r["inter_token_latency"]["p25"] for r in valid_results]
itl_p50_values = [r["inter_token_latency"]["p50"] for r in valid_results]
itl_p75_values = [r["inter_token_latency"]["p75"] for r in valid_results]
aggregated = {
"time_to_first_token": {
"p25": sum(ttft_p25_values) / len(ttft_p25_values),
"p50": sum(ttft_p50_values) / len(ttft_p50_values),
"p75": sum(ttft_p75_values) / len(ttft_p75_values),
},
"inter_token_latency": {
"p25": sum(itl_p25_values) / len(itl_p25_values),
"p50": sum(itl_p50_values) / len(itl_p50_values),
"p75": sum(itl_p75_values) / len(itl_p75_values),
},
}
return aggregated
def run_benchmark(
model,
tokenizer, # Add tokenizer parameter
tokenizer,
prefix_ratio,
isl,
osl,
......@@ -209,25 +115,20 @@ def run_benchmark(
seed,
num_prefix_prompts,
output_dir,
urls,
url,
use_expected_osl=False,
) -> Optional[Dict]:
"""Run aiperf benchmark for a specific prefix ratio"""
logger.info(
f"Running benchmark with prefix_ratio={prefix_ratio}, seed={seed}, URLs={urls}"
f"Running benchmark with prefix_ratio={prefix_ratio}, seed={seed}, url={url}"
)
# If single URL, maintain existing behavior
if isinstance(urls, str):
urls = [urls]
if len(urls) == 1:
artifact_dir = f"{output_dir}/prefix_ratio_{prefix_ratio}_seed_{seed}"
os.makedirs(artifact_dir, exist_ok=True)
return run_benchmark_single_url(
aiperf_cmd = get_aiperf_cmd(
model,
tokenizer, # Pass tokenizer parameter
tokenizer,
prefix_ratio,
isl,
osl,
......@@ -236,95 +137,28 @@ def run_benchmark(
seed,
num_prefix_prompts,
artifact_dir,
urls[0],
use_expected_osl,
)
# Multiple URLs: split requests and concurrency
num_urls = len(urls)
base_requests_per_url = requests // num_urls
remainder_requests = requests % num_urls
base_concurrency_per_url = max(1, concurrency // num_urls)
# Launch parallel processes
processes = []
artifact_dirs = []
for i, url in enumerate(urls):
# Distribute remainder requests to first few URLs
url_requests = base_requests_per_url + (1 if i < remainder_requests else 0)
artifact_dir = f"{output_dir}/prefix_ratio_{prefix_ratio}_seed_{seed}_url_{i}"
os.makedirs(artifact_dir, exist_ok=True)
artifact_dirs.append(artifact_dir)
aiperf_cmd = get_aiperf_cmd(
model,
tokenizer, # Pass tokenizer parameter
prefix_ratio,
isl,
osl,
url_requests,
base_concurrency_per_url,
seed,
num_prefix_prompts,
artifact_dir,
url,
use_expected_osl,
)
logger.info(f"Launching process for URL {url}: {' '.join(aiperf_cmd)}")
# Run process without capturing output - let it stream to terminal
process = subprocess.Popen(aiperf_cmd)
processes.append((process, url, artifact_dir))
# Wait for all processes to complete and collect results
results: List[Optional[Dict]] = []
for process, url, artifact_dir in processes:
return_code = process.wait()
if return_code == 0:
logger.info(f"AIPerf completed successfully for URL {url}")
logger.info(f"Command: {' '.join(aiperf_cmd)}")
try:
aiperf_result = get_aiperf_result(artifact_dir)
results.append(aiperf_result)
except Exception as e:
logger.error(f"Failed to get results for URL {url}: {e}")
results.append(None)
else:
logger.error(f"AIPerf failed for URL {url} with error code: {return_code}")
results.append(None)
# Aggregate results
return aggregate_results(results)
subprocess.run(aiperf_cmd, check=True)
logger.info("AIPerf profiling completed successfully")
return get_aiperf_result(artifact_dir)
except subprocess.CalledProcessError as e:
logger.error(f"AIPerf failed with error code: {e.returncode}")
return None
def main():
parser = argparse.ArgumentParser(
description="Benchmark prefix ratios and plot results"
)
parser.add_argument(
"--model",
type=str,
default="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
help="Model name",
)
parser.add_argument(
"--tokenizer",
type=str,
default="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
help="Tokenizer name (defaults to model)",
)
parser.add_argument(
"--url",
type=str,
nargs="+", # Accept multiple URLs
default=["http://localhost:8000"],
# default=["http://localhost:8090", "http://localhost:8090"],
help="Server URL(s). Can specify multiple URLs for parallel benchmarking",
)
add_common_args(parser)
parser.add_argument(
"--output-dir",
type=str,
......@@ -336,7 +170,6 @@ def main():
parser.add_argument("--osl", type=int, default=200, help="Output sequence length")
parser.add_argument("--requests", type=int, default=200, help="Number of requests")
parser.add_argument("--concurrency", type=int, default=20, help="Concurrency level")
parser.add_argument("--seed", type=int, default=0, help="Initial random seed")
parser.add_argument(
"--prefix-ratios",
type=float,
......@@ -344,13 +177,9 @@ def main():
default=[0.1, 0.3, 0.5, 0.7, 0.9],
help="List of prefix ratios to test",
)
parser.add_argument(
"--use-expected-osl",
action="store_true",
help="Pass expected_output_tokens to nvext for router tracking",
)
args = parser.parse_args()
resolve_tokenizer(args)
# Create output directory
os.makedirs(args.output_dir, exist_ok=True)
......@@ -379,7 +208,7 @@ def main():
current_seed,
args.num_prefix_prompts,
args.output_dir,
args.url, # Now passing list of URLs
args.url,
args.use_expected_osl,
)
......
......@@ -5,23 +5,20 @@
import argparse
import json
import logging
import os
import subprocess
import numpy as np
from common import (
DEFAULT_MOONCAKE_BLOCK_SIZE,
add_common_args,
get_common_aiperf_flags,
resolve_tokenizer,
setup_logger,
)
from prefix_data_generator.synthesizer import Synthesizer
# Setup logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger = setup_logger(__name__)
def get_aiperf_cmd_for_trace(
......@@ -30,20 +27,16 @@ def get_aiperf_cmd_for_trace(
input_dataset,
artifact_dir,
seed,
block_size,
url="http://localhost:8888",
):
return [
cmd = [
"aiperf",
"profile",
"--model",
model,
"--tokenizer",
tokenizer,
"--endpoint-type",
"chat",
"--endpoint",
"v1/chat/completions",
"--streaming",
"--url",
url,
"--input-file",
......@@ -51,15 +44,15 @@ def get_aiperf_cmd_for_trace(
"--custom-dataset-type",
"mooncake_trace",
"--fixed-schedule-auto-offset",
"--prompt-input-tokens-block-size",
str(block_size),
"--random-seed",
str(seed),
"--artifact-dir",
artifact_dir,
"-H",
"Authorization: Bearer NOT USED",
"-H",
"Accept: text/event-stream",
]
cmd.extend(get_common_aiperf_flags())
return cmd
def run_benchmark_with_trace(
......@@ -69,6 +62,7 @@ def run_benchmark_with_trace(
artifact_dir,
url,
seed,
block_size,
):
"""Run aiperf benchmark with a trace dataset"""
aiperf_cmd = get_aiperf_cmd_for_trace(
......@@ -77,6 +71,7 @@ def run_benchmark_with_trace(
trace_dataset,
artifact_dir,
seed,
block_size,
url,
)
......@@ -100,25 +95,9 @@ def main():
description="Benchmark with real or synthesized mooncake-style trace data"
)
# Model and server configuration
parser.add_argument(
"--model",
type=str,
default="deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
help="Model name",
)
parser.add_argument(
"--tokenizer",
type=str,
default=None,
help="Tokenizer name (defaults to model)",
)
parser.add_argument(
"--url",
type=str,
default="http://localhost:8000",
help="Server URL",
)
# Common arguments
add_common_args(parser)
parser.add_argument(
"--output-dir",
type=str,
......@@ -190,26 +169,12 @@ def main():
parser.add_argument(
"--block-size",
type=int,
default=512,
help="Block size for prefilling and decoding (default: 512)",
)
parser.add_argument(
"--seed",
type=int,
default=0,
help="Random seed for reproducibility (default: 0)",
)
parser.add_argument(
"--use-expected-osl",
action="store_true",
help="Pass expected_output_tokens to nvext for router tracking",
default=DEFAULT_MOONCAKE_BLOCK_SIZE,
help=f"Block size for prefilling and decoding (default: {DEFAULT_MOONCAKE_BLOCK_SIZE})",
)
args = parser.parse_args()
# Use tokenizer from model if not specified
if args.tokenizer is None:
args.tokenizer = args.model
resolve_tokenizer(args)
# Create output directory
os.makedirs(args.output_dir, exist_ok=True)
......@@ -350,6 +315,7 @@ def main():
artifact_dir,
args.url,
args.seed,
args.block_size,
)
logger.info(f"Results saved to: {artifact_dir}")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment