Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
change
sglang
Commits
a1d03892
"vscode:/vscode.git/clone" did not exist on "d2478cd4ff857eb93d73b754adfd0eef5d67d156"
Unverified
Commit
a1d03892
authored
Sep 09, 2025
by
Sundara Raman Ramachandran
Committed by
GitHub
Sep 10, 2025
Browse files
[Benchmark] Prefil-only benchmark scripts (#10240)
parent
dccf52f9
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
1153 additions
and
0 deletions
+1153
-0
benchmark/prefill_only/bench_embeddings.py
benchmark/prefill_only/bench_embeddings.py
+148
-0
benchmark/prefill_only/bench_score.py
benchmark/prefill_only/bench_score.py
+192
-0
benchmark/prefill_only/util.py
benchmark/prefill_only/util.py
+813
-0
No files found.
benchmark/prefill_only/bench_embeddings.py
0 → 100644
View file @
a1d03892
"""
SGLang Embeddings Benchmark Script
This script benchmarks SGLang's /v1/embeddings API performance using HTTP requests.
Features:
- HTTP-only implementation
- Uses /v1/embeddings API endpoint directly
- Configurable RPS, duration, and batch sizes
- Progress tracking and detailed metrics
- Poisson and constant request distributions
Usage:
- Update configuration variables at the top of the file
- Ensure SGLang server is running on the configured HTTP_URL
- Run: python bench_embeddings.py
"""
import
asyncio
import
logging
from
transformers
import
AutoTokenizer
from
util
import
(
BenchmarkConfig
,
generate_text_with_token_count
,
run_benchmark_main
,
run_generic_benchmark
,
)
# Configure logging
logging
.
basicConfig
(
level
=
logging
.
INFO
,
format
=
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger
=
logging
.
getLogger
(
__name__
)
###############################################################################
# CONFIG
###############################################################################
# Create benchmark configuration
config
=
BenchmarkConfig
()
config
.
rps_values
=
[
500
]
config
.
duration_secs_values
=
[
60
]
config
.
num_unique_requests
=
100
config
.
distribution
=
"POISSON"
config
.
profile
=
False
config
.
freeze_gc
=
True
# Enable GC freeze functionality
# Profiler output directory - by default uses present working directory (pwd)
# Uncomment and customize the line below to override the default location:
# config.profiler_dir = "/sglang-oss-trace"
# HTTP Configuration
HTTP_URL
=
"http://localhost:30000/v1/embeddings"
# Embeddings API Config
EMBEDDINGS_MODEL_PATH
=
"/Qwen/Qwen3-Embedding-0.6B"
BATCH_SIZE
=
[
1
]
# Number of items per request (batch size)
# Configurable input token length
EMBEDDINGS_INPUT_TOKENS
=
500
# Default token length
# Load tokenizer once for embeddings text generation
print
(
"Loading tokenizer for embeddings input generation..."
)
embeddings_tokenizer
=
AutoTokenizer
.
from_pretrained
(
EMBEDDINGS_MODEL_PATH
)
# Generate input text with the specified token length using pre-loaded tokenizer
EMBEDDINGS_INPUT_TEXT
=
generate_text_with_token_count
(
EMBEDDINGS_MODEL_PATH
,
EMBEDDINGS_INPUT_TOKENS
,
config
.
special_replicated_token
,
tokenizer
=
embeddings_tokenizer
,
)
###############################################################################
# REQUEST GENERATION (in parallel)
###############################################################################
def
build_embeddings_request
(
index
:
int
,
item_count
:
int
)
->
tuple
:
"""Build a single embeddings request."""
try
:
# For embeddings, input can be a string or list of strings
if
item_count
==
1
:
input_data
=
EMBEDDINGS_INPUT_TEXT
else
:
input_data
=
[
EMBEDDINGS_INPUT_TEXT
for
_
in
range
(
item_count
)]
req
=
{
"input"
:
input_data
,
"model"
:
EMBEDDINGS_MODEL_PATH
,
}
return
(
index
,
req
)
except
Exception
as
e
:
logger
.
error
(
f
"Error building request
{
index
}
:
{
e
}
"
)
return
(
index
,
None
)
def
validate_embeddings_response
(
response_data
:
dict
)
->
bool
:
"""Validate embeddings API response."""
return
"data"
in
response_data
def
build_warmup_embeddings_request
()
->
dict
:
"""Build a warmup request for the embeddings API."""
return
{
"input"
:
EMBEDDINGS_INPUT_TEXT
,
"model"
:
EMBEDDINGS_MODEL_PATH
,
}
###############################################################################
# MAIN
###############################################################################
async
def
run_benchmark
(
rps
,
duration_secs
,
item_count
):
"""Run a single embeddings benchmark with the given RPS value."""
return
await
run_generic_benchmark
(
rps
=
rps
,
duration_secs
=
duration_secs
,
item_count
=
item_count
,
config
=
config
,
http_url
=
HTTP_URL
,
build_request_func
=
build_embeddings_request
,
response_validator
=
validate_embeddings_response
,
api_name
=
"EMBEDDINGS"
,
request_description
=
"embeddings requests"
,
)
async
def
main
():
additional_info
=
{
"Input text length"
:
f
"
{
EMBEDDINGS_INPUT_TOKENS
}
tokens"
,
"Input text preview"
:
(
EMBEDDINGS_INPUT_TEXT
[:
100
]
+
"..."
if
len
(
EMBEDDINGS_INPUT_TEXT
)
>
100
else
EMBEDDINGS_INPUT_TEXT
),
}
await
run_benchmark_main
(
config
,
run_benchmark
,
"EMBEDDINGS"
,
HTTP_URL
,
BATCH_SIZE
,
additional_info
,
build_warmup_embeddings_request
,
)
if
__name__
==
"__main__"
:
asyncio
.
run
(
main
())
benchmark/prefill_only/bench_score.py
0 → 100644
View file @
a1d03892
"""
SGLang Scoring Benchmark Script
This script benchmarks SGLang's scoring API performance using HTTP requests.
Current Features:
- HTTP-only implementation (open source compatible)
- Uses /v1/score API endpoint directly
- Single item scoring with batching support
- Configurable RPS, duration, and batch sizes
- Progress tracking and detailed metrics
- Poisson and constant request distributions
Usage:
- Update configuration variables at the top of the file
- Ensure SGLang server is running on the configured HTTP_URL
- Run: python bench_score.py
- Each request will contain ITEM_COUNT_VALUES items for batch scoring
"""
import
asyncio
from
transformers
import
AutoTokenizer
from
util
import
(
BenchmarkConfig
,
generate_text_with_token_count
,
run_benchmark_main
,
run_generic_benchmark
,
)
###############################################################################
# CONFIG
###############################################################################
# Create benchmark configuration
config
=
BenchmarkConfig
()
config
.
rps_values
=
[
160
]
config
.
duration_secs_values
=
[
60
]
config
.
num_unique_requests
=
100
config
.
distribution
=
"POISSON"
config
.
profile
=
False
config
.
freeze_gc
=
True
# Enable GC freeze functionality
# Profiler output directory - by default uses present working directory (pwd)
# Uncomment and customize the line below to override the default location:
# config.profiler_dir = "/sglang-oss-trace"
# HTTP Configuration
HTTP_URL
=
"http://localhost:30000/v1/score"
# Use score API directly
# Score API Config
# ITEM_COUNT_VALUES determines number of items per score request (batch size)
SCORE_QUERY_TOKENS
=
120
SCORE_ITEM_TOKENS
=
180
SCORE_MODEL_PATH
=
"Qwen/Qwen3-0.6B"
SCORE_LABEL_TOKEN_IDS
=
[
9454
,
2753
]
# Yes/No token IDs
ITEM_COUNT_VALUES
=
[
10
]
# Number of items per request
# Special token to replicate for precise token counting
SPECIAL_REPLICATED_TOKEN
=
"<|im_start|>"
###############################################################################
# REQUEST GENERATION (in parallel)
###############################################################################
def
create_score_request_builder
():
"""Create a score request builder function with shared tokenizer."""
# Load tokenizer once here to verify special token and get precise counts
print
(
"Loading tokenizer..."
)
tokenizer
=
AutoTokenizer
.
from_pretrained
(
SCORE_MODEL_PATH
)
# Verify that our special token produces exactly 1 token
special_token_count
=
len
(
tokenizer
.
encode
(
config
.
special_replicated_token
,
add_special_tokens
=
False
)
)
print
(
f
"Special token '
{
config
.
special_replicated_token
}
' produces "
f
"
{
special_token_count
}
token(s)"
)
def
generate_text_with_token_count_local
(
num_toks
):
"""Generate text with precise token count using replicated token."""
return
generate_text_with_token_count
(
SCORE_MODEL_PATH
,
num_toks
,
config
.
special_replicated_token
,
tokenizer
=
tokenizer
,
)
def
build_score_request
(
index
:
int
,
item_count
:
int
)
->
tuple
:
"""Build a single score request."""
try
:
# Generate query and items for score API
query
=
generate_text_with_token_count_local
(
SCORE_QUERY_TOKENS
)
items
=
[
generate_text_with_token_count_local
(
SCORE_ITEM_TOKENS
)
for
_
in
range
(
item_count
)
]
# Return as dict for score API format
score_data
=
{
"query"
:
query
,
"items"
:
items
,
"label_token_ids"
:
SCORE_LABEL_TOKEN_IDS
,
"model"
:
SCORE_MODEL_PATH
,
}
return
(
index
,
score_data
)
except
Exception
as
e
:
print
(
f
"Error building request
{
index
}
:
{
e
}
"
)
return
(
index
,
None
)
return
build_score_request
def
validate_score_response
(
response_data
:
dict
)
->
bool
:
"""Validate score API response."""
return
"scores"
in
response_data
or
"logprobs"
in
response_data
def
build_warmup_score_request
()
->
dict
:
"""Build a warmup request for the score API."""
# Load tokenizer once for warmup generation
tokenizer
=
AutoTokenizer
.
from_pretrained
(
SCORE_MODEL_PATH
)
warmup_query
=
generate_text_with_token_count
(
SCORE_MODEL_PATH
,
SCORE_QUERY_TOKENS
,
config
.
special_replicated_token
,
tokenizer
=
tokenizer
,
)
warmup_items
=
[
generate_text_with_token_count
(
SCORE_MODEL_PATH
,
SCORE_ITEM_TOKENS
,
config
.
special_replicated_token
,
tokenizer
=
tokenizer
,
)
for
_
in
range
(
3
)
]
return
{
"query"
:
warmup_query
,
"items"
:
warmup_items
,
"label_token_ids"
:
SCORE_LABEL_TOKEN_IDS
,
"model"
:
SCORE_MODEL_PATH
,
# Add missing parameters for consistency with the original warmup
"apply_softmax"
:
True
,
"item_first"
:
False
,
}
###############################################################################
# MAIN
###############################################################################
async
def
run_benchmark
(
rps
,
duration_secs
,
item_count
):
"""Run a single benchmark with the given RPS value."""
# Create the request builder function with shared tokenizer
build_request_func
=
create_score_request_builder
()
return
await
run_generic_benchmark
(
rps
=
rps
,
duration_secs
=
duration_secs
,
item_count
=
item_count
,
config
=
config
,
http_url
=
HTTP_URL
,
build_request_func
=
build_request_func
,
response_validator
=
validate_score_response
,
api_name
=
"SINGLE_ITEM_SCORING"
,
request_description
=
"score requests"
,
)
async
def
main
():
"""Main function that runs benchmarks for all RPS values."""
additional_info
=
{
"Query tokens per request"
:
SCORE_QUERY_TOKENS
,
"Item tokens per item"
:
SCORE_ITEM_TOKENS
,
}
await
run_benchmark_main
(
config
,
run_benchmark
,
"SINGLE_ITEM_SCORING"
,
HTTP_URL
,
ITEM_COUNT_VALUES
,
additional_info
,
build_warmup_score_request
,
)
if
__name__
==
"__main__"
:
asyncio
.
run
(
main
())
benchmark/
score/bench_score
.py
→
benchmark/
prefill_only/util
.py
View file @
a1d03892
"""
"""
SGLang Scoring Benchmark Script
Common utilities for SGLang benchmark scripts.
This script benchmarks SGLang's scoring API performance using HTTP requests.
Current Features:
- HTTP-only implementation (open source compatible)
- Uses /v1/score API endpoint directly
- Single item scoring with batching support
- Configurable RPS, duration, and batch sizes
- Progress tracking and detailed metrics
- Poisson and constant request distributions
Usage:
- Update configuration variables at the top of the file
- Ensure SGLang server is running on the configured HTTP_URL
- Run: python bench_score.py
- Each request will contain ITEM_COUNT_VALUES items for batch scoring
This module contains shared code for benchmarking different SGLang APIs
including scoring, embeddings, and other endpoints.
"""
"""
import
asyncio
import
asyncio
import
concurrent.futures
# For parallel prompt generation
import
concurrent.futures
import
json
import
json
import
os
import
os
import
random
import
random
from
statistics
import
mean
from
statistics
import
mean
from
typing
import
Any
,
Callable
,
Dict
,
List
,
Optional
,
Tuple
import
aiohttp
import
aiohttp
import
numpy
as
np
import
numpy
as
np
from
tqdm
import
tqdm
from
tqdm
import
tqdm
from
transformers
import
AutoTokenizer
from
transformers
import
AutoTokenizer
###############################################################################
# CONFIG
class
BenchmarkConfig
:
###############################################################################
"""Configuration for benchmark parameters."""
# Server Configuration
SERVER_TYPE
=
"HTTP"
# Fixed to HTTP for open source
def
__init__
(
self
):
# Common benchmark settings
# HTTP Configuration
self
.
server_type
=
"HTTP"
HTTP_URL
=
"http://localhost:30000/v1/score"
# Use score API directly
self
.
rps_values
=
[
70
]
self
.
duration_secs_values
=
[
60
]
# Score API Config
self
.
num_unique_requests
=
100
# ITEM_COUNT_VALUES determines number of items per score request (batch size)
self
.
distribution
=
"POISSON"
# Options: "CONSTANT", "POISSON"
SCORE_QUERY_TOKENS
=
120
self
.
profile
=
False
SCORE_ITEM_TOKENS
=
180
SCORE_MODEL_PATH
=
"Qwen/Qwen3-0.6B"
# Garbage Collection Control
SCORE_LABEL_TOKEN_IDS
=
[
9454
,
2753
]
# Yes/No token IDs
self
.
freeze_gc
=
True
# Enable/disable garbage collection freezing
# Array of RPS values to test
# Profiler configuration
RPS_VALUES
=
[
70
]
self
.
profiler_dir
=
(
# Array of duration values to test
os
.
getcwd
()
DURATION_SECS_VALUES
=
[
60
]
# Duration values in seconds
)
# Default profiler output directory (current working directory)
# Array of item count values to test
ITEM_COUNT_VALUES
=
[
10
]
# Number of items per request
# Special token for text generation
# Number of unique requests to generate (will be reused)
self
.
special_replicated_token
=
"<|im_start|>"
NUM_UNIQUE_REQUESTS
=
100
DISTRIBUTION
=
"POISSON"
# Options: "CONSTANT", "POISSON"
def
generate_text_with_token_count
(
# Profiling Configuration
model_path
:
str
,
PROFILE
=
False
# Enable profiling with START_PROFILE/STOP_PROFILE prompts
num_tokens
:
int
,
# Directory for profiler output
special_token
:
str
=
"<|im_start|>"
,
SGLANG_TORCH_PROFILER_DIR
=
"/shared/user/sglang-oss-trace/remove-decode"
tokenizer
:
Optional
[
Any
]
=
None
,
if
PROFILE
:
)
->
str
:
os
.
environ
[
"SGLANG_TORCH_PROFILER_DIR"
]
=
SGLANG_TORCH_PROFILER_DIR
# Special token to replicate for precise token counting
SPECIAL_REPLICATED_TOKEN
=
"<|im_start|>"
###############################################################################
# REQUEST GENERATION (in parallel)
###############################################################################
def
prepare_all_requests_parallel
(
num_requests
,
item_count
):
"""
"""
Generates unique requests in parallel, then reuses them to create the
Generate text with precise token count using a replicated token.
full request list. Returns a list of str prompts for HTTP.
Args:
model_path: Path to the model for tokenizer
num_tokens: Target number of tokens
special_token: Token to replicate
tokenizer: Optional pre-loaded tokenizer to avoid repeated loading
Returns:
Generated text with approximately the target token count
"""
"""
# Load tokenizer once here to verify special token and get precise counts
if
tokenizer
is
None
:
print
(
"Loading tokenizer..."
)
tokenizer
=
AutoTokenizer
.
from_pretrained
(
model_path
)
tokenizer
=
AutoTokenizer
.
from_pretrained
(
SCORE_MODEL_PATH
)
# Verify that our special token produces exactly 1 token
# Verify token count
special_token_count
=
len
(
special_token_count
=
len
(
tokenizer
.
encode
(
special_token
,
add_special_tokens
=
False
))
tokenizer
.
encode
(
SPECIAL_REPLICATED_TOKEN
,
add_special_tokens
=
False
)
)
print
(
f
"Special token '
{
SPECIAL_REPLICATED_TOKEN
}
' produces "
f
"
{
special_token_count
}
token(s)"
)
def
generate_text_with_token_count
(
num_toks
):
if
special_token_count
==
1
:
"""Generate text with precise token count using replicated token."""
# Simple case: token maps to exactly 1 token
if
special_token_count
==
1
:
return
special_token
*
num_tokens
# Simple case: token maps to exactly 1 token
else
:
return
SPECIAL_REPLICATED_TOKEN
*
num_toks
print
(
f
"Special token '
{
special_token
}
' produces
{
special_token_count
}
tokens"
)
else
:
# Handle case where special token produces multiple tokens
print
(
repetitions
=
(
num_tokens
+
special_token_count
-
1
)
//
special_token_count
f
"Special token '
{
SPECIAL_REPLICATED_TOKEN
}
' produces more than 1 token!!!"
text
=
special_token
*
repetitions
)
# Handle case where special token produces multiple tokens
# Repeat the token enough times to get at least num_toks tokens
repetitions
=
(
num_toks
+
special_token_count
-
1
)
//
special_token_count
text
=
SPECIAL_REPLICATED_TOKEN
*
repetitions
# Verify we got the expected token count (approximately)
actual_tokens
=
len
(
tokenizer
.
encode
(
text
,
add_special_tokens
=
False
))
if
actual_tokens
<
num_toks
:
print
(
f
"Warning: Generated
{
actual_tokens
}
tokens, "
f
"expected
{
num_toks
}
"
)
return
text
# Verify we got the expected token count
actual_tokens
=
len
(
tokenizer
.
encode
(
text
,
add_special_tokens
=
False
))
if
actual_tokens
<
num_tokens
:
print
(
f
"Warning: Generated
{
actual_tokens
}
tokens, expected
{
num_tokens
}
"
)
def
build_request
(
index
):
return
text
"""Build a single request using the shared tokenizer."""
try
:
# Generate query and items for score API
def
setup_profiler
(
config
:
BenchmarkConfig
,
benchmark_name
:
str
)
->
None
:
query
=
generate_text_with_token_count
(
SCORE_QUERY_TOKENS
)
"""
items
=
[
Set up profiler environment if profiling is enabled.
generate_text_with_token_count
(
SCORE_ITEM_TOKENS
)
for
_
in
range
(
item_count
)
Args:
]
config: Benchmark configuration
benchmark_name: Name of the benchmark (used in directory path)
# Return as dict for score API format
"""
score_data
=
{
if
config
.
profile
:
"query"
:
query
,
# Create benchmark-specific subdirectory
"items"
:
items
,
profiler_path
=
os
.
path
.
join
(
"label_token_ids"
:
SCORE_LABEL_TOKEN_IDS
,
config
.
profiler_dir
,
benchmark_name
.
lower
().
replace
(
"_"
,
"-"
)
"model"
:
SCORE_MODEL_PATH
,
)
}
os
.
environ
[
"SGLANG_TORCH_PROFILER_DIR"
]
=
profiler_path
return
(
index
,
score_data
)
print
(
f
"Profiler enabled. Output directory:
{
profiler_path
}
"
)
else
:
print
(
"Profiler disabled"
)
def
prepare_all_requests_parallel
(
num_requests
:
int
,
item_count
:
int
,
build_request_func
:
Callable
[[
int
,
int
],
Tuple
[
int
,
Any
]],
config
:
BenchmarkConfig
,
description
:
str
=
"requests"
,
)
->
List
[
Any
]:
"""
Generic function to generate unique requests in parallel, then reuse them.
Args:
num_requests: Total number of requests needed
item_count: Number of items per request (batch size)
build_request_func: Function that takes (index, item_count) and returns (index, request_data)
config: Benchmark configuration
description: Description for progress bars
Returns:
List of request data objects
"""
def
build_request_wrapper
(
index
):
"""Wrapper to call the provided build_request_func."""
try
:
return
build_request_func
(
index
,
item_count
)
except
Exception
as
e
:
except
Exception
as
e
:
print
(
f
"Error building request
{
index
}
:
{
e
}
"
)
print
(
f
"Error building request
{
index
}
:
{
e
}
"
)
return
(
index
,
None
)
return
(
index
,
None
)
# Generate only the unique requests
# Generate only the unique requests
unique_requests
=
[
None
]
*
NUM_UNIQUE_REQUESTS
unique_requests
=
[
None
]
*
config
.
num_unique_requests
# Use ThreadPoolExecutor instead of ProcessPoolExecutor to avoid
# tokenizer loading issues across processes
max_workers
=
min
(
8
,
os
.
cpu_count
()
or
1
)
# Limit to 8 threads max
max_workers
=
min
(
8
,
os
.
cpu_count
()
or
1
)
# Limit to 8 threads max
with
concurrent
.
futures
.
ThreadPoolExecutor
(
max_workers
=
max_workers
)
as
executor
:
with
concurrent
.
futures
.
ThreadPoolExecutor
(
max_workers
=
max_workers
)
as
executor
:
futures
=
[]
futures
=
[]
for
i
in
tqdm
(
for
i
in
tqdm
(
range
(
NUM_UNIQUE_REQUESTS
),
desc
=
"Submitting prompt generation tasks"
range
(
config
.
num_unique_requests
),
desc
=
f
"Submitting
{
description
}
generation tasks"
,
):
):
future
=
executor
.
submit
(
build_request
,
i
)
future
=
executor
.
submit
(
build_request
_wrapper
,
i
)
futures
.
append
(
future
)
futures
.
append
(
future
)
# Collect results as they complete
# Collect results as they complete
for
f
in
tqdm
(
for
f
in
tqdm
(
concurrent
.
futures
.
as_completed
(
futures
),
concurrent
.
futures
.
as_completed
(
futures
),
desc
=
"Building unique
requests
"
,
desc
=
f
"Building unique
{
description
}
"
,
total
=
NUM_UNIQUE_REQUESTS
,
total
=
config
.
num_unique_requests
,
):
):
try
:
try
:
index
,
req_data
=
f
.
result
()
index
,
req_data
=
f
.
result
()
...
@@ -173,34 +167,141 @@ def prepare_all_requests_parallel(num_requests, item_count):
...
@@ -173,34 +167,141 @@ def prepare_all_requests_parallel(num_requests, item_count):
print
(
print
(
f
"Successfully generated
{
len
(
valid_requests
)
}
out of "
f
"Successfully generated
{
len
(
valid_requests
)
}
out of "
f
"
{
NUM_UNIQUE_REQUESTS
}
unique
requests"
f
"
{
config
.
num_
unique
_
requests
}
unique
{
description
}
"
)
)
# Create the full request list by cycling through unique requests
# Create the full request list by cycling through unique requests
print
(
print
(
f
"Reusing
{
len
(
valid_requests
)
}
unique
requests
to create "
f
"Reusing
{
len
(
valid_requests
)
}
unique
{
description
}
to create "
f
"
{
num_requests
}
total requests..."
f
"
{
num_requests
}
total requests..."
)
)
all_requests
=
[]
all_requests
=
[]
for
i
in
tqdm
(
range
(
num_requests
),
desc
=
"Reusing
requests
"
):
for
i
in
tqdm
(
range
(
num_requests
),
desc
=
f
"Reusing
{
description
}
"
):
unique_index
=
i
%
len
(
valid_requests
)
unique_index
=
i
%
len
(
valid_requests
)
all_requests
.
append
(
valid_requests
[
unique_index
])
all_requests
.
append
(
valid_requests
[
unique_index
])
print
(
"All
prompts/requests
prepared.
\n
"
)
print
(
f
"All
{
description
}
prepared.
\n
"
)
return
all_requests
return
all_requests
###############################################################################
async
def
sleep_with_distribution
(
distribution
:
str
,
rps
:
float
)
->
None
:
# PROFILING HELPERS
"""
###############################################################################
Sleep according to the specified distribution pattern.
async
def
send_profile_request
(
profile_text
,
item_count
,
session
=
None
):
"""Send a profile request and wait for completion."""
Args:
distribution: "CONSTANT" or "POISSON"
rps: Requests per second rate
"""
if
distribution
==
"CONSTANT"
:
interval
=
1
/
rps
await
asyncio
.
sleep
(
interval
)
elif
distribution
==
"POISSON"
:
# For Poisson process, inter-arrival times follow exponential distribution
interval
=
random
.
expovariate
(
rps
)
await
asyncio
.
sleep
(
interval
)
else
:
raise
ValueError
(
f
"Unknown distribution:
{
distribution
}
. Use 'CONSTANT' or 'POISSON'."
)
def
build_http_request_json
(
request_data
:
Any
)
->
str
:
"""
Generic function to build HTTP request JSON.
Args:
request_data: The data to serialize to JSON
Returns:
JSON string representation of the request data
"""
return
json
.
dumps
(
request_data
)
async
def
make_http_call
(
session
:
aiohttp
.
ClientSession
,
request_data
:
Any
,
request_id
:
int
,
results_queue
:
asyncio
.
Queue
,
http_url
:
str
,
response_validator
:
Callable
[[
Dict
[
str
,
Any
]],
bool
],
api_name
:
str
=
"API"
,
)
->
None
:
"""
Generic HTTP call function for API requests.
Args:
session: aiohttp client session
request_data: Data to send in the request
request_id: Unique identifier for this request
results_queue: Queue to put results
http_url: URL to send the request to
response_validator: Function to validate the response JSON
api_name: Name of the API for error messages
"""
try
:
start_time
=
asyncio
.
get_event_loop
().
time
()
request_json
=
build_http_request_json
(
request_data
)
headers
=
{
"Content-Type"
:
"application/json"
}
async
with
session
.
post
(
http_url
,
data
=
request_json
,
headers
=
headers
)
as
resp
:
resp_text
=
await
resp
.
text
()
if
resp
.
status
!=
200
:
print
(
f
"[HTTP]
{
api_name
}
Request
{
request_id
}
failed with status "
f
"
{
resp
.
status
}
:
{
resp_text
}
"
)
completion_time
=
asyncio
.
get_event_loop
().
time
()
await
results_queue
.
put
((
request_id
,
0
,
False
,
completion_time
))
return
# Parse and validate response
try
:
response_data
=
json
.
loads
(
resp_text
)
success
=
response_validator
(
response_data
)
if
not
success
:
print
(
f
"[HTTP]
{
api_name
}
Request
{
request_id
}
failed response validation"
)
except
json
.
JSONDecodeError
:
print
(
f
"[HTTP]
{
api_name
}
Request
{
request_id
}
failed to parse JSON response"
)
success
=
False
completion_time
=
asyncio
.
get_event_loop
().
time
()
elapsed_time
=
(
completion_time
-
start_time
)
*
1000
await
results_queue
.
put
((
request_id
,
elapsed_time
,
success
,
completion_time
))
except
Exception
as
e
:
print
(
f
"[HTTP]
{
api_name
}
Error for request
{
request_id
}
:
{
e
}
"
)
completion_time
=
asyncio
.
get_event_loop
().
time
()
await
results_queue
.
put
((
request_id
,
0
,
False
,
completion_time
))
async
def
send_profile_request
(
profile_text
:
str
,
http_url
:
str
,
session
:
Optional
[
aiohttp
.
ClientSession
]
=
None
)
->
None
:
"""
Send a profile request (START_PROFILE or STOP_PROFILE) and wait for completion.
Args:
profile_text: "START_PROFILE" or "STOP_PROFILE"
http_url: Base HTTP URL (will derive profile endpoints from this)
session: Optional aiohttp session to use
"""
try
:
try
:
if
session
:
if
session
:
print
(
f
"Sending
{
profile_text
}
request via HTTP..."
)
print
(
f
"Sending
{
profile_text
}
request via HTTP..."
)
# Determine the correct endpoint
# Determine the correct endpoint
base_url
=
HTTP_URL
.
rsplit
(
"/"
,
2
)[
0
]
# Remove /v1/score
if
"/v1/"
in
http_url
:
base_url
=
http_url
.
rsplit
(
"/v1/"
,
1
)[
0
]
# Remove /v1/xxx
else
:
base_url
=
http_url
.
rsplit
(
"/"
,
1
)[
0
]
# Remove last path component
if
profile_text
==
"START_PROFILE"
:
if
profile_text
==
"START_PROFILE"
:
endpoint_url
=
f
"
{
base_url
}
/start_profile"
endpoint_url
=
f
"
{
base_url
}
/start_profile"
elif
profile_text
==
"STOP_PROFILE"
:
elif
profile_text
==
"STOP_PROFILE"
:
...
@@ -227,88 +328,135 @@ async def send_profile_request(profile_text, item_count, session=None):
...
@@ -227,88 +328,135 @@ async def send_profile_request(profile_text, item_count, session=None):
print
(
f
"Error sending
{
profile_text
}
request:
{
e
}
"
)
print
(
f
"Error sending
{
profile_text
}
request:
{
e
}
"
)
###############################################################################
async
def
call_freeze_gc_http
(
session
:
aiohttp
.
ClientSession
,
http_url
:
str
)
->
None
:
# HTTP CALLS
"""
###############################################################################
Call the /freeze_gc HTTP endpoint.
def
build_http_request_json
(
score_data
):
"""Build HTTP request JSON for /v1/score endpoint.
Score API format:
{
"query": "Generated query text with SCORE_QUERY_TOKENS tokens",
"items": ["item1", "item2", ...], # Items to score with SCORE_ITEM_TOKENS each
"label_token_ids": [token_id1, token_id2], # Target token IDs
"model": "/path/to/model"
}
Args:
Args:
score_data: A dict containing query, items, label_token_ids, and model
session: aiohttp client session
http_url: Base HTTP URL to derive the freeze_gc endpoint from
"""
"""
# score_data is already in the correct format from build_request
try
:
return
json
.
dumps
(
score_data
)
# Derive freeze_gc endpoint from the API URL
if
"/v1/"
in
http_url
:
freeze_gc_url
=
http_url
.
rsplit
(
"/v1/"
,
1
)[
0
]
+
"/freeze_gc"
else
:
freeze_gc_url
=
http_url
.
rsplit
(
"/"
,
1
)[
0
]
+
"/freeze_gc"
print
(
f
"Calling freeze_gc endpoint:
{
freeze_gc_url
}
"
)
async
def
make_http_call
(
session
,
score_data
,
request_id
,
results_queue
):
async
with
session
.
post
(
freeze_gc_url
)
as
resp
:
"""HTTP call to /v1/score endpoint."""
if
resp
.
status
==
200
:
try
:
print
(
"freeze_gc called successfully"
)
start_time
=
asyncio
.
get_event_loop
().
time
()
else
:
resp_text
=
await
resp
.
text
()
print
(
f
"freeze_gc failed with status
{
resp
.
status
}
:
{
resp_text
}
"
)
request_json
=
build_http_request_json
(
score_data
)
except
Exception
as
e
:
headers
=
{
"Content-Type"
:
"application/json"
}
print
(
f
"Failed to call freeze_gc:
{
e
}
"
)
async
with
session
.
post
(
HTTP_URL
,
data
=
request_json
,
headers
=
headers
)
as
resp
:
resp_text
=
await
resp
.
text
()
if
resp
.
status
!=
200
:
async
def
send_warmup_requests
(
print
(
session
:
aiohttp
.
ClientSession
,
f
"[HTTP] Request
{
request_id
}
failed with status "
http_url
:
str
,
f
"
{
resp
.
status
}
:
{
resp_text
}
"
build_warmup_request_func
:
Callable
[[],
Any
],
)
num_warmup
:
int
=
3
,
completion_time
=
asyncio
.
get_event_loop
().
time
()
)
->
None
:
await
results_queue
.
put
((
request_id
,
0
,
False
,
completion_time
))
"""
return
Send warmup requests to HTTP server.
# Parse score API response
Args:
try
:
session: aiohttp client session
response_data
=
json
.
loads
(
resp_text
)
http_url: URL to send warmup requests to
# Score API returns scores for each item
build_warmup_request_func: Function that returns a warmup request object
# For now, just verify we got a valid response
num_warmup: Number of warmup requests to send
if
"scores"
in
response_data
or
"logprobs"
in
response_data
:
"""
success
=
True
print
(
f
"Sending
{
num_warmup
}
HTTP warmup requests..."
)
for
i
in
range
(
num_warmup
):
try
:
warmup_data
=
build_warmup_request_func
()
request_json
=
build_http_request_json
(
warmup_data
)
headers
=
{
"Content-Type"
:
"application/json"
}
async
with
session
.
post
(
http_url
,
data
=
request_json
,
headers
=
headers
)
as
resp
:
if
resp
.
status
==
200
:
print
(
f
"Warmup request
{
i
+
1
}
/
{
num_warmup
}
completed successfully"
)
else
:
else
:
print
(
print
(
f
"
[HTTP] Request
{
request_id
}
missing expected fields in response
"
f
"
Warmup request
{
i
+
1
}
/
{
num_warmup
}
failed with status
{
resp
.
status
}
"
)
)
success
=
False
except
json
.
JSONDecodeError
:
print
(
f
"[HTTP] Request
{
request_id
}
failed to parse JSON response"
)
success
=
False
completion_time
=
asyncio
.
get_event_loop
().
time
()
except
Exception
as
e
:
elapsed_time
=
(
completion_time
-
start_time
)
*
1000
print
(
f
"Warmup request
{
i
+
1
}
/
{
num_warmup
}
failed with error:
{
e
}
"
)
await
results_queue
.
put
((
request_id
,
elapsed_time
,
success
,
completion_time
))
except
Exception
as
e
:
print
(
"HTTP warmup requests completed"
)
print
(
f
"[HTTP] Error for request
{
request_id
}
:
{
e
}
"
)
completion_time
=
asyncio
.
get_event_loop
().
time
()
await
results_queue
.
put
((
request_id
,
0
,
False
,
completion_time
))
async
def
perform_global_warmup_and_freeze
(
config
:
BenchmarkConfig
,
http_url
:
str
,
build_warmup_request_func
:
Callable
[[],
Any
],
)
->
None
:
"""
Perform warmup and optionally GC freeze operations once before all benchmark runs.
Args:
config: Benchmark configuration
http_url: URL for API requests
build_warmup_request_func: Function that returns a warmup request object
"""
print
(
"="
*
80
)
print
(
f
"PERFORMING GLOBAL WARMUP
{
' AND GC FREEZE'
if
config
.
freeze_gc
else
''
}
"
)
print
(
"="
*
80
)
print
(
f
"Performing HTTP warmup
{
' and GC freeze'
if
config
.
freeze_gc
else
''
}
..."
)
async
with
aiohttp
.
ClientSession
()
as
session
:
await
send_warmup_requests
(
session
,
http_url
,
build_warmup_request_func
)
if
config
.
freeze_gc
:
await
call_freeze_gc_http
(
session
,
http_url
)
print
(
f
"HTTP warmup
{
' and GC freeze'
if
config
.
freeze_gc
else
''
}
completed successfully."
)
print
(
f
"Global warmup
{
' and GC freeze'
if
config
.
freeze_gc
else
''
}
operations completed."
)
print
(
"="
*
80
)
###############################################################################
# RESULTS
###############################################################################
async
def
process_results
(
async
def
process_results
(
results_queue
,
results_queue
:
asyncio
.
Queue
,
num_requests
,
num_requests
:
int
,
send_duration
,
send_duration
:
float
,
total_duration
,
total_duration
:
float
,
rps
,
rps
:
int
,
duration_secs
,
duration_secs
:
int
,
item_count
,
item_count
:
int
,
test_start_time
,
test_start_time
:
float
,
):
config
:
BenchmarkConfig
,
"""Processes results and groups them by minute intervals.
http_mode
:
str
=
"UNKNOWN"
,
Returns a list of dictionaries, one for each minute."""
)
->
List
[
Dict
[
str
,
Any
]]:
"""
Process benchmark results and group them by minute intervals.
Args:
results_queue: Queue containing result tuples
num_requests: Total number of requests sent
send_duration: Time taken to send all requests
total_duration: Total time for all requests to complete
rps: Target requests per second
duration_secs: Test duration in seconds
item_count: Number of items per request
test_start_time: Start time of the test
config: Benchmark configuration
http_mode: Description of the HTTP mode/API being tested
Returns:
List of dictionaries containing minute-by-minute results
"""
all_results
=
[]
all_results
=
[]
# Collect all results
# Collect all results
...
@@ -356,9 +504,9 @@ async def process_results(
...
@@ -356,9 +504,9 @@ async def process_results(
"minute_interval"
:
minute
+
1
,
"minute_interval"
:
minute
+
1
,
"target_rps"
:
rps
,
"target_rps"
:
rps
,
"item_count"
:
item_count
,
"item_count"
:
item_count
,
"server_type"
:
SERVER_TYPE
,
"server_type"
:
config
.
server_type
,
"distribution"
:
DISTRIBUTION
,
"distribution"
:
config
.
distribution
,
"unique_requests"
:
NUM_UNIQUE_REQUESTS
,
"unique_requests"
:
config
.
num_unique_requests
,
"total_requests"
:
len
(
minute_data
),
"total_requests"
:
len
(
minute_data
),
"successful_requests"
:
successful_requests
,
"successful_requests"
:
successful_requests
,
"failed_requests"
:
failed_requests
,
"failed_requests"
:
failed_requests
,
...
@@ -384,7 +532,7 @@ async def process_results(
...
@@ -384,7 +532,7 @@ async def process_results(
print
(
f
" P90 response time:
{
p90
:.
2
f
}
ms"
)
print
(
f
" P90 response time:
{
p90
:.
2
f
}
ms"
)
print
(
f
" P99 response time:
{
p99
:.
2
f
}
ms"
)
print
(
f
" P99 response time:
{
p99
:.
2
f
}
ms"
)
#
Also p
rint overall summary
#
P
rint overall summary
all_response_times
=
[
r
[
"elapsed_time"
]
for
r
in
all_results
if
r
[
"success"
]]
all_response_times
=
[
r
[
"elapsed_time"
]
for
r
in
all_results
if
r
[
"success"
]]
total_successful
=
len
([
r
for
r
in
all_results
if
r
[
"success"
]])
total_successful
=
len
([
r
for
r
in
all_results
if
r
[
"success"
]])
total_failed
=
len
([
r
for
r
in
all_results
if
not
r
[
"success"
]])
total_failed
=
len
([
r
for
r
in
all_results
if
not
r
[
"success"
]])
...
@@ -402,12 +550,12 @@ async def process_results(
...
@@ -402,12 +550,12 @@ async def process_results(
f
"Item Count
{
item_count
}
:"
f
"Item Count
{
item_count
}
:"
)
)
print
(
f
" Test duration:
{
duration_secs
}
seconds"
)
print
(
f
" Test duration:
{
duration_secs
}
seconds"
)
print
(
f
" Server type:
{
SERVER_TYPE
}
"
)
print
(
f
" Server type:
{
config
.
server_type
}
"
)
print
(
f
" HTTP mode:
SINGLE_ITEM_SCORING
"
)
print
(
f
" HTTP mode:
{
http_mode
}
"
)
print
(
f
" Target RPS:
{
rps
}
"
)
print
(
f
" Target RPS:
{
rps
}
"
)
print
(
f
" Item count:
{
item_count
}
"
)
print
(
f
" Item count:
{
item_count
}
"
)
print
(
f
" Distribution:
{
DISTRIBUTION
}
"
)
print
(
f
" Distribution:
{
config
.
distribution
}
"
)
print
(
f
" Unique requests generated:
{
NUM_UNIQUE_REQUESTS
}
"
)
print
(
f
" Unique requests generated:
{
config
.
num_unique_requests
}
"
)
print
(
f
" Total requests sent:
{
num_requests
}
"
)
print
(
f
" Total requests sent:
{
num_requests
}
"
)
print
(
f
" Successful requests:
{
total_successful
}
"
)
print
(
f
" Successful requests:
{
total_successful
}
"
)
print
(
f
" Failed requests:
{
total_failed
}
"
)
print
(
f
" Failed requests:
{
total_failed
}
"
)
...
@@ -421,22 +569,170 @@ async def process_results(
...
@@ -421,22 +569,170 @@ async def process_results(
return
minute_results
return
minute_results
###############################################################################
def
print_csv_results
(
all_results
:
List
[
Dict
[
str
,
Any
]])
->
None
:
# MAIN
"""
###############################################################################
Print benchmark results in CSV format.
async
def
run_benchmark
(
rps
,
duration_secs
,
item_count
):
"""Run a single benchmark with the given RPS value."""
Args:
all_results: List of result dictionaries from process_results
"""
print
(
"
\n
"
+
"="
*
80
)
print
(
"FINAL CSV RESULTS:"
)
print
(
"="
*
80
)
# CSV Header
headers
=
[
"test_duration_secs"
,
"minute_interval"
,
"target_rps"
,
"item_count"
,
"server_type"
,
"distribution"
,
"unique_requests"
,
"total_requests"
,
"successful_requests"
,
"failed_requests"
,
"send_duration_secs"
,
"total_duration_secs"
,
"avg_response_time_ms"
,
"p50_response_time_ms"
,
"p90_response_time_ms"
,
"p99_response_time_ms"
,
]
print
(
","
.
join
(
headers
))
# CSV Data
for
result
in
all_results
:
row
=
[
result
[
"test_duration_secs"
],
result
[
"minute_interval"
],
result
[
"target_rps"
],
result
[
"item_count"
],
result
[
"server_type"
],
result
[
"distribution"
],
result
[
"unique_requests"
],
result
[
"total_requests"
],
result
[
"successful_requests"
],
result
[
"failed_requests"
],
f
"
{
result
[
'send_duration_secs'
]:.
2
f
}
"
,
f
"
{
result
[
'total_duration_secs'
]:.
2
f
}
"
,
f
"
{
result
[
'avg_response_time_ms'
]:.
2
f
}
"
,
f
"
{
result
[
'p50_response_time_ms'
]:.
2
f
}
"
,
f
"
{
result
[
'p90_response_time_ms'
]:.
2
f
}
"
,
f
"
{
result
[
'p99_response_time_ms'
]:.
2
f
}
"
,
]
print
(
","
.
join
(
map
(
str
,
row
)))
async
def
run_benchmark_main
(
config
:
BenchmarkConfig
,
run_single_benchmark_func
,
benchmark_name
:
str
,
http_url
:
str
,
item_count_values
:
List
[
int
],
additional_info
:
Optional
[
Dict
[
str
,
Any
]]
=
None
,
build_warmup_request_func
:
Optional
[
Callable
[[],
Any
]]
=
None
,
)
->
None
:
"""
Main benchmark orchestration function.
Args:
config: Benchmark configuration
run_single_benchmark_func: Async function to run a single benchmark
benchmark_name: Name of the benchmark (e.g., "SCORING", "EMBEDDINGS")
http_url: URL of the API endpoint
item_count_values: List of item counts to test
additional_info: Additional information to print in the header
build_warmup_request_func: Optional function to build warmup requests
"""
total_combinations
=
(
len
(
config
.
duration_secs_values
)
*
len
(
config
.
rps_values
)
*
len
(
item_count_values
)
)
print
(
f
"Running benchmarks for
{
len
(
config
.
duration_secs_values
)
}
duration "
f
"values,
{
len
(
config
.
rps_values
)
}
RPS values, and "
f
"
{
len
(
item_count_values
)
}
item count values = "
f
"
{
total_combinations
}
total combinations"
)
print
(
f
"Server Type:
{
config
.
server_type
}
"
)
print
(
f
"HTTP Mode:
{
benchmark_name
}
"
)
print
(
f
"API URL:
{
http_url
}
"
)
if
additional_info
:
for
key
,
value
in
additional_info
.
items
():
print
(
f
"
{
key
}
:
{
value
}
"
)
print
(
f
"Items per request (batch size):
{
item_count_values
}
"
)
print
(
f
"Profiling Enabled:
{
config
.
profile
}
"
)
print
(
f
"Duration values:
{
config
.
duration_secs_values
}
"
)
print
(
f
"RPS values:
{
config
.
rps_values
}
"
)
print
(
f
"Item count values:
{
item_count_values
}
"
)
print
(
"="
*
80
)
# Set up profiler environment
setup_profiler
(
config
,
benchmark_name
)
# Perform global warmup and GC freeze operations if warmup function is provided
if
build_warmup_request_func
is
not
None
:
await
perform_global_warmup_and_freeze
(
config
,
http_url
,
build_warmup_request_func
)
all_results
=
[]
for
duration_secs
in
config
.
duration_secs_values
:
for
rps
in
config
.
rps_values
:
for
item_count
in
item_count_values
:
result
=
await
run_single_benchmark_func
(
rps
,
duration_secs
,
item_count
)
all_results
.
extend
(
result
)
# Extend with minute results
print_csv_results
(
all_results
)
async
def
run_generic_benchmark
(
rps
:
int
,
duration_secs
:
int
,
item_count
:
int
,
config
:
BenchmarkConfig
,
http_url
:
str
,
build_request_func
:
Callable
[[
int
,
int
],
Tuple
[
int
,
Any
]],
response_validator
:
Callable
[[
Dict
[
str
,
Any
]],
bool
],
api_name
:
str
,
request_description
:
str
=
"requests"
,
)
->
List
[
Dict
[
str
,
Any
]]:
"""
Generic benchmark runner that can be used for different APIs.
Args:
rps: Requests per second
duration_secs: Duration of the test in seconds
item_count: Number of items per request (batch size)
config: Benchmark configuration
http_url: URL of the API endpoint
build_request_func: Function to build individual requests
response_validator: Function to validate API responses
api_name: Name of the API for logging
request_description: Description for progress bars
Returns:
List of dictionaries containing minute-by-minute results
"""
num_requests
=
int
(
rps
*
duration_secs
)
num_requests
=
int
(
rps
*
duration_secs
)
print
(
print
(
f
"Starting benchmark with RPS=
{
rps
}
, Duration=
{
duration_secs
}
s, "
f
"Starting benchmark with RPS=
{
rps
}
, Duration=
{
duration_secs
}
s, "
f
"Item Count=
{
item_count
}
, num_requests=
{
num_requests
}
"
f
"Item Count=
{
item_count
}
, num_requests=
{
num_requests
}
"
)
)
print
(
f
"Server Type:
{
SERVER_TYPE
}
"
)
print
(
f
"Server Type:
{
config
.
server_type
}
"
)
print
(
f
"HTTP Mode:
SINGLE_ITEM_SCORING
"
)
print
(
f
"HTTP Mode:
{
api_name
}
"
)
print
(
f
"Profiling Enabled:
{
PROFILE
}
"
)
print
(
f
"Profiling Enabled:
{
config
.
profile
}
"
)
# Build requests in parallel (unmeasured)
# Build requests in parallel (unmeasured)
all_requests
=
prepare_all_requests_parallel
(
num_requests
,
item_count
)
all_requests
=
prepare_all_requests_parallel
(
num_requests
,
item_count
,
build_request_func
,
config
,
request_description
)
results_queue
=
asyncio
.
Queue
()
results_queue
=
asyncio
.
Queue
()
tasks
=
[]
tasks
=
[]
...
@@ -444,26 +740,34 @@ async def run_benchmark(rps, duration_secs, item_count):
...
@@ -444,26 +740,34 @@ async def run_benchmark(rps, duration_secs, item_count):
# Track timing for sending requests
# Track timing for sending requests
send_start_time
=
asyncio
.
get_event_loop
().
time
()
send_start_time
=
asyncio
.
get_event_loop
().
time
()
# HTTP implementation
(open source only supports HTTP with /v1/score API)
# HTTP implementation
async
with
aiohttp
.
ClientSession
(
async
with
aiohttp
.
ClientSession
(
timeout
=
aiohttp
.
ClientTimeout
(
total
=
300
)
timeout
=
aiohttp
.
ClientTimeout
(
total
=
300
)
)
as
session
:
)
as
session
:
# Send START_PROFILE if profiling is enabled
# Send START_PROFILE if profiling is enabled
if
PROFILE
:
if
config
.
profile
:
await
send_profile_request
(
"START_PROFILE"
,
item_count
,
session
=
session
)
await
send_profile_request
(
"START_PROFILE"
,
http_url
,
session
=
session
)
# Add progress bar for sending requests
# Add progress bar for sending requests
with
tqdm
(
with
tqdm
(
total
=
len
(
all_requests
),
total
=
len
(
all_requests
),
desc
=
f
"Sending HTTP
score
request
s
at
{
rps
}
RPS"
,
desc
=
f
"Sending HTTP
{
request
_description
}
at
{
rps
}
RPS"
,
unit
=
"req"
,
unit
=
"req"
,
)
as
pbar
:
)
as
pbar
:
for
i
,
score
_data
in
enumerate
(
all_requests
):
for
i
,
request
_data
in
enumerate
(
all_requests
):
request_id
=
i
+
1
request_id
=
i
+
1
tasks
.
append
(
tasks
.
append
(
asyncio
.
create_task
(
asyncio
.
create_task
(
make_http_call
(
session
,
score_data
,
request_id
,
results_queue
)
make_http_call
(
session
,
request_data
,
request_id
,
results_queue
,
http_url
,
response_validator
,
api_name
,
)
)
)
)
)
...
@@ -472,27 +776,15 @@ async def run_benchmark(rps, duration_secs, item_count):
...
@@ -472,27 +776,15 @@ async def run_benchmark(rps, duration_secs, item_count):
# Throttle based on distribution
# Throttle based on distribution
if
i
<
len
(
all_requests
)
-
1
:
if
i
<
len
(
all_requests
)
-
1
:
if
DISTRIBUTION
==
"CONSTANT"
:
await
sleep_with_distribution
(
config
.
distribution
,
rps
)
interval
=
1
/
rps
await
asyncio
.
sleep
(
interval
)
elif
DISTRIBUTION
==
"POISSON"
:
# For Poisson process, inter-arrival times follow
# exponential distribution
interval
=
random
.
expovariate
(
rps
)
await
asyncio
.
sleep
(
interval
)
else
:
raise
ValueError
(
f
"Unknown distribution:
{
DISTRIBUTION
}
. "
f
"Use 'CONSTANT' or 'POISSON'."
)
send_end_time
=
asyncio
.
get_event_loop
().
time
()
send_end_time
=
asyncio
.
get_event_loop
().
time
()
send_duration
=
send_end_time
-
send_start_time
send_duration
=
send_end_time
-
send_start_time
# Wait for all requests to complete with progress tracking
# Wait for all requests to complete with progress tracking
print
(
f
"Waiting for
{
len
(
tasks
)
}
HTTP
score
request
s
to complete..."
)
print
(
f
"Waiting for
{
len
(
tasks
)
}
HTTP
{
request
_description
}
to complete..."
)
with
tqdm
(
with
tqdm
(
total
=
len
(
tasks
),
desc
=
"Completing HTTP
score
request
s
"
,
unit
=
"req"
total
=
len
(
tasks
),
desc
=
f
"Completing HTTP
{
request
_description
}
"
,
unit
=
"req"
)
as
completion_pbar
:
)
as
completion_pbar
:
completed_tasks
=
[]
completed_tasks
=
[]
for
task
in
asyncio
.
as_completed
(
tasks
):
for
task
in
asyncio
.
as_completed
(
tasks
):
...
@@ -501,8 +793,8 @@ async def run_benchmark(rps, duration_secs, item_count):
...
@@ -501,8 +793,8 @@ async def run_benchmark(rps, duration_secs, item_count):
completion_pbar
.
update
(
1
)
completion_pbar
.
update
(
1
)
# Send STOP_PROFILE if profiling is enabled
# Send STOP_PROFILE if profiling is enabled
if
PROFILE
:
if
config
.
profile
:
await
send_profile_request
(
"STOP_PROFILE"
,
item_count
,
session
=
session
)
await
send_profile_request
(
"STOP_PROFILE"
,
http_url
,
session
=
session
)
completion_end_time
=
asyncio
.
get_event_loop
().
time
()
completion_end_time
=
asyncio
.
get_event_loop
().
time
()
total_duration
=
completion_end_time
-
send_start_time
total_duration
=
completion_end_time
-
send_start_time
...
@@ -516,88 +808,6 @@ async def run_benchmark(rps, duration_secs, item_count):
...
@@ -516,88 +808,6 @@ async def run_benchmark(rps, duration_secs, item_count):
duration_secs
,
duration_secs
,
item_count
,
item_count
,
send_start_time
,
send_start_time
,
config
,
api_name
,
)
)
async
def
main
():
"""Main function that runs benchmarks for all RPS values."""
total_combinations
=
(
len
(
DURATION_SECS_VALUES
)
*
len
(
RPS_VALUES
)
*
len
(
ITEM_COUNT_VALUES
)
)
print
(
f
"Running benchmarks for
{
len
(
DURATION_SECS_VALUES
)
}
duration "
f
"values,
{
len
(
RPS_VALUES
)
}
RPS values, and "
f
"
{
len
(
ITEM_COUNT_VALUES
)
}
item count values = "
f
"
{
total_combinations
}
total combinations"
)
print
(
f
"Server Type:
{
SERVER_TYPE
}
"
)
print
(
f
"HTTP Mode: SINGLE_ITEM_SCORING"
)
print
(
f
"Score API URL:
{
HTTP_URL
}
"
)
print
(
f
"Query tokens per request:
{
SCORE_QUERY_TOKENS
}
"
)
print
(
f
"Item tokens per item:
{
SCORE_ITEM_TOKENS
}
"
)
print
(
f
"Items per request (batch size):
{
ITEM_COUNT_VALUES
}
"
)
print
(
f
"Profiling Enabled:
{
PROFILE
}
"
)
print
(
f
"Duration values:
{
DURATION_SECS_VALUES
}
"
)
print
(
f
"RPS values:
{
RPS_VALUES
}
"
)
print
(
f
"Item count values:
{
ITEM_COUNT_VALUES
}
"
)
print
(
"="
*
80
)
all_results
=
[]
for
duration_secs
in
DURATION_SECS_VALUES
:
for
rps
in
RPS_VALUES
:
for
item_count
in
ITEM_COUNT_VALUES
:
result
=
await
run_benchmark
(
rps
,
duration_secs
,
item_count
)
all_results
.
extend
(
result
)
# Extend with minute results
# Print CSV header and results
print
(
"
\n
"
+
"="
*
80
)
print
(
"FINAL CSV RESULTS:"
)
print
(
"="
*
80
)
# CSV Header
headers
=
[
"test_duration_secs"
,
"minute_interval"
,
"target_rps"
,
"item_count"
,
"server_type"
,
"distribution"
,
"unique_requests"
,
"total_requests"
,
"successful_requests"
,
"failed_requests"
,
"send_duration_secs"
,
"total_duration_secs"
,
"avg_response_time_ms"
,
"p50_response_time_ms"
,
"p90_response_time_ms"
,
"p99_response_time_ms"
,
]
print
(
","
.
join
(
headers
))
# CSV Data
for
result
in
all_results
:
row
=
[
result
[
"test_duration_secs"
],
result
[
"minute_interval"
],
result
[
"target_rps"
],
result
[
"item_count"
],
result
[
"server_type"
],
result
[
"distribution"
],
result
[
"unique_requests"
],
result
[
"total_requests"
],
result
[
"successful_requests"
],
result
[
"failed_requests"
],
f
"
{
result
[
'send_duration_secs'
]:.
2
f
}
"
,
f
"
{
result
[
'total_duration_secs'
]:.
2
f
}
"
,
f
"
{
result
[
'avg_response_time_ms'
]:.
2
f
}
"
,
f
"
{
result
[
'p50_response_time_ms'
]:.
2
f
}
"
,
f
"
{
result
[
'p90_response_time_ms'
]:.
2
f
}
"
,
f
"
{
result
[
'p99_response_time_ms'
]:.
2
f
}
"
,
]
print
(
","
.
join
(
map
(
str
,
row
)))
if
__name__
==
"__main__"
:
asyncio
.
run
(
main
())
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment