"lib/llm/vscode:/vscode.git/clone" did not exist on "db8e52f2c3579ff28540e4072fc16e528c48d714"
Unverified Commit 3aa30778 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix: more fixes for stable router benchmarking (#3264)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 97a98546
......@@ -250,7 +250,12 @@ class Synthesizer:
return path + unique_user_prompt, True, context_len
def synthesize_requests(
self, num_requests: int, input_len_filter: Optional[int] = None
self,
num_requests: int,
max_isl: Optional[int] = None,
min_isl: Optional[int] = None,
min_osl: Optional[int] = None,
max_osl: Optional[int] = None,
) -> list[dict[str, Any]]:
timestamp = 0
......@@ -270,8 +275,17 @@ class Synthesizer:
input_len = len(path) * self.block_size
output_len = self.output_lens_sampler.sample()
if input_len_filter is not None and input_len > input_len_filter:
# Apply filtering for ISL
if max_isl is not None and input_len > max_isl:
continue
if min_isl is not None and input_len < min_isl:
continue
# Apply clipping for OSL (not filtering)
if min_osl is not None and output_len < min_osl:
output_len = min_osl
if max_osl is not None and output_len > max_osl:
output_len = max_osl
requests.append(
{
"timestamp": int(timestamp),
......@@ -379,6 +393,24 @@ def main():
default=None,
help="Maximum input sequence length to include in output (default: None, no filtering)",
)
parser.add_argument(
"--min-isl",
type=int,
default=None,
help="Minimum input sequence length to include in output (default: None, no filtering)",
)
parser.add_argument(
"--min-osl",
type=int,
default=None,
help="Minimum output sequence length - clips values below this threshold (default: None, no clipping)",
)
parser.add_argument(
"--max-osl",
type=int,
default=None,
help="Maximum output sequence length - clips values above this threshold (default: None, no clipping)",
)
parser.add_argument(
"--block-size",
type=int,
......@@ -395,12 +427,20 @@ def main():
dataset_file = Path(args.input_file).resolve()
if args.output_file is None:
output_file = dataset_file.with_stem(
f"{dataset_file.stem}_synth"
+ f"_{int(args.prefix_len_multiplier)}x{args.prefix_root_multiplier}+{args.prompt_len_multiplier}"
+ f"_speedup{args.speedup_ratio}"
+ f"_maxisl{args.max_isl}"
)
suffix_parts = [
f"{dataset_file.stem}_synth",
f"{int(args.prefix_len_multiplier)}x{args.prefix_root_multiplier}+{args.prompt_len_multiplier}",
f"speedup{args.speedup_ratio}",
]
if args.max_isl is not None:
suffix_parts.append(f"maxisl{args.max_isl}")
if args.min_isl is not None:
suffix_parts.append(f"minisl{args.min_isl}")
if args.min_osl is not None:
suffix_parts.append(f"minosl{args.min_osl}")
if args.max_osl is not None:
suffix_parts.append(f"maxosl{args.max_osl}")
output_file = dataset_file.with_stem("_".join(suffix_parts))
else:
output_file = Path(args.output_file).resolve()
......@@ -415,7 +455,13 @@ def main():
)
print("synthesizing requests...", flush=True)
requests = synthesizer.synthesize_requests(args.num_requests, args.max_isl)
requests = synthesizer.synthesize_requests(
args.num_requests,
max_isl=args.max_isl,
min_isl=args.min_isl,
min_osl=args.min_osl,
max_osl=args.max_osl,
)
print(f"synthesized {len(requests)} requests")
# Print statistics in a single table with metrics as rows and statistics as columns
......
......@@ -196,10 +196,10 @@ python prefix_ratio_benchmark.py --output-dir results/experiment1
Instead of synthetic benchmarks with controlled prefix ratios, you can benchmark using real trace data in [mooncake-style format](https://github.com/kvcache-ai/Mooncake/blob/d21da178bae8db9651cf18a76824c084145fc725/mooncake_trace.jsonl). This approach uses actual request patterns from production traces, potentially modified with synthesis parameters.
```bash
python real_data_benchmark.py --input-file mooncake_trace.jsonl
python real_data_benchmark.py --input-dataset mooncake_trace.jsonl
```
The script can apply various modifications on top of the original trace file to simulate different scenarios and workload conditions. This script accepts the same synthesis parameters as the [prefix data generator](../prefix_data_generator/README.md):
The script can apply various modifications on top of the original trace dataset to simulate different scenarios and workload conditions. This script accepts the same synthesis parameters as the [prefix data generator](../prefix_data_generator/README.md):
**Key parameters:**
- `--num-requests`: Number of requests to synthesize from the trace (default: use all)
......@@ -212,19 +212,26 @@ The script can apply various modifications on top of the original trace file to
Examples:
```bash
# Use original trace file as-is (no synthesis parameters specified)
python real_data_benchmark.py --input-file trace.jsonl
# Use original trace dataset as-is (no synthesis parameters specified)
python real_data_benchmark.py --input-dataset trace.jsonl
# Speed up request rate by 2x and use only first 1000 requests
python real_data_benchmark.py --input-file trace.jsonl --num-requests 1000 --speedup-ratio 2.0
python real_data_benchmark.py --input-dataset trace.jsonl --num-requests 1000 --speedup-ratio 2.0
# Double prefix lengths to test cache efficiency with longer shared contexts
python real_data_benchmark.py --input-file trace.jsonl --prefix-len-multiplier 2.0
python real_data_benchmark.py --input-dataset trace.jsonl --prefix-len-multiplier 2.0
# Create more diverse workload by replicating prefix tree 3 times
python real_data_benchmark.py --input-file trace.jsonl --prefix-root-multiplier 3
python real_data_benchmark.py --input-dataset trace.jsonl --prefix-root-multiplier 3
```
> [!Note]
> At the time of writing this documentation, you may need to install the latest genai-perf from the main source branch to loadgen on the trace files:
> ```bash
> pip install git+https://github.com/triton-inference-server/perf_analyzer.git#subdirectory=genai-perf
> ```
> However, by the time of release, the genai-perf version included in the vLLM runtime container should be up to date enough to use as-is.
## Troubleshooting
1. **Workers fail to start**: Check CUDA_VISIBLE_DEVICES and GPU availability
......
......@@ -27,12 +27,11 @@ logger.addHandler(console_handler)
def get_genai_perf_cmd_for_trace(
model,
tokenizer,
input_file,
input_dataset,
artifact_dir,
seed,
url="http://localhost:8888",
):
"""Build genai-perf command for trace file input"""
return [
"genai-perf",
"profile",
......@@ -48,7 +47,9 @@ def get_genai_perf_cmd_for_trace(
"--url",
url,
"--input-file",
input_file,
f"payload:{input_dataset}",
"--fixed-schedule",
"True",
"--random-seed",
str(seed),
"--artifact-dir",
......@@ -67,22 +68,22 @@ def get_genai_perf_cmd_for_trace(
def run_benchmark_with_trace(
model,
tokenizer,
trace_file,
trace_dataset,
artifact_dir,
url,
seed,
):
"""Run genai-perf benchmark with a trace file"""
"""Run genai-perf benchmark with a trace dataset"""
genai_perf_cmd = get_genai_perf_cmd_for_trace(
model,
tokenizer,
trace_file,
trace_dataset,
artifact_dir,
seed,
url,
)
logger.info(f"Running genai-perf with trace file: {trace_file}")
logger.info(f"Running genai-perf with trace dataset: {trace_dataset}")
logger.info(f"Command: {' '.join(genai_perf_cmd)}")
try:
......@@ -128,12 +129,12 @@ def main():
help="Output directory for results",
)
# Trace file and synthesis configuration (similar to synthesizer.py)
# Trace dataset and synthesis configuration (similar to synthesizer.py)
parser.add_argument(
"--input-file",
"--input-dataset",
type=str,
default="mooncake_trace.jsonl",
help="Path to the input mooncake-style trace file",
help="Path to the input mooncake-style trace dataset file",
)
parser.add_argument(
"--num-requests",
......@@ -171,6 +172,24 @@ def main():
default=None,
help="Maximum input sequence length to include in output (default: None, no filtering)",
)
parser.add_argument(
"--min-isl",
type=int,
default=None,
help="Minimum input sequence length to include in output (default: None, no filtering)",
)
parser.add_argument(
"--min-osl",
type=int,
default=None,
help="Minimum output sequence length - clips values below this threshold (default: None, no clipping)",
)
parser.add_argument(
"--max-osl",
type=int,
default=None,
help="Maximum output sequence length - clips values above this threshold (default: None, no clipping)",
)
parser.add_argument(
"--block-size",
type=int,
......@@ -202,18 +221,21 @@ def main():
or args.prefix_root_multiplier != 1
or args.prompt_len_multiplier != 1.0
or args.max_isl is not None
or args.min_isl is not None
or args.min_osl is not None
or args.max_osl is not None
)
if not needs_synthesis:
# No synthesis needed, use original file
trace_file_path = args.input_file
# No synthesis needed, use original dataset
trace_dataset_path = args.input_dataset
logger.info(
f"Using original trace file (no synthesis parameters modified): {trace_file_path}"
f"Using original trace dataset (no synthesis parameters modified): {trace_dataset_path}"
)
else:
# Generate synthetic data based on input file
# Generate synthetic data based on input dataset
logger.info("Generating synthetic trace data...")
logger.info(f" Base file: {args.input_file}")
logger.info(f" Base dataset: {args.input_dataset}")
logger.info(
f" Num requests: {args.num_requests if args.num_requests else 'all'}"
)
......@@ -221,7 +243,18 @@ def main():
logger.info(f" Prefix len multiplier: {args.prefix_len_multiplier}")
logger.info(f" Prefix root multiplier: {args.prefix_root_multiplier}")
logger.info(f" Prompt len multiplier: {args.prompt_len_multiplier}")
logger.info(f" Max ISL: {args.max_isl if args.max_isl else 'no limit'}")
logger.info(
f" Max ISL: {args.max_isl if args.max_isl else 'no limit'} (filtering)"
)
logger.info(
f" Min ISL: {args.min_isl if args.min_isl else 'no limit'} (filtering)"
)
logger.info(
f" Min OSL: {args.min_osl if args.min_osl else 'no clipping'} (clipping)"
)
logger.info(
f" Max OSL: {args.max_osl if args.max_osl else 'no clipping'} (clipping)"
)
logger.info(f" Random seed: {args.seed}")
# Set random seed for reproducibility
......@@ -229,7 +262,7 @@ def main():
# Create synthesizer
synthesizer = Synthesizer(
args.input_file,
args.input_dataset,
block_size=args.block_size,
speedup_ratio=args.speedup_ratio,
prefix_len_multiplier=args.prefix_len_multiplier,
......@@ -239,36 +272,42 @@ def main():
# Determine number of requests
if args.num_requests is None:
# Count requests in original file
with open(args.input_file, "r") as f:
# Count requests in original dataset
with open(args.input_dataset, "r") as f:
num_requests = sum(1 for _ in f)
logger.info(f"Using all {num_requests} requests from input file")
logger.info(f"Using all {num_requests} requests from input dataset")
else:
num_requests = args.num_requests
# Generate synthetic requests
requests = synthesizer.synthesize_requests(num_requests, args.max_isl)
requests = synthesizer.synthesize_requests(
num_requests,
max_isl=args.max_isl,
min_isl=args.min_isl,
min_osl=args.min_osl,
max_osl=args.max_osl,
)
logger.info(f"Generated {len(requests)} synthetic requests")
# Save synthetic data to a permanent file in output directory
synthetic_trace_filename = "synthetic_trace.jsonl"
trace_file_path = os.path.join(args.output_dir, synthetic_trace_filename)
trace_dataset_path = os.path.join(args.output_dir, synthetic_trace_filename)
# Write synthetic data to file
with open(trace_file_path, "w") as f:
with open(trace_dataset_path, "w") as f:
for request in requests:
f.write(json.dumps(request) + "\n")
logger.info(f"Synthetic trace data saved to: {trace_file_path}")
logger.info(f"Synthetic trace data saved to: {trace_dataset_path}")
# Run benchmark with the trace file
# Run benchmark with the trace dataset
artifact_dir = os.path.join(args.output_dir, "genai_perf_artifacts")
os.makedirs(artifact_dir, exist_ok=True)
run_benchmark_with_trace(
args.model,
args.tokenizer,
trace_file_path,
trace_dataset_path,
artifact_dir,
args.url,
args.seed,
......
......@@ -157,8 +157,8 @@ def parse_args():
parser.add_argument(
"--router-snapshot-threshold",
type=int,
default=10000,
help="KV Router: Number of messages in stream before triggering a snapshot. Defaults to 10000.",
default=1000000,
help="KV Router: Number of messages in stream before triggering a snapshot. Defaults to 1000000.",
)
parser.add_argument(
"--router-reset-states",
......
......@@ -27,7 +27,7 @@ The main KV-aware routing arguments:
- `--router-reset-states`: When specified, resets the router state on startup by clearing both the JetStream event stream and NATS object store, starting with a fresh state. By default (when this flag is not provided), the router persists state across restarts, downloading any available snapshot from NATS object store and continuing to consume events from where it left off. This enables routers to maintain KV cache awareness across restarts. **Warning**: Using `--router-reset-states` can bring existing router replicas into an inconsistent state. Only use this flag when launching the first router replica in a component, or consider using a different namespace/component for a clean slate.
- `--router-snapshot-threshold`: Sets the number of messages in the JetStream before triggering a snapshot. When the message count exceeds this threshold, a router will attempt to purge acknowledged messages from the stream and create a snapshot of the current radix tree state in NATs object store. Defaults to 10000. This helps manage stream size and provides faster initialization for routers that restart.
- `--router-snapshot-threshold`: Sets the number of messages in the JetStream before triggering a snapshot. When the message count exceeds this threshold, a router will attempt to purge acknowledged messages from the stream and create a snapshot of the current radix tree state in NATs object store. Defaults to 1000000. This helps manage stream size and provides faster initialization for routers that restart.
>[!Note]
> State persistence is only available when KV events are enabled (default). When using `--no-kv-events` with `ApproxKvIndexer`, state persistence is not currently supported.
......
......@@ -42,7 +42,7 @@ impl KvRouterConfig {
#[pymethods]
impl KvRouterConfig {
#[new]
#[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, router_replica_sync=false, router_track_active_blocks=true, router_snapshot_threshold=10000, router_reset_states=false))]
#[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, router_replica_sync=false, router_track_active_blocks=true, router_snapshot_threshold=1000000, router_reset_states=false))]
fn new(
overlap_score_weight: f64,
router_temperature: f64,
......
......@@ -119,7 +119,7 @@ impl Default for KvRouterConfig {
use_kv_events: true,
router_replica_sync: false,
router_track_active_blocks: true,
router_snapshot_threshold: Some(10000),
router_snapshot_threshold: Some(1000000),
router_reset_states: false,
}
}
......
......@@ -520,7 +520,7 @@ impl NatsQueue {
.and_then(|s| s.parse::<u64>().ok())
.map(time::Duration::from_secs)
.unwrap_or_else(|| time::Duration::from_secs(60 * 60));
// Always try to create the stream (removes the race condition)
let stream_config = jetstream::stream::Config {
name: self.stream_name.clone(),
subjects: vec![self.subject.clone()],
......@@ -528,40 +528,26 @@ impl NatsQueue {
..Default::default()
};
match client.jetstream().create_stream(stream_config).await {
Ok(_) => {
log::debug!("Successfully created NATS stream {}", self.stream_name);
}
Err(e) => {
// Log warning but continue - stream likely already exists
log::debug!(
"Failed to create NATS stream '{}': {e}. Stream likely already exists, continuing...",
self.stream_name
);
// Get or create the stream
let stream = client
.jetstream()
.get_or_create_stream(stream_config)
.await?;
// If reset_stream is true, purge all messages from the newly created stream
log::debug!("Stream {} is ready", self.stream_name);
// If reset_stream is true, purge all messages from the stream
if reset_stream {
match client
.jetstream()
.get_stream(&self.stream_name)
.await?
.purge()
.await
{
match stream.purge().await {
Ok(purge_info) => {
log::debug!(
log::info!(
"Successfully purged {} messages from NATS stream {}",
purge_info.purged,
self.stream_name
);
}
Err(e) => {
log::warn!(
"Failed to purge NATS stream '{}': {e}",
self.stream_name
);
}
}
log::warn!("Failed to purge NATS stream '{}': {e}", self.stream_name);
}
}
}
......@@ -574,7 +560,6 @@ impl NatsQueue {
..Default::default()
};
let stream = client.jetstream().get_stream(&self.stream_name).await?;
let subscriber = stream.create_consumer(consumer_config).await?;
self.subscriber = Some(subscriber);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment