Unverified Commit 5c9e1e28 authored by Lyu Han's avatar Lyu Han Committed by GitHub
Browse files

Report first-token-latency and token-latency percentiles (#736)

* update profile scripts

* add top_p, top_k and temperature as input arguments

* fix input_ids

* update profile_throughput

* update profile_restful_api

* update profile_serving

* update

* update

* add progress bar

* remove TODO comments

* update

* remove useless profile_* argument

* remove log level

* change concurrency default value to 64

* update restful_api.md

* update according to review comments

* fix docstring
parent 8add942d
...@@ -29,7 +29,7 @@ pip install nvidia-ml-py ...@@ -29,7 +29,7 @@ pip install nvidia-ml-py
```bash ```bash
python profile_generation.py \ python profile_generation.py \
--model-path /path/to/your/model \ /path/to/your/model \
--concurrency 1 8 --prompt-tokens 1 512 --completion-tokens 2048 512 --concurrency 1 8 --prompt-tokens 1 512 --completion-tokens 2048 512
``` ```
......
# Copyright (c) OpenMMLab. All rights reserved. # Copyright (c) OpenMMLab. All rights reserved.
# import multiprocessing as mp
import argparse import argparse
import csv import csv
import logging import logging
import os import os
import os.path as osp
import time import time
from dataclasses import dataclass from dataclasses import dataclass
from queue import Queue from queue import Queue
...@@ -18,37 +16,47 @@ from pynvml import (NVMLError, nvmlDeviceGetCount, nvmlDeviceGetHandleByIndex, ...@@ -18,37 +16,47 @@ from pynvml import (NVMLError, nvmlDeviceGetCount, nvmlDeviceGetHandleByIndex,
nvmlInit, nvmlShutdown, nvmlSystemGetDriverVersion) nvmlInit, nvmlShutdown, nvmlSystemGetDriverVersion)
from tqdm import tqdm from tqdm import tqdm
from lmdeploy.tokenizer import Tokenizer
from lmdeploy.turbomind import TurboMind from lmdeploy.turbomind import TurboMind
def infer(model, session_id: int, input_ids: str, output_seqlen: int, def infer(model, session_id: int, input_ids: List, output_seqlen: int,
test_round: int, que: Queue): test_round: int, que: Queue):
chatbot = model.create_instance() chatbot = model.create_instance()
stats = [] stats = []
for i in range(test_round): for _ in range(test_round):
start = time.perf_counter() token_latency_stats = [0] * (output_seqlen + 1)
timestamps = [] prev = time.perf_counter()
tokens = [] n_pre_token = 0
"""
The iterator provided by `stream_infer` denotes the number of generated tokens so far,
which is represented by the variable `n_token`.
Please note that `n_token` is not a continuous value. In other words, during the iteration,
its value might be 5, 7, 8, 16, and so on, rather than 1, 2, 3, 4, etc.
So, it is quite difficult to get the latency of each generated token.
As a work-around, we set the latency `new-prev` of each iteration to the first token of
the new generated tokens, and leave the latency of the rest tokens being 0.
For example, in the first iteration, 5 tokens are generated.
The time elapsing in this iteration `now-prev` is set to the latency of first token of
the 5 tokens, i.e. `token_latency_stats[0]`, and `token_latency_stats[1:4]` is set 0`
""" # noqa: E501
for outputs in chatbot.stream_infer(session_id, for outputs in chatbot.stream_infer(session_id,
input_ids, input_ids,
request_output_len=output_seqlen, request_output_len=output_seqlen,
sequence_start=True, sequence_start=True,
sequence_end=True, sequence_end=True,
ignore_eos=True): ignore_eos=True,
res, token = outputs[0] stream_output=True):
timestamps.append(time.perf_counter()) _, n_token = outputs[0]
tokens.append(token) now = time.perf_counter()
if n_pre_token != n_token:
# TODO: ignore first token token_latency_stats[n_pre_token] = np.round(now - prev, 3)
first_token_latency = np.round(timestamps[0] - start, 2) n_pre_token = n_token
if len(timestamps) == 1: prev = now
token_latency = np.round(timestamps[0] - start, 2)
token = tokens[0] assert output_seqlen <= n_token <= output_seqlen + 1, \
else: f'Error. session_id({session_id}) request {output_seqlen} ' \
token_latency = np.round(timestamps[-1] - timestamps[0], 2) f'tokens, but generate {n_token} tokens'
token = tokens[-1] - tokens[0] stats.append(token_latency_stats[:output_seqlen])
stats.append([first_token_latency, token, token_latency])
que.put((session_id, stats)) que.put((session_id, stats))
...@@ -93,15 +101,19 @@ def profile_throughput(model_path: str, ...@@ -93,15 +101,19 @@ def profile_throughput(model_path: str,
input_seqlen: int = 1, input_seqlen: int = 1,
output_seqlen: int = 512, output_seqlen: int = 512,
test_round: int = 10, test_round: int = 10,
tp: int = 1): tp: int = 1,
tokenizer_model_path = osp.join(model_path, 'triton_models', 'tokenizer') **kwargs):
tokenizer = Tokenizer(tokenizer_model_path) # avoid turbomind checking chat template name by setting
tm_model = TurboMind(model_path=model_path, tp=tp) # `model_name='llama'`
tm_model = TurboMind(model_path=model_path,
tp=tp,
model_name='llama',
**kwargs)
tokenizer = tm_model.tokenizer
# make up a prompt that can be tokenized into {input_seqlen} tokens # make up a prompt that can be tokenized into {input_seqlen} tokens
assert input_seqlen > 0, 'input_seqlen should > 0' assert input_seqlen > 0, 'input_seqlen should > 0'
prompt = 'hi' input_ids = tokenizer('hi').input_ids
input_ids = tokenizer.encode(prompt)
input_ids = input_ids * input_seqlen input_ids = input_ids * input_seqlen
warmup(tm_model, concurrency, input_ids, output_seqlen) warmup(tm_model, concurrency, input_ids, output_seqlen)
...@@ -110,7 +122,6 @@ def profile_throughput(model_path: str, ...@@ -110,7 +122,6 @@ def profile_throughput(model_path: str,
procs = [] procs = []
_start = time.perf_counter() _start = time.perf_counter()
# TODO: update to the multithread version
for i in range(concurrency): for i in range(concurrency):
proc = Thread(target=infer, proc = Thread(target=infer,
args=(tm_model, i + 1, input_ids, output_seqlen, args=(tm_model, i + 1, input_ids, output_seqlen,
...@@ -128,33 +139,49 @@ def profile_throughput(model_path: str, ...@@ -128,33 +139,49 @@ def profile_throughput(model_path: str,
_end = time.perf_counter() _end = time.perf_counter()
elapsed_time = _end - _start elapsed_time = _end - _start
stats = [] token_latency_stats = []
while not que.empty(): while not que.empty():
session_id, _stats = que.get() _, _stats = que.get()
print(f'\n{"-" * 50}\n' token_latency_stats += _stats
f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n')
stats.append(_stats) # The shape is [concurrency*test_round, output_seqlen]
token_latency_stats = np.stack(token_latency_stats, axis=0)
stats = np.array(stats).reshape(-1, 3)
first_token_latency_min = np.round(
first_token_latency_min = np.min(stats[:, 0], axis=0) np.min(token_latency_stats[:, 0], axis=0), 3)
first_token_latency_max = np.max(stats[:, 0], axis=0) first_token_latency_max = np.round(
first_token_latency_ave = np.mean(stats[:, 0], axis=0) np.max(token_latency_stats[:, 0], axis=0), 3)
token_latency_min = np.min(stats[:, 2], axis=0) first_token_latency_ave = np.round(
token_latency_max = np.max(stats[:, 2], axis=0) np.mean(token_latency_stats[:, 0], axis=0), 3)
token_latency_ave = np.mean(stats[:, 2], axis=0) token_latency_max = np.round(np.max(np.sum(token_latency_stats, axis=1)),
throughput = np.sum(stats[:, 1], axis=0) / np.sum(stats[:, 2], 3)
axis=0) * concurrency token_latency_min = np.round(np.min(np.sum(token_latency_stats, axis=1)),
print(f'\n{"-" * 50}\nconcurrency: {concurrency}, input_tokens: ' 3)
f'{input_seqlen}, output_tokens: {output_seqlen}\n' token_latency_ave = np.round(np.mean(np.sum(token_latency_stats, axis=1)),
f'elapsed_time: {elapsed_time:.2f}s\n' 3)
# sort token_latency without the first token's latency
sorted_token_latency = np.sort(token_latency_stats[:, 1:].flatten())
percentiles = [
np.round(
sorted_token_latency[int(percent * len(sorted_token_latency))], 3)
for percent in [0.5, 0.75, 0.95, 0.99]
]
throughput = np.round(token_latency_stats.size / elapsed_time, 2)
print(f'\n{"-" * 50}\ntotal time: {elapsed_time:.2f}s\n'
f'concurrency: {concurrency}, test_round: {test_round}\n'
f'input_tokens: {input_seqlen}, output_tokens: {output_seqlen}\n'
f'first_token latency(min, max, ave): ' f'first_token latency(min, max, ave): '
f'{first_token_latency_min:.2f}s, {first_token_latency_max:.2f}s, ' f'{first_token_latency_min}s, {first_token_latency_max}s, '
f'{first_token_latency_ave:.2f}s\ntoken latency(min, max, ave): ' f'{first_token_latency_ave}s\ntotal_token latency(min, max, ave): '
f'{token_latency_min:.2f}s, {token_latency_max:.2f}s, ' f'{token_latency_min}s, {token_latency_max}s, '
f'{token_latency_ave:.2f}s\n' f'{token_latency_ave}s\n'
f'throughput: {throughput:.2f} token/s\n{"-" * 50}') f'token_latency percentiles(50%,75%,95%,99%)(s): {percentiles}\n'
return tm_model.model_name, throughput, tm_model.gpu_count f'throughput: {throughput} token/s\n{"-" * 50}')
return tm_model.model_name, \
[first_token_latency_min, first_token_latency_max,
first_token_latency_ave], \
percentiles, throughput, tm_model.gpu_count
class MemoryMonitor: class MemoryMonitor:
...@@ -235,6 +262,8 @@ class ProfileResult: ...@@ -235,6 +262,8 @@ class ProfileResult:
batch: int batch: int
prompt_tokens: int prompt_tokens: int
completion_tokens: int completion_tokens: int
first_token_latency: List
percentiles: List
throughput_per_proc: float throughput_per_proc: float
throughput_per_node: float throughput_per_node: float
mem_per_proc: float mem_per_proc: float
...@@ -244,42 +273,67 @@ class ProfileResult: ...@@ -244,42 +273,67 @@ class ProfileResult:
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description='Regression Test') parser = argparse.ArgumentParser(description='Regression Test')
parser.add_argument('--model-path', parser.add_argument('model_path',
type=str, type=str,
help='benchmark test model path') help='the path of the model in localhost or '
'the repo_id of the model in huggingface.co')
parser.add_argument('--concurrency', parser.add_argument('--concurrency',
nargs='+', nargs='+',
type=int, type=int,
help='how many requests launched concurrently', help='how many requests launched concurrently',
default=[1, 8, 16, 32]) default=[1, 16, 32, 64])
parser.add_argument( parser.add_argument(
'--prompt-tokens', '--prompt-tokens',
nargs='+', nargs='+',
type=int, type=int,
help='how many requests launched concurrently. One-to-one' help='how many requests launched concurrently. One-to-one'
'correspondence with completion-tokens', 'correspondence with completion-tokens',
default=[64, 512, 512, 1024]) default=[1, 128, 128, 2048, 2048])
parser.add_argument('--completion-tokens', parser.add_argument('--completion-tokens',
nargs='+', nargs='+',
type=int, type=int,
help='how many tokens to be generated. One-to-one' help='how many tokens to be generated. One-to-one'
'correspondence with prompt-tokens', 'correspondence with prompt-tokens',
default=[512, 512, 1024, 1024]) default=[128, 128, 2048, 128, 2048])
parser.add_argument('--tp', type=int, help='Tensor parallel', default=1) parser.add_argument('--tp', type=int, help='Tensor parallel', default=1)
parser.add_argument('--dst-csv', parser.add_argument('--top_k',
type=int,
help='The number of highest probability vocabulary '
'tokens to keep for top-k-filtering',
default=1)
parser.add_argument('--top_p',
type=float,
help='the set of most probable tokens with '
'probabilities that add up to top_p or higher '
'are kept for generation',
default=1.0)
parser.add_argument('--temperature',
type=float,
help='The value used to modulate the next token '
'probabilities',
default=1.0)
parser.add_argument('--csv',
type=str, type=str,
help='Where to save the result.', help='Where to save the result.',
default='profile_generation.csv') default='profile_generation.csv')
parser.add_argument('--log-level', parser.add_argument('--log-level',
help='set log level', help='set log level',
default='INFO', default='ERROR',
choices=list(logging._nameToLevel.keys())) choices=list(logging._nameToLevel.keys()))
parser.add_argument('--test-round',
type=int,
help='number of test rounds',
default=10)
args = parser.parse_args() args = parser.parse_args()
return args return args
def main(): def main():
args = parse_args() args = parse_args()
assert len(args.prompt_tokens) == len(args.completion_tokens), \
f'mismatched size between `prompt-tokens` and `completion-tokenes`' \
f', {len(args.prompt_tokens)} vs {len(args.completion_tokens)}'
os.environ['TM_LOG_LEVEL'] = args.log_level os.environ['TM_LOG_LEVEL'] = args.log_level
results: List[ProfileResult] = [] results: List[ProfileResult] = []
for batch in tqdm(args.concurrency): for batch in tqdm(args.concurrency):
...@@ -292,9 +346,14 @@ def main(): ...@@ -292,9 +346,14 @@ def main():
concurrency=batch, concurrency=batch,
input_seqlen=prompt_tokens, input_seqlen=prompt_tokens,
output_seqlen=completion_tokens, output_seqlen=completion_tokens,
tp=args.tp) tp=args.tp,
top_k=args.top_k,
top_p=args.top_p,
temperature=args.temperature,
test_round=args.test_round)
output = Pool(1).map(profile_target, (args.model_path, )) output = Pool(1).map(profile_target, (args.model_path, ))
model_name, throughput_per_proc, tp = output[0] model_name, first_token_latency, percentiles, \
throughput_per_proc, tp = output[0]
time.sleep(5) # wait a while for releasing GPU mem time.sleep(5) # wait a while for releasing GPU mem
memory = MemoryMonitor.terminate() memory = MemoryMonitor.terminate()
device_count = MemoryMonitor.device_count.value device_count = MemoryMonitor.device_count.value
...@@ -303,25 +362,31 @@ def main(): ...@@ -303,25 +362,31 @@ def main():
batch=batch, batch=batch,
prompt_tokens=prompt_tokens, prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens, completion_tokens=completion_tokens,
first_token_latency=first_token_latency,
percentiles=percentiles,
throughput_per_proc=throughput_per_proc, throughput_per_proc=throughput_per_proc,
throughput_per_node=throughput_per_proc / tp * throughput_per_node=throughput_per_proc / tp *
device_count, device_count,
mem_per_proc=memory, mem_per_proc=memory,
mem_per_gpu=memory / tp, mem_per_gpu=memory / tp,
mem_per_node=memory / tp * device_count)) mem_per_node=memory / tp * device_count))
with open(args.dst_csv, 'w') as csvfile: with open(args.csv, 'w') as csvfile:
writer = csv.writer(csvfile) writer = csv.writer(csvfile)
writer.writerow([ writer.writerow([
'batch', 'prompt_tokens', 'completion_tokens', 'batch', 'prompt_tokens', 'completion_tokens',
'throughput_per_proc(token/s)', 'throughput_per_node(token/s)', '1st_token_latency(min)(s)', '1st_token_latency(max)(s)',
'mem_per_proc(GB)', 'mem_per_gpu(GB)', 'mem_per_node(GB)' '1st_token_latency(ave)(s)', 'percentile50(s)', 'percentile75(s)',
'percentile95(s)', 'percentile99(s)', 'throughput(token/s)',
'mem_per_proc(GB)', 'mem_per_gpu(GB)'
]) ])
for re in results: for re in results:
writer.writerow([ writer.writerow([
re.batch, re.prompt_tokens, re.completion_tokens, re.batch, re.prompt_tokens, re.completion_tokens,
f'{re.throughput_per_proc:.2f}', re.first_token_latency[0], re.first_token_latency[1],
f'{re.throughput_per_node:.2f}', f'{re.mem_per_proc:.2f}', re.first_token_latency[2], re.percentiles[0],
f'{re.mem_per_gpu:.2f}', f'{re.mem_per_node:.2f}' re.percentiles[1], re.percentiles[2], re.percentiles[3],
f'{re.throughput_per_proc:.2f}', f'{re.mem_per_proc:.2f}',
f'{re.mem_per_gpu:.2f}'
]) ])
......
import csv
import json import json
import random import random
import time import time
from queue import Queue from queue import Queue
from threading import Thread from threading import Thread
from typing import List, Tuple
import fire import fire
import numpy as np import numpy as np
from tqdm import tqdm
from lmdeploy.serve.openai.api_client import get_streaming_response from lmdeploy.serve.openai.api_client import APIClient
from lmdeploy.tokenizer import Tokenizer from lmdeploy.tokenizer import Tokenizer
def infer(server_addr: str, session_id: int, req_queue: Queue, res_que: Queue, def sample_requests(
dataset_path: str,
num_requests: int,
tokenizer: Tokenizer,
) -> List[Tuple[str, int, int]]:
# Load the dataset.
with open(dataset_path) as f:
dataset = json.load(f)
# Filter out the conversations with less than 2 turns.
dataset = [data for data in dataset if len(data['conversations']) >= 2]
# Only keep the first two turns of each conversation.
dataset = [(data['conversations'][0]['value'],
data['conversations'][1]['value']) for data in dataset]
# Tokenize the prompts and completions.
prompts = [prompt for prompt, _ in dataset]
prompt_token_ids = tokenizer(prompts).input_ids
completions = [completion for _, completion in dataset]
completion_token_ids = tokenizer(completions).input_ids
tokenized_dataset = []
for i in range(len(dataset)):
output_len = len(completion_token_ids[i])
tokenized_dataset.append((prompts[i], prompt_token_ids[i], output_len))
# Filter out too long sequences.
filtered_dataset: List[Tuple[str, int, int]] = []
for prompt, prompt_token_ids, output_len in tokenized_dataset:
prompt_len = len(prompt_token_ids)
if prompt_len < 4 or output_len < 4:
# Prune too short sequences.
continue
if prompt_len > 1024 or prompt_len + output_len > 2048:
# Prune too long sequences.
continue
filtered_dataset.append((prompt, prompt_len, output_len))
# Sample the requests.
sampled_requests = random.sample(filtered_dataset, num_requests)
return sampled_requests
class Engine:
def __init__(self,
server_addr: str,
tokenzier_path: str,
temperature: float = 0.8,
top_p: float = 1.0,
csv: str = '',
**kwargs):
self.tokenizer = Tokenizer(tokenzier_path)
self.server_addr = server_addr
self.temperature = temperature
self.top_p = top_p
self.csv = csv
client = APIClient(self.server_addr)
self.model_name = client.available_models[0]
self.pbar = None
def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int,
stream_output: bool): stream_output: bool):
stats = [] stats = []
for prompt, input_seqlen, output_seqlen in iter(req_queue.get, client = APIClient(self.server_addr)
[None, None, None]):
if prompt is None: for prompt, input_seqlen, output_seqlen in iter(
break req_queue.get, [None, None, None]):
timestamps = [] timestamps = []
tokens = []
timestamps.append(time.perf_counter()) timestamps.append(time.perf_counter())
for res, token, status in get_streaming_response( for output in client.chat_completions_v1(
prompt, model=self.model_name,
server_addr, messages=prompt,
session_id, temperature=self.temperature,
request_output_len=output_seqlen, top_p=self.top_p,
interactive_mode=False, n=1,
ignore_eos=True, max_tokens=output_seqlen,
stream=stream_output): stream=stream_output,
session_id=session_id,
ignore_eos=True):
timestamps.append(time.perf_counter()) timestamps.append(time.perf_counter())
tokens.append(token)
first_token_latency = np.round(timestamps[1] - timestamps[0], 3) first_token_latency = np.round(timestamps[1] - timestamps[0], 3)
token_latency = np.round(timestamps[-1] - timestamps[0], 3) token_latency = np.round(timestamps[-1] - timestamps[0], 3)
completion_tokens = tokens[-1] # assert output.pop('finish_reason') == 'length', \
total_tokens = tokens[-1] + input_seqlen # f'Error. session_id({session_id}) request {output_seqlen} ' \
# f'tokens, but `finish_reason` is not `length`'
total_tokens = input_seqlen + output_seqlen
stats.append([ stats.append([
first_token_latency, completion_tokens, output_seqlen, first_token_latency, output_seqlen, output_seqlen,
total_tokens, token_latency total_tokens, token_latency
]) ])
print(f'session {session_id}: ' self.pbar.update(1)
f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}, '
f'completion_tokens {completion_tokens}')
res_que.put((session_id, stats))
res_queue.put((session_id, stats))
def warmup(server_addr: str, def process_request(self,
concurrency: int, requests,
output_seqlen: int, concurrency: int = 1,
warmup_round: int = 1,
stream_output: bool = False): stream_output: bool = False):
print('start to warmup ...') res_queue = Queue()
req_queue = Queue()
def _infer(server_addr, session_id): threads = []
for _ in range(warmup_round):
for _ in get_streaming_response('', self.pbar = tqdm(total=len(requests))
server_addr,
session_id,
request_output_len=output_seqlen,
interactive_mode=False,
stream=stream_output,
ignore_eos=True):
continue
_start = time.perf_counter() # feed request to q
procs = [] for req in requests:
req_queue.put(req)
for i in range(concurrency): for i in range(concurrency):
proc = Thread(target=_infer, args=(server_addr, i + 1)) req_queue.put([None, None, None])
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
_end = time.perf_counter()
print(f'end warmup, elapsed time: {round(_end - _start, 2)} s')
def read_dataset(tokenizer_path: str, dataset_path: str, samples: int,
session_len: int):
start = time.perf_counter()
with open(dataset_path) as f:
dataset = json.load(f)
dataset = [data for data in dataset if len(data['conversations']) >= 2]
# Only keep the first two turns of each conversation.
dataset = [(data['conversations'][0]['value'],
data['conversations'][1]['value']) for data in dataset]
prompts = [prompt for prompt, _ in dataset]
completions = [completion for _, completion in dataset]
print(f'elapsed time for read data: '
f'{round(time.perf_counter() - start, 2)} s')
print('start tokenization. This takes a while, please wait...')
start = time.perf_counter()
tokenizer = Tokenizer(tokenizer_path)
prompts_token_lens = [len(tokenizer.encode(prompt)) for prompt in prompts]
completions_token_lens = [
len(tokenizer.encode(prompt)) for prompt in completions
]
print(f'elapsed time for tokenization: '
f'{round(time.perf_counter() - start, 2)} s')
start = time.perf_counter()
filtered_dataset = []
for (prompt, _), input_len, output_len in zip(dataset, prompts_token_lens,
completions_token_lens):
if input_len + output_len > session_len:
# ignore too long conversation
continue
filtered_dataset.append([prompt, input_len, output_len])
if samples > 0: start = time.time()
filtered_dataset = random.sample(filtered_dataset, samples)
que = Queue() # start threads
for data in filtered_dataset: for i in range(concurrency):
que.put(data) t = Thread(target=self._inference,
que.put((None, None, None)) args=(req_queue, res_queue, i, stream_output))
print(f'elapsed time for filtering: ' t.start()
f'{round(time.perf_counter() - start, 2)} s') threads.append(t)
return que, len(filtered_dataset)
# wait for finish
for t in threads:
t.join()
def main(server_addr: str, elapsed_time = time.time() - start
tokenizer_path: str,
dataset_path: str,
concurrency: int = 1,
session_len: int = 2048,
samples: int = 1000,
stream_output: bool = False):
api_url = server_addr + '/v1/chat/interactive'
warmup(api_url, concurrency, session_len - 1, 4, stream_output)
req_queue, n_req = read_dataset(tokenizer_path, dataset_path, samples,
session_len)
for i in range(concurrency):
req_queue.put([None, None, None])
res_que = Queue()
procs = []
_start = time.perf_counter()
for i in range(concurrency):
proc = Thread(target=infer,
args=(api_url, i + 1, req_queue, res_que, stream_output))
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
_end = time.perf_counter()
elapsed_time = _end - _start
stats = [] stats = []
while not res_que.empty(): while not res_queue.empty():
session_id, _stats = res_que.get() session_id, _stats = res_queue.get()
print(f'\n{"-" * 50}\n' # print(f'\n{"-" * 50}\n'
f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n') # f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n')
stats.append(np.array(_stats)) stats.append(np.array(_stats))
stats = np.concatenate(stats).reshape(-1, 5) stats = np.concatenate(stats).reshape(-1, 5)
...@@ -165,7 +158,7 @@ def main(server_addr: str, ...@@ -165,7 +158,7 @@ def main(server_addr: str,
prompt_tokens = total_tokens - completion_tokens prompt_tokens = total_tokens - completion_tokens
completion_token_throughput = completion_tokens / elapsed_time completion_token_throughput = completion_tokens / elapsed_time
total_token_throughput = total_tokens / elapsed_time total_token_throughput = total_tokens / elapsed_time
rqs = n_req / elapsed_time rqs = len(requests) / elapsed_time
rqm = rqs * 60 rqm = rqs * 60
if (np.abs(stats[:, 1] - stats[:, 2]) <= 1).min() is False: if (np.abs(stats[:, 1] - stats[:, 2]) <= 1).min() is False:
...@@ -189,6 +182,72 @@ def main(server_addr: str, ...@@ -189,6 +182,72 @@ def main(server_addr: str,
f'RPM (request per minute): {rqm:.3f} req/min\n' f'RPM (request per minute): {rqm:.3f} req/min\n'
f'{"-" * 50}\n') f'{"-" * 50}\n')
if self.csv:
with open(self.csv, 'w') as csvfile:
writer = csv.writer(csvfile)
writer.writerow([
'batch', 'num_prompts', 'prompt_tokens',
'completion_tokens', '1st_token_latency(min)(s)',
'1st_token_latency(max)(s)', '1st_token_latency(ave)(s)',
'output token thr(tokens/s', 'total token thr(token/s)',
'RPM'
])
writer.writerow([
concurrency,
len(requests), prompt_tokens, completion_tokens,
f'{first_token_latency_min:.3f}' if stream_output else '-',
f'{first_token_latency_max:.3f}' if stream_output else '-',
f'{first_token_latency_ave:.3f}' if stream_output else '-',
f'{completion_token_throughput:.3f}',
f'{total_token_throughput:.3f}', f'{rqm:.3f}'
])
def main(server_addr: str,
tokenizer_path: str,
dataset: str,
concurrency: int = 64,
num_prompts: int = 2000,
top_p: float = 1.0,
temperature: float = 1.0,
stream_output: bool = False,
csv: str = './profile_api_server.csv',
seed: int = 0):
"""Benchmark the request througput of api server.
Args:
server_addr (str): http url of api_server with format http://0.0.0.0:0
tokenizer_path (str): Path to the tokenizer model in localhost
dataset (str): Path to the dataset
concurrency (int, optional): Number of working threads to process the sampled prompts.
Defaults to 64.
num_prompts (int, optional): Number of prompts to process. Defaults to 2000.
top_p (float, optional): the set of most probable tokens with
probabilities that add up to top_p or higher
are kept for generation. Defaults to 1.0.
temperature (float, optional): The value used to modulate the next token probabilities.
Defaults to 1.0.
stream_output (bool, optional): Indicator for streaming output. Defaults to False.
csv (str, optional): The path to save the result.
seed (int, optional): Seed used in sampling prompts from dataset. Defaults to 0.
""" # noqa
if not server_addr.startswith('http://'):
print(f'[WARNING] server_addr of the api_server should '
f'start with "http://", but got "{server_addr}"')
server_addr = 'http://' + server_addr.strip()
random.seed(seed)
engine = Engine(server_addr,
tokenizer_path,
top_p=top_p,
temperature=temperature,
csv=csv)
requests = sample_requests(dataset, num_prompts, engine.tokenizer)
engine.process_request(requests, concurrency, stream_output)
if __name__ == '__main__': if __name__ == '__main__':
fire.Fire(main) fire.Fire(main)
import csv
import json import json
import logging
import multiprocessing as mp
import random import random
import time import time
from queue import Queue
from threading import Thread
from typing import List, Tuple
import fire import fire
import numpy as np import numpy as np
from tqdm import tqdm
from lmdeploy.serve.turbomind.chatbot import Chatbot from lmdeploy.serve.turbomind.chatbot import Chatbot
from lmdeploy.tokenizer import Tokenizer from lmdeploy.tokenizer import Tokenizer
def infer(chatbot, session_id: int, req_que: mp.Queue, res_que: mp.Queue): def sample_requests(
dataset_path: str,
num_requests: int,
tokenizer: Tokenizer,
) -> List[Tuple[str, int, int]]:
# Load the dataset.
with open(dataset_path) as f:
dataset = json.load(f)
# Filter out the conversations with less than 2 turns.
dataset = [data for data in dataset if len(data['conversations']) >= 2]
# Only keep the first two turns of each conversation.
dataset = [(data['conversations'][0]['value'],
data['conversations'][1]['value']) for data in dataset]
# Tokenize the prompts and completions.
prompts = [prompt for prompt, _ in dataset]
prompt_token_ids = tokenizer(prompts).input_ids
completions = [completion for _, completion in dataset]
completion_token_ids = tokenizer(completions).input_ids
tokenized_dataset = []
for i in range(len(dataset)):
output_len = len(completion_token_ids[i])
tokenized_dataset.append((prompts[i], prompt_token_ids[i], output_len))
# Filter out too long sequences.
filtered_dataset: List[Tuple[str, int, int]] = []
for prompt, prompt_token_ids, output_len in tokenized_dataset:
prompt_len = len(prompt_token_ids)
if prompt_len < 4 or output_len < 4:
# Prune too short sequences.
continue
if prompt_len > 1024 or prompt_len + output_len > 2048:
# Prune too long sequences.
continue
filtered_dataset.append((prompt, prompt_len, output_len))
# Sample the requests.
sampled_requests = random.sample(filtered_dataset, num_requests)
return sampled_requests
class Engine:
def __init__(self,
server_addr: str,
tokenzier_path: str,
temperature: float = 0.8,
top_k: int = 1,
top_p: float = 1.0,
csv: str = '',
log_level: str = 'ERROR',
**kwargs):
self.server_addr = server_addr
self.tokenizer = Tokenizer(tokenzier_path)
self.temperature = temperature
self.top_k = top_k
self.top_p = top_p
self.csv = csv
self.log_level = log_level
self.pbar = None
def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int,
stream_output: bool):
chatbot = Chatbot(self.server_addr,
ignore_eos=True,
profile_serving=True,
top_k=self.top_k,
top_p=self.top_p,
temperature=self.temperature,
log_level=self.log_level)
stats = [] stats = []
for prompt, input_seqlen, output_seqlen in iter(req_que.get, for prompt, input_seqlen, output_seqlen in iter(
[None, None, None]): req_queue.get, [None, None, None]):
timestamps = [] timestamps = []
tokens = [] tokens = []
timestamps.append(time.perf_counter()) timestamps.append(time.perf_counter())
for status, res, token in chatbot.stream_infer( for _, _, n_token in chatbot.stream_infer(
session_id, session_id,
prompt, prompt,
request_output_len=output_seqlen, request_output_len=output_seqlen,
sequence_start=True, sequence_start=True,
sequence_end=True): sequence_end=True):
timestamps.append(time.perf_counter()) timestamps.append(time.perf_counter())
tokens.append(token) tokens.append(n_token)
first_token_latency = np.round(timestamps[1] - timestamps[0], 3) first_token_latency = np.round(timestamps[1] - timestamps[0], 3)
token_latency = np.round(timestamps[-1] - timestamps[0], 3) token_latency = np.round(timestamps[-1] - timestamps[0], 3)
completion_tokens = tokens[-1] completion_tokens = tokens[-1]
assert output_seqlen <= completion_tokens <= output_seqlen + 1, \
f'Error. session_id({session_id}) request {output_seqlen} ' \
f'tokens, but generate {completion_tokens} tokens.\n' \
f'prompt: {prompt}'
total_tokens = tokens[-1] + input_seqlen total_tokens = tokens[-1] + input_seqlen
stats.append([ stats.append([
first_token_latency, completion_tokens, output_seqlen, first_token_latency, completion_tokens, output_seqlen,
total_tokens, token_latency total_tokens, token_latency
]) ])
print(f'session {session_id}: ' self.pbar.update(1)
f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}, ' res_queue.put((session_id, stats))
f'completion_tokens {completion_tokens}')
res_que.put((session_id, stats))
def process_request(self,
requests,
concurrency: int = 1,
stream_output: bool = True):
res_queue = Queue()
req_queue = Queue()
threads = []
def warmup(tritonserver_addr: str, self.pbar = tqdm(total=len(requests))
concurrency: int,
output_seqlen: int,
warmup_round: int = 1):
print('start to warmup ...')
def _infer(_chatbot, session_id): # feed request to q
for _ in range(warmup_round): for req in requests:
for _, _, _ in _chatbot.stream_infer( req_queue.put(req)
session_id, for i in range(concurrency):
prompt='', req_queue.put([None, None, None])
request_output_len=output_seqlen,
sequence_start=True,
sequence_end=True):
continue
_chatbot.reset_session()
_start = time.perf_counter()
chatbots = [
Chatbot(tritonserver_addr=tritonserver_addr,
ignore_eos=True,
log_level=logging.ERROR,
profile_generation=True) for _ in range(concurrency)
]
procs = []
for i, chatbot in enumerate(chatbots):
proc = mp.Process(target=_infer, args=(chatbot, i + 1))
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
_end = time.perf_counter()
print(f'end warmup, elapsed time: {round(_end - _start, 2)} s')
def read_dataset(tokenizer_path: str, dataset_path: str, samples: int,
session_len: int, que: mp.Queue):
start = time.perf_counter()
with open(dataset_path) as f:
dataset = json.load(f)
dataset = [data for data in dataset if len(data['conversations']) >= 2]
# Only keep the first two turns of each conversation.
dataset = [(data['conversations'][0]['value'],
data['conversations'][1]['value']) for data in dataset]
prompts = [prompt for prompt, _ in dataset]
completions = [completion for _, completion in dataset]
print(f'elapsed time for read data: '
f'{round(time.perf_counter() - start, 2)} s')
print('start tokenization. This takes a while, please wait...')
start = time.perf_counter()
tokenizer = Tokenizer(tokenizer_path)
prompts_token_lens = [len(tokenizer.encode(prompt)) for prompt in prompts]
completions_token_lens = [
len(tokenizer.encode(prompt)) for prompt in completions
]
print(f'elapsed time for tokenization: '
f'{round(time.perf_counter() - start, 2)} s')
start = time.perf_counter()
filtered_dataset = []
for (prompt, _), input_len, output_len in zip(dataset, prompts_token_lens,
completions_token_lens):
if input_len + output_len > session_len:
# ignore too long conversation
continue
filtered_dataset.append([prompt, input_len, output_len])
if samples > 0:
filtered_dataset = random.sample(filtered_dataset, samples)
for data in filtered_dataset: start = time.time()
que.put(data)
print(f'elapsed time for filtering: '
f'{round(time.perf_counter() - start, 2)} s')
return len(filtered_dataset)
# start threads
for i in range(concurrency):
t = Thread(target=self._inference,
args=(req_queue, res_queue, i, stream_output))
t.start()
threads.append(t)
def main(tritonserver_addr: str, # wait for finish
tokenizer_path: str, for t in threads:
dataset_path: str, t.join()
concurrency: int = 1,
session_len: int = 2048,
samples: int = 1000):
warmup(tritonserver_addr, concurrency, session_len - 1)
req_que = mp.Queue()
res_que = mp.Queue()
procs = [] elapsed_time = time.time() - start
for i in range(concurrency):
chatbot = Chatbot(tritonserver_addr=tritonserver_addr,
display=False,
profile_serving=True,
ignore_eos=True,
log_level=logging.ERROR)
proc = mp.Process(target=infer,
args=(chatbot, i + 1, req_que, res_que))
procs.append(proc)
# read data and put it to queue
n_req = read_dataset(tokenizer_path, dataset_path, samples, session_len,
req_que)
for i in range(concurrency):
req_que.put([None, None, None])
_start = time.perf_counter()
for proc in procs:
proc.start()
stats = [] stats = []
for i in range(concurrency): while not res_queue.empty():
session_id, _stats = res_que.get() session_id, _stats = res_queue.get()
print(f'\n{"-" * 50}\n' # print(f'\n{"-" * 50}\n'
f'session {session_id}: processed reqs {len(_stats)}, ' # f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n')
f'stats: \n{_stats}\n{"-" * 50}\n')
stats.append(np.array(_stats)) stats.append(np.array(_stats))
_end = time.perf_counter()
elapsed_time = _end - _start
stats = np.concatenate(stats).reshape(-1, 5) stats = np.concatenate(stats).reshape(-1, 5)
...@@ -170,7 +163,7 @@ def main(tritonserver_addr: str, ...@@ -170,7 +163,7 @@ def main(tritonserver_addr: str,
prompt_tokens = total_tokens - completion_tokens prompt_tokens = total_tokens - completion_tokens
completion_token_throughput = completion_tokens / elapsed_time completion_token_throughput = completion_tokens / elapsed_time
total_token_throughput = total_tokens / elapsed_time total_token_throughput = total_tokens / elapsed_time
rqs = n_req / elapsed_time rqs = len(requests) / elapsed_time
rqm = rqs * 60 rqm = rqs * 60
if (np.abs(stats[:, 1] - stats[:, 2]) <= 1).min() is False: if (np.abs(stats[:, 1] - stats[:, 2]) <= 1).min() is False:
...@@ -178,12 +171,14 @@ def main(tritonserver_addr: str, ...@@ -178,12 +171,14 @@ def main(tritonserver_addr: str,
f'Request {request_output_tokens:.0f}, ' f'Request {request_output_tokens:.0f}, '
f'but got {completion_tokens:.0f}') f'but got {completion_tokens:.0f}')
print(f'\n{"-" * 50}\nconcurrency: {concurrency}\n'
f'elapsed_time: {elapsed_time:.3f}s\n')
if stream_output:
print(f'first_token latency(min, max, ave): '
f'{first_token_latency_min:.3f}s, '
f'{first_token_latency_max:.3f}s, '
f'{first_token_latency_ave:.3f}s\n')
print( print(
f'\n{"-" * 50}\nconcurrency: {concurrency}\n'
f'elapsed_time: {elapsed_time:.3f}s\n'
f'first_token latency(min, max, ave): '
f'{first_token_latency_min:.3f}s, {first_token_latency_max:.3f}s, '
f'{first_token_latency_ave:.3f}s\n'
f'number of prompt tokens: {prompt_tokens:.0f}\n' f'number of prompt tokens: {prompt_tokens:.0f}\n'
f'number of completion tokens: {completion_tokens:.0f}\n' f'number of completion tokens: {completion_tokens:.0f}\n'
f'token throughput (completion token): {completion_token_throughput:.3f} token/s\n' # noqa f'token throughput (completion token): {completion_token_throughput:.3f} token/s\n' # noqa
...@@ -191,8 +186,72 @@ def main(tritonserver_addr: str, ...@@ -191,8 +186,72 @@ def main(tritonserver_addr: str,
f'RPS (request per second): {rqs:.3f} req/s\n' f'RPS (request per second): {rqs:.3f} req/s\n'
f'RPM (request per minute): {rqm:.3f} req/min\n' f'RPM (request per minute): {rqm:.3f} req/min\n'
f'{"-" * 50}\n') f'{"-" * 50}\n')
for proc in procs:
proc.join() if self.csv:
with open(self.csv, 'w') as csvfile:
writer = csv.writer(csvfile)
writer.writerow([
'batch', 'num_prompts', 'prompt_tokens',
'completion_tokens', '1st_token_latency(min)(s)',
'1st_token_latency(max)(s)', '1st_token_latency(ave)(s)',
'output token thr(tokens/s', 'total token thr(token/s)',
'RPM'
])
writer.writerow([
concurrency,
len(requests), prompt_tokens, completion_tokens,
f'{first_token_latency_min:.3f}' if stream_output else '-',
f'{first_token_latency_max:.3f}' if stream_output else '-',
f'{first_token_latency_ave:.3f}' if stream_output else '-',
f'{completion_token_throughput:.3f}',
f'{total_token_throughput:.3f}', f'{rqm:.3f}'
])
def main(server_addr: str,
tokenizer_path: str,
dataset: str,
concurrency: int = 32,
num_prompts: int = 1000,
top_k: int = 1,
top_p: float = 1.0,
temperature: float = 1.0,
stream_output: bool = True,
csv: str = './profile_tis.csv',
seed: int = 0):
"""Benchmark the request througput of the triton inference server.
Args:
server_addr (str): Address of the triton inference server with format 0.0.0.0:0
tokenizer_path (str): Path to the tokenizer model in localhost
dataset (str): Path to the dataset
concurrency (int, optional): Number of working threads to process the sampled prompts.
Defaults to 32.
num_prompts (int, optional): Number of prompts to process. Defaults to 1000.
top_k (int, optional): The number of highest probability vocabulary tokens
to keep for top-k-filtering. Defaults to 1.
top_p (float, optional): the set of most probable tokens with
probabilities that add up to top_p or higher
are kept for generation. Defaults to 1.0.
temperature (float, optional): The value used to modulate the next token probabilities.
Defaults to 1.0.
stream_output (bool, optional): Indicator for streaming output. Defaults to True.
seed (int, optional): Seed used in sampling prompts from dataset. Defaults to 0.
""" # noqa
random.seed(seed)
engine = Engine(server_addr,
tokenizer_path,
top_k=top_k,
top_p=top_p,
temperature=temperature,
log_level='ERROR',
csv=csv)
requests = sample_requests(dataset, num_prompts, engine.tokenizer)
engine.process_request(requests, concurrency, stream_output)
if __name__ == '__main__': if __name__ == '__main__':
......
# Copyright (c) OpenMMLab. All rights reserved.
import csv
import json import json
import os.path as osp import os
import random import random
import time import time
from queue import Queue from queue import Queue
...@@ -8,6 +10,7 @@ from typing import List, Tuple ...@@ -8,6 +10,7 @@ from typing import List, Tuple
import fire import fire
import numpy as np import numpy as np
from tqdm import tqdm
from lmdeploy.tokenizer import Tokenizer from lmdeploy.tokenizer import Tokenizer
from lmdeploy.turbomind import TurboMind from lmdeploy.turbomind import TurboMind
...@@ -56,25 +59,29 @@ def sample_requests( ...@@ -56,25 +59,29 @@ def sample_requests(
class Engine: class Engine:
def __init__(self, model_path: str, tp: int = 1): def __init__(self, model_path: str, tp: int, csv: str, **kwargs):
tokenizer_model_path = osp.join(model_path, 'triton_models', # avoid turbomind checking chat template name by setting
'tokenizer') # `model_name='llama'`
tokenizer = Tokenizer(tokenizer_model_path) tm_model = TurboMind(model_path=model_path,
tm_model = TurboMind(model_path=model_path, tp=tp) model_name='llama',
tp=tp,
**kwargs)
self.tm_model = tm_model self.tm_model = tm_model
self.tokenizer = tokenizer self.tokenizer = tm_model.tokenizer
self.csv = csv
self.pbar = None
def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int, def _inference(self, req_queue: Queue, res_queue: Queue, session_id: int,
stream_output: bool): stream_output: bool):
model_inst = self.tm_model.create_instance() model_inst = self.tm_model.create_instance()
stats = [] stats = []
timestamps = []
tokens = []
timestamps.append(time.perf_counter())
for prompt, input_seqlen, output_seqlen in iter( for prompt, input_seqlen, output_seqlen in iter(
req_queue.get, [None, None, None]): req_queue.get, [None, None, None]):
input_ids = self.tokenizer.encode(prompt) input_ids = self.tokenizer(prompt).input_ids
offset = 0 offset = 0
timestamps = []
tokens = []
timestamps.append(time.perf_counter())
for outputs in model_inst.stream_infer( for outputs in model_inst.stream_infer(
session_id, session_id,
input_ids=input_ids, input_ids=input_ids,
...@@ -93,15 +100,16 @@ class Engine: ...@@ -93,15 +100,16 @@ class Engine:
first_token_latency = np.round(timestamps[1] - timestamps[0], 3) first_token_latency = np.round(timestamps[1] - timestamps[0], 3)
token_latency = np.round(timestamps[-1] - timestamps[0], 3) token_latency = np.round(timestamps[-1] - timestamps[0], 3)
completion_tokens = tokens[-1] completion_tokens = tokens[-1]
total_tokens = tokens[-1] + len(input_ids) assert output_seqlen <= completion_tokens <= output_seqlen + 1, \
f'Error. session_id({session_id}) request {output_seqlen} ' \
f'tokens, but generate {completion_tokens} tokens.\n' \
f'prompt: {prompt}'
total_tokens = tokens[-1] + input_seqlen
stats.append([ stats.append([
first_token_latency, completion_tokens, output_seqlen, first_token_latency, completion_tokens, output_seqlen,
total_tokens, token_latency total_tokens, token_latency
]) ])
print( self.pbar.update(1)
f'session {session_id}: '
f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}, '
f'completion_tokens {completion_tokens}')
res_queue.put((session_id, stats)) res_queue.put((session_id, stats))
def process_request(self, def process_request(self,
...@@ -112,6 +120,8 @@ class Engine: ...@@ -112,6 +120,8 @@ class Engine:
req_queue = Queue() req_queue = Queue()
threads = [] threads = []
self.pbar = tqdm(total=len(requests))
# feed request to q # feed request to q
for req in requests: for req in requests:
req_queue.put(req) req_queue.put(req)
...@@ -136,8 +146,8 @@ class Engine: ...@@ -136,8 +146,8 @@ class Engine:
stats = [] stats = []
while not res_queue.empty(): while not res_queue.empty():
session_id, _stats = res_queue.get() session_id, _stats = res_queue.get()
print(f'\n{"-" * 50}\n' # print(f'\n{"-" * 50}\n'
f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n') # f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n')
stats.append(np.array(_stats)) stats.append(np.array(_stats))
stats = np.concatenate(stats).reshape(-1, 5) stats = np.concatenate(stats).reshape(-1, 5)
...@@ -146,7 +156,6 @@ class Engine: ...@@ -146,7 +156,6 @@ class Engine:
first_token_latency_max = np.max(stats[:, 0], axis=0) first_token_latency_max = np.max(stats[:, 0], axis=0)
first_token_latency_ave = np.mean(stats[:, 0], axis=0) first_token_latency_ave = np.mean(stats[:, 0], axis=0)
completion_tokens = np.sum(stats[:, 1], axis=0) completion_tokens = np.sum(stats[:, 1], axis=0)
request_output_tokens = np.sum(stats[:, 2], axis=0)
total_tokens = np.sum(stats[:, 3], axis=0) total_tokens = np.sum(stats[:, 3], axis=0)
prompt_tokens = total_tokens - completion_tokens prompt_tokens = total_tokens - completion_tokens
completion_token_throughput = completion_tokens / elapsed_time completion_token_throughput = completion_tokens / elapsed_time
...@@ -154,11 +163,6 @@ class Engine: ...@@ -154,11 +163,6 @@ class Engine:
rqs = len(requests) / elapsed_time rqs = len(requests) / elapsed_time
rqm = rqs * 60 rqm = rqs * 60
if (np.abs(stats[:, 1] - stats[:, 2]) <= 1).min() is False:
print(f'Did not generate requested number of tokens. '
f'Request {request_output_tokens:.0f}, '
f'but got {completion_tokens:.0f}')
print(f'\n{"-" * 50}\nconcurrency: {concurrency}\n' print(f'\n{"-" * 50}\nconcurrency: {concurrency}\n'
f'elapsed_time: {elapsed_time:.3f}s\n') f'elapsed_time: {elapsed_time:.3f}s\n')
if stream_output: if stream_output:
...@@ -175,18 +179,71 @@ class Engine: ...@@ -175,18 +179,71 @@ class Engine:
f'RPM (request per minute): {rqm:.3f} req/min\n' f'RPM (request per minute): {rqm:.3f} req/min\n'
f'{"-" * 50}\n') f'{"-" * 50}\n')
if self.csv:
with open(self.csv, 'w') as csvfile:
writer = csv.writer(csvfile)
writer.writerow([
'batch', 'num_promts', 'prompt_tokens',
'completion_tokens', '1st_token_latency(min)(s)',
'1st_token_latency(max)(s)', '1st_token_latency(ave)(s)',
'output token thr(tokens/s', 'total token thr(token/s)',
'RPM'
])
writer.writerow([
concurrency,
len(requests), prompt_tokens, completion_tokens,
f'{first_token_latency_min:.3f}' if stream_output else '-',
f'{first_token_latency_max:.3f}' if stream_output else '-',
f'{first_token_latency_ave:.3f}' if stream_output else '-',
f'{completion_token_throughput:.3f}',
f'{total_token_throughput:.3f}', f'{rqm:.3f}'
])
def main(dataset: str, def main(dataset: str,
model_path: str, model_path: str,
concurrency: int = 1, concurrency: int = 64,
num_prompts: int = 1000, num_prompts: int = 2000,
tp: int = 1, tp: int = 1,
stream_output: bool = True): top_k: int = 1,
top_p: float = 1.0,
engine = Engine(model_path, tp=tp) temperature: float = 1.0,
tokenizer = engine.tokenizer stream_output: bool = True,
csv: str = './profile_throughput.csv',
requests = sample_requests(dataset, num_prompts, tokenizer) log_level: str = 'ERROR',
seed: int = 0):
"""Benchmark the request throughput of lmdeploy in localhost.
Args:
dataset (str): Path to the dataset
model_path (str): Path to a model in localhost or a model_repo_id in huggingface.co
concurrency (int, optional): Number of working threads to process the sampled prompts.
Defaults to 64.
num_prompts (int, optional): Number of prompts to process. Defaults to 2000.
tp (int, optional): Number of GPUs for tensor parallel. Defaults to 1.
top_k (int, optional): The number of highest probability vocabulary tokens
to keep for top-k-filtering. Defaults to 1.
top_p (float, optional): the set of most probable tokens with
probabilities that add up to top_p or higher
are kept for generation. Defaults to 1.0.
temperature (float, optional): The value used to modulate the next token probabilities.
Defaults to 1.0.
stream_output (bool, optional): Indicator for streaming output. Defaults to True.
csv (str, optional): The path to save the result.
log_level(str, optional): The log level. Defaults to INFO
seed (int, optional): Seed used in sampling prompts from dataset. Defaults to 0.
""" # noqa
random.seed(seed)
os.environ['TM_LOG_LEVEL'] = log_level
engine = Engine(model_path,
tp=tp,
top_k=top_k,
top_p=top_p,
temperature=temperature,
csv=csv)
requests = sample_requests(dataset, num_prompts, engine.tokenizer)
engine.process_request(requests, concurrency, stream_output) engine.process_request(requests, concurrency, stream_output)
......
...@@ -9,7 +9,7 @@ The user can open the http url print by the following command in a browser. ...@@ -9,7 +9,7 @@ The user can open the http url print by the following command in a browser.
- **Please check the http url for the detailed api usage!!!** - **Please check the http url for the detailed api usage!!!**
```shell ```shell
lmdeploy serve api_server ./workspace --server_name 0.0.0.0 --server_port ${server_port} --instance_num 32 --tp 1 lmdeploy serve api_server ./workspace --server_name 0.0.0.0 --server_port ${server_port} --instance_num 64 --tp 1
``` ```
We provide four restful api in total. Three of them are in OpenAI format. We provide four restful api in total. Three of them are in OpenAI format.
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
重要的事情说三遍。 重要的事情说三遍。
```shell ```shell
lmdeploy serve api_server ./workspace 0.0.0.0 --server_port ${server_port} --instance_num 32 --tp 1 lmdeploy serve api_server ./workspace 0.0.0.0 --server_port ${server_port} --instance_num 64 --tp 1
``` ```
我们一共提供四个 restful api,其中三个仿照 OpenAI 的形式。 我们一共提供四个 restful api,其中三个仿照 OpenAI 的形式。
......
...@@ -48,7 +48,7 @@ class SubCliServe(object): ...@@ -48,7 +48,7 @@ class SubCliServe(object):
model_path: str, model_path: str,
server_name: str = '0.0.0.0', server_name: str = '0.0.0.0',
server_port: int = 23333, server_port: int = 23333,
instance_num: int = 32, instance_num: int = 64,
tp: int = 1, tp: int = 1,
allow_origins: List[str] = ['*'], allow_origins: List[str] = ['*'],
allow_credentials: bool = True, allow_credentials: bool = True,
......
...@@ -241,6 +241,11 @@ class AsyncEngine: ...@@ -241,6 +241,11 @@ class AsyncEngine:
len(input_ids), tokens, finish_reason) len(input_ids), tokens, finish_reason)
response_size = tokens response_size = tokens
# `response_size` might be note updated since
# ` if response.endswith('�')`
if response_size != tokens:
yield GenOut(response, self.steps[str(session_id)],
len(input_ids), tokens, finish_reason)
# update step # update step
self.steps[str(session_id)] += len(input_ids) + tokens self.steps[str(session_id)] += len(input_ids) + tokens
if sequence_end or stop: if sequence_end or stop:
......
...@@ -21,8 +21,6 @@ from lmdeploy.serve.openai.protocol import ( # noqa: E501 ...@@ -21,8 +21,6 @@ from lmdeploy.serve.openai.protocol import ( # noqa: E501
EmbeddingsRequest, ErrorResponse, GenerateRequest, GenerateResponse, EmbeddingsRequest, ErrorResponse, GenerateRequest, GenerateResponse,
ModelCard, ModelList, ModelPermission, UsageInfo) ModelCard, ModelList, ModelPermission, UsageInfo)
os.environ['TM_LOG_LEVEL'] = 'ERROR'
class VariableInterface: class VariableInterface:
"""A IO interface maintaining variables.""" """A IO interface maintaining variables."""
...@@ -476,12 +474,13 @@ async def chat_interactive_v1(request: GenerateRequest, ...@@ -476,12 +474,13 @@ async def chat_interactive_v1(request: GenerateRequest,
def main(model_path: str, def main(model_path: str,
server_name: str = '0.0.0.0', server_name: str = '0.0.0.0',
server_port: int = 23333, server_port: int = 23333,
instance_num: int = 32, instance_num: int = 64,
tp: int = 1, tp: int = 1,
allow_origins: List[str] = ['*'], allow_origins: List[str] = ['*'],
allow_credentials: bool = True, allow_credentials: bool = True,
allow_methods: List[str] = ['*'], allow_methods: List[str] = ['*'],
allow_headers: List[str] = ['*'], allow_headers: List[str] = ['*'],
log_level: str = 'ERROR',
**kwargs): **kwargs):
"""An example to perform model inference through the command line """An example to perform model inference through the command line
interface. interface.
...@@ -496,7 +495,10 @@ def main(model_path: str, ...@@ -496,7 +495,10 @@ def main(model_path: str,
allow_credentials (bool): whether to allow credentials for CORS allow_credentials (bool): whether to allow credentials for CORS
allow_methods (List[str]): a list of allowed HTTP methods for CORS allow_methods (List[str]): a list of allowed HTTP methods for CORS
allow_headers (List[str]): a list of allowed HTTP headers for CORS allow_headers (List[str]): a list of allowed HTTP headers for CORS
""" log_level(str): set log level whose value among [CRITICAL, ERROR, WARNING, INFO, DEBUG]
""" # noqa E501
os.environ['TM_LOG_LEVEL'] = log_level
if allow_origins: if allow_origins:
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
......
...@@ -67,7 +67,6 @@ class Chatbot: ...@@ -67,7 +67,6 @@ class Chatbot:
model_name (str): name of the to-be-deployed mode model_name (str): name of the to-be-deployed mode
log_level (int): the level of the log log_level (int): the level of the log
display (bool): display the generated text on consolo or not display (bool): display the generated text on consolo or not
profile_generation (bool): profile token generation or not
""" """
def __init__(self, def __init__(self,
...@@ -76,8 +75,6 @@ class Chatbot: ...@@ -76,8 +75,6 @@ class Chatbot:
ignore_eos: bool = False, ignore_eos: bool = False,
log_level: int = logging.INFO, log_level: int = logging.INFO,
display: bool = False, display: bool = False,
profile_generation: bool = False,
profile_serving: bool = False,
**model_kwargs): **model_kwargs):
self.tritonserver_addr = tritonserver_addr self.tritonserver_addr = tritonserver_addr
self.model_name = model_name self.model_name = model_name
...@@ -97,6 +94,7 @@ class Chatbot: ...@@ -97,6 +94,7 @@ class Chatbot:
if ignore_eos: if ignore_eos:
stop_words = None stop_words = None
bad_words = np.array([[[self.eos_id], [1]]], dtype=np.int32) bad_words = np.array([[[self.eos_id], [1]]], dtype=np.int32)
self.eos_id = -1
self.cfg = mmengine.Config( self.cfg = mmengine.Config(
dict(session_len=self.model.session_len, dict(session_len=self.model.session_len,
top_p=self.model.top_p, top_p=self.model.top_p,
...@@ -107,8 +105,6 @@ class Chatbot: ...@@ -107,8 +105,6 @@ class Chatbot:
bad_words=bad_words)) bad_words=bad_words))
self.log_level = log_level self.log_level = log_level
self.display = display self.display = display
self.profile_generation = profile_generation
self.profile_serving = profile_serving
def stream_infer(self, def stream_infer(self,
session_id: int, session_id: int,
...@@ -416,8 +412,6 @@ class Chatbot: ...@@ -416,8 +412,6 @@ class Chatbot:
def _get_prompt(self, prompt: str, sequence_start: bool): def _get_prompt(self, prompt: str, sequence_start: bool):
"""return the concatenated prompt according to the model's chat """return the concatenated prompt according to the model's chat
template.""" template."""
if self.profile_generation or self.profile_serving:
return prompt
return self.model.get_prompt(prompt, sequence_start) return self.model.get_prompt(prompt, sequence_start)
def _stream_infer(self, def _stream_infer(self,
...@@ -468,9 +462,7 @@ class Chatbot: ...@@ -468,9 +462,7 @@ class Chatbot:
input_ids = np.array([[1]], dtype=np.uint32) input_ids = np.array([[1]], dtype=np.uint32)
input_lengths = np.array([[1]], dtype=np.uint32) input_lengths = np.array([[1]], dtype=np.uint32)
input_tokens = input_lengths.squeeze() input_tokens = input_lengths.squeeze()
if self.profile_generation:
yield StatusCode.TRITON_STREAM_ING, \
'ignore preprocessing during profiling generation', 0
if request_output_len is None: if request_output_len is None:
request_output_len = max( request_output_len = max(
128, 128,
...@@ -506,8 +498,7 @@ class Chatbot: ...@@ -506,8 +498,7 @@ class Chatbot:
producer.start() producer.start()
for status, res, n_token in self.stream_consumer( for status, res, n_token in self.stream_consumer(
self.postprocess, que, session, input_tokens, preseq_length, self.postprocess, que, session, input_tokens, preseq_length,
cancel, logger, self.display, self.profile_generation, cancel, logger, self.display, self.eos_id):
self.eos_id):
yield status, res, n_token yield status, res, n_token
producer.join() producer.join()
...@@ -600,8 +591,7 @@ class Chatbot: ...@@ -600,8 +591,7 @@ class Chatbot:
@staticmethod @staticmethod
def stream_consumer(postprocess, res_queue, session, n_input_token, def stream_consumer(postprocess, res_queue, session, n_input_token,
preseq_length, cancel, logger, display, preseq_length, cancel, logger, display, eos_id):
profile_generation, eos_id):
"""Consume the response from the triton inference server. """Consume the response from the triton inference server.
Args: Args:
...@@ -614,7 +604,6 @@ class Chatbot: ...@@ -614,7 +604,6 @@ class Chatbot:
cancel (bool): indicator for cancelling the session cancel (bool): indicator for cancelling the session
logger (util.Logger): logger (util.Logger):
display (bool): display the text in the consolo interface or not display (bool): display the text in the consolo interface or not
profile_generation (bool): indicator for profiling token generation
eos_id (int): eos token id eos_id (int): eos token id
Yields: Yields:
...@@ -658,11 +647,6 @@ class Chatbot: ...@@ -658,11 +647,6 @@ class Chatbot:
session.sequence_length = session.sequence_length - 1 session.sequence_length = session.sequence_length - 1
output_ids = output_ids[:, :, :-1] output_ids = output_ids[:, :, :-1]
if profile_generation:
yield (StatusCode.TRITON_STREAM_ING,
'postprocessing is ignored during profiling '
'token generation', output_ids.shape[-1])
continue
output_str = postprocess( output_str = postprocess(
output_ids, np.array([[n_token]], dtype=np.uint32)) output_ids, np.array([[n_token]], dtype=np.uint32))
text = output_str[0].decode() text = output_str[0].decode()
...@@ -681,7 +665,10 @@ class Chatbot: ...@@ -681,7 +665,10 @@ class Chatbot:
logger.error(f'catch exception: {e}') logger.error(f'catch exception: {e}')
logger.error( logger.error(
f'session {session.session_id}: prompt: {session.prompt}') f'session {session.session_id}: prompt: {session.prompt}')
# `n_token` might be not updated since `if text.endswith('�')`
if n_token != output_ids.shape[-1]:
n_token = output_ids.shape[-1]
session.response += text
# put session back to queue so that `_stream_infer` can update it in # put session back to queue so that `_stream_infer` can update it in
# `self.sessions` # `self.sessions`
while not res_queue.empty(): while not res_queue.empty():
......
...@@ -586,7 +586,8 @@ class TurboMindInstance: ...@@ -586,7 +586,8 @@ class TurboMindInstance:
outputs = [] outputs = []
for output, len_ in zip(output_ids, sequence_length): for output, len_ in zip(output_ids, sequence_length):
output, len_ = output, len_.item() output, len_ = output, len_.item()
if len(output) > 0 and output[-1].item() == self.eos_id: if len(output) > 0 and output[-1].item(
) == self.eos_id and not ignore_eos:
outputs.append((output[:-1], len_ - 1)) outputs.append((output[:-1], len_ - 1))
elif len(output) > 0 and output[-1].item() in self.stop_tokens: elif len(output) > 0 and output[-1].item() in self.stop_tokens:
outputs.append((output[:-1], len_)) outputs.append((output[:-1], len_))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment