Unverified Commit 74a4f3c9 authored by q.yao's avatar q.yao Committed by GitHub
Browse files

Streaming output (#71)



* streaming-output

* fix end

* fix profile

* support chinese streaming

* lint

* update chat

* lint

* fix benchmark

---------
Co-authored-by: default avatargrimoire <yaoqian@pjlab.org.cn>
parent 208b6841
# import multiprocessing as mp # import multiprocessing as mp
import os.path as osp
import time import time
from queue import Queue from queue import Queue
from threading import Thread from threading import Thread
...@@ -17,8 +18,8 @@ def infer(model, session_id: int, input_ids: str, output_seqlen: int, ...@@ -17,8 +18,8 @@ def infer(model, session_id: int, input_ids: str, output_seqlen: int,
stats = [] stats = []
for i in range(test_round): for i in range(test_round):
start = time.perf_counter() start = time.perf_counter()
timestamps = [start] timestamps = []
tokens = [0] tokens = []
for outputs in chatbot.stream_infer(session_id, for outputs in chatbot.stream_infer(session_id,
input_ids, input_ids,
request_output_len=output_seqlen, request_output_len=output_seqlen,
...@@ -30,9 +31,13 @@ def infer(model, session_id: int, input_ids: str, output_seqlen: int, ...@@ -30,9 +31,13 @@ def infer(model, session_id: int, input_ids: str, output_seqlen: int,
tokens.append(token) tokens.append(token)
# TODO: ignore first token # TODO: ignore first token
first_token_latency = timestamps[1] - start first_token_latency = timestamps[0] - start
token_latency = timestamps[-1] - timestamps[0] if len(timestamps) == 1:
token = tokens[-1] - tokens[0] token_latency = timestamps[0] - start
token = tokens[0]
else:
token_latency = timestamps[-1] - timestamps[0]
token = tokens[-1] - tokens[0]
stats.append([first_token_latency, token, token_latency]) stats.append([first_token_latency, token, token_latency])
que.put((session_id, stats)) que.put((session_id, stats))
...@@ -75,13 +80,14 @@ def warmup(model, ...@@ -75,13 +80,14 @@ def warmup(model,
def main(model_path: str, def main(model_path: str,
model_name: str, model_name: str,
tokenlizer: str,
concurrency: int = 1, concurrency: int = 1,
session_len: int = 2056, session_len: int = 2056,
input_seqlen: int = 0, input_seqlen: int = 0,
output_seqlen: int = 512, output_seqlen: int = 512,
test_round: int = 10): test_round: int = 10):
tokenizer = AutoTokenizer.from_pretrained(tokenlizer) tokenizer_model_path = osp.join(model_path, 'triton_models', 'tokenizer')
tokenizer = AutoTokenizer.from_pretrained(tokenizer_model_path,
trust_remote_code=True)
model = MODELS.get(model_name)() model = MODELS.get(model_name)()
stop_words = model.stop_words stop_words = model.stop_words
tm_model = TurboMind(model_path=model_path, stop_words=stop_words) tm_model = TurboMind(model_path=model_path, stop_words=stop_words)
......
# Copyright (c) OpenMMLab. All rights reserved. # Copyright (c) OpenMMLab. All rights reserved.
import os
import os.path as osp import os.path as osp
import random import random
...@@ -8,6 +9,8 @@ from transformers import AutoTokenizer ...@@ -8,6 +9,8 @@ from transformers import AutoTokenizer
from lmdeploy import turbomind as tm from lmdeploy import turbomind as tm
from lmdeploy.model import MODELS from lmdeploy.model import MODELS
os.environ['TM_LOG_LEVEL'] = 'ERROR'
def input_prompt(): def input_prompt():
print('\ndouble enter to end input >>> ', end='') print('\ndouble enter to end input >>> ', end='')
...@@ -15,11 +18,22 @@ def input_prompt(): ...@@ -15,11 +18,22 @@ def input_prompt():
return '\n'.join(iter(input, sentinel)) return '\n'.join(iter(input, sentinel))
def valid_str(string, coding='utf-8'):
invalid_chars = [b'\xef\xbf\xbd']
bstr = bytes(string, coding)
for invalid_char in invalid_chars:
bstr = bstr.replace(invalid_char, b'')
ret = bstr.decode(encoding=coding, errors='ignore')
return ret
def main(model_name, model_path, session_id: int = 1): def main(model_name, model_path, session_id: int = 1):
tm_model = tm.TurboMind(model_path) model = MODELS.get(model_name)()
tm_model = tm.TurboMind(model_path, stop_words=model.stop_words)
generator = tm_model.create_instance() generator = tm_model.create_instance()
tokenizer_model_path = osp.join(model_path, 'triton_models', 'tokenizer') tokenizer_model_path = osp.join(model_path, 'triton_models', 'tokenizer')
tokenizer = AutoTokenizer.from_pretrained(tokenizer_model_path, trust_remote_code=True) tokenizer = AutoTokenizer.from_pretrained(tokenizer_model_path,
trust_remote_code=True)
model = MODELS.get(model_name)() model = MODELS.get(model_name)()
nth_round = 1 nth_round = 1
...@@ -31,13 +45,27 @@ def main(model_name, model_path, session_id: int = 1): ...@@ -31,13 +45,27 @@ def main(model_name, model_path, session_id: int = 1):
if prompt == 'exit': if prompt == 'exit':
exit(0) exit(0)
elif prompt == 'end': elif prompt == 'end':
pass prompt = model.get_prompt('', nth_round == 1)
input_ids = tokenizer.encode(prompt, add_special_tokens=False)
for outputs in generator.stream_infer(session_id=session_id,
input_ids=[input_ids],
request_output_len=512,
sequence_start=False,
sequence_end=True):
pass
nth_round = 1
step = 0
seed = random.getrandbits(64)
else: else:
prompt = model.get_prompt(prompt, nth_round == 1) prompt = model.get_prompt(prompt, nth_round == 1)
input_ids = tokenizer.encode(prompt, add_special_tokens=False) input_ids = tokenizer.encode(prompt, add_special_tokens=False)
print(f'session {session_id}')
print(f'{prompt}', end='', flush=True)
response_size = 0
for outputs in generator.stream_infer( for outputs in generator.stream_infer(
session_id=session_id, session_id=session_id,
input_ids=[input_ids], input_ids=[input_ids],
stream_output=True,
request_output_len=512, request_output_len=512,
sequence_start=(nth_round == 1), sequence_start=(nth_round == 1),
sequence_end=False, sequence_end=False,
...@@ -51,13 +79,17 @@ def main(model_name, model_path, session_id: int = 1): ...@@ -51,13 +79,17 @@ def main(model_name, model_path, session_id: int = 1):
random_seed=seed if nth_round == 1 else None): random_seed=seed if nth_round == 1 else None):
res, tokens = outputs[0] res, tokens = outputs[0]
# decode res # decode res
response = tokenizer.decode(res[step:], response = tokenizer.decode(
skip_special_tokens=True) res, skip_special_tokens=True)[response_size:]
print(f'session {session_id}, {tokens}, {response}') response = valid_str(response)
# update step print(f'{response}', end='', flush=True)
step = tokens - 1 response_size += len(response)
# update step
step += len(input_ids) + tokens
print()
nth_round += 1 nth_round += 1
if __name__ == '__main__': if __name__ == '__main__':
......
# Copyright (c) OpenMMLab. All rights reserved. # Copyright (c) OpenMMLab. All rights reserved.
import os.path as osp import os.path as osp
import sys import sys
from queue import Queue
from threading import Thread
from typing import Iterable, List from typing import Iterable, List
import numpy as np import numpy as np
...@@ -78,13 +80,13 @@ class TurboMind: ...@@ -78,13 +80,13 @@ class TurboMind:
self.model = model self.model = model
self.stop_words = _stop_words(stop_words) self.stop_words = _stop_words(stop_words)
def create_instance(self, stream=0): def create_instance(self, cuda_stream_id=0):
return TurboMindInstance(self, stream) return TurboMindInstance(self, cuda_stream_id)
class TurboMindInstance: class TurboMindInstance:
def __init__(self, tm_model, stream=0): def __init__(self, tm_model, cuda_stream_id=0):
self.tm_model = tm_model self.tm_model = tm_model
self.device_id = tm_model.device_id self.device_id = tm_model.device_id
...@@ -92,7 +94,7 @@ class TurboMindInstance: ...@@ -92,7 +94,7 @@ class TurboMindInstance:
self.stop_words = tm_model.stop_words self.stop_words = tm_model.stop_words
self.eos_id = tm_model.eos_id self.eos_id = tm_model.eos_id
self.session_len = tm_model.session_len self.session_len = tm_model.session_len
self.stream = stream self.cuda_stream_id = cuda_stream_id
# create instance # create instance
model = tm_model.model model = tm_model.model
...@@ -101,10 +103,25 @@ class TurboMindInstance: ...@@ -101,10 +103,25 @@ class TurboMindInstance:
instance_comm = model.create_instance_comm(tm_model.gpu_count) instance_comm = model.create_instance_comm(tm_model.gpu_count)
model_inst = model.create_model_instance(self.device_id, self.rank, model_inst = model.create_model_instance(self.device_id, self.rank,
self.stream, nccl_params, self.cuda_stream_id,
custom_comms[0]) nccl_params, custom_comms[0])
# model_inst.register_callback(self._forward_callback)
self.model_inst = model_inst self.model_inst = model_inst
self.instance_comm = instance_comm self.instance_comm = instance_comm
self.que = Queue()
self.thread = None
def _forward_callback(self, result, ctx):
self.que.put((False, result))
def _forward_thread(self, inputs):
def _func():
output = self.model_inst.forward(inputs, self.instance_comm)
self.que.put((True, output))
self.thread = Thread(target=_func)
self.thread.start()
def stream_infer(self, def stream_infer(self,
session_id, session_id,
...@@ -119,7 +136,11 @@ class TurboMindInstance: ...@@ -119,7 +136,11 @@ class TurboMindInstance:
temperature=0.8, temperature=0.8,
repetition_penalty=1.05, repetition_penalty=1.05,
ignore_eos=False, ignore_eos=False,
random_seed=None): random_seed=None,
stream_output=False):
if stream_output:
self.model_inst.register_callback(self._forward_callback)
if len(input_ids) == 0: if len(input_ids) == 0:
input_ids = [] input_ids = []
...@@ -140,12 +161,13 @@ class TurboMindInstance: ...@@ -140,12 +161,13 @@ class TurboMindInstance:
input_ids = pad_sequence(input_ids, input_ids = pad_sequence(input_ids,
batch_first=True, batch_first=True,
padding_value=self.eos_id) padding_value=self.eos_id)
input_lengths = input_lengths.detach().cpu().numpy()
if isinstance(session_id, int): if isinstance(session_id, int):
session_id = [session_id] session_id = [session_id]
assert len(session_id) == batch_size assert len(session_id) == batch_size
step = _broadcast_np(step, np.int32)
inputs = dict( inputs = dict(
input_ids=input_ids, input_ids=input_ids,
input_lengths=input_lengths, input_lengths=input_lengths,
...@@ -156,7 +178,7 @@ class TurboMindInstance: ...@@ -156,7 +178,7 @@ class TurboMindInstance:
runtime_top_p=_broadcast_np(top_p, np.float32), runtime_top_p=_broadcast_np(top_p, np.float32),
temperature=_broadcast_np(temperature, np.float32), temperature=_broadcast_np(temperature, np.float32),
repetition_penalty=_broadcast_np(repetition_penalty, np.float32), repetition_penalty=_broadcast_np(repetition_penalty, np.float32),
step=_broadcast_np(step, np.int32), step=step,
# session input # session input
session_len=self.session_len * session_len=self.session_len *
...@@ -183,12 +205,36 @@ class TurboMindInstance: ...@@ -183,12 +205,36 @@ class TurboMindInstance:
if random_seed is not None: if random_seed is not None:
inputs['random_seed'] = _broadcast_np(random_seed, np.uint64) inputs['random_seed'] = _broadcast_np(random_seed, np.uint64)
tm_inputs = _np_dict_to_tm_dict(inputs) tm_inputs = _np_dict_to_tm_dict(inputs)
tm_outputs = self.model_inst.forward(tm_inputs, self.instance_comm)
outputs = _tm_dict_to_torch_dict(tm_outputs) # start forward thread
self._forward_thread(tm_inputs)
seq_start = input_lengths + input_lengths.new_tensor(step)
# generator
while True:
while self.que.qsize() > 1:
self.que.get()
finish, tm_outputs = self.que.get()
outputs = _tm_dict_to_torch_dict(tm_outputs)
output_ids = outputs['output_ids'][:, 0, :]
sequence_length = outputs['sequence_length'].long()[:, 0].cpu()
output_ids = [
output_id[s:l] for output_id, s, l in zip(
output_ids, seq_start, sequence_length)
]
sequence_length -= seq_start.to(sequence_length.device)
yield [(output, l.item())
for output, l in zip(output_ids, sequence_length)]
if finish:
while self.que.qsize() > 0:
self.que.get()
self.thread.join()
break
# TODO: Add stream output if stream_output:
output_ids = outputs['output_ids'][:, 0, :] self.model_inst.unregister_callback()
sequence_length = outputs['sequence_length'].long()[:, 0]
return [[(output[:l], l.item())]
for output, l in zip(output_ids, sequence_length)]
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include "src/turbomind/triton_backend/llama/LlamaTritonModel.h" #include "src/turbomind/triton_backend/llama/LlamaTritonModel.h"
#include "src/turbomind/triton_backend/transformer_triton_backend.hpp" #include "src/turbomind/triton_backend/transformer_triton_backend.hpp"
#include <memory> #include <memory>
#include <pybind11/functional.h>
#include <pybind11/pybind11.h> #include <pybind11/pybind11.h>
#include <pybind11/stl.h> #include <pybind11/stl.h>
#include <pybind11/stl_bind.h> #include <pybind11/stl_bind.h>
...@@ -307,7 +308,15 @@ PYBIND11_MODULE(_turbomind, m) ...@@ -307,7 +308,15 @@ PYBIND11_MODULE(_turbomind, m)
ft::AbstractInstanceComm* inst_comm) { return model->forward(input_tensors, inst_comm); }, ft::AbstractInstanceComm* inst_comm) { return model->forward(input_tensors, inst_comm); },
py::call_guard<py::gil_scoped_release>(), py::call_guard<py::gil_scoped_release>(),
"input_tensors"_a, "input_tensors"_a,
"inst_comm"_a = nullptr); "inst_comm"_a = nullptr)
.def(
"register_callback",
[](AbstractTransformerModelInstance* self, triton_stream_cb_t cb, py::object ctx) {
self->registerCallback(cb, ctx.ptr());
},
"callback"_a,
"context"_a = nullptr)
.def("unregister_callback", &AbstractTransformerModelInstance::unRegisterCallback);
// transformer model // transformer model
py::class_<AbstractTransformerModel, std::shared_ptr<AbstractTransformerModel>>(m, "AbstractTransformerModel") py::class_<AbstractTransformerModel, std::shared_ptr<AbstractTransformerModel>>(m, "AbstractTransformerModel")
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#pragma once #pragma once
#include <functional>
#include <memory> #include <memory>
#include <sstream> #include <sstream>
#include <sys/time.h> #include <sys/time.h>
...@@ -263,7 +264,7 @@ struct Tensor { ...@@ -263,7 +264,7 @@ struct Tensor {
} // namespace triton } // namespace triton
using triton_stream_cb_t = void(std::shared_ptr<std::unordered_map<std::string, triton::Tensor>>, void*); using triton_stream_cb_t = std::function<void(std::shared_ptr<std::unordered_map<std::string, triton::Tensor>>, void*)>;
struct AbstractTransformerModel; struct AbstractTransformerModel;
struct AbstractTransformerModelInstance; struct AbstractTransformerModelInstance;
...@@ -281,7 +282,7 @@ struct AbstractTransformerModelInstance { ...@@ -281,7 +282,7 @@ struct AbstractTransformerModelInstance {
return forward(input_tensors); return forward(input_tensors);
} }
void registerCallback(triton_stream_cb_t* cb, void* ctx) void registerCallback(triton_stream_cb_t cb, void* ctx)
{ {
stream_cb_ = cb; stream_cb_ = cb;
stream_ctx_ = ctx; stream_ctx_ = ctx;
...@@ -293,8 +294,8 @@ struct AbstractTransformerModelInstance { ...@@ -293,8 +294,8 @@ struct AbstractTransformerModelInstance {
stream_ctx_ = nullptr; stream_ctx_ = nullptr;
} }
triton_stream_cb_t* stream_cb_ = nullptr; triton_stream_cb_t stream_cb_ = nullptr;
void* stream_ctx_ = nullptr; void* stream_ctx_ = nullptr;
}; };
struct AbstractTransformerModel { struct AbstractTransformerModel {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment