Unverified Commit 1823a00d authored by Ming Yang's avatar Ming Yang Committed by GitHub
Browse files

[Misc] Support bench serve long context (#24373)


Signed-off-by: default avatarMing Yang <minos.future@gmail.com>
parent ed16d0f2
...@@ -45,3 +45,34 @@ def test_bench_serve(server): ...@@ -45,3 +45,34 @@ def test_bench_serve(server):
print(result.stderr) print(result.stderr)
assert result.returncode == 0, f"Benchmark failed: {result.stderr}" assert result.returncode == 0, f"Benchmark failed: {result.stderr}"
@pytest.mark.benchmark
def test_bench_serve_chat(server):
command = [
"vllm",
"bench",
"serve",
"--model",
MODEL_NAME,
"--host",
server.host,
"--port",
str(server.port),
"--dataset-name",
"random",
"--random-input-len",
"32",
"--random-output-len",
"4",
"--num-prompts",
"5",
"--endpoint",
"/v1/chat/completions",
"--endpoint-type",
"openai-chat",
]
result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)
assert result.returncode == 0, f"Benchmark failed: {result.stderr}"
...@@ -17,6 +17,47 @@ from tqdm.asyncio import tqdm ...@@ -17,6 +17,47 @@ from tqdm.asyncio import tqdm
AIOHTTP_TIMEOUT = aiohttp.ClientTimeout(total=6 * 60 * 60) AIOHTTP_TIMEOUT = aiohttp.ClientTimeout(total=6 * 60 * 60)
class StreamedResponseHandler:
"""Handles streaming HTTP responses by accumulating chunks until complete
messages are available."""
def __init__(self):
self.buffer = ""
def add_chunk(self, chunk_bytes: bytes) -> list[str]:
"""Add a chunk of bytes to the buffer and return any complete
messages."""
chunk_str = chunk_bytes.decode("utf-8")
self.buffer += chunk_str
messages = []
# Split by double newlines (SSE message separator)
while "\n\n" in self.buffer:
message, self.buffer = self.buffer.split("\n\n", 1)
message = message.strip()
if message:
messages.append(message)
# if self.buffer is not empty, check if it is a complete message
# by removing data: prefix and check if it is a valid JSON
if self.buffer.startswith("data: "):
message_content = self.buffer.removeprefix("data: ").strip()
if message_content == "[DONE]":
messages.append(self.buffer.strip())
self.buffer = ""
elif message_content:
try:
json.loads(message_content)
messages.append(self.buffer.strip())
self.buffer = ""
except json.JSONDecodeError:
# Incomplete JSON, wait for more chunks.
pass
return messages
@dataclass @dataclass
class RequestFuncInput: class RequestFuncInput:
"""The input for the request function.""" """The input for the request function."""
...@@ -102,46 +143,50 @@ async def async_request_openai_completions( ...@@ -102,46 +143,50 @@ async def async_request_openai_completions(
headers=headers) as response: headers=headers) as response:
if response.status == 200: if response.status == 200:
first_chunk_received = False first_chunk_received = False
async for chunk_bytes in response.content: handler = StreamedResponseHandler()
async for chunk_bytes in response.content.iter_any():
chunk_bytes = chunk_bytes.strip() chunk_bytes = chunk_bytes.strip()
if not chunk_bytes: if not chunk_bytes:
continue continue
chunk_bytes = chunk_bytes.decode("utf-8")
# NOTE: SSE comments (often used as pings) start with
# a colon. These are not JSON data payload and should
# be skipped.
if chunk_bytes.startswith(":"):
continue
chunk = chunk_bytes.removeprefix("data: ") messages = handler.add_chunk(chunk_bytes)
for message in messages:
# NOTE: SSE comments (often used as pings) start with
# a colon. These are not JSON data payload and should
# be skipped.
if message.startswith(":"):
continue
if chunk != "[DONE]": chunk = message.removeprefix("data: ")
data = json.loads(chunk)
# NOTE: Some completion API might have a last if chunk != "[DONE]":
# usage summary response without a token so we data = json.loads(chunk)
# want to check a token was generated
if choices := data.get("choices"):
# Note that text could be empty here
# e.g. for special tokens
text = choices[0].get("text")
timestamp = time.perf_counter()
# First token
if not first_chunk_received:
first_chunk_received = True
ttft = time.perf_counter() - st
output.ttft = ttft
# Decoding phase # NOTE: Some completion API might have a last
else: # usage summary response without a token so we
output.itl.append(timestamp - # want to check a token was generated
most_recent_timestamp) if choices := data.get("choices"):
# Note that text could be empty here
# e.g. for special tokens
text = choices[0].get("text")
timestamp = time.perf_counter()
# First token
if not first_chunk_received:
first_chunk_received = True
ttft = time.perf_counter() - st
output.ttft = ttft
most_recent_timestamp = timestamp # Decoding phase
generated_text += text or "" else:
elif usage := data.get("usage"): output.itl.append(timestamp -
output.output_tokens = usage.get( most_recent_timestamp)
"completion_tokens")
most_recent_timestamp = timestamp
generated_text += text or ""
elif usage := data.get("usage"):
output.output_tokens = usage.get(
"completion_tokens")
if first_chunk_received: if first_chunk_received:
output.success = True output.success = True
else: else:
...@@ -227,41 +272,44 @@ async def async_request_openai_chat_completions( ...@@ -227,41 +272,44 @@ async def async_request_openai_chat_completions(
async with session.post(url=api_url, json=payload, async with session.post(url=api_url, json=payload,
headers=headers) as response: headers=headers) as response:
if response.status == 200: if response.status == 200:
async for chunk_bytes in response.content: handler = StreamedResponseHandler()
async for chunk_bytes in response.content.iter_any():
chunk_bytes = chunk_bytes.strip() chunk_bytes = chunk_bytes.strip()
if not chunk_bytes: if not chunk_bytes:
continue continue
chunk_bytes = chunk_bytes.decode("utf-8")
# NOTE: SSE comments (often used as pings) start with
# a colon. These are not JSON data payload and should
# be skipped.
if chunk_bytes.startswith(":"):
continue
chunk = chunk_bytes.removeprefix("data: ") messages = handler.add_chunk(chunk_bytes)
for message in messages:
# NOTE: SSE comments (often used as pings) start with
# a colon. These are not JSON data payload and should
# be skipped.
if message.startswith(":"):
continue
chunk = message.removeprefix("data: ")
if chunk != "[DONE]": if chunk != "[DONE]":
timestamp = time.perf_counter() timestamp = time.perf_counter()
data = json.loads(chunk) data = json.loads(chunk)
if choices := data.get("choices"): if choices := data.get("choices"):
content = choices[0]["delta"].get("content") content = choices[0]["delta"].get("content")
# First token # First token
if ttft == 0.0: if ttft == 0.0:
ttft = timestamp - st ttft = timestamp - st
output.ttft = ttft output.ttft = ttft
# Decoding phase # Decoding phase
else: else:
output.itl.append(timestamp - output.itl.append(timestamp -
most_recent_timestamp) most_recent_timestamp)
generated_text += content or "" generated_text += content or ""
elif usage := data.get("usage"): elif usage := data.get("usage"):
output.output_tokens = usage.get( output.output_tokens = usage.get(
"completion_tokens") "completion_tokens")
most_recent_timestamp = timestamp most_recent_timestamp = timestamp
output.generated_text = generated_text output.generated_text = generated_text
output.success = True output.success = True
...@@ -347,36 +395,40 @@ async def async_request_openai_audio( ...@@ -347,36 +395,40 @@ async def async_request_openai_audio(
data=form, data=form,
headers=headers) as response: headers=headers) as response:
if response.status == 200: if response.status == 200:
async for chunk_bytes in response.content: handler = StreamedResponseHandler()
async for chunk_bytes in response.content.iter_any():
chunk_bytes = chunk_bytes.strip() chunk_bytes = chunk_bytes.strip()
if not chunk_bytes: if not chunk_bytes:
continue continue
chunk = chunk_bytes.decode("utf-8").removeprefix( messages = handler.add_chunk(chunk_bytes)
"data: ") for message in messages:
if chunk != "[DONE]": chunk = message.decode("utf-8").removeprefix(
timestamp = time.perf_counter() "data: ")
data = json.loads(chunk) if chunk != "[DONE]":
timestamp = time.perf_counter()
if choices := data.get("choices"): data = json.loads(chunk)
content = choices[0]["delta"].get(
"content") if choices := data.get("choices"):
# First token content = choices[0]["delta"].get(
if ttft == 0.0: "content")
ttft = timestamp - st # First token
output.ttft = ttft if ttft == 0.0:
ttft = timestamp - st
# Decoding phase output.ttft = ttft
else:
output.itl.append( # Decoding phase
timestamp - most_recent_timestamp) else:
output.itl.append(
generated_text += content or "" timestamp - most_recent_timestamp)
elif usage := data.get("usage"):
output.output_tokens = usage.get( generated_text += content or ""
"completion_tokens") elif usage := data.get("usage"):
output.output_tokens = usage.get(
most_recent_timestamp = timestamp "completion_tokens")
most_recent_timestamp = timestamp
output.generated_text = generated_text output.generated_text = generated_text
output.success = True output.success = True
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment