Unverified Commit 4292e3b8 authored by Cyrus Leung's avatar Cyrus Leung Committed by GitHub
Browse files

[Benchmark] Improve UX of sweep scripts (#35600)


Signed-off-by: default avatarDarkLight1337 <tlleungac@connect.ust.hk>
parent 24d6ea8a
...@@ -72,7 +72,7 @@ Follow these steps to run the script: ...@@ -72,7 +72,7 @@ Follow these steps to run the script:
] ]
``` ```
5. Determine where you want to save the results, and pass that to `--output-dir`. 5. Set `--output-dir` and optionally `--experiment-name` to control where to save the results.
Example command: Example command:
...@@ -82,7 +82,8 @@ vllm bench sweep serve \ ...@@ -82,7 +82,8 @@ vllm bench sweep serve \
--bench-cmd 'vllm bench serve --model meta-llama/Llama-2-7b-chat-hf --backend vllm --endpoint /v1/completions --dataset-name sharegpt --dataset-path benchmarks/ShareGPT_V3_unfiltered_cleaned_split.json' \ --bench-cmd 'vllm bench serve --model meta-llama/Llama-2-7b-chat-hf --backend vllm --endpoint /v1/completions --dataset-name sharegpt --dataset-path benchmarks/ShareGPT_V3_unfiltered_cleaned_split.json' \
--serve-params benchmarks/serve_hparams.json \ --serve-params benchmarks/serve_hparams.json \
--bench-params benchmarks/bench_hparams.json \ --bench-params benchmarks/bench_hparams.json \
-o benchmarks/results --output-dir benchmarks/results \
--experiment-name demo
``` ```
By default, each parameter combination is benchmarked 3 times to make the results more reliable. You can adjust the number of runs by setting `--num-runs`. By default, each parameter combination is benchmarked 3 times to make the results more reliable. You can adjust the number of runs by setting `--num-runs`.
...@@ -118,7 +119,8 @@ vllm bench sweep serve_workload \ ...@@ -118,7 +119,8 @@ vllm bench sweep serve_workload \
--serve-params benchmarks/serve_hparams.json \ --serve-params benchmarks/serve_hparams.json \
--bench-params benchmarks/bench_hparams.json \ --bench-params benchmarks/bench_hparams.json \
--num-runs 1 \ --num-runs 1 \
-o benchmarks/results --output-dir benchmarks/results \
--experiment-name demo
``` ```
The algorithm for exploring different workload levels can be summarized as follows: The algorithm for exploring different workload levels can be summarized as follows:
...@@ -186,7 +188,8 @@ vllm bench sweep startup \ ...@@ -186,7 +188,8 @@ vllm bench sweep startup \
--startup-cmd 'vllm bench startup --model Qwen/Qwen3-0.6B' \ --startup-cmd 'vllm bench startup --model Qwen/Qwen3-0.6B' \
--serve-params benchmarks/serve_hparams.json \ --serve-params benchmarks/serve_hparams.json \
--startup-params benchmarks/startup_hparams.json \ --startup-params benchmarks/startup_hparams.json \
-o benchmarks/results --output-dir benchmarks/results \
--experiment-name demo
``` ```
!!! important !!! important
...@@ -204,11 +207,10 @@ Control the variables to plot via `--var-x` and `--var-y`, optionally applying ` ...@@ -204,11 +207,10 @@ Control the variables to plot via `--var-x` and `--var-y`, optionally applying `
Example commands for visualizing [Workload Explorer](#workload-explorer) results: Example commands for visualizing [Workload Explorer](#workload-explorer) results:
```bash ```bash
# Name of the directory that stores the results EXPERIMENT_DIR=${1:-"benchmarks/results/demo"}
TIMESTAMP=$1
# Latency increases as the workload increases # Latency increases as the workload increases
vllm bench sweep plot benchmarks/results/$TIMESTAMP \ vllm bench sweep plot $EXPERIMENT_DIR \
--var-x max_concurrency \ --var-x max_concurrency \
--var-y median_ttft_ms \ --var-y median_ttft_ms \
--col-by _benchmark_name \ --col-by _benchmark_name \
...@@ -216,7 +218,7 @@ vllm bench sweep plot benchmarks/results/$TIMESTAMP \ ...@@ -216,7 +218,7 @@ vllm bench sweep plot benchmarks/results/$TIMESTAMP \
--fig-name latency_curve --fig-name latency_curve
# Throughput saturates as workload increases # Throughput saturates as workload increases
vllm bench sweep plot benchmarks/results/$TIMESTAMP \ vllm bench sweep plot $EXPERIMENT_DIR \
--var-x max_concurrency \ --var-x max_concurrency \
--var-y total_token_throughput \ --var-y total_token_throughput \
--col-by _benchmark_name \ --col-by _benchmark_name \
...@@ -224,7 +226,7 @@ vllm bench sweep plot benchmarks/results/$TIMESTAMP \ ...@@ -224,7 +226,7 @@ vllm bench sweep plot benchmarks/results/$TIMESTAMP \
--fig-name throughput_curve --fig-name throughput_curve
# Tradeoff between latency and throughput # Tradeoff between latency and throughput
vllm bench sweep plot benchmarks/results/$TIMESTAMP \ vllm bench sweep plot $EXPERIMENT_DIR \
--var-x total_token_throughput \ --var-x total_token_throughput \
--var-y median_ttft_ms \ --var-y median_ttft_ms \
--col-by _benchmark_name \ --col-by _benchmark_name \
...@@ -249,7 +251,9 @@ Higher concurrency or batch size can raise GPU efficiency (per-GPU), but can add ...@@ -249,7 +251,9 @@ Higher concurrency or batch size can raise GPU efficiency (per-GPU), but can add
Example: Example:
```bash ```bash
vllm bench sweep plot_pareto benchmarks/results/<timestamp> \ EXPERIMENT_DIR=${1:-"benchmarks/results/demo"}
vllm bench sweep plot_pareto $EXPERIMENT_DIR \
--label-by max_concurrency,tensor_parallel_size,pipeline_parallel_size --label-by max_concurrency,tensor_parallel_size,pipeline_parallel_size
``` ```
......
...@@ -499,7 +499,7 @@ class SweepPlotArgs: ...@@ -499,7 +499,7 @@ class SweepPlotArgs:
@classmethod @classmethod
def from_cli_args(cls, args: argparse.Namespace): def from_cli_args(cls, args: argparse.Namespace):
output_dir = Path(args.OUTPUT_DIR) output_dir = Path(args.EXPERIMENT_DIR)
if not output_dir.exists(): if not output_dir.exists():
raise ValueError(f"No parameter sweep results under {output_dir}") raise ValueError(f"No parameter sweep results under {output_dir}")
...@@ -531,11 +531,9 @@ class SweepPlotArgs: ...@@ -531,11 +531,9 @@ class SweepPlotArgs:
@classmethod @classmethod
def add_cli_args(cls, parser: argparse.ArgumentParser) -> argparse.ArgumentParser: def add_cli_args(cls, parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
parser.add_argument( parser.add_argument(
"OUTPUT_DIR", "EXPERIMENT_DIR",
type=str, type=str,
default="results", help="The directory containing the sweep results to plot.",
help="The directory containing the results to plot, "
"i.e., the `--output-dir` argument to the parameter sweep script.",
) )
parser.add_argument( parser.add_argument(
"--fig-dir", "--fig-dir",
......
...@@ -325,7 +325,7 @@ class SweepPlotParetoArgs: ...@@ -325,7 +325,7 @@ class SweepPlotParetoArgs:
@classmethod @classmethod
def from_cli_args(cls, args: argparse.Namespace): def from_cli_args(cls, args: argparse.Namespace):
output_dir = Path(args.OUTPUT_DIR) output_dir = Path(args.EXPERIMENT_DIR)
if not output_dir.exists(): if not output_dir.exists():
raise ValueError(f"No parameter sweep results under {output_dir}") raise ValueError(f"No parameter sweep results under {output_dir}")
...@@ -342,9 +342,8 @@ class SweepPlotParetoArgs: ...@@ -342,9 +342,8 @@ class SweepPlotParetoArgs:
@classmethod @classmethod
def add_cli_args(cls, parser: argparse.ArgumentParser): def add_cli_args(cls, parser: argparse.ArgumentParser):
parser.add_argument( parser.add_argument(
"OUTPUT_DIR", "EXPERIMENT_DIR",
type=str, type=str,
default="results",
help="The directory containing the sweep results to plot.", help="The directory containing the sweep results to plot.",
) )
parser.add_argument( parser.add_argument(
......
...@@ -4,6 +4,7 @@ import argparse ...@@ -4,6 +4,7 @@ import argparse
import contextlib import contextlib
import json import json
import shlex import shlex
from contextlib import contextmanager
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
...@@ -135,7 +136,7 @@ def run_benchmark( ...@@ -135,7 +136,7 @@ def run_benchmark(
def _get_comb_base_path( def _get_comb_base_path(
output_dir: Path, experiment_dir: Path,
serve_comb: ParameterSweepItem, serve_comb: ParameterSweepItem,
bench_comb: ParameterSweepItem, bench_comb: ParameterSweepItem,
*, *,
...@@ -149,7 +150,7 @@ def _get_comb_base_path( ...@@ -149,7 +150,7 @@ def _get_comb_base_path(
if extra_parts: if extra_parts:
parts.extend(extra_parts) parts.extend(extra_parts)
return output_dir / sanitize_filename("-".join(parts)) return experiment_dir / sanitize_filename("-".join(parts))
def _get_comb_run_path(base_path: Path, run_number: int | None): def _get_comb_run_path(base_path: Path, run_number: int | None):
...@@ -162,10 +163,10 @@ def _get_comb_run_path(base_path: Path, run_number: int | None): ...@@ -162,10 +163,10 @@ def _get_comb_run_path(base_path: Path, run_number: int | None):
def _comb_needs_server( def _comb_needs_server(
serve_comb: ParameterSweepItem, serve_comb: ParameterSweepItem,
bench_combs: ParameterSweep, bench_combs: ParameterSweep,
output_dir: Path, experiment_dir: Path,
): ):
for bench_comb in bench_combs: for bench_comb in bench_combs:
base_path = _get_comb_base_path(output_dir, serve_comb, bench_comb) base_path = _get_comb_base_path(experiment_dir, serve_comb, bench_comb)
if not _get_comb_run_path(base_path, run_number=None).exists(): if not _get_comb_run_path(base_path, run_number=None).exists():
return True return True
...@@ -179,11 +180,11 @@ def server_ctx( ...@@ -179,11 +180,11 @@ def server_ctx(
show_stdout: bool, show_stdout: bool,
serve_comb: ParameterSweepItem, serve_comb: ParameterSweepItem,
bench_params: ParameterSweep, bench_params: ParameterSweep,
output_dir: Path, experiment_dir: Path,
dry_run: bool, dry_run: bool,
server_ready_timeout: int = 300, server_ready_timeout: int = 300,
): ):
if not _comb_needs_server(serve_comb, bench_params, output_dir): if not _comb_needs_server(serve_comb, bench_params, experiment_dir):
return contextlib.nullcontext() return contextlib.nullcontext()
return run_server( return run_server(
...@@ -215,10 +216,10 @@ def run_comb( ...@@ -215,10 +216,10 @@ def run_comb(
*, *,
serve_comb: ParameterSweepItem, serve_comb: ParameterSweepItem,
bench_comb: ParameterSweepItem, bench_comb: ParameterSweepItem,
link_vars: list[tuple[str, str]],
base_path: Path, base_path: Path,
num_runs: int, num_runs: int,
dry_run: bool, dry_run: bool,
link_vars: list[tuple[str, str]],
): ):
if not _comb_is_valid(serve_comb, bench_comb, link_vars): if not _comb_is_valid(serve_comb, bench_comb, link_vars):
return None return None
...@@ -257,10 +258,10 @@ def run_combs( ...@@ -257,10 +258,10 @@ def run_combs(
server_ready_timeout: int, server_ready_timeout: int,
serve_params: ParameterSweep, serve_params: ParameterSweep,
bench_params: ParameterSweep, bench_params: ParameterSweep,
output_dir: Path, link_vars: list[tuple[str, str]],
experiment_dir: Path,
num_runs: int, num_runs: int,
dry_run: bool, dry_run: bool,
link_vars: list[tuple[str, str]],
): ):
all_data = list[dict[str, object]]() all_data = list[dict[str, object]]()
for serve_comb in serve_params: for serve_comb in serve_params:
...@@ -270,22 +271,22 @@ def run_combs( ...@@ -270,22 +271,22 @@ def run_combs(
show_stdout=show_stdout, show_stdout=show_stdout,
serve_comb=serve_comb, serve_comb=serve_comb,
bench_params=bench_params, bench_params=bench_params,
output_dir=output_dir, experiment_dir=experiment_dir,
dry_run=dry_run, dry_run=dry_run,
server_ready_timeout=server_ready_timeout, server_ready_timeout=server_ready_timeout,
) as server: ) as server:
for bench_comb in bench_params: for bench_comb in bench_params:
base_path = _get_comb_base_path(output_dir, serve_comb, bench_comb) base_path = _get_comb_base_path(experiment_dir, serve_comb, bench_comb)
comb_data = run_comb( comb_data = run_comb(
server, server,
bench_cmd, bench_cmd,
serve_comb=serve_comb, serve_comb=serve_comb,
bench_comb=bench_comb, bench_comb=bench_comb,
link_vars=link_vars,
base_path=base_path, base_path=base_path,
num_runs=num_runs, num_runs=num_runs,
dry_run=dry_run, dry_run=dry_run,
link_vars=link_vars,
) )
if comb_data is not None: if comb_data is not None:
...@@ -295,7 +296,7 @@ def run_combs( ...@@ -295,7 +296,7 @@ def run_combs(
return None return None
combined_df = pd.DataFrame.from_records(all_data) combined_df = pd.DataFrame.from_records(all_data)
combined_df.to_csv(output_dir / "summary.csv") combined_df.to_csv(experiment_dir / "summary.csv")
return combined_df return combined_df
...@@ -309,11 +310,12 @@ class SweepServeArgs: ...@@ -309,11 +310,12 @@ class SweepServeArgs:
server_ready_timeout: int server_ready_timeout: int
serve_params: ParameterSweep serve_params: ParameterSweep
bench_params: ParameterSweep bench_params: ParameterSweep
link_vars: list[tuple[str, str]]
output_dir: Path output_dir: Path
experiment_name: str
num_runs: int num_runs: int
dry_run: bool dry_run: bool
resume: str | None resume: bool
link_vars: list[tuple[str, str]]
parser_name: ClassVar[str] = "serve" parser_name: ClassVar[str] = "serve"
parser_help: ClassVar[str] = "Run vLLM server benchmark under multiple settings." parser_help: ClassVar[str] = "Run vLLM server benchmark under multiple settings."
...@@ -340,6 +342,11 @@ class SweepServeArgs: ...@@ -340,6 +342,11 @@ class SweepServeArgs:
link_vars = cls.parse_link_vars(args.link_vars) link_vars = cls.parse_link_vars(args.link_vars)
if args.experiment_name:
experiment_name = args.experiment_name
else:
experiment_name = datetime.now().strftime("%Y%m%d_%H%M%S")
num_runs = args.num_runs num_runs = args.num_runs
if num_runs < 1: if num_runs < 1:
raise ValueError("`num_runs` should be at least 1.") raise ValueError("`num_runs` should be at least 1.")
...@@ -351,11 +358,12 @@ class SweepServeArgs: ...@@ -351,11 +358,12 @@ class SweepServeArgs:
show_stdout=args.show_stdout, show_stdout=args.show_stdout,
serve_params=serve_params, serve_params=serve_params,
bench_params=bench_params, bench_params=bench_params,
link_vars=link_vars,
output_dir=Path(args.output_dir), output_dir=Path(args.output_dir),
experiment_name=experiment_name,
num_runs=num_runs, num_runs=num_runs,
dry_run=args.dry_run, dry_run=args.dry_run,
resume=args.resume, resume=args.resume,
link_vars=link_vars,
server_ready_timeout=args.server_ready_timeout, server_ready_timeout=args.server_ready_timeout,
) )
...@@ -392,6 +400,7 @@ class SweepServeArgs: ...@@ -392,6 +400,7 @@ class SweepServeArgs:
default=300, default=300,
help="Timeout in seconds to wait for the server to become ready.", help="Timeout in seconds to wait for the server to become ready.",
) )
parser.add_argument( parser.add_argument(
"--serve-params", "--serve-params",
type=str, type=str,
...@@ -402,6 +411,16 @@ class SweepServeArgs: ...@@ -402,6 +411,16 @@ class SweepServeArgs:
"If both `serve_params` and `bench_params` are given, " "If both `serve_params` and `bench_params` are given, "
"this script will iterate over their Cartesian product.", "this script will iterate over their Cartesian product.",
) )
parser.add_argument(
"--link-vars",
type=str,
default="",
help=(
"Comma-separated list of linked variables between serve and bench, "
"e.g. max_num_seqs=max_concurrency,max_model_len=random_input_len"
),
)
parser.add_argument( parser.add_argument(
"--bench-params", "--bench-params",
type=str, type=str,
...@@ -417,7 +436,15 @@ class SweepServeArgs: ...@@ -417,7 +436,15 @@ class SweepServeArgs:
"--output-dir", "--output-dir",
type=str, type=str,
default="results", default="results",
help="The directory to which results are written.", help="The main directory to which results are written.",
)
parser.add_argument(
"-e",
"--experiment-name",
type=str,
default=None,
help="The name of this experiment (defaults to current timestamp). "
"Results will be stored under `output_dir/experiment_name`.",
) )
parser.add_argument( parser.add_argument(
"--num-runs", "--num-runs",
...@@ -433,21 +460,10 @@ class SweepServeArgs: ...@@ -433,21 +460,10 @@ class SweepServeArgs:
) )
parser.add_argument( parser.add_argument(
"--resume", "--resume",
type=str, action="store_true",
default=None, help="Resume a previous execution of this script, i.e., only run "
help="Set this to the name of a directory under `output_dir` (which is a " "parameter combinations for which there are still no output files "
"timestamp) to resume a previous execution of this script, i.e., only run " "under `output_dir/experiment_name`.",
"parameter combinations for which there are still no output files.",
)
parser.add_argument(
"--link-vars",
type=str,
default="",
help=(
"Comma-separated list of linked variables between serve and bench, "
"e.g. max_num_seqs=max_concurrency,max_model_len=random_input_len"
),
) )
return parser return parser
...@@ -462,33 +478,52 @@ class SweepServeArgs: ...@@ -462,33 +478,52 @@ class SweepServeArgs:
pairs.append((a.strip(), b.strip())) pairs.append((a.strip(), b.strip()))
return pairs return pairs
def resolve_experiment_dir(self) -> Path:
experiment_dir = self.output_dir / self.experiment_name
def run_main(args: SweepServeArgs): if self.resume:
timestamp = args.resume or datetime.now().strftime("%Y%m%d_%H%M%S") if not experiment_dir.exists():
output_dir = args.output_dir / timestamp raise ValueError(f"Cannot resume from non-existent {experiment_dir=}")
else:
if experiment_dir.exists():
raise ValueError(f"Cannot overwrite existing {experiment_dir=}")
if args.resume and not output_dir.exists(): return experiment_dir
raise ValueError(f"Cannot resume from non-existent directory ({output_dir})")
@contextmanager
def run_ctx(self, experiment_dir: Path):
if self.dry_run:
yield
print(f"Experiment will be saved at: {experiment_dir}")
return
try: try:
yield
print(f"Experiment has been saved at: {experiment_dir}")
except BaseException as exc:
raise RuntimeError(
"The script was terminated early. Use `--resume` "
"to continue the script from its last checkpoint."
) from exc
def run_main(args: SweepServeArgs):
experiment_dir = args.resolve_experiment_dir()
with args.run_ctx(experiment_dir):
return run_combs( return run_combs(
serve_cmd=args.serve_cmd, serve_cmd=args.serve_cmd,
bench_cmd=args.bench_cmd, bench_cmd=args.bench_cmd,
link_vars=args.link_vars,
after_bench_cmd=args.after_bench_cmd, after_bench_cmd=args.after_bench_cmd,
show_stdout=args.show_stdout, show_stdout=args.show_stdout,
server_ready_timeout=args.server_ready_timeout, server_ready_timeout=args.server_ready_timeout,
serve_params=args.serve_params, serve_params=args.serve_params,
bench_params=args.bench_params, bench_params=args.bench_params,
output_dir=output_dir, experiment_dir=experiment_dir,
num_runs=args.num_runs, num_runs=args.num_runs,
dry_run=args.dry_run, dry_run=args.dry_run,
link_vars=args.link_vars,
) )
except BaseException as exc:
raise RuntimeError(
f"The script was terminated early. Use `--resume {timestamp}` "
f"to continue the script from its last checkpoint."
) from exc
def main(args: argparse.Namespace): def main(args: argparse.Namespace):
......
...@@ -3,7 +3,6 @@ ...@@ -3,7 +3,6 @@
import argparse import argparse
import math import math
from dataclasses import asdict, dataclass from dataclasses import asdict, dataclass
from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import ClassVar, Literal, get_args from typing import ClassVar, Literal, get_args
...@@ -59,10 +58,10 @@ def run_comb_workload( ...@@ -59,10 +58,10 @@ def run_comb_workload(
*, *,
serve_comb: ParameterSweepItem, serve_comb: ParameterSweepItem,
bench_comb: ParameterSweepItem, bench_comb: ParameterSweepItem,
output_dir: Path, link_vars: list[tuple[str, str]],
experiment_dir: Path,
num_runs: int, num_runs: int,
dry_run: bool, dry_run: bool,
link_vars: list[tuple[str, str]],
workload_var: WorkloadVariable, workload_var: WorkloadVariable,
workload_value: int, workload_value: int,
) -> list[dict[str, object]] | None: ) -> list[dict[str, object]] | None:
...@@ -73,15 +72,15 @@ def run_comb_workload( ...@@ -73,15 +72,15 @@ def run_comb_workload(
bench_cmd, bench_cmd,
serve_comb=serve_comb, serve_comb=serve_comb,
bench_comb=bench_comb_workload, bench_comb=bench_comb_workload,
link_vars=link_vars,
base_path=_get_comb_base_path( base_path=_get_comb_base_path(
output_dir, experiment_dir,
serve_comb, serve_comb,
bench_comb, bench_comb,
extra_parts=("WL-", f"{workload_var}={workload_value}"), extra_parts=("WL-", f"{workload_var}={workload_value}"),
), ),
num_runs=num_runs, num_runs=num_runs,
dry_run=dry_run, dry_run=dry_run,
link_vars=link_vars,
) )
...@@ -91,12 +90,12 @@ def explore_comb_workloads( ...@@ -91,12 +90,12 @@ def explore_comb_workloads(
*, *,
serve_comb: ParameterSweepItem, serve_comb: ParameterSweepItem,
bench_comb: ParameterSweepItem, bench_comb: ParameterSweepItem,
link_vars: list[tuple[str, str]],
workload_var: WorkloadVariable, workload_var: WorkloadVariable,
workload_iters: int, workload_iters: int,
output_dir: Path, experiment_dir: Path,
num_runs: int, num_runs: int,
dry_run: bool, dry_run: bool,
link_vars: list[tuple[str, str]],
): ):
print("[WL START]") print("[WL START]")
print(f"Serve parameters: {serve_comb.as_text() or '(None)'}") print(f"Serve parameters: {serve_comb.as_text() or '(None)'}")
...@@ -125,10 +124,10 @@ def explore_comb_workloads( ...@@ -125,10 +124,10 @@ def explore_comb_workloads(
bench_cmd, bench_cmd,
serve_comb=serve_comb, serve_comb=serve_comb,
bench_comb=bench_comb | {"max_concurrency": 1}, bench_comb=bench_comb | {"max_concurrency": 1},
output_dir=output_dir, link_vars=link_vars,
experiment_dir=experiment_dir,
num_runs=num_runs, num_runs=num_runs,
dry_run=dry_run, dry_run=dry_run,
link_vars=link_vars,
workload_var=workload_var, workload_var=workload_var,
workload_value=1, workload_value=1,
) )
...@@ -137,10 +136,10 @@ def explore_comb_workloads( ...@@ -137,10 +136,10 @@ def explore_comb_workloads(
bench_cmd, bench_cmd,
serve_comb=serve_comb, serve_comb=serve_comb,
bench_comb=bench_comb | {"max_concurrency": dataset_size}, bench_comb=bench_comb | {"max_concurrency": dataset_size},
output_dir=output_dir, link_vars=link_vars,
experiment_dir=experiment_dir,
num_runs=num_runs, num_runs=num_runs,
dry_run=dry_run, dry_run=dry_run,
link_vars=link_vars,
workload_var=workload_var, workload_var=workload_var,
workload_value=dataset_size, workload_value=dataset_size,
) )
...@@ -177,10 +176,10 @@ def explore_comb_workloads( ...@@ -177,10 +176,10 @@ def explore_comb_workloads(
bench_cmd, bench_cmd,
serve_comb=serve_comb, serve_comb=serve_comb,
bench_comb=bench_comb, bench_comb=bench_comb,
output_dir=output_dir, link_vars=link_vars,
experiment_dir=experiment_dir,
num_runs=num_runs, num_runs=num_runs,
dry_run=dry_run, dry_run=dry_run,
link_vars=link_vars,
workload_var=workload_var, workload_var=workload_var,
workload_value=inter_workload_value, workload_value=inter_workload_value,
) )
...@@ -201,12 +200,12 @@ def explore_combs_workloads( ...@@ -201,12 +200,12 @@ def explore_combs_workloads(
server_ready_timeout: int, server_ready_timeout: int,
serve_params: ParameterSweep, serve_params: ParameterSweep,
bench_params: ParameterSweep, bench_params: ParameterSweep,
link_vars: list[tuple[str, str]],
workload_var: WorkloadVariable, workload_var: WorkloadVariable,
workload_iters: int, workload_iters: int,
output_dir: Path, experiment_dir: Path,
num_runs: int, num_runs: int,
dry_run: bool, dry_run: bool,
link_vars: list[tuple[str, str]],
): ):
if any(bench_comb.has_param(workload_var) for bench_comb in bench_params): if any(bench_comb.has_param(workload_var) for bench_comb in bench_params):
raise ValueError( raise ValueError(
...@@ -223,7 +222,7 @@ def explore_combs_workloads( ...@@ -223,7 +222,7 @@ def explore_combs_workloads(
server_ready_timeout=server_ready_timeout, server_ready_timeout=server_ready_timeout,
serve_comb=serve_comb, serve_comb=serve_comb,
bench_params=bench_params, bench_params=bench_params,
output_dir=output_dir, experiment_dir=experiment_dir,
dry_run=dry_run, dry_run=dry_run,
) as server: ) as server:
for bench_comb in bench_params: for bench_comb in bench_params:
...@@ -232,12 +231,12 @@ def explore_combs_workloads( ...@@ -232,12 +231,12 @@ def explore_combs_workloads(
bench_cmd, bench_cmd,
serve_comb=serve_comb, serve_comb=serve_comb,
bench_comb=bench_comb, bench_comb=bench_comb,
link_vars=link_vars,
workload_var=workload_var, workload_var=workload_var,
workload_iters=workload_iters, workload_iters=workload_iters,
output_dir=output_dir, experiment_dir=experiment_dir,
num_runs=num_runs, num_runs=num_runs,
dry_run=dry_run, dry_run=dry_run,
link_vars=link_vars,
) )
if comb_data is not None: if comb_data is not None:
...@@ -247,7 +246,7 @@ def explore_combs_workloads( ...@@ -247,7 +246,7 @@ def explore_combs_workloads(
return None return None
combined_df = pd.DataFrame.from_records(all_data) combined_df = pd.DataFrame.from_records(all_data)
combined_df.to_csv(output_dir / "summary.csv") combined_df.to_csv(experiment_dir / "summary.csv")
return combined_df return combined_df
...@@ -298,13 +297,9 @@ class SweepServeWorkloadArgs(SweepServeArgs): ...@@ -298,13 +297,9 @@ class SweepServeWorkloadArgs(SweepServeArgs):
def run_main(args: SweepServeWorkloadArgs): def run_main(args: SweepServeWorkloadArgs):
timestamp = args.resume or datetime.now().strftime("%Y%m%d_%H%M%S") experiment_dir = args.resolve_experiment_dir()
output_dir = args.output_dir / timestamp
if args.resume and not output_dir.exists():
raise ValueError(f"Cannot resume from non-existent directory ({output_dir})")
try: with args.run_ctx(experiment_dir):
return explore_combs_workloads( return explore_combs_workloads(
serve_cmd=args.serve_cmd, serve_cmd=args.serve_cmd,
bench_cmd=args.bench_cmd, bench_cmd=args.bench_cmd,
...@@ -313,18 +308,13 @@ def run_main(args: SweepServeWorkloadArgs): ...@@ -313,18 +308,13 @@ def run_main(args: SweepServeWorkloadArgs):
server_ready_timeout=args.server_ready_timeout, server_ready_timeout=args.server_ready_timeout,
serve_params=args.serve_params, serve_params=args.serve_params,
bench_params=args.bench_params, bench_params=args.bench_params,
link_vars=args.link_vars,
workload_var=args.workload_var, workload_var=args.workload_var,
workload_iters=args.workload_iters, workload_iters=args.workload_iters,
output_dir=output_dir, experiment_dir=experiment_dir,
num_runs=args.num_runs, num_runs=args.num_runs,
dry_run=args.dry_run, dry_run=args.dry_run,
link_vars=args.link_vars,
) )
except BaseException as exc:
raise RuntimeError(
f"The script was terminated early. Use `--resume {timestamp}` "
f"to continue the script from its last checkpoint."
) from exc
def main(args: argparse.Namespace): def main(args: argparse.Namespace):
......
...@@ -4,6 +4,7 @@ import argparse ...@@ -4,6 +4,7 @@ import argparse
import json import json
import shlex import shlex
import subprocess import subprocess
from contextlib import contextmanager
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from functools import lru_cache from functools import lru_cache
...@@ -111,7 +112,7 @@ def _apply_output_json(cmd: list[str], output_path: Path) -> list[str]: ...@@ -111,7 +112,7 @@ def _apply_output_json(cmd: list[str], output_path: Path) -> list[str]:
def _get_comb_base_path( def _get_comb_base_path(
output_dir: Path, experiment_dir: Path,
serve_comb: ParameterSweepItem, serve_comb: ParameterSweepItem,
startup_comb: ParameterSweepItem, startup_comb: ParameterSweepItem,
) -> Path: ) -> Path:
...@@ -120,7 +121,8 @@ def _get_comb_base_path( ...@@ -120,7 +121,8 @@ def _get_comb_base_path(
parts.extend(("SERVE-", serve_comb.name)) parts.extend(("SERVE-", serve_comb.name))
if startup_comb: if startup_comb:
parts.extend(("STARTUP-", startup_comb.name)) parts.extend(("STARTUP-", startup_comb.name))
return output_dir / sanitize_filename("-".join(parts))
return experiment_dir / sanitize_filename("-".join(parts))
def _get_comb_run_path(base_path: Path, run_number: int | None) -> Path: def _get_comb_run_path(base_path: Path, run_number: int | None) -> Path:
...@@ -225,7 +227,7 @@ def run_combs( ...@@ -225,7 +227,7 @@ def run_combs(
*, *,
serve_params: ParameterSweep, serve_params: ParameterSweep,
startup_params: ParameterSweep, startup_params: ParameterSweep,
output_dir: Path, experiment_dir: Path,
num_runs: int, num_runs: int,
show_stdout: bool, show_stdout: bool,
dry_run: bool, dry_run: bool,
...@@ -233,7 +235,7 @@ def run_combs( ...@@ -233,7 +235,7 @@ def run_combs(
all_data = list[dict[str, object]]() all_data = list[dict[str, object]]()
for serve_comb in serve_params: for serve_comb in serve_params:
for startup_comb in startup_params: for startup_comb in startup_params:
base_path = _get_comb_base_path(output_dir, serve_comb, startup_comb) base_path = _get_comb_base_path(experiment_dir, serve_comb, startup_comb)
comb_data = run_comb( comb_data = run_comb(
startup_cmd, startup_cmd,
serve_comb=serve_comb, serve_comb=serve_comb,
...@@ -250,7 +252,7 @@ def run_combs( ...@@ -250,7 +252,7 @@ def run_combs(
return None return None
combined_df = pd.DataFrame.from_records(all_data) combined_df = pd.DataFrame.from_records(all_data)
combined_df.to_csv(output_dir / "summary.csv") combined_df.to_csv(experiment_dir / "summary.csv")
return combined_df return combined_df
...@@ -260,11 +262,11 @@ class SweepStartupArgs: ...@@ -260,11 +262,11 @@ class SweepStartupArgs:
serve_params: ParameterSweep serve_params: ParameterSweep
startup_params: ParameterSweep startup_params: ParameterSweep
output_dir: Path output_dir: Path
experiment_name: str
num_runs: int num_runs: int
show_stdout: bool show_stdout: bool
dry_run: bool dry_run: bool
resume: str | None resume: bool
strict_params: bool
parser_name: ClassVar[str] = "startup" parser_name: ClassVar[str] = "startup"
parser_help: ClassVar[str] = ( parser_help: ClassVar[str] = (
...@@ -286,13 +288,19 @@ class SweepStartupArgs: ...@@ -286,13 +288,19 @@ class SweepStartupArgs:
startup_params = ParameterSweep.from_records([{}]) startup_params = ParameterSweep.from_records([{}])
supported = _get_supported_startup_keys() supported = _get_supported_startup_keys()
strict_params = args.strict_params
serve_params = _filter_params( serve_params = _filter_params(
serve_params, supported=supported, strict=args.strict_params serve_params, supported=supported, strict=strict_params
) )
startup_params = _filter_params( startup_params = _filter_params(
startup_params, supported=supported, strict=args.strict_params startup_params, supported=supported, strict=strict_params
) )
if args.experiment_name:
experiment_name = args.experiment_name
else:
experiment_name = datetime.now().strftime("%Y%m%d_%H%M%S")
if args.num_runs < 1: if args.num_runs < 1:
raise ValueError("`num_runs` should be at least 1.") raise ValueError("`num_runs` should be at least 1.")
...@@ -301,11 +309,11 @@ class SweepStartupArgs: ...@@ -301,11 +309,11 @@ class SweepStartupArgs:
serve_params=serve_params, serve_params=serve_params,
startup_params=startup_params, startup_params=startup_params,
output_dir=Path(args.output_dir), output_dir=Path(args.output_dir),
experiment_name=experiment_name,
num_runs=args.num_runs, num_runs=args.num_runs,
show_stdout=args.show_stdout, show_stdout=args.show_stdout,
dry_run=args.dry_run, dry_run=args.dry_run,
resume=args.resume, resume=args.resume,
strict_params=args.strict_params,
) )
@classmethod @classmethod
...@@ -316,6 +324,7 @@ class SweepStartupArgs: ...@@ -316,6 +324,7 @@ class SweepStartupArgs:
default="vllm bench startup", default="vllm bench startup",
help="The command used to run the startup benchmark.", help="The command used to run the startup benchmark.",
) )
parser.add_argument( parser.add_argument(
"--serve-params", "--serve-params",
type=str, type=str,
...@@ -331,12 +340,27 @@ class SweepStartupArgs: ...@@ -331,12 +340,27 @@ class SweepStartupArgs:
help="Path to JSON file containing parameter combinations " help="Path to JSON file containing parameter combinations "
"for the `vllm bench startup` command.", "for the `vllm bench startup` command.",
) )
parser.add_argument(
"--strict-params",
action="store_true",
help="If set, unknown parameters in sweep files raise an error "
"instead of being ignored.",
)
parser.add_argument( parser.add_argument(
"-o", "-o",
"--output-dir", "--output-dir",
type=str, type=str,
default="results", default="results",
help="The directory to which results are written.", help="The main directory to which results are written.",
)
parser.add_argument(
"-e",
"--experiment-name",
type=str,
default=None,
help="The name of this experiment (defaults to current timestamp). "
"Results will be stored under `output_dir/experiment_name`.",
) )
parser.add_argument( parser.add_argument(
"--num-runs", "--num-runs",
...@@ -357,43 +381,56 @@ class SweepStartupArgs: ...@@ -357,43 +381,56 @@ class SweepStartupArgs:
) )
parser.add_argument( parser.add_argument(
"--resume", "--resume",
type=str,
default=None,
help="Set this to the name of a directory under `output_dir` (which is a "
"timestamp) to resume a previous execution of this script, i.e., only run "
"parameter combinations for which there are still no output files.",
)
parser.add_argument(
"--strict-params",
action="store_true", action="store_true",
help="If set, unknown parameters in sweep files raise an error " help="Resume a previous execution of this script, i.e., only run "
"instead of being ignored.", "parameter combinations for which there are still no output files "
"under `output_dir/experiment_name`.",
) )
return parser return parser
def resolve_experiment_dir(self) -> Path:
experiment_dir = self.output_dir / self.experiment_name
def run_main(args: SweepStartupArgs): if self.resume:
timestamp = args.resume or datetime.now().strftime("%Y%m%d_%H%M%S") if not experiment_dir.exists():
output_dir = args.output_dir / timestamp raise ValueError(f"Cannot resume from non-existent {experiment_dir=}")
else:
if experiment_dir.exists():
raise ValueError(f"Cannot overwrite existing {experiment_dir=}")
if args.resume and not output_dir.exists(): return experiment_dir
raise ValueError(f"Cannot resume from non-existent directory ({output_dir})")
@contextmanager
def run_ctx(self, experiment_dir: Path):
if self.dry_run:
yield
print(f"Experiment will be saved at: {experiment_dir}")
return
try: try:
yield
print(f"Experiment has been saved at: {experiment_dir}")
except BaseException as exc:
raise RuntimeError(
"The script was terminated early. Use `--resume` "
"to continue the script from its last checkpoint."
) from exc
def run_main(args: SweepStartupArgs):
experiment_dir = args.resolve_experiment_dir()
with args.run_ctx(experiment_dir):
return run_combs( return run_combs(
startup_cmd=args.startup_cmd, startup_cmd=args.startup_cmd,
serve_params=args.serve_params, serve_params=args.serve_params,
startup_params=args.startup_params, startup_params=args.startup_params,
output_dir=output_dir, experiment_dir=experiment_dir,
num_runs=args.num_runs, num_runs=args.num_runs,
show_stdout=args.show_stdout, show_stdout=args.show_stdout,
dry_run=args.dry_run, dry_run=args.dry_run,
) )
except BaseException as exc:
raise RuntimeError(
f"The script was terminated early. Use `--resume {timestamp}` "
f"to continue the script from its last checkpoint."
) from exc
def main(args: argparse.Namespace): def main(args: argparse.Namespace):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment