"docs/gpu.mdx" did not exist on "27402cb7a28555a3efcaa5af054b1ce2d18e5442"
Unverified Commit e3ac7fd5 authored by AllentDan's avatar AllentDan Committed by GitHub
Browse files

Fix stop requests by await before turbomind queue.get() (#850)

* await before get

* add comments

* recover stop back
parent 68fa4b9a
......@@ -135,6 +135,7 @@ class AsyncEngine:
yield
except (Exception, asyncio.CancelledError) as e: # noqa
self.stop_session(session_id)
raise e
if str(session_id) in self.id2generator and self.id2generator[str(
session_id)] not in self.gens_set:
self.gens_set.add(self.id2generator[str(session_id)])
......
......@@ -456,14 +456,7 @@ class TurboMindInstance:
t.start()
self.threads[device_id] = t
async def async_stream_infer(self, *args, **kwargs):
"""Async wrapper of self.stream_infer."""
for output in self.stream_infer(*args, **kwargs):
# Allow the pipeline add new requests into the queue.
await asyncio.sleep(0)
yield output
def stream_infer(self,
def prepare_inputs(self,
session_id,
input_ids,
input_embeddings=None,
......@@ -480,34 +473,7 @@ class TurboMindInstance:
ignore_eos=False,
random_seed=None,
stream_output=False):
"""Perform model inference.
Args:
session_id (int): the id of a session
input_ids (numpy.ndarray): the token ids of a prompt
input_embeddings (List[numpy.ndarray]): embeddings features
input_embedding_ranges (List[Tuple[int,int]]): the begin/end
offsets of input_embeddings to input_ids
request_output_len (int): the max number of to-be-generated tokens
sequence_start (bool): indicator for starting a sequence
sequence_end (bool): indicator for ending a sequence
step (int): the offset of the k/v cache
stop (bool): indicator for cancelling the session
top_p (float): If set to float < 1, only the smallest set of most
probable tokens with probabilities that add up to top_p or higher
are kept for generation.
top_k (int): The number of the highest probability vocabulary
tokens to keep for top-k-filtering
temperature (float): to modulate the next token probability
repetition_penalty (float): The parameter for repetition penalty.
1.0 means no penalty
ignore_eos (bool): indicator for ignoring eos
random_seed (int): seed used by sampling
stream_output (bool): indicator for stream output
"""
if stream_output and not stop:
self.model_insts[0].register_callback(self._forward_callback)
"""Convert inputs format."""
if len(input_ids) == 0:
input_ids = [[]]
if isinstance(input_ids[0], int):
......@@ -608,8 +574,183 @@ class TurboMindInstance:
if random_seed is not None:
inputs['random_seed'] = _broadcast_np(random_seed, np.uint64)
return inputs, input_lengths
async def async_stream_infer(self,
session_id,
input_ids,
input_embeddings=None,
input_embedding_ranges=None,
request_output_len: int = 512,
sequence_start: bool = True,
sequence_end: bool = False,
step=0,
stop=False,
top_p=0.8,
top_k=40,
temperature=0.8,
repetition_penalty=1.0,
ignore_eos=False,
random_seed=None,
stream_output=False):
"""Perform model inference.
Args:
session_id (int): the id of a session
input_ids (numpy.ndarray): the token ids of a prompt
input_embeddings (List[numpy.ndarray]): embeddings features
input_embedding_ranges (List[Tuple[int,int]]): the begin/end
offsets of input_embeddings to input_ids
request_output_len (int): the max number of to-be-generated tokens
sequence_start (bool): indicator for starting a sequence
sequence_end (bool): indicator for ending a sequence
step (int): the offset of the k/v cache
stop (bool): indicator for cancelling the session
top_p (float): If set to float < 1, only the smallest set of most
probable tokens with probabilities that add up to top_p or higher
are kept for generation.
top_k (int): The number of the highest probability vocabulary
tokens to keep for top-k-filtering
temperature (float): to modulate the next token probability
repetition_penalty (float): The parameter for repetition penalty.
1.0 means no penalty
ignore_eos (bool): indicator for ignoring eos
random_seed (int): seed used by sampling
stream_output (bool): indicator for stream output
"""
if stream_output and not stop:
self.model_insts[0].register_callback(self._forward_callback)
inputs, input_lengths = self.prepare_inputs(
session_id=session_id,
input_ids=input_ids,
input_embeddings=input_embeddings,
input_embedding_ranges=input_embedding_ranges,
request_output_len=request_output_len,
sequence_start=sequence_start,
sequence_end=sequence_end,
step=step,
stop=stop,
top_p=top_p,
top_k=top_k,
temperature=temperature,
repetition_penalty=repetition_penalty,
ignore_eos=ignore_eos,
random_seed=random_seed,
stream_output=stream_output)
tm_inputs = _np_dict_to_tm_dict(inputs)
# start forward thread
self.que = Queue()
self._forward_thread(tm_inputs)
seq_start = input_lengths + input_lengths.new_tensor(step)
# generator
while True:
# Thanks for https://github.com/frankxyy and his issue
# https://github.com/InternLM/lmdeploy/issues/832
while self.que.qsize() == 0:
await asyncio.sleep(0)
while self.que.qsize() > 1:
self.que.get()
finish, tm_outputs = self.que.get()
outputs = _tm_dict_to_torch_dict(tm_outputs)
output_ids = outputs['output_ids'][:, 0, :]
sequence_length = outputs['sequence_length'].long()[:, 0]
output_ids = [
output_id[s:l] for output_id, s, l in zip(
output_ids, seq_start, sequence_length)
]
sequence_length -= seq_start.to(sequence_length.device)
outputs = []
for output, len_ in zip(output_ids, sequence_length):
output, len_ = output, len_.item()
if len(output) > 0 and output[-1].item(
) == self.eos_id and not ignore_eos:
outputs.append((output[:-1], len_ - 1))
elif len(output) > 0 and output[-1].item() in self.stop_tokens:
outputs.append((output[:-1], len_))
else:
outputs.append((output, len_))
yield outputs
if finish:
for t in self.threads:
t.join()
while self.que.qsize() > 0:
self.que.get()
break
if stream_output and not stop:
self.model_insts[0].unregister_callback()
def stream_infer(self,
session_id,
input_ids,
input_embeddings=None,
input_embedding_ranges=None,
request_output_len: int = 512,
sequence_start: bool = True,
sequence_end: bool = False,
step=0,
stop=False,
top_p=0.8,
top_k=40,
temperature=0.8,
repetition_penalty=1.0,
ignore_eos=False,
random_seed=None,
stream_output=False):
"""Perform model inference.
Args:
session_id (int): the id of a session
input_ids (numpy.ndarray): the token ids of a prompt
input_embeddings (List[numpy.ndarray]): embeddings features
input_embedding_ranges (List[Tuple[int,int]]): the begin/end
offsets of input_embeddings to input_ids
request_output_len (int): the max number of to-be-generated tokens
sequence_start (bool): indicator for starting a sequence
sequence_end (bool): indicator for ending a sequence
step (int): the offset of the k/v cache
stop (bool): indicator for cancelling the session
top_p (float): If set to float < 1, only the smallest set of most
probable tokens with probabilities that add up to top_p or higher
are kept for generation.
top_k (int): The number of the highest probability vocabulary
tokens to keep for top-k-filtering
temperature (float): to modulate the next token probability
repetition_penalty (float): The parameter for repetition penalty.
1.0 means no penalty
ignore_eos (bool): indicator for ignoring eos
random_seed (int): seed used by sampling
stream_output (bool): indicator for stream output
"""
if stream_output and not stop:
self.model_insts[0].register_callback(self._forward_callback)
inputs, input_lengths = self.prepare_inputs(
session_id=session_id,
input_ids=input_ids,
input_embeddings=input_embeddings,
input_embedding_ranges=input_embedding_ranges,
request_output_len=request_output_len,
sequence_start=sequence_start,
sequence_end=sequence_end,
step=step,
stop=stop,
top_p=top_p,
top_k=top_k,
temperature=temperature,
repetition_penalty=repetition_penalty,
ignore_eos=ignore_eos,
random_seed=random_seed,
stream_output=stream_output)
tm_inputs = _np_dict_to_tm_dict(inputs)
# start forward thread
self.que = Queue()
self._forward_thread(tm_inputs)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment