Unverified Commit 4bd0b487 authored by WRH's avatar WRH Committed by GitHub
Browse files

[Refactor] Support multi-session chat (#178)

* add some dist utils

* add model utils

* add termio and basicstreamer

* typo

* fix world size

* refactor chat and tested llama1

* add internlm adapter and support stoping criteria

* concat with id for internlm

* update docstring

* update and support llama2

* typo

* move docs to docs

* update docstring of session manager

* update docstring

* update docs

* fix accel none in model

* fix and add test for tensor broadcast

* fix session using typing to check type

* add docstrings and comprehensive condition test

* unit test for dist

* fix session

* split unittests of utils

* typo

* update control flow of accel

* move test model

* remove main in unittest

* remove some log

* remove some comments
parent c80f3e49
...@@ -122,11 +122,7 @@ For the deployment of other supported models, such as LLaMA, LLaMA-2, vicuna and ...@@ -122,11 +122,7 @@ For the deployment of other supported models, such as LLaMA, LLaMA-2, vicuna and
### Inference with PyTorch ### Inference with PyTorch
You have to install deepspeed first before running with PyTorch. For detailed instructions on Inference pytorch models, see [here](docs/en/pytorch.md).
```
pip install deepspeed
```
#### Single GPU #### Single GPU
...@@ -149,6 +145,12 @@ deepspeed --module --num_gpus 2 lmdeploy.pytorch.chat \ ...@@ -149,6 +145,12 @@ deepspeed --module --num_gpus 2 lmdeploy.pytorch.chat \
--seed 0 --seed 0
``` ```
You need to install deepspeed first to use this feature.
```
pip install deepspeed
```
## Quantization ## Quantization
In fp16 mode, kv_cache int8 quantization can be enabled, and a single card can serve more users. In fp16 mode, kv_cache int8 quantization can be enabled, and a single card can serve more users.
......
# Pytorch
## Chat in command line
LMDeploy support chatting with PyTorch models with submodule `lmdeploy.pytorch.chat`.
This submodule allow user to chat with language model through command line, and optionally accelerate model using backends like deepspeed.
**Example 1**: Chat with default setting
```python
python -m lmdeploy.pytorch.chat $PATH_TO_HF_MODEL
```
**Example 2**: Disable sampling and chat history
```python
python -m lmdeploy.pytorch.chat \
$PATH_TO_LLAMA_MODEL_IN_HF_FORMAT \
--temperature 0 --max-histroy 0
```
**Example 3**: Accelerate with deepspeed inference
```python
python -m lmdeploy.pytorch.chat \
$PATH_TO_LLAMA_MODEL_IN_HF_FORMAT \
--accel deepspeed
```
Note: to use deepspeed, you need to install deepspeed, and if hope to accelerate InternLM, you need a customized version <https://github.com/wangruohui/DeepSpeed/tree/support_internlm_0.10.0>
**Example 4**: Tensor parallel the model on 2 GPUs
```python
deepspeed --module --num_gpus 2 lmdeploy.pytorch.chat \
$PATH_TO_LLAMA_MODEL_IN_HF_FORMAT \
--accel deepspeed \
```
This module also allow the following control commands to change generation behaviors during chat.
- `exit`: terminate and exit chat
- `config set key=value`: change generation config `key` to `value`, e.g. config temperature=0 disable sampling for following chats
- `clear`: clear chat history
### Simple diagram of components
```mermaid
graph LR;
subgraph model specific adapter
p((user_input))-->tokenize-->id((input_ids))-->decorate
tmpl_ids((template_ids))-->decorate;
end
subgraph generate
model[CausalLM_model.generate]-->gen_result(("gen_result"))
gen_result-->hid
gen_result-->attn((attention))
end
subgraph streamer
model-->s[streamer]--value-->decode_single--token-->output
end
subgraph session_manager
prepend_history-->fullid((complete_ids));
trim-->prepend_history
end
decorate-->prepend_history
hid((history_ids))-->trim;
attn-->trim;
fullid-->model
tokenizer((tokenizer))-->decode_single
tokenizer-->tokenize
p-->genconfig(GenConfig)-->model
```
# Copyright (c) OpenMMLab. All rights reserved.
import logging
import torch.nn as nn
from .base import BasicAdapter, BasicAdapterFast
from .internlm import InternLMAdapter
from .llama2 import Llama2Adapter
logger = logging.getLogger(__name__)
def _get_default_adapter(tokenizer):
if tokenizer.is_fast:
return BasicAdapterFast
else:
return BasicAdapter
def init_adapter(model: nn.Module, tokenizer, adapter=None):
if adapter is None:
for v in model.modules():
if 'InternLMModel' in v.__class__.__name__:
Adapter = InternLMAdapter
break
elif 'LlamaModel' in v.__class__.__name__:
Adapter = Llama2Adapter
break
else:
Adapter = _get_default_adapter(tokenizer)
elif adapter == 'llama1':
Adapter = _get_default_adapter(tokenizer)
else:
raise ValueError(f'Adapter {adapter} is not allowed.')
logger.info(f'Using adapter {Adapter.__name__}')
return Adapter(tokenizer)
# Copyright (c) OpenMMLab. All rights reserved.
"""Basic adapter suitable for general HuggingFace models."""
import logging
import re
from transformers import (PreTrainedTokenizer, PreTrainedTokenizerBase,
PreTrainedTokenizerFast)
logger = logging.getLogger(__name__)
class BaseAdapter:
"""Base class for all adapters.
Note:
Adapters coordinate with the session manager to prepare input_ids.
The full sequence fed to the model is as follows:
```
adapter.start_ids
adapter.encode_and_decorate(user_input_1)
output_1_generated_by_model
adapter.sep_ids
adapter.encode_and_decorate(user_input_2)
output_2_generated_by_model
adapter.sep_ids
adapter.encode_and_decorate(user_input_3)
```
Thus adapter is responsible for providing model specific
``start_ids``, ``sep_ids``, and method to encode single prompt.
"""
def __init__(self, tokenizer: PreTrainedTokenizerBase):
self.tokenizer = tokenizer
def encode_and_decorate(self, prompt, add_special_tokens=False):
"""Model specific method to encode and decorate prompt."""
raise NotImplementedError
def decode(self, value):
"""Model specific method to decode single value to string."""
raise NotImplementedError
@property
def stopping_criteria(self):
"""Model specific stopping criteria for generation."""
return None
@property
def start_ids(self):
"""Model specific start ids."""
return [self.tokenizer.bos_token_id]
@property
def sep_ids(self):
"""Model specific separation ids."""
return [self.tokenizer.bos_token_id]
class BasicAdapter(BaseAdapter):
"""Basic adapter for slow tokenizers."""
def encode_and_decorate(self, prompt, add_special_tokens=False):
"""Encode prompt.
Note:
we leave <bos> to session manager to add.
"""
input_ids = self.tokenizer.encode(
prompt,
add_special_tokens=add_special_tokens,
return_tensors='pt',
)
logger.debug(f'Encode {prompt} to {input_ids}')
return input_ids
def decode(self, value):
"""Fallback when tokenizer is not fast."""
self.tokenizer: PreTrainedTokenizer
tok = self.tokenizer.decode(value)
return tok + ' '
class BasicAdapterFast(BaseAdapter):
"""Basic adapter for slow tokenizers."""
hex_regex = re.compile(r'^<0x([0-9ABCDEF]+)>$')
def encode_and_decorate(self, prompt, add_special_tokens=False):
"""Encode prompt.
Note:
we leave <bos> to session manager to add.
"""
input_ids = self.tokenizer.encode(
prompt,
add_special_tokens=add_special_tokens,
return_tensors='pt',
)
logger.debug(f'Encode {prompt} to {input_ids}')
return input_ids
def decode(self, value):
"""Decode with fast tokenizers."""
self.tokenizer: PreTrainedTokenizerFast
tok = self.tokenizer._convert_id_to_token(value)
if tok.startswith('▁'): # sentencepiece
space = ' '
tok = tok[1:]
else:
space = ''
if res := self.hex_regex.match(tok):
tok = chr(int(res.group(1), 16))
if tok == '</s>' or tok == '\r':
tok = '\n'
tok = space + tok
logger.debug(f'Decode {value} to {repr(tok)}')
return tok
# Copyright (c) OpenMMLab. All rights reserved.
import logging
import re
import torch
from transformers import (PreTrainedTokenizerFast, StoppingCriteria,
StoppingCriteriaList)
from .base import BaseAdapter
logger = logging.getLogger(__name__)
class InternLMStoppingCriteria(StoppingCriteria):
"""Stopping criteria for HF version of InternLM."""
def __call__(self, input_ids, *args, **kwargs) -> bool:
return input_ids[0, -1] in [2, 103028]
class InternLMAdapter(BaseAdapter):
"""Adapter for InternLM.
InternLM use the following template and \n should be 13.
<bos> (no actual newline here, just for better readability)
<|User|>:{prompt}<eoh>\n
<|Bot|>:{model_output}<eoa>\n
<|User|>:{prompt}<eoh>\n
<|Bot|>:{model_output}<eoa>\n
...
<eos>
"""
hex_regex = re.compile(r'^<0x([0-9ABCDEF]+)>$')
# ids of '<|User|>:'
B_USER_ID = torch.tensor([[333, 352, 1621, 352, 27232]])
# ids of '<eoh>\n<|Bot|>:'
E_USER_ID = torch.tensor([[103027, 13, 333, 352, 23845, 352, 27232]])
# ids of '<bos>'
start_ids = [1]
# ids of '\n'
sep_ids = [13]
def __init__(self, tokenizer: PreTrainedTokenizerFast):
self.tokenizer = tokenizer
def encode_and_decorate(self, prompt):
r"""Encode prompt and decorate with template.
Note:
we leave <bos> and chat history for session manager to add,
so we will decorate input_ids to '<|User|>:{prompt}<eoh>\n<|Bot|>:'
"""
input_ids = self.tokenizer.encode(
prompt,
add_special_tokens=False,
return_tensors='pt',
)
# This is f'<|User|>:{prompt}<eoh>\n<|Bot|>:'
# but force \n to 13 instead of 364
input_ids = torch.cat([self.B_USER_ID, input_ids, self.E_USER_ID],
dim=1)
return input_ids
def decode(self, value):
"""Decode generated tokens for InternLM."""
tok = self.tokenizer.decode(value)
if res := self.hex_regex.match(tok):
tok = chr(int(res.group(1), 16))
if tok == '</s>' or tok == '<eoa>' or tok == '\r':
tok = '\n'
logger.debug(f'Decode {value} to {repr(tok)}')
return tok
@property
def stopping_criteria(self):
return StoppingCriteriaList([InternLMStoppingCriteria()])
# Copyright (c) OpenMMLab. All rights reserved.
import logging
import re
from transformers import PreTrainedTokenizerFast
from .base import BasicAdapterFast
logger = logging.getLogger(__name__)
B_INST, E_INST = '[INST]', '[/INST]'
B_SYS, E_SYS = '<<SYS>>\n', '\n<</SYS>>\n\n'
DEFAULT_SYSTEM_PROMPT = """\
You are a helpful, respectful and honest assistant. Always answer as helpfully as possible, while being safe. Your answers should not include any harmful, unethical, racist, sexist, toxic, dangerous, or illegal content. Please ensure that your responses are socially unbiased and positive in nature.
If a question does not make any sense, or is not factually coherent, explain why instead of answering something not correct. If you don't know the answer to a question, please don't share false information.""" # noqa: E501
class Llama2Adapter(BasicAdapterFast):
"""Adapter for llama2.
Llama2 use the following template and the first user prompt
should contain a system prompt.
User can specify the system prompt using a <<SYS>> tag otherwise
the default system prompt is prepended to user's input.
<bos>
[INST]<space>
<<SYS>>\n
SYSTEM_PROMPT\n
<</SYS>>\n\n
{user_prompt_1}<space>
[/INST]<space>
{answer_1}<space>
<eos>
<bos>
[INST]<space>
{user_prompt_2}<space>
[/INST]<space>
{answer_2}<space>
<eos>
<bos>
[INST]<space>
{user_prompt_2}(no space here)
...
"""
start_ids = []
sep_ids = []
def __init__(self, tokenizer: PreTrainedTokenizerFast):
super().__init__(tokenizer)
self.prev_round = 0
def encode_and_decorate(self, prompt):
r"""Encode prompt and decorate with template."""
if self.prev_round == 0:
res = re.search(r'<<SYS>>(.*?)<</SYS>>(.*)', prompt)
if res:
prompt = B_SYS + res.group(1).strip() + \
E_SYS + res.group(2).strip()
else:
prompt = B_SYS + DEFAULT_SYSTEM_PROMPT + E_SYS + prompt
prompt = f'{B_INST} {prompt.strip()} {E_INST}'
logger.debug(f'decorated prompt: {repr(prompt)}')
input_ids = self.tokenizer.encode(
prompt,
add_special_tokens=True,
return_tensors='pt',
)
self.prev_round += 1
return input_ids
# Copyright (c) OpenMMLab. All rights reserved. # Copyright (c) OpenMMLab. All rights reserved.
"""Chat through command line.
import os This submodule allows user to chat with language model through command line,
import warnings and optionally accelerate model using backends like deepspeed.
import fire Example 1: Chat with default setting
import torch
try: ```python
import deepspeed python -m lmdeploy.pytorch.chat $PATH_TO_HF_MODEL
```
_is_deepspeed_available = True Example 2: Disable sampling
except ImportError:
_is_deepspeed_available = False
try: ```python
from transformers import (AutoModelForCausalLM, AutoTokenizer, python -m lmdeploy.pytorch.chat \
GenerationConfig) $PATH_TO_LLAMA_MODEL_IN_HF_FORMAT \
--temperature 0
```
_is_transformers_available = True Example 3: Accelerate with deepspeed inference
except ImportError:
_is_transformers_available = False
else:
from .accel import LoadNoInit
from .utils import get_utils
```python
python -m lmdeploy.pytorch.chat \
$PATH_TO_LLAMA_MODEL_IN_HF_FORMAT \
--accel deepspeed
```
def input_prompt(): Note: to use deepspeed, you need to install deepspeed,
"""Helper function for getting input from users.""" and if hope to accelerate InternLM, you need a customized version
https://github.com/wangruohui/DeepSpeed/tree/support_internlm_0.10.0
print('\ndouble enter to end input >>> ', end='') Example 4: Tensor parallel the model on 2 GPUs
sentinel = '' # ends when this string is seen
return '\n'.join(iter(input, sentinel))
```python
deepspeed --module --num_gpus 2 lmdeploy.pytorch.chat \
$PATH_TO_LLAMA_MODEL_IN_HF_FORMAT \
--accel deepspeed \
```
def init_model( This module also allow the following control commands to change
model_path: str, generation behaviors during chat.
tokenizer_path: str,
use_fast_tokenizer=True,
local_rank=0,
world_size=1,
):
"""Initialize model and tokenizer from given path.
Args: - `exit`: terminate and exit chat
model_path (str): Path to model. - `config set key=value`: change generation config `key` to `value`,
tokenizer_path (str): Path to tokenizer. e.g. config temperature=0 disable sampling for following chats
use_fast_tokenizer (bool): Whether to use fast tokenizer. - `clear`: clear chat history
local_rank (int): Local rank of current process. """
world_size (int): World size of current process.
import itertools
Note: import logging
If the model is converted from new version of transformers, from typing import Optional
use_fast_tokenizer should be True.
If using depodaca/llama-xb-hf, use_fast_tokenizer should be False. import fire
""" import torch
from transformers import GenerationConfig, PreTrainedModel
if not _is_transformers_available:
raise ImportError('transformers is not installed.\n' from .adapters import init_adapter
'Please install with `pip install transformers`.\n') from .dist import get_local_rank, get_rank, get_world_size
from .model import accel_model, init_model
tokenizer = AutoTokenizer.from_pretrained(tokenizer_path, from .session import BasicSessionManagerWithHistory
use_fast=use_fast_tokenizer, from .utils import BasicStreamer, TerminalIO, control
trust_remote_code=True)
logger = logging.getLogger(__name__)
with LoadNoInit():
model = AutoModelForCausalLM.from_pretrained(model_path,
torch_dtype=torch.float16, def set_logging(log_file: str, debug: bool):
trust_remote_code=True) torch.set_printoptions(linewidth=120)
level = logging.DEBUG if debug else logging.INFO
if not _is_deepspeed_available: log_file = log_file or 'chat.log'
warnings.warn('deepspeed is not installed, ' if r := get_rank() != 0:
'use plain huggingface model.') log_file = log_file + f'.{r}'
logging.basicConfig(level=level,
model = model.cuda(local_rank) format=('%(filename)s: '
else: '%(levelname)s: '
config = dict( '%(funcName)s(): '
tensor_parallel=dict(tp_size=world_size), # Number of GPU '%(lineno)d:\t'
dtype=torch.float16, # dtype of the weights (fp16) '%(message)s'),
replace_with_kernel_inject=True, filename=log_file,
# replace the model with the kernel injector filemode='w')
max_out_tokens=2048, print(f'Worker {get_rank()} logging to {log_file}')
)
# For internlm model not supported by DeepSpeed,
# set replace_with_kernel_inject=False to use AutoTP.
# It's a hotfix before the progress is merged
# https://github.com/InternLM/lmdeploy/issues/136
if 'InternLM' in model.__class__.__name__:
try:
# Use customized deepspeed supporting internlm
# https://github.com/wangruohui/DeepSpeed/tree/support_internlm_0.10.0 (commit cdef2ce) # noqa: E501
from deepspeed.module_inject.containers.internlm import \
InternLMLayerPolicy
except ImportError:
# use stock deepspeed
config.update({'replace_with_kernel_inject': False})
else:
for module in model.modules():
if module.__class__.__name__ == 'InternLMDecoderLayer':
InternLMLayerPolicy._orig_layer_class = module.__class__ # noqa: E501
break
model = deepspeed.init_inference(
model=model, # Transformers models
config=config,
)
return tokenizer, model
def main( def main(
model_path: str, model_path: str,
tokenizer_path: str = None, tokenizer_path: Optional[str] = None,
max_new_tokens: int = 64, accel: Optional[str] = None,
max_new_tokens: int = 128,
temperature: float = 0.8, temperature: float = 0.8,
top_p: float = 0.95, top_p: float = 0.95,
seed: int = 0, seed: int = 0,
use_fast_tokenizer: bool = True, use_fast_tokenizer: bool = True,
max_alloc: int = 2048,
max_session_len: int = None,
log_file: Optional[str] = None,
debug: bool = False,
adapter: Optional[str] = None,
): ):
"""Start chat session with given model. """Chat with model through terminal.
Args: Args:
model_path (str): Path to model. model_path (str): Path to model.
tokenizer_path (str): Path to tokenizer. tokenizer_path (str): Path to tokenizer.
accel (str): Model accelerator.
max_new_tokens (int): Maximum number of tokens to generate. max_new_tokens (int): Maximum number of tokens to generate.
temperature (float): Temperature for sampling. temperature (float): Temperature for sampling.
top_p (float): Top p for sampling. top_p (float): Top p for sampling.
seed (int): Random seed. seed (int): Random seed.
use_fast_tokenizer (bool): Whether to use fast tokenizer. use_fast_tokenizer (bool): Whether to use fast tokenizer.
""" This argument is directly pass to transformer's ``AutoTokenizer.from_pretrained``.
Generally, user should choose to use fast tokenizers.
But if using fast raise some error, try to force using a slow one.
max_alloc (int): Maximum memory to allocate (for deepspeed).
max_session_len (int): Maximum number of tokens allowed for all chat sessions.
This include both history and current session.
log_file (str): Path to log file.
debug (bool): Whether to enable debug mode.
adapter (str): Force to use an adapter.
Generally user should not use this argument because adapter is selected based
on the type of model. Only when it is impossible, e.g. distinguishing llama 1/2
based on `LlamaforCausalLM` class, this argument is required.
Currently, only "llama1" is acceptable for llama1 models.
""" # noqa: E501
set_logging(log_file, debug)
# workers should sync in sampling
torch.manual_seed(seed) torch.manual_seed(seed)
local_rank = int(os.getenv('LOCAL_RANK', '0')) local_rank = get_local_rank()
world_size = int(os.getenv('WORLD_SIZE', '1')) world_size = get_world_size()
# Init model and tokenizer
if not tokenizer_path: if not tokenizer_path:
tokenizer_path = model_path tokenizer_path = model_path
tokenizer, model = init_model( model, tokenizer = init_model(
model_path, model_path,
tokenizer_path, tokenizer_path,
use_fast_tokenizer=use_fast_tokenizer, use_fast_tokenizer=use_fast_tokenizer,
local_rank=local_rank,
world_size=world_size,
) )
gen_config = GenerationConfig( # Init adapter based on model and tokenizer
max_new_tokens=max_new_tokens, adapter = init_adapter(model, tokenizer, adapter)
do_sample=temperature > 0,
temperature=temperature,
top_p=top_p,
)
Decorator, Streamer, stop_criteria = get_utils(model) # Accelerate model
model: PreTrainedModel = accel_model(model,
accel,
max_alloc=max_alloc,
tp_size=world_size)
# warmup # warmup
warmup_config = GenerationConfig( warmup_config = GenerationConfig(
...@@ -161,50 +155,58 @@ def main( ...@@ -161,50 +155,58 @@ def main(
temperature=temperature, temperature=temperature,
top_p=top_p, top_p=top_p,
) )
model.generate(torch.tensor([[1]], device=local_rank), warmup_config) model.generate(torch.tensor([[6]], device=get_local_rank()), warmup_config)
# print("READY ...")
_on_master = local_rank == 0
_is_dist = world_size > 1
while True:
# Receive prompt on master
if _on_master:
prompt = input_prompt()
else:
prompt = None
# Broadcast prompt to all workers
if _is_dist:
prompt = [prompt]
torch.distributed.broadcast_object_list(prompt, src=0)
prompt = prompt[0]
if prompt == 'exit':
exit(0)
# Re-config during runtime
if prompt.startswith('config set'):
try:
keqv = prompt.split()[-1]
k, v = keqv.split('=')
v = eval(v)
gen_config.__setattr__(k, v)
print(f'Worker {local_rank} set {k} to {repr(v)}')
except: # noqa
print('illegal instruction')
else:
if _on_master:
streamer = Streamer(tokenizer)
else:
streamer = None
prompt = Decorator.decorate(prompt)
ids = tokenizer.encode(prompt, return_tensors='pt')
model.generate(ids.cuda(local_rank),
gen_config,
streamer=streamer,
stopping_criteria=stop_criteria)
gen_config = GenerationConfig(
max_new_tokens=max_new_tokens,
do_sample=temperature > 0,
temperature=temperature,
top_p=top_p,
)
if __name__ == '__main__': # Session manager handling history
max_session_len = max_alloc if max_session_len is None else max_session_len
sm = BasicSessionManagerWithHistory(max_session_len=max_session_len,
start_ids=adapter.start_ids,
sep_ids=adapter.sep_ids)
io = TerminalIO()
streamer = BasicStreamer(adapter.decode, io.output)
for r in itertools.count(1):
# User input from IO
logger.info(f'Round {r}')
prompt: str = io.input()
logger.info(f'User input: {prompt}')
# Allow user to change config during runtime or exit
if control(prompt, gen_config, sm):
continue
# Tokenize and apply model specific templates
input_ids = adapter.encode_and_decorate(prompt)
logger.info(f'Input ids:\n{input_ids}')
# Prepend chat history (tensor concatenation)
input_ids = sm.prepend_history(input_ids)
logger.info(f'Input ids with history:\n{input_ids}')
# Generate
input_ids = input_ids.cuda(local_rank)
# returned tensor including input and generated output
output = model.generate(input_ids,
gen_config,
streamer=streamer,
stopping_criteria=adapter.stopping_criteria)
logger.info(f'Output:\n{output}')
# Save output into session manager and maybe trim some history
sm.add_to_history(output)
def cli():
fire.Fire(main) fire.Fire(main)
if __name__ == '__main__':
cli()
# Copyright (c) OpenMMLab. All rights reserved.
"""Helpers for parallel and distributed inference."""
import functools
import os
import torch
from torch.distributed import broadcast, broadcast_object_list, is_initialized
def get_local_rank():
"""Get local rank of current process.
Assume environment variable ``LOCAL_RANK`` is properly set by some launcher.
See: https://pytorch.org/docs/stable/elastic/run.html#environment-variables
""" # noqa: E501
return int(os.getenv('LOCAL_RANK', '0'))
def get_rank():
"""Get rank of current process.
Assume environment variable ``RANK`` is properly set by some launcher.
See: https://pytorch.org/docs/stable/elastic/run.html#environment-variables
""" # noqa: E501
return int(os.getenv('RANK', '0'))
def get_world_size():
"""Get rank of current process.
Assume environment variable ``WORLD_SIZE`` is properly set by some launcher.
See: https://pytorch.org/docs/stable/elastic/run.html#environment-variables
""" # noqa: E501
return int(os.getenv('WORLD_SIZE', '1'))
def master_only(func):
"""Decorator to run a function only on the master process."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
if is_initialized():
if get_rank() != 0:
return None
return func(*args, **kwargs)
return wrapper
def master_only_and_broadcast_general(func):
"""Decorator to run a function only on the master process and broadcast the
result to all processes."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
if is_initialized():
if get_rank() == 0:
result = [func(*args, **kwargs)]
else:
result = [None]
broadcast_object_list(result, src=0)
result = result[0]
else:
result = func(*args, **kwargs)
return result
return wrapper
def master_only_and_broadcast_tensor(func):
"""Decorator to run a function only on the master process and broadcast the
result to all processes.
Note: Require CUDA tensor.
Note: Not really work because we don't know the shape aforehand,
for cpu tensors, use master_only_and_broadcast_general
"""
@functools.wraps(func)
def wrapper(*args, size, dtype, **kwargs):
if is_initialized():
if get_rank() == 0:
result = func(*args, **kwargs)
else:
result = torch.empty(size=size,
dtype=dtype,
device=get_local_rank())
broadcast(result, src=0)
# print(f'rank {get_rank()} received {result}')
else:
result = func(*args, **kwargs)
return result
return wrapper
# Copyright (c) OpenMMLab. All rights reserved.
import logging
import time
import warnings
from typing import Optional
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from .dist import get_local_rank
logger = logging.getLogger(__name__)
class LoadWoInit:
"""Context manager that disable parameter initialization."""
def __init__(self):
self.constant_ = torch.nn.init.constant_
self.zeros_ = torch.nn.init.zeros_
self.ones_ = torch.nn.init.ones_
self.uniform_ = torch.nn.init.uniform_
self.normal_ = torch.nn.init.normal_
self.kaiming_uniform_ = torch.nn.init.kaiming_uniform_
self.kaiming_normal_ = torch.nn.init.kaiming_normal_
def __enter__(self, *args, **kwargs):
torch.nn.init.constant_ = lambda *args, **kwargs: None
torch.nn.init.zeros_ = lambda *args, **kwargs: None
torch.nn.init.ones_ = lambda *args, **kwargs: None
torch.nn.init.uniform_ = lambda *args, **kwargs: None
torch.nn.init.normal_ = lambda *args, **kwargs: None
torch.nn.init.kaiming_uniform_ = lambda *args, **kwargs: None
torch.nn.init.kaiming_normal_ = lambda *args, **kwargs: None
def __exit__(self, *args, **kwargs):
torch.nn.init.constant_ = self.constant_
torch.nn.init.zeros_ = self.zeros_
torch.nn.init.ones_ = self.ones_
torch.nn.init.uniform_ = self.uniform_
torch.nn.init.normal_ = self.normal_
torch.nn.init.kaiming_uniform_ = self.kaiming_uniform_
torch.nn.init.kaiming_normal_ = self.kaiming_normal_
def init_model(model_path: str,
tokenizer_path: Optional[str] = None,
use_fast_tokenizer=True):
"""Initialize model and tokenizer from given model path.
Args:
model_path (str): Path to model.
tokenizer_path (str): Path to tokenizer.
use_fast_tokenizer (bool): Whether to use fast tokenizer.
Note:
If the model is converted from new version of transformers,
use_fast_tokenizer should be True.
If using depodaca/llama-xb-hf, use_fast_tokenizer should be False.
"""
start = time.monotonic()
if not tokenizer_path:
tokenizer_path = model_path
tokenizer = AutoTokenizer.from_pretrained(tokenizer_path,
use_fast=use_fast_tokenizer,
trust_remote_code=True)
with LoadWoInit():
model = AutoModelForCausalLM.from_pretrained(model_path,
torch_dtype=torch.float16,
trust_remote_code=True)
logger.info(f'Model loaded in {time.monotonic() - start:.1f} seconds')
logger.info(f'Model loaded from {model_path}')
logger.debug(model)
return model, tokenizer
def accel_model(model, accel: Optional[str] = None, max_alloc=2048, tp_size=1):
"""Accelerate model with given accelerator.
Note:
Currently we support only deepspeed or just no acceleration.
"""
logger.info(f'Accelerate model with {accel}')
if accel is None:
# No acceleration, just to cuda
# assume single gpu single process
# user is responsible to assign the gpu id via CUDA_VISIBLE_DEVICES # noqa: E501
model = model.cuda(get_local_rank())
elif accel.lower() == 'deepspeed':
# Use deepspeed inference inject fast kernel and/or tensor parallel
try:
import deepspeed
except ImportError as e:
raise ImportError('--accel=deepspeed is specified but '
'deepspeed is not installed.\n'
'Install with `pip install deepspeed`.') from e
config = dict(
tensor_parallel=dict(tp_size=tp_size), # Use world size in general
dtype=torch.float16,
replace_with_kernel_inject=True,
max_out_tokens=max_alloc,
)
if 'InternLM' in model.__class__.__name__:
try:
# Use customized deepspeed supporting InternLM
# https://github.com/wangruohui/DeepSpeed/tree/support_internlm_0.10.0 (commit cdef2ce) # noqa: E501
from deepspeed.module_inject.containers.internlm import \
InternLMLayerPolicy # noqa: E501
except ImportError:
# InternLM is not officially supported by DeepSpeed
# Set replace_with_kernel_inject=False to use AutoTP
config.update({'replace_with_kernel_inject': False})
warnings.warn(
'\033[0;93m'
'Current installation of deepspeed does not '
'support InternLM. Disable kernel injection. '
'To support InternLM, install customized deepspeed with '
'`pip install git+https://github.com/wangruohui/DeepSpeed@support_internlm_0.10.0`' # noqa: E501
'\033[0m')
else:
for module in model.modules():
# Since remote code is dynamically located,
# we need to do this dynamically
if module.__class__.__name__ == 'InternLMDecoderLayer':
InternLMLayerPolicy._orig_layer_class = module.__class__ # noqa: E501
break
logger.debug(f'Using deepspeed config\n{config}')
model = deepspeed.init_inference(
model=model, # Transformers models
config=config,
)
# for k, v in model.named_parameters():
# logger.debug(f"{k}: v.device")
else:
raise ValueError(f'Unsupported accelerator {accel}.')
logger.debug(model)
return model
# Copyright (c) OpenMMLab. All rights reserved.
import logging
import torch
from transformers.generation.utils import ModelOutput
logger = logging.getLogger(__name__)
class BasicSessionManager:
"""Basic session manager without history."""
def prepend_history(self, input_ids):
return input_ids
def add_to_history(self, output):
pass
class BasicSessionManagerWithHistory:
"""Basic session manager with chat history.
Args:
max_session_len (int): Maximum number of tokens allowed for all chat sessions.
reduce_size (int): Number of tokens to be trimmed when reaching maximum
session length. Default: 256.
start_ids (list[int]): Sequences of ids at the start of the chat session.
sep_ids (list[int]): Sequences of ids separating chat sessions.
""" # noqa: E501
bs = 1
def __init__(self,
max_session_len=2048,
reduce_size=256,
start_ids=[1],
sep_ids=[13]) -> None:
self.start_ids = torch.tensor(start_ids, dtype=torch.long)
self.sep_ids = torch.tensor(sep_ids, dtype=torch.long)
assert self.start_ids.ndim == 1
assert self.sep_ids.ndim == 1
self.max_session_len = max(len(start_ids), max_session_len)
self.reduce_size = min(reduce_size, max_session_len - len(start_ids))
assert self.max_session_len > self.reduce_size
self.new_session()
def new_session(self):
self.history_ids = self.start_ids.repeat(self.bs, 1)
def prepend_history(self, input_ids: torch.Tensor):
"""Prepend history ids to input ids and trim if over-length."""
input_ids = input_ids.to(self.history_ids.device).long()
sep_ids = self.sep_ids.to(self.history_ids.device).long().repeat(1, 1)
input_ids = torch.cat([self.history_ids, sep_ids, input_ids], dim=1)
if input_ids.shape[1] > self.max_session_len:
input_ids = input_ids[:,
(self.reduce_size - self.max_session_len):]
input_ids[:, :len(self.start_ids)] = self.start_ids.repeat(
self.bs, 1)
return input_ids
def add_to_history(self, output):
"""Save history output ids.
Note:
Output returned by HuggingFace generator contains both input
and output ids.
"""
if isinstance(output, ModelOutput):
self.history_ids = output.sequences
elif isinstance(output, torch.Tensor):
self.history_ids = output
else:
raise ValueError(f'Unknown output type {type(output)}')
# Copyright (c) OpenMMLab. All rights reserved. # Copyright (c) OpenMMLab. All rights reserved.
import re import logging
from transformers import (PreTrainedTokenizerFast, StoppingCriteria,
StoppingCriteriaList)
from transformers.generation.streamers import BaseStreamer from transformers.generation.streamers import BaseStreamer
from .dist import get_rank, master_only, master_only_and_broadcast_general
def get_utils(model): try:
"""Get utils by model type.""" import readline # To support command line history # noqa: F401
except ImportError: # readline not available
pass
name = model.__class__.__name__ logger = logging.getLogger(__name__)
if name == 'InferenceEngine':
name = model.module.__class__.__name__
if name == 'InternLMForCausalLM':
stop_criteria = InternLMStoppingCriteria()
stop_criteria = StoppingCriteriaList([stop_criteria])
return InternLMDecorator, InternLMStreamer, stop_criteria
else:
return BaseDecorator, DecodeOutputStreamer, None
class TerminalIO:
"""Terminal input and output."""
class DecodeOutputStreamer(BaseStreamer): end_of_output = '\n'
"""Default streamer for HuggingFace models."""
def __init__(self, tokenizer, skip_prompt=True) -> None: @master_only_and_broadcast_general
super().__init__() def input(self):
self.tokenizer = tokenizer """Read input from terminal."""
self.skip_prompt = skip_prompt
self.gen_len = 0
if isinstance(tokenizer, PreTrainedTokenizerFast):
self.decode = self._decode_with_raw_id
self.hex_regex = re.compile(r'^<0x([0-9ABCDEF]+)>$')
else:
self.decode = self._decode_fallback
def _decode_with_raw_id(self, value): print('\ndouble enter to end input >>> ', end='')
"""Convert token ids to tokens and decode.""" sentinel = '' # ends when this string is seen
try:
return '\n'.join(iter(input, sentinel))
except EOFError:
print('Detect EOF, exit')
exit()
tok = self.tokenizer._convert_id_to_token(value) @master_only
if tok.startswith('▁'): # sentencepiece def output(self, string):
space = ' ' """Output to terminal with flush."""
tok = tok[1:]
else: print(string, end='', flush=True)
space = ''
if res := self.hex_regex.match(tok):
tok = chr(int(res.group(1), 16))
if tok == '</s>':
tok = '\n'
return space + tok
def _decode_fallback(self, value):
"""Fallback decoder for non-fast tokenizer."""
tok = self.tokenizer.decode(value, class BasicStreamer(BaseStreamer):
skip_special_tokens=False, """Basic streamer for HuggingFace models."""
clean_up_tokenization_spaces=False)
return tok + ' ' def __init__(self,
decode_func,
output_func,
end_of_output='\n',
skip_prompt=True):
self.decode = decode_func
self.output = output_func
self.end_of_output = end_of_output
self.skip_prompt = skip_prompt
self.gen_len = 0
def put(self, value): def put(self, value):
"""Callback function to decode token and output to stdout.""" """Callback before forwarding current token id to model."""
if self.gen_len == 0 and self.skip_prompt: if self.gen_len == 0 and self.skip_prompt:
pass pass
else: else:
tok = self.decode(value[0]) token = self.decode(value)
print(tok, end='', flush=True) self.output(token)
self.gen_len += 1 self.gen_len += 1
def end(self): def end(self):
"""Callback function to finish generation.""" """Callback at the end of generation."""
self.output(self.end_of_output)
print('\n')
class InternLMStreamer(DecodeOutputStreamer):
"""Streamer for InternLM."""
def __init__(self, tokenizer, skip_prompt=True) -> None:
BaseStreamer().__init__()
self.tokenizer = tokenizer
self.skip_prompt = skip_prompt
self.gen_len = 0 self.gen_len = 0
self.hex_regex = re.compile(r'^<0x([0-9ABCDEF]+)>$')
def decode(self, value):
"""Decode generated tokens for InternLM."""
tok = self.tokenizer.decode(value)
if res := self.hex_regex.match(tok):
tok = chr(int(res.group(1), 16))
if tok == '</s>' or tok == '<eoa>' or tok == '\r':
tok = '\n'
return tok
class BaseDecorator:
"""Base decorator for decorating prompt and extracting generated output."""
@classmethod
def decorate(cls, prompt):
"""Abstract method for adding Add special tokens to prompt."""
return prompt
@classmethod
def extract(cls, gen_out):
"""Abstract methods for extract generated output from model output."""
return gen_out
class InternLMDecorator(BaseDecorator): def control(prompt, gen_config, sm):
"""Decorator for InternLM.""" """Allow user to control generation config and session manager.
regex = re.compile(r'<\|Bot\|>:(.*)') Return:
True if control command applied, False otherwise.
"""
@classmethod if prompt == 'exit':
def decorate(cls, prompt): exit(0)
"""Decorate prompt for InternLM."""
return f'<|User|>:{prompt}<eoh>'
@classmethod if prompt == 'clear':
def extract(cls, gen_out): sm.new_session()
"""Extract generated tokens for InternLM.""" logger.info('Session cleared')
return cls.regex.search(gen_out).group(1) return True
# Re-config during runtime
if prompt.startswith('config set'):
try:
keqv = prompt.split()[-1]
k, v = keqv.split('=')
v = eval(v)
gen_config.__setattr__(k, v)
logger.info(f'Worker {get_rank()} set {k} to {repr(v)}')
logger.info(f'Generator config changed to: {gen_config}')
class InternLMStoppingCriteria(StoppingCriteria): return True
"""Stopping criteria for HF version of InternLM.""" except: # noqa
logger.info(
'illegal instruction, treated as normal conversation. ')
def __call__(self, input_ids, *args, **kwargs) -> bool: return False
return input_ids[0, -1] in [2, 103028]
import unittest
import torch
from lmdeploy.pytorch.dist import (get_rank, master_only,
master_only_and_broadcast_general,
master_only_and_broadcast_tensor)
class SimpleTest(unittest.TestCase):
@master_only
def fake_input(self):
print(f'Evaluate fake input 1 on {get_rank()}')
return 'master only or none'
@master_only_and_broadcast_general
def fake_input21(self):
print(f'Evaluate fake input 21 (str) on {get_rank()}')
return 'master only and_broadcast'
@master_only_and_broadcast_general
def fake_input22(self):
print(f'Evaluate fake input 22 (cpu tensor) on {get_rank()}')
return torch.tensor([6, 66, 666])
@master_only_and_broadcast_tensor
def fake_input3(self):
print(f'Evaluate fake input 3 (gpu tensor) on {get_rank()}')
return torch.tensor([6, 66, 666]).cuda()
def test(self):
torch.distributed.init_process_group(backend='nccl')
rank = get_rank()
# unittest will discard --local_rank, thus set manually
torch.cuda.set_device(rank)
in1 = self.fake_input()
in21 = self.fake_input21()
in22 = self.fake_input22()
in3 = self.fake_input3(dtype=torch.long, size=(1, 3))
if rank == 0:
self.assertEqual(in1, 'master only or none')
else:
self.assertEqual(in1, None)
self.assertEqual(in21, 'master only and_broadcast')
self.assertTrue(torch.allclose(in22, torch.tensor([6, 66, 666])))
self.assertFalse(torch.allclose(in3.cpu(), torch.tensor([6, 6, 666])))
self.assertTrue(torch.allclose(in3.cpu(), torch.tensor([6, 66, 666])))
from lmdeploy.pytorch.model import accel_model, init_model
def test_init_model():
cprint = lambda x: print(f'\033[92m{x}\033[0m') # noqa: E731
# Test llama2-7b
for model_path in ['llama2/huggingface/llama-2-7b', 'internlm-7b']:
model, tokenizer = init_model(model_path)
assert tokenizer.is_fast
cprint('llama2 on CPU')
print(model)
model1 = accel_model(model)
cprint('llama2 on GPU')
print(model1)
cprint('llama2 with kernel injection')
model2 = accel_model(model, accel='deepspeed')
assert 'DeepSpeedSelfAttention' in repr(model2)
assert 'DeepSpeedMLP' in repr(model2)
from lmdeploy.pytorch.utils import BasicStreamer, TerminalIO
def test_terminal_io(monkeypatch):
import io
tio = TerminalIO()
inputs = 'hello\n\n'
# inputs = 'hello\n\n\x1B[A\n\n'
monkeypatch.setattr('sys.stdin', io.StringIO(inputs))
string = tio.input()
tio.output(string)
assert string == 'hello'
# string = tio.input()
# tio.output(string)
# assert string == 'hello'
def test_basic_streamer():
output = []
def decode_func(value):
return value + 10
def output_func(value):
output.append(value)
streamer = BasicStreamer(decode_func, output_func)
for i in range(10):
streamer.put(i)
if i == 5:
streamer.end()
streamer.end()
assert output == [11, 12, 13, 14, 15, '\n', 17, 18, 19, '\n']
output.clear()
streamer = BasicStreamer(decode_func, output_func, skip_prompt=False)
for i in range(10):
streamer.put(i)
if i == 5:
streamer.end()
streamer.end()
assert output == [10, 11, 12, 13, 14, 15, '\n', 16, 17, 18, 19, '\n']
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment