Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
jerrrrry
vllm_test_tools
Commits
9ff70a90
Commit
9ff70a90
authored
Sep 25, 2025
by
jerrrrry
Browse files
Upload New File
parent
f256c205
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
628 additions
and
0 deletions
+628
-0
092/backend_request_func.py
092/backend_request_func.py
+628
-0
No files found.
092/backend_request_func.py
0 → 100644
View file @
9ff70a90
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import
io
import
json
import
os
import
sys
import
time
import
traceback
from
dataclasses
import
dataclass
,
field
from
typing
import
Optional
,
Union
import
aiohttp
import
huggingface_hub.constants
from
tqdm.asyncio
import
tqdm
from
transformers
import
AutoTokenizer
,
PreTrainedTokenizer
,
PreTrainedTokenizerFast
# NOTE(simon): do not import vLLM here so the benchmark script
# can run without vLLM installed.
AIOHTTP_TIMEOUT
=
aiohttp
.
ClientTimeout
(
total
=
6
*
60
*
60
)
@
dataclass
class
RequestFuncInput
:
prompt
:
str
api_url
:
str
prompt_len
:
int
output_len
:
int
model
:
str
model_name
:
Optional
[
str
]
=
None
logprobs
:
Optional
[
int
]
=
None
extra_body
:
Optional
[
dict
]
=
None
multi_modal_content
:
Optional
[
dict
]
=
None
ignore_eos
:
bool
=
False
language
:
Optional
[
str
]
=
None
@
dataclass
class
RequestFuncOutput
:
generated_text
:
str
=
""
success
:
bool
=
False
latency
:
float
=
0.0
output_tokens
:
int
=
0
ttft
:
float
=
0.0
# Time to first token
itl
:
list
[
float
]
=
field
(
default_factory
=
list
)
# list of inter-token latencies
tpot
:
float
=
0.0
# avg next-token latencies
prompt_len
:
int
=
0
error
:
str
=
""
async
def
async_request_tgi
(
request_func_input
:
RequestFuncInput
,
pbar
:
Optional
[
tqdm
]
=
None
,
)
->
RequestFuncOutput
:
api_url
=
request_func_input
.
api_url
assert
api_url
.
endswith
(
"generate_stream"
)
async
with
aiohttp
.
ClientSession
(
trust_env
=
True
,
timeout
=
AIOHTTP_TIMEOUT
)
as
session
:
params
=
{
"max_new_tokens"
:
request_func_input
.
output_len
,
"do_sample"
:
True
,
"temperature"
:
0.01
,
# TGI does not accept 0.0 temperature.
"top_p"
:
0.99
,
# TGI does not accept 1.0 top_p.
"truncate"
:
request_func_input
.
prompt_len
,
"ignore_eos_token"
:
request_func_input
.
ignore_eos
,
}
payload
=
{
"inputs"
:
request_func_input
.
prompt
,
"parameters"
:
params
,
}
output
=
RequestFuncOutput
()
output
.
prompt_len
=
request_func_input
.
prompt_len
if
request_func_input
.
ignore_eos
:
output
.
output_tokens
=
request_func_input
.
output_len
else
:
output
.
output_tokens
=
None
ttft
=
0.0
st
=
time
.
perf_counter
()
most_recent_timestamp
=
st
try
:
async
with
session
.
post
(
url
=
api_url
,
json
=
payload
)
as
response
:
if
response
.
status
==
200
:
async
for
chunk_bytes
in
response
.
content
:
chunk_bytes
=
chunk_bytes
.
strip
()
if
not
chunk_bytes
:
continue
chunk_bytes
=
chunk_bytes
.
decode
(
"utf-8"
)
# NOTE: Sometimes TGI returns a ping response without
# any data, we should skip it.
if
chunk_bytes
.
startswith
(
":"
):
continue
chunk
=
chunk_bytes
.
removeprefix
(
"data:"
)
data
=
json
.
loads
(
chunk
)
timestamp
=
time
.
perf_counter
()
# First token
if
ttft
==
0.0
:
ttft
=
time
.
perf_counter
()
-
st
output
.
ttft
=
ttft
# Decoding phase
else
:
output
.
itl
.
append
(
timestamp
-
most_recent_timestamp
)
most_recent_timestamp
=
timestamp
output
.
latency
=
most_recent_timestamp
-
st
output
.
success
=
True
output
.
generated_text
=
data
[
"generated_text"
]
else
:
output
.
error
=
response
.
reason
or
""
output
.
success
=
False
except
Exception
:
output
.
success
=
False
exc_info
=
sys
.
exc_info
()
output
.
error
=
""
.
join
(
traceback
.
format_exception
(
*
exc_info
))
if
pbar
:
pbar
.
update
(
1
)
return
output
async
def
async_request_trt_llm
(
request_func_input
:
RequestFuncInput
,
pbar
:
Optional
[
tqdm
]
=
None
,
)
->
RequestFuncOutput
:
api_url
=
request_func_input
.
api_url
assert
api_url
.
endswith
(
"generate_stream"
)
async
with
aiohttp
.
ClientSession
(
trust_env
=
True
,
timeout
=
AIOHTTP_TIMEOUT
)
as
session
:
payload
=
{
"accumulate_tokens"
:
True
,
"text_input"
:
request_func_input
.
prompt
,
"temperature"
:
0.0
,
"top_p"
:
1.0
,
"max_tokens"
:
request_func_input
.
output_len
,
"stream"
:
True
,
}
if
request_func_input
.
ignore_eos
:
payload
[
"min_length"
]
=
request_func_input
.
output_len
output
=
RequestFuncOutput
()
output
.
prompt_len
=
request_func_input
.
prompt_len
ttft
=
0.0
st
=
time
.
perf_counter
()
most_recent_timestamp
=
st
try
:
async
with
session
.
post
(
url
=
api_url
,
json
=
payload
)
as
response
:
if
response
.
status
==
200
:
async
for
chunk_bytes
in
response
.
content
:
chunk_bytes
=
chunk_bytes
.
strip
()
if
not
chunk_bytes
:
continue
chunk
=
chunk_bytes
.
decode
(
"utf-8"
).
removeprefix
(
"data:"
)
data
=
json
.
loads
(
chunk
)
output
.
generated_text
+=
data
[
"text_output"
]
timestamp
=
time
.
perf_counter
()
# First token
if
ttft
==
0.0
:
ttft
=
timestamp
-
st
output
.
ttft
=
ttft
# Decoding phase
else
:
output
.
itl
.
append
(
timestamp
-
most_recent_timestamp
)
most_recent_timestamp
=
timestamp
output
.
latency
=
most_recent_timestamp
-
st
output
.
success
=
True
else
:
output
.
error
=
response
.
reason
or
""
output
.
success
=
False
except
Exception
:
output
.
success
=
False
exc_info
=
sys
.
exc_info
()
output
.
error
=
""
.
join
(
traceback
.
format_exception
(
*
exc_info
))
if
pbar
:
pbar
.
update
(
1
)
return
output
async
def
async_request_deepspeed_mii
(
request_func_input
:
RequestFuncInput
,
pbar
:
Optional
[
tqdm
]
=
None
,
)
->
RequestFuncOutput
:
api_url
=
request_func_input
.
api_url
assert
api_url
.
endswith
((
"completions"
,
"profile"
)),
(
"OpenAI Completions API URL must end with 'completions' or 'profile'."
)
async
with
aiohttp
.
ClientSession
(
trust_env
=
True
,
timeout
=
AIOHTTP_TIMEOUT
)
as
session
:
payload
=
{
"model"
:
request_func_input
.
model
,
"prompt"
:
request_func_input
.
prompt
,
"max_tokens"
:
request_func_input
.
output_len
,
"temperature"
:
0.01
,
# deepspeed-mii does not accept 0.0 temp.
"top_p"
:
1.0
,
}
headers
=
{
"Authorization"
:
f
"Bearer
{
os
.
environ
.
get
(
'OPENAI_API_KEY'
)
}
"
}
output
=
RequestFuncOutput
()
output
.
prompt_len
=
request_func_input
.
prompt_len
# NOTE: DeepSpeed-MII doesn't support streaming as of Jan 28 2024,
# will use 0 as placeholder.
# See https://github.com/microsoft/DeepSpeed-MII/pull/311
output
.
ttft
=
0
st
=
time
.
perf_counter
()
try
:
async
with
session
.
post
(
url
=
api_url
,
json
=
payload
,
headers
=
headers
)
as
response
:
if
response
.
status
==
200
:
parsed_resp
=
await
response
.
json
()
output
.
latency
=
time
.
perf_counter
()
-
st
if
"choices"
in
parsed_resp
:
output
.
generated_text
=
parsed_resp
[
"choices"
][
0
][
"text"
]
elif
"text"
in
parsed_resp
:
output
.
generated_text
=
parsed_resp
[
"text"
][
0
]
else
:
output
.
error
=
(
"Unexpected response format: "
"neither 'choices' nor 'text' found"
)
output
.
success
=
False
output
.
success
=
True
else
:
output
.
error
=
response
.
reason
or
""
output
.
success
=
False
except
Exception
:
output
.
success
=
False
exc_info
=
sys
.
exc_info
()
output
.
error
=
""
.
join
(
traceback
.
format_exception
(
*
exc_info
))
if
pbar
:
pbar
.
update
(
1
)
return
output
async
def
async_request_openai_completions
(
request_func_input
:
RequestFuncInput
,
pbar
:
Optional
[
tqdm
]
=
None
,
)
->
RequestFuncOutput
:
api_url
=
request_func_input
.
api_url
assert
api_url
.
endswith
((
"completions"
,
"profile"
)),
(
"OpenAI Completions API URL must end with 'completions' or 'profile'."
)
async
with
aiohttp
.
ClientSession
(
trust_env
=
True
,
timeout
=
AIOHTTP_TIMEOUT
)
as
session
:
payload
=
{
"model"
:
request_func_input
.
model_name
if
request_func_input
.
model_name
else
request_func_input
.
model
,
"prompt"
:
request_func_input
.
prompt
,
"temperature"
:
0.0
,
"repetition_penalty"
:
1.0
,
"max_tokens"
:
request_func_input
.
output_len
,
"logprobs"
:
request_func_input
.
logprobs
,
"stream"
:
True
,
"stream_options"
:
{
"include_usage"
:
True
,
},
}
if
request_func_input
.
ignore_eos
:
payload
[
"ignore_eos"
]
=
request_func_input
.
ignore_eos
if
request_func_input
.
extra_body
:
payload
.
update
(
request_func_input
.
extra_body
)
headers
=
{
"Authorization"
:
f
"Bearer
{
os
.
environ
.
get
(
'OPENAI_API_KEY'
)
}
"
}
output
=
RequestFuncOutput
()
output
.
prompt_len
=
request_func_input
.
prompt_len
generated_text
=
""
st
=
time
.
perf_counter
()
most_recent_timestamp
=
st
try
:
async
with
session
.
post
(
url
=
api_url
,
json
=
payload
,
headers
=
headers
)
as
response
:
if
response
.
status
==
200
:
first_chunk_received
=
False
async
for
chunk_bytes
in
response
.
content
:
chunk_bytes
=
chunk_bytes
.
strip
()
if
not
chunk_bytes
:
continue
chunk
=
chunk_bytes
.
decode
(
"utf-8"
).
removeprefix
(
"data: "
)
if
chunk
!=
"[DONE]"
:
data
=
json
.
loads
(
chunk
)
# NOTE: Some completion API might have a last
# usage summary response without a token so we
# want to check a token was generated
if
choices
:
=
data
.
get
(
"choices"
):
# Note that text could be empty here
# e.g. for special tokens
text
=
choices
[
0
].
get
(
"text"
)
timestamp
=
time
.
perf_counter
()
# First token
if
not
first_chunk_received
:
first_chunk_received
=
True
ttft
=
time
.
perf_counter
()
-
st
output
.
ttft
=
ttft
# Decoding phase
else
:
output
.
itl
.
append
(
timestamp
-
most_recent_timestamp
)
most_recent_timestamp
=
timestamp
generated_text
+=
text
or
""
if
usage
:
=
data
.
get
(
"usage"
):
output
.
output_tokens
=
usage
.
get
(
"completion_tokens"
)
if
first_chunk_received
:
output
.
success
=
True
else
:
output
.
success
=
False
output
.
error
=
(
"Never received a valid chunk to calculate TTFT."
"This response will be marked as failed!"
)
output
.
generated_text
=
generated_text
output
.
latency
=
most_recent_timestamp
-
st
else
:
output
.
error
=
response
.
reason
or
""
output
.
success
=
False
except
Exception
:
output
.
success
=
False
exc_info
=
sys
.
exc_info
()
output
.
error
=
""
.
join
(
traceback
.
format_exception
(
*
exc_info
))
if
pbar
:
pbar
.
update
(
1
)
return
output
async
def
async_request_openai_chat_completions
(
request_func_input
:
RequestFuncInput
,
pbar
:
Optional
[
tqdm
]
=
None
,
)
->
RequestFuncOutput
:
api_url
=
request_func_input
.
api_url
assert
api_url
.
endswith
((
"chat/completions"
,
"profile"
)),
(
"OpenAI Chat Completions API URL must end with 'chat/completions'."
)
async
with
aiohttp
.
ClientSession
(
trust_env
=
True
,
timeout
=
AIOHTTP_TIMEOUT
)
as
session
:
content
=
[{
"type"
:
"text"
,
"text"
:
request_func_input
.
prompt
}]
if
request_func_input
.
multi_modal_content
:
content
.
append
(
request_func_input
.
multi_modal_content
)
payload
=
{
"model"
:
request_func_input
.
model_name
if
request_func_input
.
model_name
else
request_func_input
.
model
,
"messages"
:
[
{
"role"
:
"user"
,
"content"
:
content
},
],
"temperature"
:
0.0
,
"max_completion_tokens"
:
request_func_input
.
output_len
,
"stream"
:
True
,
"stream_options"
:
{
"include_usage"
:
True
,
},
}
if
request_func_input
.
ignore_eos
:
payload
[
"ignore_eos"
]
=
request_func_input
.
ignore_eos
if
request_func_input
.
extra_body
:
payload
.
update
(
request_func_input
.
extra_body
)
headers
=
{
"Content-Type"
:
"application/json"
,
"Authorization"
:
f
"Bearer
{
os
.
environ
.
get
(
'OPENAI_API_KEY'
)
}
"
,
}
output
=
RequestFuncOutput
()
output
.
prompt_len
=
request_func_input
.
prompt_len
generated_text
=
""
ttft
=
0.0
st
=
time
.
perf_counter
()
most_recent_timestamp
=
st
try
:
async
with
session
.
post
(
url
=
api_url
,
json
=
payload
,
headers
=
headers
)
as
response
:
if
response
.
status
==
200
:
async
for
chunk_bytes
in
response
.
content
:
chunk_bytes
=
chunk_bytes
.
strip
()
if
not
chunk_bytes
:
continue
chunk_bytes
=
chunk_bytes
.
decode
(
"utf-8"
)
# NOTE: SSE comments (often used as pings) start with a colon.
# These are not JSON data payload and should be skipped.
if
chunk_bytes
.
startswith
(
":"
):
continue
chunk
=
chunk_bytes
.
removeprefix
(
"data: "
)
if
chunk
!=
"[DONE]"
:
timestamp
=
time
.
perf_counter
()
data
=
json
.
loads
(
chunk
)
if
choices
:
=
data
.
get
(
"choices"
):
content
=
choices
[
0
][
"delta"
].
get
(
"content"
)
# First token
if
ttft
==
0.0
:
ttft
=
timestamp
-
st
output
.
ttft
=
ttft
# Decoding phase
else
:
output
.
itl
.
append
(
timestamp
-
most_recent_timestamp
)
generated_text
+=
content
or
""
elif
usage
:
=
data
.
get
(
"usage"
):
output
.
output_tokens
=
usage
.
get
(
"completion_tokens"
)
most_recent_timestamp
=
timestamp
output
.
generated_text
=
generated_text
output
.
success
=
True
output
.
latency
=
most_recent_timestamp
-
st
else
:
output
.
error
=
response
.
reason
or
""
output
.
success
=
False
except
Exception
:
output
.
success
=
False
exc_info
=
sys
.
exc_info
()
output
.
error
=
""
.
join
(
traceback
.
format_exception
(
*
exc_info
))
if
pbar
:
pbar
.
update
(
1
)
return
output
async
def
async_request_openai_audio
(
request_func_input
:
RequestFuncInput
,
pbar
:
Optional
[
tqdm
]
=
None
,
)
->
RequestFuncOutput
:
# Lazy import without PlaceholderModule to avoid vllm dep.
import
soundfile
api_url
=
request_func_input
.
api_url
assert
api_url
.
endswith
((
"transcriptions"
,
"translations"
)),
(
"OpenAI Chat Completions API URL must end with 'transcriptions' "
)
"or `translations`."
async
with
aiohttp
.
ClientSession
(
trust_env
=
True
,
timeout
=
AIOHTTP_TIMEOUT
)
as
session
:
content
=
[{
"type"
:
"text"
,
"text"
:
request_func_input
.
prompt
}]
payload
=
{
"model"
:
request_func_input
.
model_name
if
request_func_input
.
model_name
else
request_func_input
.
model
,
"temperature"
:
0.0
,
"max_completion_tokens"
:
request_func_input
.
output_len
,
"stream"
:
True
,
"language"
:
"en"
,
# Flattened due to multipart/form-data
"stream_include_usage"
:
True
,
"stream_continuous_usage_stats"
:
True
,
}
if
request_func_input
.
extra_body
:
payload
.
update
(
request_func_input
.
extra_body
)
headers
=
{
"Authorization"
:
f
"Bearer
{
os
.
environ
.
get
(
'OPENAI_API_KEY'
)
}
"
,
}
# Send audio file
def
to_bytes
(
y
,
sr
):
buffer
=
io
.
BytesIO
()
soundfile
.
write
(
buffer
,
y
,
sr
,
format
=
"WAV"
)
buffer
.
seek
(
0
)
return
buffer
with
to_bytes
(
*
request_func_input
.
multi_modal_content
[
"audio"
])
as
f
:
form
=
aiohttp
.
FormData
()
form
.
add_field
(
"file"
,
f
,
content_type
=
"audio/wav"
)
for
key
,
value
in
payload
.
items
():
form
.
add_field
(
key
,
str
(
value
))
output
=
RequestFuncOutput
()
output
.
prompt_len
=
request_func_input
.
prompt_len
generated_text
=
""
ttft
=
0.0
st
=
time
.
perf_counter
()
most_recent_timestamp
=
st
try
:
async
with
session
.
post
(
url
=
api_url
,
data
=
form
,
headers
=
headers
)
as
response
:
if
response
.
status
==
200
:
async
for
chunk_bytes
in
response
.
content
:
chunk_bytes
=
chunk_bytes
.
strip
()
if
not
chunk_bytes
:
continue
chunk
=
chunk_bytes
.
decode
(
"utf-8"
).
removeprefix
(
"data: "
)
if
chunk
!=
"[DONE]"
:
timestamp
=
time
.
perf_counter
()
data
=
json
.
loads
(
chunk
)
if
choices
:
=
data
.
get
(
"choices"
):
content
=
choices
[
0
][
"delta"
].
get
(
"content"
)
# First token
if
ttft
==
0.0
:
ttft
=
timestamp
-
st
output
.
ttft
=
ttft
# Decoding phase
else
:
output
.
itl
.
append
(
timestamp
-
most_recent_timestamp
)
generated_text
+=
content
or
""
elif
usage
:
=
data
.
get
(
"usage"
):
output
.
output_tokens
=
usage
.
get
(
"completion_tokens"
)
most_recent_timestamp
=
timestamp
output
.
generated_text
=
generated_text
output
.
success
=
True
output
.
latency
=
most_recent_timestamp
-
st
else
:
output
.
error
=
response
.
reason
or
""
output
.
success
=
False
except
Exception
:
output
.
success
=
False
exc_info
=
sys
.
exc_info
()
output
.
error
=
""
.
join
(
traceback
.
format_exception
(
*
exc_info
))
if
pbar
:
pbar
.
update
(
1
)
return
output
def
get_model
(
pretrained_model_name_or_path
:
str
)
->
str
:
if
os
.
getenv
(
"VLLM_USE_MODELSCOPE"
,
"False"
).
lower
()
==
"true"
:
from
modelscope
import
snapshot_download
from
vllm.model_executor.model_loader.weight_utils
import
get_lock
# Use file lock to prevent multiple processes from
# downloading the same model weights at the same time.
with
get_lock
(
pretrained_model_name_or_path
):
model_path
=
snapshot_download
(
model_id
=
pretrained_model_name_or_path
,
local_files_only
=
huggingface_hub
.
constants
.
HF_HUB_OFFLINE
,
ignore_file_pattern
=
[
".*.pt"
,
".*.safetensors"
,
".*.bin"
],
)
return
model_path
return
pretrained_model_name_or_path
def
get_tokenizer
(
pretrained_model_name_or_path
:
str
,
tokenizer_mode
:
str
=
"auto"
,
trust_remote_code
:
bool
=
False
,
**
kwargs
,
)
->
Union
[
PreTrainedTokenizer
,
PreTrainedTokenizerFast
]:
if
pretrained_model_name_or_path
is
not
None
and
not
os
.
path
.
exists
(
pretrained_model_name_or_path
):
pretrained_model_name_or_path
=
get_model
(
pretrained_model_name_or_path
)
if
tokenizer_mode
==
"slow"
:
if
kwargs
.
get
(
"use_fast"
,
False
):
raise
ValueError
(
"Cannot use the fast tokenizer in slow tokenizer mode."
)
kwargs
[
"use_fast"
]
=
False
if
tokenizer_mode
==
"mistral"
:
try
:
from
vllm.transformers_utils.tokenizer
import
MistralTokenizer
except
ImportError
as
e
:
raise
ImportError
(
"MistralTokenizer requires vllm package.
\n
"
"Please install it with `pip install vllm` "
"to use mistral tokenizer mode."
)
from
e
return
MistralTokenizer
.
from_pretrained
(
str
(
pretrained_model_name_or_path
))
else
:
return
AutoTokenizer
.
from_pretrained
(
pretrained_model_name_or_path
,
trust_remote_code
=
trust_remote_code
,
**
kwargs
,
)
ASYNC_REQUEST_FUNCS
=
{
"tgi"
:
async_request_tgi
,
"vllm"
:
async_request_openai_completions
,
"lmdeploy"
:
async_request_openai_completions
,
"deepspeed-mii"
:
async_request_deepspeed_mii
,
"openai"
:
async_request_openai_completions
,
"openai-chat"
:
async_request_openai_chat_completions
,
"openai-audio"
:
async_request_openai_audio
,
"tensorrt-llm"
:
async_request_trt_llm
,
"scalellm"
:
async_request_openai_completions
,
"sglang"
:
async_request_openai_completions
,
"llama.cpp"
:
async_request_openai_completions
,
}
OPENAI_COMPATIBLE_BACKENDS
=
[
k
for
k
,
v
in
ASYNC_REQUEST_FUNCS
.
items
()
if
v
in
(
async_request_openai_completions
,
async_request_openai_chat_completions
)
]
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment