launch_server.py 4.32 KB
Newer Older
Pan Zezhong's avatar
Pan Zezhong committed
1
2
3
4
5
from jiuge import JiugeForCauslLM
from libinfinicore_infer import DeviceType

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse, JSONResponse
6
import anyio
Pan Zezhong's avatar
Pan Zezhong committed
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import uvicorn
import time
import uuid
import sys
import signal
import json

if len(sys.argv) < 3:
    print(
        "Usage: python launch_server.py [--cpu | --nvidia| --cambricon | --ascend | --metax | --moore] <path/to/model_dir> [n_device]"
    )
    sys.exit(1)
model_path = sys.argv[2]
device_type = DeviceType.DEVICE_TYPE_CPU
if sys.argv[1] == "--cpu":
    device_type = DeviceType.DEVICE_TYPE_CPU
elif sys.argv[1] == "--nvidia":
    device_type = DeviceType.DEVICE_TYPE_NVIDIA
elif sys.argv[1] == "--cambricon":
    device_type = DeviceType.DEVICE_TYPE_CAMBRICON
elif sys.argv[1] == "--ascend":
    device_type = DeviceType.DEVICE_TYPE_ASCEND
elif sys.argv[1] == "--metax":
    device_type = DeviceType.DEVICE_TYPE_METAX
elif sys.argv[1] == "--moore":
    device_type = DeviceType.DEVICE_TYPE_MOORE
else:
    print(
        "Usage: python launch_server.py [--cpu | --nvidia| --cambricon | --ascend | --metax | --moore] <path/to/model_dir> [n_device]"
    )
    sys.exit(1)
ndev = int(sys.argv[3]) if len(sys.argv) > 3 else 1

model = JiugeForCauslLM(model_path, device_type, ndev)
kv_cache = model.create_kv_cache()


def signal_handler(sig, frame):
    print(f"Received signal {sig}, cleaning up...")
    model.drop_kv_cache(kv_cache)
    model.destroy_model_instance()
    sys.exit(0)


signal.signal(signal.SIGINT, signal_handler)  # Handle Ctrl+C
signal.signal(signal.SIGTERM, signal_handler)  # Handle docker stop / system shutdown

app = FastAPI()

56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# TO REMOVE: Global lock to ensure only one request is handled at a time
# Remove this after multiple requests handling is implemented
request_lock = anyio.Lock()


def chunk_json(id_, content=None, role=None, finish_reason=None):
    delta = {}
    if content:
        delta["content"] = content
    if role:
        delta["role"] = role
    return {
        "id": id_,
        "object": "chat.completion.chunk",
        "created": int(time.time()),
        "model": "jiuge",
        "system_fingerprint": None,
        "choices": [
            {
                "index": 0,
                "delta": delta,
                "logprobs": None,
                "finish_reason": finish_reason,
            }
        ],
    }

Pan Zezhong's avatar
Pan Zezhong committed
83
84
85

async def chat_stream(id_, request_data, request: Request):
    try:
86
        await request_lock.acquire()
Pan Zezhong's avatar
Pan Zezhong committed
87
        chunk = json.dumps(
88
            chunk_json(id_, content="", role="assistant"),
Pan Zezhong's avatar
Pan Zezhong committed
89
90
91
92
93
94
95
96
97
            ensure_ascii=False,
        )
        yield f"{chunk}\n\n"

        async for token in model.chat_stream_async(request_data, kv_cache):
            if await request.is_disconnected():
                print("Client disconnected. Aborting stream.")
                break
            chunk = json.dumps(
98
                chunk_json(id_, content=token),
Pan Zezhong's avatar
Pan Zezhong committed
99
100
101
102
                ensure_ascii=False,
            )
            yield f"{chunk}\n\n"
    finally:
103
104
        if request_lock.locked():
            request_lock.release()
Pan Zezhong's avatar
Pan Zezhong committed
105
        chunk = json.dumps(
106
            chunk_json(id_, finish_reason="stop"),
Pan Zezhong's avatar
Pan Zezhong committed
107
108
109
110
111
112
113
114
115
116
            ensure_ascii=False,
        )
        yield f"{chunk}\n\n"


def chat(id_, request_data):
    output_text = model.chat(
        request_data,
        kv_cache,
    )
117
118
119
    response = chunk_json(
        id_, content=output_text.strip(), role="assistant", finish_reason="stop"
    )
Pan Zezhong's avatar
Pan Zezhong committed
120
121
122
    return JSONResponse(response)


Pan Zezhong's avatar
Pan Zezhong committed
123
@app.post("/chat/completions")
Pan Zezhong's avatar
Pan Zezhong committed
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
async def chat_completions(request: Request):
    data = await request.json()

    if not data.get("messages"):
        return JSONResponse(content={"error": "No message provided"}, status_code=400)

    stream = data.get("stream", False)
    id_ = f"cmpl-{uuid.uuid4().hex}"
    if stream:
        return StreamingResponse(
            chat_stream(id_, data, request), media_type="text/event-stream"
        )
    else:
        return chat(id_, data)


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

"""
curl -N -H "Content-Type: application/json" \
Pan Zezhong's avatar
Pan Zezhong committed
145
     -X POST http://127.0.0.1:8000/chat/completions \
Pan Zezhong's avatar
Pan Zezhong committed
146
147
148
149
150
151
152
153
154
155
156
157
     -d '{
       "model": "jiuge",
       "messages": [
         {"role": "user", "content": "山东最高的山是?"}
       ],
       "temperature": 1.0,
       "top_k": 50,
       "top_p": 0.8,
       "max_tokens": 512,
       "stream": true
     }'
"""