Unverified Commit edb7c6ec authored by Lyu Han's avatar Lyu Han Committed by GitHub
Browse files

Fix profile_serving hung issue (#344)

* read data after start processes

* fix hang

* fix exceptions when request_output_len is 0
parent 9bfe03c6
import json
import logging
import multiprocessing as mp
import os
import random
......@@ -28,10 +29,8 @@ class Tokenizer:
def infer(chatbot, session_id: int, req_que: mp.Queue, res_que: mp.Queue):
stats = []
while not req_que.empty():
prompt, input_seqlen, output_seqlen = req_que.get()
print(f'request info: session {session_id}, '
f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}')
for prompt, input_seqlen, output_seqlen in iter(req_que.get,
[None, None, None]):
timestamps = []
tokens = []
start = time.perf_counter()
......@@ -43,12 +42,13 @@ def infer(chatbot, session_id: int, req_que: mp.Queue, res_que: mp.Queue):
sequence_end=True):
timestamps.append(time.perf_counter())
tokens.append(token)
chatbot.reset_session()
first_token_latency = timestamps[1] - start
token_latency = timestamps[-1] - timestamps[0]
first_token_latency = np.round(timestamps[1] - start, 3)
token_latency = np.round(timestamps[-1] - timestamps[0], 3)
token = tokens[-1] - tokens[0]
stats.append([first_token_latency, token, token_latency])
print(f'session {session_id}: '
f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}')
res_que.put((session_id, stats))
......@@ -73,6 +73,7 @@ def warmup(tritonserver_addr: str,
chatbots = [
Chatbot(tritonserver_addr=tritonserver_addr,
ignore_eos=True,
log_level=logging.ERROR,
profile_generation=True) for _ in range(concurrency)
]
procs = []
......@@ -87,7 +88,7 @@ def warmup(tritonserver_addr: str,
def read_dataset(tokenizer_path: str, dataset_path: str, samples: int,
session_len: int):
session_len: int, que: mp.Queue):
start = time.perf_counter()
with open(dataset_path) as f:
dataset = json.load(f)
......@@ -119,12 +120,11 @@ def read_dataset(tokenizer_path: str, dataset_path: str, samples: int,
if samples > 0:
filtered_dataset = random.sample(filtered_dataset, samples)
que = mp.Queue()
for data in filtered_dataset:
que.put(data)
print(f'elapsed time for filtering: '
f'{round(time.perf_counter() - start, 2)} s')
return que, len(filtered_dataset)
return len(filtered_dataset)
def main(tritonserver_addr: str,
......@@ -134,32 +134,39 @@ def main(tritonserver_addr: str,
session_len: int = 2048,
samples: int = 1000):
warmup(tritonserver_addr, concurrency, session_len - 1)
req_que, n_req = read_dataset(tokenizer_path, dataset_path, samples,
session_len)
req_que = mp.Queue()
res_que = mp.Queue()
procs = []
_start = time.perf_counter()
for i in range(concurrency):
chatbot = Chatbot(tritonserver_addr=tritonserver_addr,
display=False,
profile_serving=True,
ignore_eos=True)
ignore_eos=True,
log_level=logging.ERROR)
proc = mp.Process(target=infer,
args=(chatbot, i + 1, req_que, res_que))
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
_end = time.perf_counter()
elapsed_time = _end - _start
# read data and put it to queue
n_req = read_dataset(tokenizer_path, dataset_path, samples, session_len,
req_que)
for i in range(concurrency):
req_que.put([None, None, None])
stats = []
while not res_que.empty():
for i in range(concurrency):
session_id, _stats = res_que.get()
print(f'\n{"-" * 50}\n'
f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n')
f'session {session_id}: processed reqs {len(_stats)}, '
f'stats: \n{_stats}\n{"-" * 50}\n')
stats.append(np.array(_stats))
_end = time.perf_counter()
elapsed_time = _end - _start
stats = np.concatenate(stats).reshape(-1, 3)
first_token_latency_min = np.min(stats[:, 0], axis=0)
......@@ -169,14 +176,17 @@ def main(tritonserver_addr: str,
req_throughput = n_req / elapsed_time
print(f'\n{"-" * 50}\nconcurrency: {concurrency}\n'
f'elapsed_time: {elapsed_time:.2f}s\n'
f'elapsed_time: {elapsed_time:.3f}s\n'
f'first_token latency(min, max, ave): '
f'{first_token_latency_min:.2f}s, {first_token_latency_max:.2f}s, '
f'{first_token_latency_ave:.2f}s\n'
f'token throughput: {token_throughput:.2f} token/s\n'
f'req throughput: {req_throughput:.2f} req/s\n'
f'{first_token_latency_min:.3f}s, {first_token_latency_max:.3f}s, '
f'{first_token_latency_ave:.3f}s\n'
f'token throughput: {token_throughput:.3f} token/s\n'
f'req throughput: {req_throughput:.3f} req/s\n'
f'{"-" * 50}\n')
for proc in procs:
proc.join()
if __name__ == '__main__':
fire.Fire(main)
......@@ -631,7 +631,8 @@ class Chatbot:
output_ids = output_ids[:, :, n_input_token +
preseq_length:sequence_length.squeeze(
)]
last_token_id = output_ids[-1, -1, -1]
last_token_id = None if output_ids.shape[
-1] == 0 else output_ids[-1, -1, -1]
if last_token_id == eos_id:
session.sequence_length = session.sequence_length - 1
output_ids = output_ids[:, :, :-1]
......@@ -652,6 +653,8 @@ class Chatbot:
output_ids.shape[-1])
except Exception as e:
logger.error(f'catch exception: {e}')
logger.error(
f'session {session.session_id}: prompt: {session.prompt}')
# put session back to queue so that `_stream_infer` can update it in
# `self.sessions`
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment