"...text-generation-inference.git" did not exist on "6f42942772100d846c71dc15a4a37cb45b648b73"
Unverified Commit 2bf481fb authored by lvhan028's avatar lvhan028 Committed by GitHub
Browse files

update scripts for deploying llama family model to fastertransformer triton models (#4)

* add scripts for deploying llama family models via fastertransformer

* fix

* fix

* set symlinks True when copying triton models templates

* pack model repository for triton inference server

* add exception

* fix

* update config.pbtxt and launching scripts
parent 4f47f78c
......@@ -2,3 +2,4 @@
.idea/
__pycache__/
*.egg-info/
workspace/
......@@ -28,7 +28,10 @@ def main(triton_server_addr: str, model_name: str, session_id: int):
else:
request_id = f'{session_id}-{nth_round}'
for status, res, tokens in chatbot.stream_infer(
session_id, prompt, request_id=request_id):
session_id,
prompt,
request_id=request_id,
request_output_len=512):
continue
print(f'session {session_id}, {status}, {tokens}, {res}')
nth_round += 1
......
......@@ -249,6 +249,8 @@ class Chatbot:
return f'USER: {prompt} ASSISTANT:'
else:
return f'</s>USER: {prompt} ASSISTANT:'
else:
return prompt
def _stream_infer(self,
session: Session,
......
# Copyright (c) OpenMMLab. All rights reserved.
import configparser
import json
import os
import os.path as osp
import re
import shutil
from pathlib import Path
import fire
import safetensors
import torch
from sentencepiece import SentencePieceProcessor
supported_models = [
'vicuna-7b', 'vicuna-13b', 'llama-7b', 'llama-13b', 'llama-30b',
'llama-65b'
]
supported_formats = ['llama', 'hf']
def create_workspace(_path: str):
try:
if osp.exists(_path):
shutil.rmtree(_path)
os.makedirs(_path)
print(f'create workspace in directory {_path}')
return True
except Exception as e:
print(f'create workspace in {_path} failed: {e}')
return False
def destroy_workspace(_path: str):
try:
shutil.rmtree(_path)
print(f'destroy workspace in directory {_path}')
return True
except Exception as e:
print(f'create workspace in {_path} failed: {e}')
return False
def copy_triton_model_templates(_path: str):
try:
cur_path = osp.abspath(__file__)
dir_path = osp.dirname(cur_path)
triton_models_path = osp.join(dir_path, 'triton_models')
dst_path = osp.join(_path, 'triton_models')
shutil.copytree(triton_models_path, dst_path, symlinks=True)
print(f'copy triton model templates from "{triton_models_path}" to '
f'"{dst_path}" successfully')
shutil.copy(osp.join(dir_path, 'service_docker_up.sh'), _path)
return dst_path
except Exception as e:
print(f'copy triton model templates from "{triton_models_path}"'
f' to "{dst_path}" failed: {e}')
return None
def tokenizer_info(model_path: str):
assert os.path.isfile(model_path), model_path
sp_model = SentencePieceProcessor(model_file=model_path)
# BOS / EOS token IDs
n_words = sp_model.vocab_size()
bos_id = sp_model.bos_id()
eos_id = sp_model.eos_id()
return n_words, bos_id, eos_id
def export(model_name: str,
num_layer: int,
norm_eps: float,
model_params: dict,
tokenizer_path: str,
out_dir: str,
tp: int,
size_per_head: int = 128):
out_dir = osp.join(out_dir, 'weights')
os.makedirs(out_dir, exist_ok=True)
def save_bin(param: torch.Tensor, name):
print(name, param.shape)
if param.dtype in [torch.float, torch.bfloat16]:
param = param.half()
param.contiguous().numpy().tofile(osp.join(out_dir, name))
# reverse the splitting axes since the weights are transposed above
for param_name, param_data in model_params.items():
if param_name == 'tok_embeddings.weight':
_vocab_size, dim = param_data.shape
head_num = dim // size_per_head
split_dim = None
key, ext = param_name.split('.')[-2:]
copy = False
if key in ['w1', 'w3', 'w_qkv']:
split_dim = -1
if key == 'w1':
inter_size = param_data.shape[-1]
elif key in ['w2', 'wo']:
if ext in ['scales', 'zeros']:
copy = True
else:
split_dim = 0
if split_dim is not None:
print(f'*** splitting {param_name}, shape={param_data.shape}, '
f'split_dim={split_dim}')
assert param_data.shape[split_dim] % tp == 0
split_size = param_data.shape[split_dim] // tp
splits = torch.split(param_data, split_size, dim=split_dim)
for i, split in enumerate(splits):
prefix, ext = osp.splitext(param_name)
save_bin(split, f'{prefix}.{i}{ext}')
elif copy:
print(f'### copying {param_name}, shape={param_data.shape}')
copies = [param_data] * tp
for i, copy in enumerate(copies):
prefix, ext = osp.splitext(param_name)
save_bin(copy, f'{prefix}.{i}{ext}')
else:
save_bin(param_data, param_name)
# export config and save it to {out_dir}/config.ini
vocab_size, bos_id, eos_id = tokenizer_info(tokenizer_path)
assert _vocab_size == vocab_size, \
f'different vocab size {_vocab_size} vs {vocab_size}'
cfg = dict(llama=dict(
model_name=model_name,
head_num=head_num,
size_per_head=size_per_head,
vocab_size=vocab_size,
num_layer=num_layer,
rotary_embedding=size_per_head,
inter_size=inter_size,
norm_eps=norm_eps,
start_id=bos_id,
end_id=eos_id,
weight_type='fp16',
# parameters for fastertransformer
max_batch_size=32,
max_context_token_num=4,
session_len=2048,
step_length=1,
cache_max_entry_count=48,
cache_chunk_size=8,
use_context_fmha=1))
config = configparser.ConfigParser()
for section, key_values in cfg.items():
config[section] = key_values
config_path = osp.join(out_dir, 'config.ini')
with open(config_path, 'w') as f:
config.write(f)
return True
def deploy_llama(model_name: str, model_path: str, tokenizer_path: str,
triton_models_path: str, tp: int):
if osp.exists(tokenizer_path):
shutil.copy(tokenizer_path,
osp.join(triton_models_path, 'tokenizer/tokenizer.model'))
else:
print('tokenizer model {tokenizer_path} does not exist')
return False
# read model arguments from params.json
try:
params_path = osp.join(model_path, 'params.json')
with open(params_path) as f:
model_arg = json.load(f)
num_layer = model_arg['n_layers']
norm_eps = model_arg['norm_eps']
except Exception as e:
print(f'get "n_layers" and "norm_eps" from {params_path} failed: {e}')
return False
# convert weights from llama to fastertransformer
checkpoints = []
for pattern in ['*.pth', '*.pt']:
checkpoints += sorted(Path(model_path).glob(pattern))
print(checkpoints)
n_ckpt = len(checkpoints)
model_params = {}
def get_param(_name, _size):
print(_name, _size)
if _name not in model_params:
model_params[_name] = torch.zeros(_size,
dtype=torch.float16,
device='cpu')
return model_params[_name]
for i, ckpt_path in enumerate(checkpoints):
ckpt = torch.load(ckpt_path, map_location='cpu')
for param_name, param_data in ckpt.items():
key = param_name.split('.')[-2]
# column-parallel
if key in ['w1', 'w3', 'wq', 'wk', 'wv', 'output']:
size = param_data.size(0)
param = get_param(
param_name,
[size * n_ckpt, param_data.size(1)])
param.data[size * i:size * (i + 1), :] = param_data
# row-parallel
elif key in ['w2', 'wo', 'tok_embeddings']:
size = param_data.size(-1)
param = get_param(param_name,
[param_data.size(0), size * n_ckpt])
param.data[:, size * i:size * (i + 1)] = param_data
elif i == 0:
param = get_param(param_name, param_data.size())
param.data = param_data
del ckpt
for name, param in model_params.items():
# transpose all weights as FasterTransformer is expecting column-major
# weights: (output_dims, input_dims) -> (input_dims, output_dims)
key = name.split('.')[-2]
if key in ['w1', 'w3', 'wq', 'wk', 'wv', 'w2', 'wo']:
param.data = param.data.t()
# concat qkv projection
for i in range(1000):
_qkv = [f'layers.{i}.attention.{k}.weight' for k in ['wq', 'wk', 'wv']]
try:
qkv = tuple(map(model_params.pop, _qkv))
except KeyError:
break
qkv = torch.stack(qkv, dim=1)
model_params[f'layers.{i}.attention.w_qkv.weight'] = qkv
print(qkv.shape, qkv.dtype)
assert num_layer == i, f'miss matched layers: {num_layer} vs {i}'
return export(model_name, num_layer, norm_eps, model_params,
tokenizer_path, triton_models_path, tp)
def permute(x: torch.Tensor):
SIZE_PER_HEAD = 128
if x.shape[-1] > 1: # qweights
dim = x.shape[-1]
n_heads = dim // SIZE_PER_HEAD
return x.view(-1, n_heads, 2,
dim // n_heads // 2).transpose(2, 3).reshape(-1, dim)
else: # scales, zeros
dim = x.shape[0]
n_heads = dim // SIZE_PER_HEAD
return x.view(n_heads, 2, dim // n_heads // 2,
1).transpose(1, 2).reshape(dim, 1)
def check_zero(x: torch.Tensor):
_sum = x.flatten().sum().item()
assert _sum == 0, str(_sum)
def deploy_hf(model_name: str, model_path: str, tokenizer_path: str,
triton_models_path: str, tp: int):
if tokenizer_path is None:
tokenizer_path = osp.join(model_path, 'tokenizer.model')
if osp.exists(tokenizer_path):
shutil.copy(tokenizer_path,
osp.join(triton_models_path, 'tokenizer/tokenizer.model'))
else:
print('tokenizer model {tokenizer_path} does not exist')
exit(-1)
# read model arguments from params.json
try:
params_path = osp.join(model_path, 'config.json')
with open(params_path) as f:
model_arg = json.load(f)
num_layer = model_arg['num_hidden_layers']
norm_eps = model_arg['rms_norm_eps']
except Exception as e:
print(f'get "num_hidden_layers" and "rms_norm_eps" from '
f'{params_path} failed: {e}')
return False
# convert weights from hf to fastertransformer
model_params = {}
_qweight = 'weight'
_suffixes = [_qweight]
_files = [file for file in os.listdir(model_path) if file.endswith('.bin')]
_files = sorted(_files)
_params = {}
for _file in _files:
_tmp = torch.load(osp.join(model_path, _file), map_location='cpu')
_params.update(_tmp)
def get_tensor(name):
return _params[name]
def get_tensor_transposed(name):
return _params[name].t()
for i in range(1000):
try:
# attention weights
_qkvo = [f'model.layers.{i}.self_attn.{t}_proj' for t in 'qkvo']
for suffix in _suffixes:
q, k, v, o = map(get_tensor_transposed,
map(('{}.' + suffix).format, _qkvo))
if suffix == 'bias':
check_zero(q), check_zero(k), check_zero(v), check_zero(o)
else:
# q, k has different layout for fb & hf, convert to fb's
# layout
q = permute(q)
k = permute(k)
if suffix == _qweight: # weight, qweight
# insert a dimension for splitting heads later
qkv = torch.stack((q, k, v), dim=1)
else: # scales, zeros
qkv = torch.stack((q, k, v), dim=0).squeeze(dim=-1)
for k, v in [('w_qkv', qkv), ('wo', o)]:
model_params[f'layers.{i}.attention.{k}.{suffix}'] = v
# ffn weights
_w123 = [
f'model.layers.{i}.mlp.{t}_proj'
for t in ['gate', 'down', 'up']
]
for suffix in _suffixes:
w1, w2, w3 = map(get_tensor_transposed,
map(('{}.' + suffix).format, _w123))
if suffix == 'bias':
check_zero(w1), check_zero(w2), check_zero(w3)
else:
if suffix in ['scales', 'zeros']:
w1, w2, w3 = map(lambda x: x.squeeze(dim=-1),
[w1, w2, w3])
for k, v in [('w1', w1), ('w2', w2), ('w3', w3)]:
model_params[
f'layers.{i}.feed_forward.{k}.{suffix}'] = v
other = [('attention_norm.weight', 'input_layernorm.weight'),
('ffn_norm.weight', 'post_attention_layernorm.weight')]
for ft, hf in other:
model_params[f'layers.{i}.' +
ft] = get_tensor(f'model.layers.{i}.' + hf)
except safetensors.SafetensorError:
break
except KeyError:
break
assert num_layer == i, 'miss matched layers: {num_layer} vs {i}'
other = [('tok_embeddings.weight', 'model.embed_tokens.weight'),
('norm.weight', 'model.norm.weight'),
('output.weight', 'lm_head.weight')]
for ft, hf in other:
model_params[ft] = get_tensor(hf)
return export(model_name, i + 1, norm_eps, model_params, tokenizer_path,
triton_models_path, tp)
def pack_model_repository(workspace_path: str):
model_repo_dir = osp.join(workspace_path, 'model_repository')
os.makedirs(model_repo_dir, exist_ok=True)
os.symlink(src=osp.join('../triton_models/interactive'),
dst=osp.join(model_repo_dir, 'fastertransformer'))
os.symlink(src=osp.join('../triton_models/preprocessing'),
dst=osp.join(model_repo_dir, 'preprocessing'))
os.symlink(src=osp.join('../triton_models/postprocessing'),
dst=osp.join(model_repo_dir, 'postprocessing'))
def main(model_name: str,
model_path: str,
model_format: str,
tokenizer_path: str = None,
dst_path: str = './workspace',
tp: int = 1):
"""deploy llama family models via fastertransformer.
Args:
model_name (str): the name of the to-be-deployed model, such as
llama-7b, llama-13b and etc
model_path (str): the directory path of the model
model_format (str): the format of the model, fb or hf. 'fb' stands for
META's llama format, and 'hf' means huggingface format
tokenizer_path (str): the path of tokenizer model
dst_path (str): the destination path that saves outputs
tp (int): the number of GPUs used for tensor parallelism
"""
if model_name.lower() not in supported_models:
print(f'"{model_name}" is not supported. The supported models are: '
f'{supported_models}')
exit(-1)
if model_format not in supported_formats:
print(f'the model format "{model_format}" is not supported. '
f'The supported format are: {supported_formats}')
exit(-1)
if model_format == 'llama' and tokenizer_path is None:
print('The model is llama. Its tokenizer model path should be '
'specified')
exit(-1)
if not create_workspace(dst_path):
exit(-1)
triton_models_path = copy_triton_model_templates(dst_path)
if triton_models_path is None:
exit(-1)
model_name = model_name.lower()
if model_format == 'llama':
res = deploy_llama(model_name, model_path, tokenizer_path,
triton_models_path, tp)
else:
res = deploy_hf(model_name, model_path, tokenizer_path,
triton_models_path, tp)
# update `tensor_para_size` in `triton_models/interactive/config.pbtxt`
with open(osp.join(triton_models_path, 'interactive/config.pbtxt'),
'a') as f:
param = 'parameters {\n key: "tensor_para_size"\n value: {\n ' \
'string_value: ' + f'"{tp}"\n' + ' }\n}\n'
f.write(param)
if not res:
print(f'deploy model "{model_name}" via fastertransformer failed')
destroy_workspace(dst_path)
exit(-1)
# pack model repository for triton inference server
pack_model_repository(dst_path)
# update the value of $TP in `service_docker_up.sh`
file_path = osp.join(dst_path, 'service_docker_up.sh')
with open(file_path, 'r') as f:
content = f.read()
content = re.sub('TP=1', f'TP={tp}', content)
with open(file_path, 'w') as f:
f.write(content)
if __name__ == '__main__':
fire.Fire(main)
#!/bin/sh
show_help() {
echo "Usage: $0 [-h] [--help] [-l] [--lib-dir]"
echo
echo "Options:"
echo " -h, --help Show this help message and exit"
echo " --lib-dir Specify the directory of fastertransformer libraries"
}
# check if '-h' or '--help' in the arguments
for arg in "$@"
do
if [ "$arg" == "-h" ] || [ "$arg" == "--help" ]; then
show_help
exit 0
fi
done
TP=1
DEVICES="0"
for ((i = 1; i < ${TP}; ++i)); do
DEVICES="${DEVICES},$i"
done
DEVICES="\"device=${DEVICES}\""
SCRIPT_DIR="$(dirname "$0")"
SCRIPT_ABS_DIR="$(realpath "$SCRIPT_DIR")"
if [ -z "$1" ]; then
docker run \
--gpus $DEVICES \
--rm \
-v "${SCRIPT_ABS_DIR}":/workspace/models \
--shm-size 16g \
-p 33336:22 \
-p 33337-33400:33337-33400 \
--cap-add=SYS_PTRACE \
--cap-add=SYS_ADMIN \
--security-opt seccomp=unconfined \
--name llmdeploy \
-it --env NCCL_LAUNCH_MODE=GROUP lvhan028/fastertransformer:v0.0.1 \
tritonserver \
--model-repository=/workspace/models/model_repository \
--allow-http=0 \
--allow-grpc=1 \
--grpc-port=33337 \
--log-verbose=0 \
--allow-metrics=1
fi
for ((i = 1; i <= $#; i++)); do
arg=${!i}
case "$arg" in
--lib-dir)
if [ "$i" -eq "$#" ]; then
show_help
exit -1
fi
LIB_PATH=${@:i+1:1}
docker run \
--gpus $DEVICES \
--rm \
-v "${LIB_PATH}":/opt/tritonserver/backends/fastertransformer \
-v ""${SCRIPT_ABS_DIR}"":/workspace/models \
--shm-size 16g \
-p 33336:22 \
-p 33337-33400:33337-33400 \
--cap-add=SYS_PTRACE \
--cap-add=SYS_ADMIN \
--security-opt seccomp=unconfined \
--name llmdeploy \
-it --env NCCL_LAUNCH_MODE=GROUP lvhan028/fastertransformer:v0.0.1 \
tritonserver \
--model-repository=/workspace/models/model_repository \
--allow-http=0 \
--allow-grpc=1 \
--grpc-port=33337 \
--log-verbose=0 \
--allow-metrics=1
break
;;
esac
done
......@@ -241,12 +241,7 @@ output [
dims: [ -1, -1 ]
}
]
parameters {
key: "tensor_para_size"
value: {
string_value: "1"
}
}
parameters {
key: "pipeline_para_size"
value: {
......
......@@ -57,7 +57,9 @@ class TritonPythonModel:
self.model_config = model_config = json.loads(args['model_config'])
# Parse model output configs and convert Triton types to numpy types
input_names = ['INPUT_ID', 'REQUEST_INPUT_LEN']
input_names = [
'INPUT_ID', 'REQUEST_INPUT_LEN', 'BAD_WORDS_IDS', 'STOP_WORDS_IDS'
]
for input_name in input_names:
setattr(
self,
......@@ -102,6 +104,8 @@ class TritonPythonModel:
# Get input tensors
query = pb_utils.get_input_tensor_by_name(request,
'QUERY').as_numpy()
request_output_len = pb_utils.get_input_tensor_by_name(
request, 'REQUEST_OUTPUT_LEN').as_numpy()
# Preprocessing input data.
input_id, request_input_len = self._create_request(query)
......@@ -111,6 +115,12 @@ class TritonPythonModel:
input_id_tensor = pb_utils.Tensor(
'INPUT_ID',
np.array(input_id).astype(self.input_id_dtype))
request_input_len_tensor = pb_utils.Tensor(
'REQUEST_INPUT_LEN',
np.array(request_input_len).astype(
self.request_input_len_dtype))
request_output_len_tensor = pb_utils.Tensor(
'REQUEST_OUTPUT_LEN', request_output_len)
# Create InferenceResponse. You can set an error here in case
# there was a problem with handling this inference request.
......@@ -119,8 +129,10 @@ class TritonPythonModel:
#
# pb_utils.InferenceResponse(
# output_tensors=..., TritonError("An error occurred"))
inference_response = pb_utils.InferenceResponse(
output_tensors=[input_id_tensor])
inference_response = pb_utils.InferenceResponse(output_tensors=[
input_id_tensor, request_input_len_tensor,
request_output_len_tensor
])
responses.append(inference_response)
# You should return a list of pb_utils.InferenceResponse. Length
......
name: "preprocessing"
backend: "python"
max_batch_size: 1
input [
{
name: "QUERY"
data_type: TYPE_STRING
dims: [ -1 ]
},
{
name: "BAD_WORDS_DICT"
data_type: TYPE_STRING
dims: [ -1 ]
optional: true
},
{
name: "STOP_WORDS_DICT"
data_type: TYPE_STRING
dims: [ -1 ]
optional: true
},
{
name: "REQUEST_OUTPUT_LEN"
data_type: TYPE_UINT32
dims: [ -1 ]
}
]
output [
......@@ -13,6 +31,31 @@ output [
name: "INPUT_ID"
data_type: TYPE_UINT32
dims: [ -1 ]
},
{
name: "REQUEST_INPUT_LEN"
data_type: TYPE_UINT32
dims: [ 1 ]
},
{
name: "BAD_WORDS_IDS"
data_type: TYPE_INT32
dims: [ 2, -1 ]
},
{
name: "STOP_WORDS_IDS"
data_type: TYPE_INT32
dims: [ 2, -1 ]
},
{
name: "REQUEST_OUTPUT_LEN"
data_type: TYPE_UINT32
dims: [ -1 ]
},
{
name: "PROMPT_LEARNING_TASK_NAME_IDS"
data_type: TYPE_UINT32
dims: [ 1 ]
}
]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment