Unverified Commit d5c10e7a authored by AllentDan's avatar AllentDan Committed by GitHub
Browse files

Add Restful API (#223)

* add restful api

* refine

* add simple doc

* lint

* add uvicorn requirement

* more args

* add llama2

* docstring

* update doc

* save

* refine

* lint

* better decode

* add v1/embedding

* add GenerateRequest

* add llama2 chat template

* correct profiling

* update documents

* add length judge

* add faq

* update doc and rename req_que to req_queue

* fix md link, use get_logger, fix sequence_end bug

* use another doc link for go to avoid lint error

* add api_client.py

* update doc

* update doc

* update function interface

* update FAQ

* resolve comments
parent 7785142d
import json
import multiprocessing as mp
import os
import random
import time
from typing import Iterable, List
import fire
import numpy as np
import requests
from sentencepiece import SentencePieceProcessor
from lmdeploy.utils import get_logger
def get_streaming_response(prompt: str,
api_url: str,
instance_id: int,
request_output_len: int,
stream: bool = True,
sequence_start: bool = True,
sequence_end: bool = False,
ignore_eos: bool = False) -> Iterable[List[str]]:
headers = {'User-Agent': 'Test Client'}
pload = {
'prompt': prompt,
'stream': stream,
'instance_id': instance_id,
'request_output_len': request_output_len,
'sequence_start': sequence_start,
'sequence_end': sequence_end,
'ignore_eos': ignore_eos
}
response = requests.post(api_url,
headers=headers,
json=pload,
stream=stream)
for chunk in response.iter_lines(chunk_size=8192,
decode_unicode=False,
delimiter=b'\0'):
if chunk:
data = json.loads(chunk.decode('utf-8'))
output = data['text']
tokens = data['tokens']
yield output, tokens
class Tokenizer:
def __init__(self, model_path: str):
# reload tokenizer
assert os.path.isfile(model_path), model_path
self.sp_model = SentencePieceProcessor(model_file=model_path)
def encode(self, prompts: List):
prompts_token_ids = self.sp_model.Encode(prompts,
add_bos=False,
add_eos=False)
return [len(token_ids) for token_ids in prompts_token_ids]
def infer(server_addr: str, session_id: int, req_queue: mp.Queue,
res_que: mp.Queue):
stats = []
while not req_queue.empty():
prompt, input_seqlen, output_seqlen = req_queue.get()
get_logger('profile_restful_api').info(
f'request info: session {session_id}, '
f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}')
timestamps = []
tokens = []
start = time.perf_counter()
for res, token in get_streaming_response(
prompt,
server_addr,
session_id,
request_output_len=output_seqlen,
sequence_start=True,
sequence_end=True):
timestamps.append(time.perf_counter())
tokens.append(token)
first_token_latency = timestamps[1] - start
token_latency = timestamps[-1] - timestamps[0]
token = tokens[-1] - tokens[0]
stats.append([first_token_latency, token, token_latency])
res_que.put((session_id, stats))
def warmup(server_addr: str,
concurrency: int,
output_seqlen: int,
warmup_round: int = 1):
print('start to warmup ...')
def _infer(server_addr, session_id):
for _ in range(warmup_round):
for _, _ in get_streaming_response(
'',
server_addr,
session_id,
request_output_len=output_seqlen,
sequence_start=True,
sequence_end=True):
continue
_start = time.perf_counter()
procs = []
for i in range(concurrency):
proc = mp.Process(target=_infer, args=(server_addr, i + 1))
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
_end = time.perf_counter()
print(f'end warmup, elapsed time: {round(_end - _start, 2)} s')
def read_dataset(tokenizer_path: str, dataset_path: str, samples: int,
session_len: int):
start = time.perf_counter()
with open(dataset_path) as f:
dataset = json.load(f)
dataset = [data for data in dataset if len(data['conversations']) >= 2]
# Only keep the first two turns of each conversation.
dataset = [(data['conversations'][0]['value'],
data['conversations'][1]['value']) for data in dataset]
prompts = [prompt for prompt, _ in dataset]
completions = [completion for _, completion in dataset]
print(f'elapsed time for read data: '
f'{round(time.perf_counter() - start, 2)} s')
start = time.perf_counter()
tokenizer = Tokenizer(tokenizer_path)
prompts_token_lens = tokenizer.encode(prompts)
completions_token_lens = tokenizer.encode(completions)
print(f'elapsed time for tokenization: '
f'{round(time.perf_counter() - start, 2)} s')
start = time.perf_counter()
filtered_dataset = []
for (prompt, _), input_len, output_len in zip(dataset, prompts_token_lens,
completions_token_lens):
if input_len + output_len > session_len:
# ignore too long conversation
continue
filtered_dataset.append([prompt, input_len, output_len])
if samples > 0:
filtered_dataset = random.sample(filtered_dataset, samples)
que = mp.Queue()
for data in filtered_dataset:
que.put(data)
print(f'elapsed time for filtering: '
f'{round(time.perf_counter() - start, 2)} s')
return que, len(filtered_dataset)
def main(server_addr: str,
tokenizer_path: str,
dataset_path: str,
concurrency: int = 1,
session_len: int = 2048,
samples: int = 1000):
api_url = server_addr + '/generate'
warmup(api_url, concurrency, session_len - 1)
req_queue, n_req = read_dataset(tokenizer_path, dataset_path, samples,
session_len)
res_que = mp.Queue()
procs = []
_start = time.perf_counter()
for i in range(concurrency):
proc = mp.Process(target=infer,
args=(api_url, i + 1, req_queue, res_que))
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
_end = time.perf_counter()
elapsed_time = _end - _start
stats = []
while not res_que.empty():
session_id, _stats = res_que.get()
print(f'\n{"-" * 50}\n'
f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n')
stats.append(np.array(_stats))
stats = np.concatenate(stats).reshape(-1, 3)
first_token_latency_min = np.min(stats[:, 0], axis=0)
first_token_latency_max = np.max(stats[:, 0], axis=0)
first_token_latency_ave = np.mean(stats[:, 0], axis=0)
token_throughput = np.sum(stats[:, 1], axis=0) / elapsed_time
req_throughput = n_req / elapsed_time
print(f'\n{"-" * 50}\nconcurrency: {concurrency}\n'
f'elapsed_time: {elapsed_time:.2f}s\n'
f'first_token latency(min, max, ave): '
f'{first_token_latency_min:.2f}s, {first_token_latency_max:.2f}s, '
f'{first_token_latency_ave:.2f}s\n'
f'token throughput: {token_throughput:.2f} token/s\n'
f'req throughput: {req_throughput:.2f} req/s\n'
f'{"-" * 50}\n')
if __name__ == '__main__':
fire.Fire(main)
......@@ -174,7 +174,7 @@ def main(tritonserver_addr: str,
f'{first_token_latency_min:.2f}s, {first_token_latency_max:.2f}s, '
f'{first_token_latency_ave:.2f}s\n'
f'token throughput: {token_throughput:.2f} token/s\n'
f'req throughput: {req_throughput} req/s\n'
f'req throughput: {req_throughput:.2f} req/s\n'
f'{"-" * 50}\n')
......
# Restful API
### Launch Service
```shell
python3 -m lmdeploy.serve.openai.api_server ./workspace server_name server_port --instance_num 32 --tp 1
```
Then, the user can open the swagger UI: http://{server_name}:{server_port}/docs for the detailed api usage.
We provide four restful api in total. Three of them are in OpenAI format. However, we recommend users try
our own api which provides more arguments for users to modify. The performance is comparatively better.
### python
Here is an example for our own api `generate`.
```python
import json
import requests
from typing import Iterable, List
def get_streaming_response(prompt: str,
api_url: str,
instance_id: int,
request_output_len: int,
stream: bool = True,
sequence_start: bool = True,
sequence_end: bool = True,
ignore_eos: bool = False) -> Iterable[List[str]]:
headers = {'User-Agent': 'Test Client'}
pload = {
'prompt': prompt,
'stream': stream,
'instance_id': instance_id,
'request_output_len': request_output_len,
'sequence_start': sequence_start,
'sequence_end': sequence_end,
'ignore_eos': ignore_eos
}
response = requests.post(
api_url, headers=headers, json=pload, stream=stream)
for chunk in response.iter_lines(
chunk_size=8192, decode_unicode=False, delimiter=b'\0'):
if chunk:
data = json.loads(chunk.decode('utf-8'))
output = data['text']
tokens = data['tokens']
yield output, tokens
for output, tokens in get_streaming_response(
"Hi, how are you?", "http://{server_name}:{server_port}/generate", 0,
512):
print(output, end='')
```
### Golang/Rust
Golang can also build a http request to use the service. You may refer
to [the blog](https://pkg.go.dev/net/http) for details to build own client.
Besides, Rust supports building a client in [many ways](https://blog.logrocket.com/best-rust-http-client/).
### cURL
cURL is a tool for observing the output of the api.
List Models:
```bash
curl http://{server_name}:{server_port}/v1/models
```
Generate:
```bash
curl http://{server_name}:{server_port}/generate \
-H "Content-Type: application/json" \
-d '{
"model": "internlm-chat-7b",
"prompt": "Hello! Ho are you?",
"sequence_start": true,
"sequence_end": true
}'
```
Chat Completions:
```bash
curl http://{server_name}:{server_port}/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "internlm-chat-7b",
"messages": [{"role": "user", "content": "Hello! Ho are you?"}]
}'
```
Embeddings:
```bash
curl http://{server_name}:{server_port}/v1/embeddings \
-H "Content-Type: application/json" \
-d '{
"model": "internlm-chat-7b",
"input": "Hello world!"
}'
```
### FAQ
1. When user got `"finish_reason":"length"` which means the session is too long to be continued.
Please add `"renew_session": true` into the next request.
2. When OOM appeared at the server side, please reduce the number of `instance_num` when lanching the service.
3. When the request with the same `instance_id` to `generate` got a empty return value and a negative `tokens`, please consider setting `sequence_start=false` for the second question and the same for the afterwards.
# Restful API
### 启动服务
运行脚本
```shell
python3 -m lmdeploy.serve.openai.api_server ./workspace server_name server_port --instance_num 32 --tp 1
```
然后用户可以打开 swagger UI: http://{server_name}:{server_port}/docs 详细查看所有的 API 及其使用方法。
我们一共提供四个 restful api,其中三个仿照 OpenAI 的形式。不过,我们建议用户用我们提供的另一个 API: `generate`
它有更好的性能,提供更多的参数让用户自定义修改。
### python
这是一个 python 示例,展示如何使用 `generate`
```python
import json
import requests
from typing import Iterable, List
def get_streaming_response(prompt: str,
api_url: str,
instance_id: int,
request_output_len: int,
stream: bool = True,
sequence_start: bool = True,
sequence_end: bool = True,
ignore_eos: bool = False) -> Iterable[List[str]]:
headers = {'User-Agent': 'Test Client'}
pload = {
'prompt': prompt,
'stream': stream,
'instance_id': instance_id,
'request_output_len': request_output_len,
'sequence_start': sequence_start,
'sequence_end': sequence_end,
'ignore_eos': ignore_eos
}
response = requests.post(
api_url, headers=headers, json=pload, stream=stream)
for chunk in response.iter_lines(
chunk_size=8192, decode_unicode=False, delimiter=b'\0'):
if chunk:
data = json.loads(chunk.decode('utf-8'))
output = data['text']
tokens = data['tokens']
yield output, tokens
for output, tokens in get_streaming_response(
"Hi, how are you?", "http://{server_name}:{server_port}/generate", 0,
512):
print(output, end='')
```
### Golang/Rust
Golang 也可以建立 http 请求使用启动的服务,用户可以参考[这篇博客](https://pkg.go.dev/net/http)构建自己的客户端。
Rust 也有许多[方法](https://blog.logrocket.com/best-rust-http-client/)构建客户端,使用服务。
### cURL
cURL 也可以用于查看 API 的输出结果
查看模型列表:
```bash
curl http://{server_name}:{server_port}/v1/models
```
使用 generate:
```bash
curl http://{server_name}:{server_port}/generate \
-H "Content-Type: application/json" \
-d '{
"model": "internlm-chat-7b",
"prompt": "Hello! Ho are you?",
"sequence_start": true,
"sequence_end": true
}'
```
Chat Completions:
```bash
curl http://{server_name}:{server_port}/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "internlm-chat-7b",
"messages": [{"role": "user", "content": "Hello! Ho are you?"}]
}'
```
Embeddings:
```bash
curl http://{server_name}:{server_port}/v1/embeddings \
-H "Content-Type: application/json" \
-d '{
"model": "internlm-chat-7b",
"input": "Hello world!"
}'
```
### FAQ
1. 当返回结果结束原因为 `"finish_reason":"length"`,这表示回话长度超过最大值。
请添加 `"renew_session": true` 到下一次请求中。
2. 当服务端显存 OOM 时,可以适当减小启动服务时的 `instance_num` 个数
3. 当同一个 `instance_id` 的请求给 `generate` 函数后,出现返回空字符串和负值的 `tokens`,应该是第二次问话没有设置 `sequence_start=false`
# Copyright (c) OpenMMLab. All rights reserved.
from abc import abstractmethod
from typing import List
from mmengine import Registry
MODELS = Registry('model', locations=['lmdeploy.model'])
......@@ -35,6 +38,50 @@ class BaseModel:
"""
return prompt
@staticmethod
def _translate_messages(messages: List):
"""Translate messages into system, user speaking list, assistant
speaking list.
Args:
messages (List): chat history
Returns:
Turple: consists of system (str), users (List[str]),
assistants (List[str])
"""
system = None
users = []
assistants = []
assert isinstance(messages, List)
for message in messages:
msg_role = message['role']
if msg_role == 'system':
system = message['content']
elif msg_role == 'user':
users.append(message['content'])
elif msg_role == 'assistant':
assistants.append(message['content'])
else:
raise ValueError(f'Unknown role: {msg_role}')
assistants.append(None)
return system, users, assistants
@abstractmethod
def messages2prompt(self, messages, sequence_start=True):
"""Return the prompt that is concatenated with other elements in the
chat template. When messages arg is a string, return
self.get_prompt(messages). When messages arg is a chat history, return
translated prompt from chat history.
Args:
messages (str | List): user's input prompt
Returns:
str: the concatenated prompt
"""
if isinstance(messages, str):
return self.get_prompt(messages)
# chat history processing in derived classes
@property
def stop_words(self):
"""Return the stop-words' token ids."""
......@@ -68,9 +115,30 @@ class Vicuna(BaseModel):
str: the concatenated prompt
"""
if sequence_start:
return f'{self.system} {self.user}: {prompt} {self.assistant}:'
return f'{self.system} {self.user}: {prompt} {self.assistant}: '
else:
return f'</s>{self.user}: {prompt} {self.assistant}:'
return f'</s>{self.user}: {prompt} {self.assistant}: '
def messages2prompt(self, messages, sequence_start=True):
"""Return the prompt that is concatenated with other elements in the
chat template.
Args:
messages (str | List): user's input prompt
Returns:
str: the concatenated prompt
"""
if isinstance(messages, str):
return self.get_prompt(messages, sequence_start)
system, users, assistants = self._translate_messages(messages)
system = self.system if not system else system
ret = system + ' '
for user, assistant in zip(users, assistants):
if assistant:
ret += f'{self.user}: {user} {self.assistant}: {assistant}</s>'
else:
ret += f'{self.user}: {user} {self.assistant}: '
return ret
@MODELS.register_module(name='internlm')
......@@ -116,6 +184,27 @@ class InternLMChat7B(BaseModel):
return f'\n{self.user}:{prompt}{self.eoh}\n' \
f'{self.assistant}:'
def messages2prompt(self, messages, sequence_start=True):
"""Return the prompt that is concatenated with other elements in the
chat template.
Args:
messages (str | List): user's input prompt
Returns:
str: the concatenated prompt
"""
if isinstance(messages, str):
return self.get_prompt(messages, sequence_start)
system, users, assistants = self._translate_messages(messages)
ret = '<BOS>'
for user, assistant in zip(users, assistants):
if assistant:
ret += f'{self.user}:{user}{self.eoh}\n{self.assistant}:' \
f'{assistant}{self.eoa}'
else:
ret += f'{self.user}:{user}{self.eoh}\n{self.assistant}:'
return ret
@property
def stop_words(self):
"""Return the stop-words' token ids."""
......@@ -215,6 +304,29 @@ If a question does not make any sense, or is not factually coherent, explain why
return f'{self.b_inst} {prompt} {self.e_inst} '
def messages2prompt(self, messages, sequence_start=True):
"""Return the prompt that is concatenated with other elements in the
chat template.
Args:
messages (str | List): user's input prompt
Returns:
str: the concatenated prompt
"""
if isinstance(messages, str):
return self.get_prompt(messages, sequence_start)
system, users, assistants = self._translate_messages(messages)
system = self.default_sys_prompt if not system else system
ret = f'<BOS>{self.b_inst} {self.b_sys} {system} {self.e_sys}'
for i, (user, assistant) in enumerate(zip(users, assistants)):
if i != 0:
ret += f'{self.b_inst} '
if assistant:
ret += f'{user} {self.e_inst} {assistant}'
else:
ret += f'{user} {self.e_inst} '
return ret
@MODELS.register_module(name='qwen-7b')
class Qwen7BChat(BaseModel):
......
# Copyright (c) OpenMMLab. All rights reserved.
# Copyright (c) OpenMMLab. All rights reserved.
import json
from typing import Iterable, List
import fire
import requests
def get_streaming_response(prompt: str,
api_url: str,
instance_id: int,
request_output_len: int = 512,
stream: bool = True,
sequence_start: bool = True,
sequence_end: bool = True,
ignore_eos: bool = False) -> Iterable[List[str]]:
headers = {'User-Agent': 'Test Client'}
pload = {
'prompt': prompt,
'stream': stream,
'instance_id': instance_id,
'request_output_len': request_output_len,
'sequence_start': sequence_start,
'sequence_end': sequence_end,
'ignore_eos': ignore_eos
}
response = requests.post(api_url,
headers=headers,
json=pload,
stream=stream)
for chunk in response.iter_lines(chunk_size=8192,
decode_unicode=False,
delimiter=b'\0'):
if chunk:
data = json.loads(chunk.decode('utf-8'))
output = data['text']
tokens = data['tokens']
finish_reason = data['finish_reason']
yield output, tokens, finish_reason
def input_prompt():
"""Input a prompt in the consolo interface."""
print('\ndouble enter to end input >>> ', end='')
sentinel = '' # ends when this string is seen
return '\n'.join(iter(input, sentinel))
def main(server_name: str, server_port: int, session_id: int = 0):
nth_round = 1
while True:
prompt = input_prompt()
if prompt == 'exit':
exit(0)
else:
for output, tokens, finish_reason in get_streaming_response(
prompt,
f'http://{server_name}:{server_port}/generate',
instance_id=session_id,
request_output_len=512,
sequence_start=(nth_round == 1),
sequence_end=False):
if finish_reason == 'length':
print('WARNING: exceed session max length.'
' Please end the session.')
continue
print(output, end='')
nth_round += 1
if __name__ == '__main__':
fire.Fire(main)
# Copyright (c) OpenMMLab. All rights reserved.
import json
import os
import time
from http import HTTPStatus
from typing import AsyncGenerator, Optional
import fire
import uvicorn
from fastapi import BackgroundTasks, FastAPI, Request
from fastapi.responses import JSONResponse, StreamingResponse
from lmdeploy.serve.openai.async_engine import AsyncEngine
from lmdeploy.serve.openai.protocol import ( # noqa: E501
ChatCompletionRequest, ChatCompletionResponse,
ChatCompletionResponseChoice, ChatCompletionResponseStreamChoice,
ChatCompletionStreamResponse, ChatMessage, DeltaMessage, EmbeddingsRequest,
EmbeddingsResponse, ErrorResponse, GenerateRequest, ModelCard, ModelList,
ModelPermission, UsageInfo)
os.environ['TM_LOG_LEVEL'] = 'ERROR'
class VariableInterface:
"""A IO interface maintaining variables."""
async_engine: AsyncEngine = None
request_hosts = []
app = FastAPI()
def get_model_list():
"""Available models.
Only provided one now.
"""
return [VariableInterface.async_engine.tm_model.model_name]
@app.get('/v1/models')
def available_models():
"""Show available models."""
model_cards = []
for model_name in get_model_list():
model_cards.append(
ModelCard(id=model_name,
root=model_name,
permission=[ModelPermission()]))
return ModelList(data=model_cards)
def create_error_response(status: HTTPStatus, message: str):
"""Create error response according to http status and message.
Args:
status (HTTPStatus): HTTP status codes and reason phrases
message (str): error message
"""
return JSONResponse(ErrorResponse(message=message,
type='invalid_request_error').dict(),
status_code=status.value)
async def check_request(request) -> Optional[JSONResponse]:
"""Check if a request is valid."""
if request.model in get_model_list():
return
ret = create_error_response(
HTTPStatus.NOT_FOUND, f'The model `{request.model}` does not exist.')
return ret
@app.post('/v1/chat/completions')
async def chat_completions_v1(request: ChatCompletionRequest,
raw_request: Request = None):
"""Completion API similar to OpenAI's API.
Refer to `https://platform.openai.com/docs/api-reference/chat/create`
for the API specification.
The request should be a JSON object with the following fields:
- model: model name. Available from /v1/models.
- messages: string prompt or chat history in OpenAI format.
- temperature (float): to modulate the next token probability
- top_p (float): If set to float < 1, only the smallest set of most
probable tokens with probabilities that add up to top_p or higher
are kept for generation.
- n (int): How many chat completion choices to generate for each input
message. Only support one here.
- stream: whether to stream the results or not. Default to false.
- max_tokens (int): output token nums
- repetition_penalty (float): The parameter for repetition penalty.
1.0 means no penalty
Additional arguments supported by LMDeploy:
- renew_session (bool): Whether renew the session. Can be used when the
session length is exceeded.
- ignore_eos (bool): indicator for ignoring eos
Currently we do not support the following features:
- function_call (Users should implement this by themselves)
- logit_bias (not supported yet)
- presence_penalty (replaced with repetition_penalty)
- frequency_penalty (replaced with repetition_penalty)
"""
instance_id = int(raw_request.client.host.replace('.', ''))
error_check_ret = await check_request(request)
if error_check_ret is not None:
return error_check_ret
model_name = request.model
request_id = str(instance_id)
created_time = int(time.time())
result_generator = VariableInterface.async_engine.generate_openai(
request.messages,
instance_id,
request.stream,
request.renew_session,
request_output_len=request.max_tokens if request.max_tokens else 512,
stop=request.stop,
top_p=request.top_p,
temperature=request.temperature,
repetition_penalty=request.repetition_penalty,
ignore_eos=request.ignore_eos)
async def abort_request() -> None:
async for _ in VariableInterface.async_engine.generate_openai(
request.messages,
instance_id,
request.stream,
request.renew_session,
stop=True):
pass
def create_stream_response_json(
index: int,
text: str,
finish_reason: Optional[str] = None,
) -> str:
choice_data = ChatCompletionResponseStreamChoice(
index=index,
delta=DeltaMessage(role='assistant', content=text),
finish_reason=finish_reason,
)
response = ChatCompletionStreamResponse(
id=request_id,
created=created_time,
model=model_name,
choices=[choice_data],
)
response_json = response.json(ensure_ascii=False)
return response_json
async def completion_stream_generator() -> AsyncGenerator[str, None]:
# First chunk with role
for i in range(request.n):
choice_data = ChatCompletionResponseStreamChoice(
index=i,
delta=DeltaMessage(role='assistant'),
finish_reason=None,
)
chunk = ChatCompletionStreamResponse(id=request_id,
choices=[choice_data],
model=model_name)
data = chunk.json(exclude_unset=True, ensure_ascii=False)
yield f'data: {data}\n\n'
async for res in result_generator:
response_json = create_stream_response_json(
index=0,
text=res.response,
)
yield f'data: {response_json}\n\n'
yield 'data: [DONE]\n\n'
# Streaming response
if request.stream:
background_tasks = BackgroundTasks()
# Abort the request if the client disconnects.
background_tasks.add_task(abort_request)
return StreamingResponse(completion_stream_generator(),
media_type='text/event-stream',
background=background_tasks)
# Non-streaming response
final_res = None
async for res in result_generator:
if await raw_request.is_disconnected():
# Abort the request if the client disconnects.
await abort_request()
return create_error_response(HTTPStatus.BAD_REQUEST,
'Client disconnected')
final_res = res
assert final_res is not None
choices = []
choice_data = ChatCompletionResponseChoice(
index=0,
message=ChatMessage(role='assistant', content=final_res.response),
finish_reason=final_res.finish_reason,
)
choices.append(choice_data)
total_tokens = sum([
final_res.history_token_len, final_res.input_token_len,
final_res.generate_token_len
])
usage = UsageInfo(
prompt_tokens=final_res.input_token_len,
completion_tokens=final_res.generate_token_len,
total_tokens=total_tokens,
)
response = ChatCompletionResponse(
id=request_id,
created=created_time,
model=model_name,
choices=choices,
usage=usage,
)
return response
@app.post('/v1/embeddings')
async def create_embeddings(request: EmbeddingsRequest,
raw_request: Request = None):
"""Creates embeddings for the text."""
error_check_ret = await check_request(request)
if error_check_ret is not None:
return error_check_ret
embedding = await VariableInterface.async_engine.get_embeddings(
request.input)
data = [{'object': 'embedding', 'embedding': embedding, 'index': 0}]
token_num = len(embedding)
return EmbeddingsResponse(
data=data,
model=request.model,
usage=UsageInfo(
prompt_tokens=token_num,
total_tokens=token_num,
completion_tokens=None,
),
).dict(exclude_none=True)
@app.post('/generate')
async def generate(request: GenerateRequest, raw_request: Request = None):
"""Generate completion for the request.
The request should be a JSON object with the following fields:
- prompt: the prompt to use for the generation.
- stream: whether to stream the results or not.
- sequence_start (bool): indicator for starting a sequence.
- sequence_end (bool): indicator for ending a sequence
- instance_id: determine which instance will be called. If not specified
with a value other than -1, using host ip directly.
- request_output_len (int): output token nums
- step (int): the offset of the k/v cache
- top_p (float): If set to float < 1, only the smallest set of most
probable tokens with probabilities that add up to top_p or higher
are kept for generation.
- top_k (int): The number of the highest probability vocabulary
tokens to keep for top-k-filtering
- temperature (float): to modulate the next token probability
- repetition_penalty (float): The parameter for repetition penalty.
1.0 means no penalty
- ignore_eos (bool): indicator for ignoring eos
"""
if request.instance_id == -1:
instance_id = int(raw_request.client.host.replace('.', ''))
request.instance_id = instance_id
generation = VariableInterface.async_engine.generate(
request.prompt,
request.instance_id,
stream_response=request.stream,
sequence_start=request.sequence_start,
sequence_end=request.sequence_end,
request_output_len=request.request_output_len,
top_p=request.top_p,
top_k=request.top_k,
temperature=request.temperature,
repetition_penalty=request.repetition_penalty,
ignore_eos=request.ignore_eos)
# Streaming case
async def stream_results() -> AsyncGenerator[bytes, None]:
async for out in generation:
ret = {
'text': out.response,
'tokens': out.generate_token_len,
'finish_reason': out.finish_reason
}
yield (json.dumps(ret) + '\0').encode('utf-8')
if request.stream:
return StreamingResponse(stream_results())
else:
ret = {}
async for out in generation:
ret = {
'text': out.response,
'tokens': out.generate_token_len,
'finish_reason': out.finish_reason
}
return JSONResponse(ret)
def main(model_path: str,
server_name: str = 'localhost',
server_port: int = 23333,
instance_num: int = 32,
tp: int = 1):
"""An example to perform model inference through the command line
interface.
Args:
model_path (str): the path of the deployed model
server_name (str): host ip for serving
server_port (int): server port
instance_num (int): number of instances of turbomind model
tp (int): tensor parallel
"""
VariableInterface.async_engine = AsyncEngine(model_path=model_path,
instance_num=instance_num,
tp=tp)
uvicorn.run(app=app, host=server_name, port=server_port, log_level='info')
if __name__ == '__main__':
fire.Fire(main)
# Copyright (c) OpenMMLab. All rights reserved.
import asyncio
import dataclasses
import os.path as osp
import random
from typing import Literal, Optional
from lmdeploy import turbomind as tm
from lmdeploy.model import MODELS, BaseModel
from lmdeploy.turbomind.tokenizer import Tokenizer
@dataclasses.dataclass
class GenOut:
"""Pack all response information together."""
response: str
history_token_len: int
input_token_len: int
generate_token_len: int
finish_reason: Optional[Literal['stop', 'length']] = None
class AsyncEngine:
"""Async inference engine. Maintaining a bunch of tm_model instances.
Args:
model_path (str): the path of the deployed model
instance_num (int): instance numbers to be created
tp (int): tensor parallel
"""
def __init__(self, model_path, instance_num=32, tp=1) -> None:
tokenizer_model_path = osp.join(model_path, 'triton_models',
'tokenizer')
tokenizer = Tokenizer(tokenizer_model_path)
self.tm_model = tm.TurboMind(model_path,
eos_id=tokenizer.eos_token_id,
tp=tp)
self.tokenizer = tokenizer
self.generators = [
self.tm_model.create_instance() for i in range(instance_num)
]
self.instance_num = instance_num
self.model: BaseModel = MODELS.get(self.tm_model.model_name)()
self.available = [True] * instance_num
self.starts = [None] * instance_num
self.steps = {}
async def get_embeddings(self, prompt):
prompt = self.model.get_prompt(prompt)
input_ids = self.tokenizer.encode(prompt)
return input_ids
async def get_generator(self, instance_id):
"""Only return the model instance if it is available."""
while self.available[instance_id] is False:
await asyncio.sleep(0.1)
return self.generators[instance_id]
async def generate(
self,
messages,
instance_id,
stream_response=True,
sequence_start=True,
sequence_end=False,
step=0,
request_output_len=512,
stop=False,
top_k=40,
top_p=0.8,
temperature=0.8,
repetition_penalty=1.0,
ignore_eos=False,
):
"""Generate responses.
Args:
messages (str | List): chat history or prompt
instance_id (int): actually request host ip
stream_response (bool): whether return responses streamingly
request_output_len (int): output token nums
sequence_start (bool): indicator for starting a sequence
sequence_end (bool): indicator for ending a sequence
step (int): the offset of the k/v cache
stop (bool): whether stop inference
top_p (float): If set to float < 1, only the smallest set of most
probable tokens with probabilities that add up to top_p or higher
are kept for generation.
top_k (int): The number of the highest probability vocabulary
tokens to keep for top-k-filtering
temperature (float): to modulate the next token probability
repetition_penalty (float): The parameter for repetition penalty.
1.0 means no penalty
ignore_eos (bool): indicator for ignoring eos
"""
session_id = instance_id
instance_id %= self.instance_num
if str(session_id) not in self.steps:
self.steps[str(session_id)] = 0
if step != 0:
self.steps[str(session_id)] = step
seed = random.getrandbits(64)
prompt = self.model.messages2prompt(messages, sequence_start)
input_ids = self.tokenizer.encode(prompt)
finish_reason = 'stop' if stop else None
if self.steps[str(session_id)] + len(
input_ids) >= self.tm_model.session_len:
finish_reason = 'length'
yield GenOut('', self.steps[str(session_id)], len(input_ids), 0,
finish_reason)
else:
generator = await self.get_generator(instance_id)
self.available[instance_id] = False
response_size = 0
async for outputs in generator.async_stream_infer(
session_id=session_id,
input_ids=[input_ids],
stream_output=stream_response,
request_output_len=request_output_len,
sequence_start=(sequence_start),
sequence_end=sequence_end,
step=self.steps[str(session_id)],
stop=stop,
top_k=top_k,
top_p=top_p,
temperature=temperature,
repetition_penalty=repetition_penalty,
ignore_eos=ignore_eos,
random_seed=seed if sequence_start else None):
res, tokens = outputs[0]
# decode res
response = self.tokenizer.decode(res[response_size:])
# response, history token len, input token len, gen token len
yield GenOut(response, self.steps[str(session_id)],
len(input_ids), tokens, finish_reason)
response_size = tokens
# update step
self.steps[str(session_id)] += len(input_ids) + tokens
if sequence_end:
self.steps[str(session_id)] = 0
self.available[instance_id] = True
async def generate_openai(
self,
messages,
instance_id,
stream_response=True,
renew_session=False,
request_output_len=512,
stop=False,
top_k=40,
top_p=0.8,
temperature=0.8,
repetition_penalty=1.0,
ignore_eos=False,
):
"""Generate responses.
Args:
messages (str | List): chat history or prompt
instance_id (int): actually request host ip
stream_response (bool): whether return responses streamingly
renew_session (bool): renew the session
request_output_len (int): output token nums
stop (bool): whether stop inference
top_p (float): If set to float < 1, only the smallest set of most
probable tokens with probabilities that add up to top_p or higher
are kept for generation.
top_k (int): The number of the highest probability vocabulary
tokens to keep for top-k-filtering
temperature (float): to modulate the next token probability
repetition_penalty (float): The parameter for repetition penalty.
1.0 means no penalty
ignore_eos (bool): indicator for ignoring eos
"""
session_id = instance_id
instance_id %= self.instance_num
sequence_start = False
generator = await self.get_generator(instance_id)
self.available[instance_id] = False
if renew_session and str(session_id) in self.steps and self.steps[str(
session_id)] > 0: # renew a session
empty_prompt = self.model.messages2prompt('', False)
empty_input_ids = self.tokenizer.encode(empty_prompt)
for outputs in generator.stream_infer(session_id=session_id,
input_ids=[empty_input_ids],
request_output_len=1,
sequence_start=False,
sequence_end=True):
pass
self.steps[str(session_id)] = 0
if str(session_id) not in self.steps:
self.steps[str(session_id)] = 0
if self.steps[str(session_id)] == 0:
sequence_start = True
seed = random.getrandbits(64)
prompt = self.model.messages2prompt(messages, sequence_start)
input_ids = self.tokenizer.encode(prompt)
finish_reason = 'stop' if stop else None
if self.steps[str(session_id)] + len(
input_ids) >= self.tm_model.session_len:
finish_reason = 'length'
yield GenOut('', self.steps[str(session_id)], len(input_ids), 0,
finish_reason)
else:
response_size = 0
async for outputs in generator.async_stream_infer(
session_id=session_id,
input_ids=[input_ids],
stream_output=stream_response,
request_output_len=request_output_len,
sequence_start=(sequence_start),
sequence_end=False,
step=self.steps[str(session_id)],
stop=stop,
top_k=top_k,
top_p=top_p,
temperature=temperature,
repetition_penalty=repetition_penalty,
ignore_eos=ignore_eos,
random_seed=seed if sequence_start else None):
res, tokens = outputs[0]
# decode res
response = self.tokenizer.decode(res[response_size:])
# response, history token len, input token len, gen token len
yield GenOut(response, self.steps[str(session_id)],
len(input_ids), tokens, finish_reason)
response_size = tokens
# update step
self.steps[str(session_id)] += len(input_ids) + tokens
self.available[instance_id] = True
# Copyright (c) OpenMMLab. All rights reserved.
# Modified from
# https://github.com/lm-sys/FastChat/blob/168ccc29d3f7edc50823016105c024fe2282732a/fastchat/protocol/openai_api_protocol.py
import time
from typing import Any, Dict, List, Literal, Optional, Union
import shortuuid
from pydantic import BaseModel, Field
class ErrorResponse(BaseModel):
"""Error responses."""
object: str = 'error'
message: str
code: int
class ModelPermission(BaseModel):
"""Model permissions."""
id: str = Field(default_factory=lambda: f'modelperm-{shortuuid.random()}')
object: str = 'model_permission'
created: int = Field(default_factory=lambda: int(time.time()))
allow_create_engine: bool = False
allow_sampling: bool = True
allow_logprobs: bool = True
allow_search_indices: bool = True
allow_view: bool = True
allow_fine_tuning: bool = False
organization: str = '*'
group: Optional[str] = None
is_blocking: str = False
class ModelCard(BaseModel):
"""Model cards."""
id: str
object: str = 'model'
created: int = Field(default_factory=lambda: int(time.time()))
owned_by: str = 'lmdeploy'
root: Optional[str] = None
parent: Optional[str] = None
permission: List[ModelPermission] = []
class ModelList(BaseModel):
"""Model list consists of model cards."""
object: str = 'list'
data: List[ModelCard] = []
class UsageInfo(BaseModel):
"""Usage information."""
prompt_tokens: int = 0
total_tokens: int = 0
completion_tokens: Optional[int] = 0
class ChatCompletionRequest(BaseModel):
"""Chat completion request."""
model: str
messages: Union[str, List[Dict[str, str]]]
temperature: Optional[float] = 0.7
top_p: Optional[float] = 1.0
n: Optional[int] = 1
max_tokens: Optional[int] = 512
stop: Optional[bool] = False
stream: Optional[bool] = False
presence_penalty: Optional[float] = 0.0
frequency_penalty: Optional[float] = 0.0
user: Optional[str] = None
# additional argument of lmdeploy
repetition_penalty: Optional[float] = 1.0
renew_session: Optional[bool] = False
ignore_eos: Optional[bool] = False
class ChatMessage(BaseModel):
"""Chat messages."""
role: str
content: str
class ChatCompletionResponseChoice(BaseModel):
"""Chat completion response choices."""
index: int
message: ChatMessage
finish_reason: Optional[Literal['stop', 'length']]
class ChatCompletionResponse(BaseModel):
"""Chat completion response."""
id: str = Field(default_factory=lambda: f'chatcmpl-{shortuuid.random()}')
object: str = 'chat.completion'
created: int = Field(default_factory=lambda: int(time.time()))
model: str
choices: List[ChatCompletionResponseChoice]
usage: UsageInfo
class DeltaMessage(BaseModel):
"""Delta messages."""
role: Optional[str] = None
content: Optional[str] = None
class ChatCompletionResponseStreamChoice(BaseModel):
"""Chat completion response stream choice."""
index: int
delta: DeltaMessage
finish_reason: Optional[Literal['stop', 'length']]
class ChatCompletionStreamResponse(BaseModel):
"""Chat completion stream response."""
id: str = Field(default_factory=lambda: f'chatcmpl-{shortuuid.random()}')
object: str = 'chat.completion.chunk'
created: int = Field(default_factory=lambda: int(time.time()))
model: str
choices: List[ChatCompletionResponseStreamChoice]
class CompletionRequest(BaseModel):
"""Completion request."""
model: str
prompt: Union[str, List[Any]]
suffix: Optional[str] = None
temperature: Optional[float] = 0.7
n: Optional[int] = 1
max_tokens: Optional[int] = 16
stop: Optional[Union[str, List[str]]] = None
stream: Optional[bool] = False
top_p: Optional[float] = 1.0
logprobs: Optional[int] = None
echo: Optional[bool] = False
presence_penalty: Optional[float] = 0.0
frequency_penalty: Optional[float] = 0.0
user: Optional[str] = None
class CompletionResponseChoice(BaseModel):
"""Completion response choices."""
index: int
text: str
logprobs: Optional[int] = None
finish_reason: Optional[Literal['stop', 'length']]
class CompletionResponse(BaseModel):
"""Completion response."""
id: str = Field(default_factory=lambda: f'cmpl-{shortuuid.random()}')
object: str = 'text_completion'
created: int = Field(default_factory=lambda: int(time.time()))
model: str
choices: List[CompletionResponseChoice]
usage: UsageInfo
class CompletionResponseStreamChoice(BaseModel):
"""Completion response stream choice."""
index: int
text: str
logprobs: Optional[float] = None
finish_reason: Optional[Literal['stop', 'length']] = None
class CompletionStreamResponse(BaseModel):
"""Completion stream response."""
id: str = Field(default_factory=lambda: f'cmpl-{shortuuid.random()}')
object: str = 'text_completion'
created: int = Field(default_factory=lambda: int(time.time()))
model: str
choices: List[CompletionResponseStreamChoice]
class EmbeddingsRequest(BaseModel):
"""Embedding request."""
model: str = None
input: Union[str, List[Any]]
user: Optional[str] = None
class EmbeddingsResponse(BaseModel):
"""Embedding response."""
object: str = 'list'
data: List[Dict[str, Any]]
model: str
usage: UsageInfo
class GenerateRequest(BaseModel):
"""Generate request."""
prompt: str
instance_id: int = -1
sequence_start: bool = True
sequence_end: bool = False
stream: bool = False
request_output_len: int = 512
top_p: float = 0.8
top_k: int = 40
temperature: float = 0.8
repetition_penalty: float = 1.0
ignore_eos: bool = False
# Copyright (c) OpenMMLab. All rights reserved.
import asyncio
import os.path as osp
import sys
from configparser import ConfigParser
......@@ -207,6 +208,13 @@ class TurboMindInstance:
t.start()
self.threads[device_id] = t
async def async_stream_infer(self, *args, **kwargs):
"""Async wrapper of self.stream_infer."""
for output in self.stream_infer(*args, **kwargs):
# Allow the pipeline add new requests into the queue.
await asyncio.sleep(0)
yield output
def stream_infer(self,
session_id,
input_ids,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment