Commit 53cb1582 authored by Pan Zezhong's avatar Pan Zezhong
Browse files

add lock for single request handling, add test server script

parent f0fd0463
...@@ -22,3 +22,8 @@ cache/ ...@@ -22,3 +22,8 @@ cache/
#GGUF #GGUF
*.gguf *.gguf
# txt
*.txt
*.http
...@@ -384,7 +384,7 @@ class JiugeForCauslLM: ...@@ -384,7 +384,7 @@ class JiugeForCauslLM:
temperature = request.get("temperature", 1.0) temperature = request.get("temperature", 1.0)
topk = request.get("top_k", 1) topk = request.get("top_k", 1)
topp = request.get("top_p", 1.0) topp = request.get("top_p", 1.0)
max_tokens = request.get("max_tokens", 512) max_tokens = request.get("max_tokens", self.meta.dctx)
input_content = self.tokenizer.apply_chat_template( input_content = self.tokenizer.apply_chat_template(
conversation=messages, conversation=messages,
add_generation_prompt=True, add_generation_prompt=True,
......
...@@ -3,7 +3,7 @@ from libinfinicore_infer import DeviceType ...@@ -3,7 +3,7 @@ from libinfinicore_infer import DeviceType
from fastapi import FastAPI, Request from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse, JSONResponse from fastapi.responses import StreamingResponse, JSONResponse
import asyncio import anyio
import uvicorn import uvicorn
import time import time
import uuid import uuid
...@@ -53,11 +53,18 @@ signal.signal(signal.SIGTERM, signal_handler) # Handle docker stop / system shu ...@@ -53,11 +53,18 @@ signal.signal(signal.SIGTERM, signal_handler) # Handle docker stop / system shu
app = FastAPI() app = FastAPI()
# TO REMOVE: Global lock to ensure only one request is handled at a time
# Remove this after multiple requests handling is implemented
request_lock = anyio.Lock()
async def chat_stream(id_, request_data, request: Request):
try: def chunk_json(id_, content=None, role=None, finish_reason=None):
chunk = json.dumps( delta = {}
{ if content:
delta["content"] = content
if role:
delta["role"] = role
return {
"id": id_, "id": id_,
"object": "chat.completion.chunk", "object": "chat.completion.chunk",
"created": int(time.time()), "created": int(time.time()),
...@@ -66,12 +73,19 @@ async def chat_stream(id_, request_data, request: Request): ...@@ -66,12 +73,19 @@ async def chat_stream(id_, request_data, request: Request):
"choices": [ "choices": [
{ {
"index": 0, "index": 0,
"delta": {"role": "assistant", "content": ""}, "delta": delta,
"logprobs": None, "logprobs": None,
"finish_reason": None, "finish_reason": finish_reason,
} }
], ],
}, }
async def chat_stream(id_, request_data, request: Request):
try:
await request_lock.acquire()
chunk = json.dumps(
chunk_json(id_, content="", role="assistant"),
ensure_ascii=False, ensure_ascii=False,
) )
yield f"{chunk}\n\n" yield f"{chunk}\n\n"
...@@ -81,36 +95,15 @@ async def chat_stream(id_, request_data, request: Request): ...@@ -81,36 +95,15 @@ async def chat_stream(id_, request_data, request: Request):
print("Client disconnected. Aborting stream.") print("Client disconnected. Aborting stream.")
break break
chunk = json.dumps( chunk = json.dumps(
{ chunk_json(id_, content=token),
"id": id_,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": "jiuge",
"system_fingerprint": None,
"choices": [
{
"index": 0,
"delta": {"content": token},
"logprobs": None,
"finish_reason": None,
}
],
},
ensure_ascii=False, ensure_ascii=False,
) )
yield f"{chunk}\n\n" yield f"{chunk}\n\n"
finally: finally:
if request_lock.locked():
request_lock.release()
chunk = json.dumps( chunk = json.dumps(
{ chunk_json(id_, finish_reason="stop"),
"id": id_,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": "jiuge",
"system_fingerprint": None,
"choices": [
{"index": 0, "delta": {}, "logprobs": None, "finish_reason": "stop"}
],
},
ensure_ascii=False, ensure_ascii=False,
) )
yield f"{chunk}\n\n" yield f"{chunk}\n\n"
...@@ -121,20 +114,9 @@ def chat(id_, request_data): ...@@ -121,20 +114,9 @@ def chat(id_, request_data):
request_data, request_data,
kv_cache, kv_cache,
) )
response = chunk_json(
response = { id_, content=output_text.strip(), role="assistant", finish_reason="stop"
"id": id_, )
"object": "chat.completion",
"created": int(time.time()),
"model": "jiuge",
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": output_text.strip()},
"finish_reason": "stop",
}
],
}
return JSONResponse(response) return JSONResponse(response)
......
import requests
import json
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
API_URL = "http://localhost:8000/jiuge/chat/completions"
MODEL = "FM9G-7B"
PROMPT = ["给我讲个故事", "山东最高的山是?"]
CONCURRENCY = 10 # 并发用户数量
def single_run(user_id):
payload = {
"model": MODEL,
"messages": [{"role": "user", "content": PROMPT[user_id % len(PROMPT)]}],
"stream": True
}
headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}
print(f"[User {user_id}] Sending request...")
start = time.perf_counter()
resp = requests.post(API_URL, headers=headers, json=payload, stream=True)
resp.raise_for_status()
ttfb = resp.elapsed.total_seconds() # HTTP header 到达时间
header_received = time.perf_counter()
if resp.encoding is None:
resp.encoding = 'utf-8'
tokens = 0
chunks = []
for line in resp.iter_lines(decode_unicode=True):
if not line or line.strip() == "[DONE]":
continue
s = line.strip()
if s.startswith("data:"):
s = s[len("data:"):].strip()
try:
data = json.loads(s)
except json.JSONDecodeError:
continue
text = data.get("choices", [{}])[0].get("delta", {}).get("content")
if text:
chunks.append(text)
tokens += 1
stream_done = time.perf_counter()
# 时间计算
stream_time = stream_done - header_received
total_time = stream_done - start
time_per_token_ms = (stream_time / tokens * 1000) if tokens else float('inf')
tps = tokens / stream_time if stream_time > 0 else 0
return {
"user": user_id,
"ttfb": ttfb,
"stream_time": stream_time,
"total_time": total_time,
"tokens": tokens,
"time_per_token_ms": time_per_token_ms,
"tps": tps,
"chunks": chunks
}
def main():
worst = None
worst_stream = -1.0
best_stream = float('inf')
results = []
with ThreadPoolExecutor(max_workers=CONCURRENCY) as e:
futures = [e.submit(single_run, uid) for uid in range(CONCURRENCY)]
for future in as_completed(futures):
r = future.result()
results.append(r)
print(
f"User {r['user']} → TTFB = {r['ttfb']:.3f}s, latency = {r['stream_time']:.3f}s, "
f"tokens = {r['tokens']}, time/token = {r['time_per_token_ms']:.2f} ms, "
f"TPS = {r['tps']:.1f} tok/s"
)
if r['stream_time'] > worst_stream:
worst_stream = r['stream_time']
worst = r
if r['stream_time'] < best_stream:
best_stream = r['stream_time']
best = r
with open("responses.txt", "w", encoding="utf-8") as fw:
for r in results:
fw.write(f"[User {r['user']}]\n")
text = "".join(r["chunks"])
# fixed = text.encode('latin-1').decode('utf-8')
fixed = text
fw.write(fixed)
fw.write("\n\n")
n = CONCURRENCY
avg_ttfb = sum(r['ttfb'] for r in results) / n
avg_token = sum(r['tokens'] for r in results) / n
avg_stream = sum(r['stream_time'] for r in results) / n
avg_tps = sum(r['tps'] for r in results) / n
avg_time_per_token = sum(r['time_per_token_ms'] for r in results) / n
print(f"\n✅ All {n} requests completed.")
print(f"Averages → TTFB = {avg_ttfb:.3f}s, latency = {avg_stream:.3f}s, "
f"tokens = {avg_token:.1f}, TPS = {avg_tps:.1f} tok/s, time/token = {avg_time_per_token:.2f} ms")
if best:
print("\nFastest user:")
print(
f"User {best['user']} → latency = {best['stream_time']:.3f}s, "
f"tokens = {best['tokens']}, TPS = {best['tps']:.1f} tok/s, "
f"time/token = {best['time_per_token_ms']:.2f} ms"
)
if worst:
print("\nSlowest user:")
print(
f"User {worst['user']} → latency = {worst['stream_time']:.3f}s, "
f"tokens = {worst['tokens']}, TPS = {worst['tps']:.1f} tok/s, "
f"time/token = {worst['time_per_token_ms']:.2f} ms"
)
if __name__ == "__main__":
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment