Unverified Commit d3dbe179 authored by pppppM's avatar pppppM Committed by GitHub
Browse files

[Feature] Support AWQ (#108)

* support kv cache offload

* add dataloader docstring

* complete gitignore

* refactor collect mod fn

* add calibration

* fix lint

* add observers and quantizers

* fix lints

* add global available mixin

* fix lints

* split batch inference

* support smoothquant and awq

* update export kv scales

* fix lints

* fix some bugs

* update weight only usage

* update usage

* auto mapping and support smooth internlm

* trust remote code

* fix num head key error

* fix bias error

* align shape and pack order with llm-awq

* modified according to LZHgrla's comments.

* update gitignore

* fix kv qparams export error

* update usage

* decouple calibrate and awq

* update docstrings

* update api name

* update readme

* update readme

* update readme

* update readme

* update kv_qparams and readme

* fix typos
parent 0d9c6c9d
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
.vscode/ .vscode/
.idea/ # C extensions
*.so
# Distribution / packaging
.Python
develop-eggs/
dist/
downloads/
eggs/
.eggs/ .eggs/
__pycache__/ lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/ *.egg-info/
workspace/ .installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache .cache
*build*/ *build*/
!builder/ !builder/
...@@ -12,4 +47,17 @@ dist/ ...@@ -12,4 +47,17 @@ dist/
examples/cpp/llama/*.csv examples/cpp/llama/*.csv
*.npy *.npy
*.weight *.weight
*.pyc
# LMDeploy
workspace/
work_dir*/
# Huggingface
*.bin
*config.json
*generate_config.json
# Pytorch
*.pth
*.py~
*.sh~
...@@ -13,6 +13,7 @@ ______________________________________________________________________ ...@@ -13,6 +13,7 @@ ______________________________________________________________________
## News 🎉 ## News 🎉
- \[2023/08\] TurboMind supports 4-bit quantization and inference.
- \[2023/07\] TurboMind supports Llama-2 70B with GQA. - \[2023/07\] TurboMind supports Llama-2 70B with GQA.
- \[2023/07\] TurboMind supports Llama-2 7B/13B. - \[2023/07\] TurboMind supports Llama-2 7B/13B.
- \[2023/07\] TurboMind supports tensor-parallel inference of InternLM. - \[2023/07\] TurboMind supports tensor-parallel inference of InternLM.
...@@ -153,16 +154,51 @@ pip install deepspeed ...@@ -153,16 +154,51 @@ pip install deepspeed
## Quantization ## Quantization
### Step 1. Obtain Quantization Parameters
First, run the quantization script to obtain the quantization parameters.
> After execution, various parameters needed for quantization will be stored in `$WORK_DIR`; these will be used in the following steps..
```
python3 -m lmdeploy.lite.apis.calibrate \
--model $HF_MODEL \
--calib_dataset 'c4' \ # Calibration dataset, supports c4, ptb, wikitext2, pileval
--calib_samples 128 \ # Number of samples in the calibration set, if memory is insufficient, you can appropriately reduce this
--calib_seqlen 2048 \ # Length of a single piece of text, if memory is insufficient, you can appropriately reduce this
--work_dir $WORK_DIR \ # Folder storing Pytorch format quantization statistics parameters and post-quantization weight
```
### Step 2. Actual Model Quantization
`LMDeploy` supports INT4 quantization of weights and INT8 quantization of KV Cache. Run the corresponding script according to your needs.
#### Weight INT4 Quantization
LMDeploy uses AWQ algorithm for model weight quantization
> Requires input from the $WORK_DIR of step 1, and the quantized weights will also be stored in this folder.
```
python3 -m lmdeploy.lite.apis.auto_awq \
--w_bits 4 \ # Bit number for weight quantization
--w_sym False \ # Whether to use symmetric quantization for weights
--w_group_size 128 \ # Group size for weight quantization statistics
--work_dir $WORK_DIR \ # Directory saving quantization parameters from Step 1
```
#### KV Cache INT8 Quantization
In fp16 mode, kv_cache int8 quantization can be enabled, and a single card can serve more users. In fp16 mode, kv_cache int8 quantization can be enabled, and a single card can serve more users.
First execute the quantization script, and the quantization parameters are stored in the `workspace/triton_models/weights` transformed by `deploy.py`. First execute the quantization script, and the quantization parameters are stored in the `workspace/triton_models/weights` transformed by `deploy.py`.
``` ```
python3 -m lmdeploy.lite.apis.kv_qparams \ python3 -m lmdeploy.lite.apis.kv_qparams \
--model $HF_MODEL \ --work_dir $WORK_DIR \ # Directory saving quantization parameters from Step 1
--output_dir $DEPLOY_WEIGHT_DIR \ --turbomind_dir $TURBOMIND_DIR \
--symmetry True \ # Whether to use symmetric or asymmetric quantization. --kv_sym False \ # Whether to use symmetric or asymmetric quantization.
--offload False \ # Whether to offload some modules to CPU to save GPU memory. --num_tp 1 \ # The number of GPUs used for tensor parallelism
--num_tp 1 \ # The number of GPUs used for tensor parallelism
``` ```
Then adjust `workspace/triton_models/weights/config.ini` Then adjust `workspace/triton_models/weights/config.ini`
...@@ -182,6 +218,7 @@ We appreciate all contributions to LMDeploy. Please refer to [CONTRIBUTING.md](. ...@@ -182,6 +218,7 @@ We appreciate all contributions to LMDeploy. Please refer to [CONTRIBUTING.md](.
## Acknowledgement ## Acknowledgement
- [FasterTransformer](https://github.com/NVIDIA/FasterTransformer) - [FasterTransformer](https://github.com/NVIDIA/FasterTransformer)
- [llm-awq](https://github.com/mit-han-lab/llm-awq)
## License ## License
......
...@@ -13,6 +13,7 @@ ______________________________________________________________________ ...@@ -13,6 +13,7 @@ ______________________________________________________________________
## 更新 🎉 ## 更新 🎉
- \[2023/08\] TurboMind 支持权重 4-bit 量化和推理
- \[2023/07\] TurboMind 支持使用 GQA 的 Llama-2 70B 模型 - \[2023/07\] TurboMind 支持使用 GQA 的 Llama-2 70B 模型
- \[2023/07\] TurboMind 支持 Llama-2 7B/13B 模型 - \[2023/07\] TurboMind 支持 Llama-2 7B/13B 模型
- \[2023/07\] TurboMind 支持 InternLM 的 Tensor Parallel 推理 - \[2023/07\] TurboMind 支持 InternLM 的 Tensor Parallel 推理
...@@ -151,16 +152,51 @@ deepspeed --module --num_gpus 2 lmdeploy.pytorch.chat \ ...@@ -151,16 +152,51 @@ deepspeed --module --num_gpus 2 lmdeploy.pytorch.chat \
## 量化部署 ## 量化部署
在 fp16 模式下,可以开启 kv_cache int8 量化,单卡可服务更多用户。 ### Step 1. 获取量化参数
首先执行量化脚本,量化参数存放到 `deploy.py` 转换的 `workspace/triton_models/weights` 目录下。
首先,执行量化脚本,获取量化参数
> 执行后,量化需要的各种参数会存放在 $WORK_DIR 中; 接下来的步骤中会用到
``` ```
python3 -m lmdeploy.lite.apis.kv_qparams \
python3 -m lmdeploy.lite.apis.calibrate \
--model $HF_MODEL \ --model $HF_MODEL \
--output_dir $DEPLOY_WEIGHT_DIR \ --calib_dataset 'c4' \ # 校准数据集,支持 c4, ptb, wikitext2, pileval
--symmetry True \ # 对称量化或非对称量化,默认为 True --calib_samples 128 \ # 校准集的样本数,如果显存不够,可以适当调小
--offload False \ # 将模型放在 CPU,只在推理时加载部分模块到 GPU,默认为 False --calib_seqlen 2048 \ # 单条的文本长度,如果显存不够,可以适当调小
--num_tp 1 \ # Tensor 并行使用的 GPU 数,和 deploy.py 保持一致 --work_dir $WORK_DIR \ # 保存 Pytorch 格式量化统计参数和量化后权重的文件夹
```
### Step 2. 实际量化模型
目前支持对权重的 INT4 量化和 KV Cache 的 INT8 量化,根据需求执行对应脚本即可
#### 权重 INT4 量化
LMDeploy 使用 [AWQ](https://arxiv.org/abs/2306.00978) 算法对模型权重进行量化
> 需要输入第一步的 \`$WORK_DIR\`\` ,量化后的权重也会存在这个文件夹中
```
python3 -m lmdeploy.lite.apis.auto_awq \
--w_bits 4 \ # 权重量化的 bit 数
--w_group_size 128 \ # 权重量化分组统计尺寸
--work_dir $WORK_DIR \ # Step 1 保存量化参数的目录
```
#### KV Cache INT8 量化
首先,导出 TurboMind 格式的量化参数(KV Cache INT8 量化需要使用 `TurboMind`
> `$TURBOMIND_DIR` 为 `deploy.py` 转换得到的`workspace/triton_models/weights\` 目录
```
python3 -m lmdeploy.lite.apis.kv_qparams \
--work_dir $WORK_DIR \ # Step 1 保存量化参数的目录
--turbomind_dir $TURBOMIND_DIR \
--kv_sym False \ # 对称量化或非对称量化,默认为 False
--num_tp 1 \ # Tensor 并行使用的 GPU 数,和 deploy.py 保持一致
``` ```
然后调整 `workspace/triton_models/weights/config.ini` 然后调整 `workspace/triton_models/weights/config.ini`
...@@ -180,6 +216,7 @@ python3 -m lmdeploy.lite.apis.kv_qparams \ ...@@ -180,6 +216,7 @@ python3 -m lmdeploy.lite.apis.kv_qparams \
## 致谢 ## 致谢
- [FasterTransformer](https://github.com/NVIDIA/FasterTransformer) - [FasterTransformer](https://github.com/NVIDIA/FasterTransformer)
- [llm-awq](https://github.com/mit-han-lab/llm-awq)
## License ## License
......
# Copyright (c) OpenMMLab. All rights reserved.
from pathlib import Path
import fire
import torch
from torch import nn
from transformers import AutoModelForCausalLM, AutoTokenizer
from lmdeploy.lite.quantization.awq import (FC_FCS_MAP, NORM_FCS_MAP,
quant_weights, smooth_layers)
from lmdeploy.lite.utils import collect_target_modules
LAYER_TYPE_MAP = {
'InternLMForCausalLM': 'InternLMDecoderLayer',
'QWenLMHeadModel': 'QWenBlock',
'BaiChuanForCausalLM': 'DecoderLayer',
'LlamaForCausalLM': 'LlamaDecoderLayer',
}
NORM_TYPE_MAP = {
'InternLMForCausalLM': 'InternLMRMSNorm',
'QWenLMHeadModel': 'RMSNorm',
'BaiChuanForCausalLM': 'RMSNorm',
'LlamaForCausalLM': 'LlamaRMSNorm',
}
def auto_awq(model: str,
w_bits: int = 4,
w_sym: bool = False,
w_group_size: int = 128,
work_dir: str = './work_dir',
device: str = 'cuda'):
tokenizer = AutoTokenizer.from_pretrained(model,
use_fast=False,
trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(model,
torch_dtype=torch.float16,
trust_remote_code=True)
layer_type = LAYER_TYPE_MAP[type(model).__name__]
fc2fcs = FC_FCS_MAP[layer_type]
norm2fcs = NORM_FCS_MAP[layer_type]
work_dir = Path(work_dir)
act_scales = torch.load(work_dir / 'inputs_stats.pth')['absmean']
layers = collect_target_modules(model, layer_type)
fcs = {}
for l_name, layer in layers.items():
name2fc = collect_target_modules(layer, nn.Linear, prefix=l_name)
fcs.update(name2fc)
smooth_layers(layers, fc2fcs, norm2fcs, act_scales, w_group_size, device)
quant_weights(model, fcs, w_bits, w_sym, w_group_size, device)
model.save_pretrained(work_dir)
tokenizer.save_pretrained(work_dir)
if __name__ == '__main__':
fire.Fire(auto_awq)
# Copyright (c) OpenMMLab. All rights reserved.
from pathlib import Path
import fire
import torch
from accelerate import (infer_auto_device_map, init_empty_weights,
load_checkpoint_in_model)
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer
from lmdeploy.lite.quantization import CalibrationContext
from lmdeploy.lite.utils import collect_target_modules, get_calib_loaders
LAYER_TYPE_MAP = {
'InternLMForCausalLM': 'InternLMDecoderLayer',
'QWenLMHeadModel': 'QWenBlock',
'BaiChuanForCausalLM': 'DecoderLayer',
'LlamaForCausalLM': 'LlamaDecoderLayer',
}
NORM_TYPE_MAP = {
'InternLMForCausalLM': 'InternLMRMSNorm',
'QWenLMHeadModel': 'RMSNorm',
'BaiChuanForCausalLM': 'RMSNorm',
'LlamaForCausalLM': 'LlamaRMSNorm',
}
def calibrate(model: str,
calib_dataset: str = 'c4',
calib_samples: int = 128,
calib_seqlen: int = 2048,
work_dir: str = './work_dir',
device: str = 'cuda') -> None:
"""The main function for loading the model and performing calibration on a
given dataset.
Args:
model (str): The model to be loaded.
calib_dataset (str, optional): The calibration dataset name.
Defaults to 'c4'.
calib_samples (int, optional): The number of samples for calibration.
Defaults to 128.
calib_seqlen (int, optional): The sequence length for calibration.
Defaults to 2048.
work_dir (str): The working directory for outputs.
Defaults to './work_dir'.
device (str, optional): The device to be used for calculation.
Defaults to 'cuda'.
"""
assert calib_dataset in ['c4', 'ptb', 'wikitext2', 'pileval'], \
'Support only `c4`, `ptb`, `wikitext2` or `pileval`.'
# Load tokenizer and configuration
tokenizer = AutoTokenizer.from_pretrained(model,
use_fast=False,
trust_remote_code=True)
hf_config = AutoConfig.from_pretrained(model, trust_remote_code=True)
checkpoint = hf_config._name_or_path
with init_empty_weights():
# Load model
model = AutoModelForCausalLM.from_pretrained(model,
torch_dtype=torch.float16,
trust_remote_code=True)
model.config.use_cache = False
layer_type = LAYER_TYPE_MAP[type(model).__name__]
norm_type = NORM_TYPE_MAP[type(model).__name__]
decoder_layers = collect_target_modules(model, layer_type)
# Infer device map
device_map = infer_auto_device_map(model,
no_split_module_classes=[layer_type])
for name in device_map.keys():
if name in decoder_layers:
device_map[name] = 'cpu'
else:
device_map[name] = 0
load_checkpoint_in_model(model, checkpoint, device_map)
print('Loading calibrate dataset ...')
calib_loader, _ = get_calib_loaders(calib_dataset,
tokenizer,
nsamples=calib_samples,
seqlen=calib_seqlen)
# Initialize calibration context
calib_ctx = CalibrationContext(model,
tokenizer,
layer_type=layer_type,
norm_type=norm_type,
device=device)
with calib_ctx:
all_data = torch.cat([
data if isinstance(data, torch.Tensor) else data[0]
for data in calib_loader
]).to(device)
calib_ctx.calibrate(all_data)
# Create work directory if not exists
work_dir = Path(work_dir)
work_dir.mkdir(parents=True, exist_ok=True)
calib_ctx.export(work_dir)
if __name__ == '__main__':
fire.Fire(calibrate)
# Copyright (c) OpenMMLab. All rights reserved. # Copyright (c) OpenMMLab. All rights reserved.
from pathlib import Path from pathlib import Path
from typing import List, Tuple from typing import Union
import fire import fire
import numpy as np
import torch import torch
from tqdm import tqdm
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.models.llama.modeling_llama import (LlamaDecoderLayer,
LlamaForCausalLM)
from lmdeploy.lite.quantization import Observer
from lmdeploy.lite.utils import get_calib_loaders, memory_efficient_inference
# OFFLOAD_MOD_MAP is a dictionary that specifies which parts of
# certain model types should be offloaded to the CPU during inference.
# The key of this dictionary is a model class and the value is a tuple
# of modules within that model that should be offloaded.
# As an example, here it is specified that for the LlamaForCausalLM model,
# only the LlamaDecoderLayer should be offloaded. This might be because
# the LlamaDecoderLayer consumes a significant amount of GPU memory
# and offloading it when not in use can help save GPU resources.
OFFLOAD_MOD_MAP = {LlamaForCausalLM: (LlamaDecoderLayer, )}
def absmax(tensor: torch.Tensor) -> float:
"""Returns the maximum absolute value in a tensor.
Args:
tensor (torch.Tensor): Input tensor.
Returns:
float: Maximum absolute value in the tensor.
"""
return tensor.abs().max().item()
def _export_sym(key_stats: dict,
value_stats: dict,
bits: int,
out_dir: Union[str, Path],
tp: int = 1) -> None:
"""Export symmetric quantization parameters to specified directory."""
keys_absmax = key_stats['absmax']
values_absmax = value_stats['absmax']
for layer_idx, name in enumerate(keys_absmax.keys()):
k_absmax = keys_absmax[name]
v_absmax = values_absmax[name]
heads, dims = k_absmax.shape
assert heads % tp == 0
mp_k_absmax = torch.chunk(k_absmax, tp)
mp_v_absmax = torch.chunk(v_absmax, tp)
for i in range(tp):
# quant: q = f / scale
# dequant: f = q * scale
k_s = mp_k_absmax[i].max() / (2**(bits - 1) - 1)
v_s = mp_v_absmax[i].max() / (2**(bits - 1) - 1)
kv_qparams = np.array([k_s, v_s], dtype=np.float32)
out_path = out_dir / f'layers.{layer_idx}.past_kv_scale.{i}.weight' # noqa: E501
kv_qparams.tofile(out_path)
print(f'Layer {layer_idx} MP {i} KV scales done.')
def _export_asym(key_stats: dict,
value_stats: dict,
bits: int,
out_dir: Union[str, Path],
tp: int = 1) -> None:
"""Export asymmetric quantization parameters to specified directory."""
keys_min = key_stats['min']
values_min = value_stats['min']
keys_max = key_stats['max']
values_max = value_stats['max']
for layer_idx, name in enumerate(keys_min.keys()):
k_max = keys_max[name]
v_max = values_max[name]
k_min = keys_min[name]
v_min = values_min[name]
heads, dims = k_min.shape
assert heads % tp == 0
tp_k_min = torch.chunk(k_min, tp)
tp_v_min = torch.chunk(v_min, tp)
tp_k_max = torch.chunk(k_max, tp)
tp_v_max = torch.chunk(v_max, tp)
for i in range(tp):
# quant: q = (f - zp) / scale
# dequant: f = q * scale + zp
k_min = tp_k_min[i].min()
v_min = tp_v_min[i].min()
def minmax(tensor: torch.Tensor) -> Tuple[float, float]: k_max = tp_k_max[i].max()
"""Returns the minimum and maximum value in a tensor. v_max = tp_v_max[i].max()
Args: k_scale = (k_max - k_min) / (2**bits - 1)
tensor (torch.Tensor): Input tensor. v_scale = (v_max - v_min) / (2**bits - 1)
Returns: kv_qparams = np.array([k_scale, k_min, v_scale, v_min],
tuple: Minimum and maximum value in the tensor. dtype=np.float32)
""" out_path = out_dir / f'layers.{layer_idx}.past_kv_scale.{i}.weight' # noqa: E501
return (tensor.min().item(), tensor.max().item()) kv_qparams.tofile(out_path)
print(f'Layer {layer_idx} MP {i} KV scales&zeros done.')
def stats_past_key_values(past_key_values: List[torch.Tensor], def main(work_dir: str,
k_obs_list: List[Observer], turbomind_dir: str,
v_obs_list: List[Observer], symmetry: bool, kv_bits: int = 8,
num_tp: int) -> None: kv_sym: bool = True,
"""Collects statistics for past key values. num_tp: int = 1) -> None:
"""Main function to export key and value stats.
Args: Args:
past_key_values (List[Tensor]): Past key values generated by the work_dir (Union[str, Path]): Directory path where the stats are saved.
model during forward pass. turbomind_dir (Union[str, Path]): Directory path where to
k_obs_list (List[Observer]): List of observers for collecting save the results.
stats for keys. kv_bits (int, optional): Number of bits for quantization.
v_obs_list (List[Observer]): List of observers for collecting Defaults to 8.
stats for values. kv_sym (bool, optional): Whether to use symmetric quantizaiton.
symmetry (bool): Whether to use symmetric or asymmetric quantization. Defaults to True.
num_tp (int, optional): Number of tensor parallelism. Defaults to 1.
""" """
if len(k_obs_list) == 0 and len(v_obs_list) == 0:
num_layers = len(past_key_values)
for _ in range(num_layers * num_tp):
if symmetry:
k_observer = Observer(absmax)
v_observer = Observer(absmax)
else:
k_observer = Observer(minmax)
v_observer = Observer(minmax)
k_observer.enable_observer()
v_observer.enable_observer()
k_obs_list.append(k_observer)
v_obs_list.append(v_observer)
assert len(k_obs_list) == len(past_key_values) * num_tp
assert len(v_obs_list) == len(past_key_values) * num_tp
for layer, (k_cache, v_cache) in enumerate(past_key_values):
for tp in range(num_tp):
k_obs = k_obs_list[layer * num_tp + tp]
v_obs = v_obs_list[layer * num_tp + tp]
# K Cache Shape: [Bs, Heads, Tokens, Dims]
per_tp_heads = k_cache.size(1) // num_tp
k_obs(k_cache[:, tp * per_tp_heads:(tp + 1) * per_tp_heads])
v_obs(v_cache[:, tp * per_tp_heads:(tp + 1) * per_tp_heads])
def main(model: str,
bits: int = 8,
granularity: str = 'per_tensor',
symmetry: bool = True,
offload: bool = False,
max_seq_len: int = 2048,
num_tp: int = 1,
calib_dataset: str = 'c4',
calib_samples: int = 128,
output_dir: str = './kv_scales'):
assert granularity in ['per_tensor'], \
'Currently, only support per-tensor quantization for the kv cache.'
assert bits == 8, \
'Currently, only support 8-bit quantization for the kv cache.'
assert calib_dataset in ['c4', 'ptb', 'wikitext2', 'pileval'], \
'Currently, only support `c4`, `ptb`, `wikitext2`, or `pileval`.'
tokenizer = AutoTokenizer.from_pretrained(model,
use_fast=False,
trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(model, trust_remote_code=True)
model.config.use_cache = True
print('Loading calibrate dataset ...')
calib_loader, _ = get_calib_loaders(calib_dataset,
tokenizer,
nsamples=calib_samples,
seqlen=max_seq_len)
k_obs_list = list()
v_obs_list = list()
if offload:
import warnings
warnings.warn('You are using the `offload` mode, in which the '
'modules in the `OFFLOAD_MOD_MAP` will be moved to '
'the GPU during forward and kept on the CPU at other '
'times to save GPU memory.')
if type(model) not in OFFLOAD_MOD_MAP:
warnings.warn(f'{type(model)} is not in the `OFFLOAD_MOD_MAP`,'
f'and by default, offloading will be done on '
'`nn.Linear`. You can add more robust modules to '
'the `OFFLOAD_MOD_MAP` for faster speed.')
offload_mod = OFFLOAD_MOD_MAP[type(model)]
with memory_efficient_inference(model, offload_mod):
for data in tqdm(calib_loader, desc='Calibrating: '):
if isinstance(data, torch.Tensor):
output = model(data.to('cuda'))
else:
output = model(data[0].to('cuda'))
kv_cache = output.past_key_values
stats_past_key_values(kv_cache, k_obs_list, v_obs_list,
symmetry, num_tp)
else:
model.to('cuda')
with torch.inference_mode():
for data in tqdm(calib_loader, desc='Calibrating: '):
if isinstance(data, torch.Tensor):
output = model(data.to('cuda'))
else:
output = model(data[0].to('cuda'))
kv_cache = output.past_key_values
stats_past_key_values(kv_cache, k_obs_list, v_obs_list,
symmetry, num_tp)
import numpy as np
out_dir = Path(output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
for i, (k_obs, v_obs) in enumerate(zip(k_obs_list, v_obs_list)):
layer = i // num_tp
tp = i % num_tp
save_path = out_dir / f'layers.{layer}.past_kv_scale.{tp}.weight'
if symmetry:
# quant: q = f / scale
# dequant: f = q * scale
k_scale = max(k_obs.buffer) / (2**(bits - 1) - 1)
v_scale = max(v_obs.buffer) / (2**(bits - 1) - 1)
kv_qparams = np.array([k_scale, v_scale], dtype=np.float32)
kv_qparams.tofile(save_path)
print(f'Layer {layer} TP {tp} KV scales done.')
else: work_dir = Path(work_dir)
# quant: q = (f - zp) / scale
# dequant: f = q * scale + zp
k_min = min([min_k for min_k, _ in k_obs.buffer])
k_max = max([max_k for _, max_k in k_obs.buffer])
v_min = min([min_v for min_v, _ in v_obs.buffer]) tm_dir = Path(turbomind_dir)
v_max = max([max_v for _, max_v in v_obs.buffer]) assert tm_dir.exists(), 'The specified TurboMind directory does not exist.'
k_scale = (k_max - k_min) / (2**bits - 1) key_stats = torch.load(work_dir / 'key_stats.pth')
v_scale = (v_max - v_min) / (2**bits - 1) value_stats = torch.load(work_dir / 'value_stats.pth')
kv_qparams = np.array([k_scale, k_min, v_scale, v_min], if kv_sym:
dtype=np.float32) _export_sym(key_stats, value_stats, kv_bits, tm_dir, num_tp)
kv_qparams.tofile(save_path) else:
print(f'Layer {i} KV scales&zeros done.') _export_asym(key_stats, value_stats, kv_bits, tm_dir, num_tp)
if __name__ == '__main__': if __name__ == '__main__':
......
# Copyright (c) OpenMMLab. All rights reserved.
from torch import nn
OFFLOAD_MOD = (nn.Linear, )
KV_CACHE_SIGNATURE = 'past_key_value'
# Copyright (c) OpenMMLab. All rights reserved. # Copyright (c) OpenMMLab. All rights reserved.
from .observer import Observer from .activation import ActivationObserver, KVCacheObserver
from .calibration import CalibrationContext
from .weight import WeightQuantizer
__all__ = ['Observer'] __all__ = [
'WeightQuantizer', 'ActivationObserver', 'KVCacheObserver',
'CalibrationContext'
]
# Copyright (c) OpenMMLab. All rights reserved.
from .observer import ActivationObserver, KVCacheObserver
__all__ = ['ActivationObserver', 'KVCacheObserver']
# Copyright (c) OpenMMLab. All rights reserved.
import torch
from lmdeploy.lite.utils.global_avail import GlobalAvailMixin
class KVCacheObserver(GlobalAvailMixin):
"""A class to observe and record the max, min, and absolute max value of
given tensor."""
def __init__(self, num_head: int, head_dim: int) -> None:
"""Constructor for KVCacheObserver.
Args:
num_head : Number of heads
head_dim : Dimension of each head
"""
self.num_head = num_head
self.head_dim = head_dim
self.max_val = torch.full((num_head, head_dim),
-torch.inf,
dtype=torch.float16)
self.min_val = torch.full((num_head, head_dim),
torch.inf,
dtype=torch.float16)
self.absmax_val = torch.full((num_head, head_dim),
0,
dtype=torch.float16)
@torch.no_grad()
def observe(self, x: torch.Tensor) -> None:
"""Function to observe the input tensor and update the max, min, and
absolute max values.
Args:
x : Input tensor
"""
assert len(x.shape) == 4
x = x.transpose(1, 2)
assert x.size(2) == self.num_head
assert x.size(3) == self.head_dim
cur_max = x.flatten(0, 1).max(0)[0].cpu()
cur_min = x.flatten(0, 1).min(0)[0].cpu()
cur_absmax = x.flatten(0, 1).abs().max(0)[0].cpu()
self.max_val = torch.maximum(self.max_val, cur_max)
self.min_val = torch.minimum(self.min_val, cur_min)
self.absmax_val = torch.maximum(self.absmax_val, cur_absmax)
class ActivationObserver(GlobalAvailMixin):
"""A class to observe and record the max, min, mean, absolute max, and
absolute mean value of a given tensor.
Also keeps track of the number of batches observed.
"""
def __init__(self, dim: int) -> None:
"""Constructor for ActivationObserver.
Args:
dim : Dimension of the tensor
"""
self.dim = dim
self.max_val = torch.full((dim, ), -torch.inf, dtype=torch.float16)
self.min_val = torch.full((dim, ), torch.inf, dtype=torch.float16)
self.absmax_val = torch.full((dim, ), 0, dtype=torch.float16)
self.absmean_val = torch.full((dim, ), 0, dtype=torch.float16)
self.mean_val = torch.full((dim, ), 0, dtype=torch.float16)
self.num_batches_tracked = 0
@torch.no_grad()
def observe(self, x: torch.Tensor) -> None:
"""Function to observe the input tensor and update the max, min, mean,
absolute max, absolute mean values and number of batches tracked.
Args:
x : Input tensor
"""
assert len(x.shape) == 3
assert x.size(2) == self.dim
cur_val = x.flatten(0, 1)
cur_max = cur_val.max(0)[0].cpu()
cur_min = cur_val.min(0)[0].cpu()
cur_mean = cur_val.mean(0).cpu()
cur_abs = cur_val.abs()
cur_absmax = cur_abs.max(0)[0].cpu()
cur_absmean = cur_abs.mean(0).cpu()
self.max_val = torch.maximum(self.max_val, cur_max)
self.min_val = torch.minimum(self.min_val, cur_min)
self.absmax_val = torch.maximum(self.absmax_val, cur_absmax)
# Update mean and absmean value with accumulated sum divided
# by total number of batches
self.mean_val = (
(self.mean_val * self.num_batches_tracked + cur_mean) /
(self.num_batches_tracked + 1))
self.absmean_val = (
(self.absmean_val * self.num_batches_tracked + cur_absmean) /
(self.num_batches_tracked + 1))
# Increment the count of batches tracked
self.num_batches_tracked += 1
# Copyright (c) OpenMMLab. All rights reserved.
from typing import List
import torch
# Maps that describe the structure of your model.
NORM_FCS_MAP = {
'LlamaDecoderLayer': {
'input_layernorm':
['self_attn.k_proj', 'self_attn.q_proj', 'self_attn.v_proj'],
'post_attention_layernorm': ['mlp.gate_proj', 'mlp.up_proj']
},
'InternLMDecoderLayer': {
'input_layernorm':
['self_attn.k_proj', 'self_attn.q_proj', 'self_attn.v_proj'],
'post_attention_layernorm': ['mlp.gate_proj', 'mlp.up_proj']
}
}
FC_FCS_MAP = {
'LlamaDecoderLayer': {
'self_attn.v_proj': ['self_attn.o_proj'],
'mlp.up_proj': ['mlp.down_proj']
},
'InternLMDecoderLayer': {
'self_attn.v_proj': ['self_attn.o_proj'],
'mlp.up_proj': ['mlp.down_proj']
}
}
@torch.no_grad()
def get_weight_scale(weight, q_group_size=-1):
org_shape = weight.shape
if q_group_size > 0:
weight = weight.view(-1, q_group_size)
scale = weight.abs() / weight.abs().amax(dim=1, keepdim=True)
scale = scale.view(org_shape)
scale = scale.mean(0)
return scale
@torch.no_grad()
def smooth_ln_fcs(ln: torch.nn.Module,
fcs: List[torch.nn.Module],
act_scales: torch.Tensor,
group_size: int = -1,
alpha: float = 0.5) -> torch.Tensor:
"""Smooth weights of a layer normalization and its fully connected layers.
:param ln: Layer Normalization module
:param fcs: List of Fully Connected modules
:param act_scales: Activation scales
:param alpha: Scaling factor (default is 0.5)
:return: Scales
"""
device, dtype = fcs[0].weight.device, fcs[0].weight.dtype
act_scales = act_scales.to(device=device, dtype=dtype)
concat_w = torch.cat([fc.weight for fc in fcs], dim=0)
w_scales = get_weight_scale(concat_w, group_size)
scales = (act_scales.pow(alpha) /
w_scales.pow(1 - alpha)).clamp(min=1e-4).to(device).to(dtype)
scales = scales / (scales.max() * scales.min()).sqrt()
ln.weight.div_(scales)
if hasattr(ln, 'bias'):
ln.bias.div_(scales)
for fc in fcs:
fc.weight.mul_(scales.view(1, -1))
for p in ln.parameters():
assert torch.isnan(p).sum() == 0
for fc in fcs:
for p in fc.parameters():
assert torch.isnan(p).sum() == 0
return scales
@torch.no_grad()
def smooth_fc_fcs(pre_fc: torch.nn.Module,
fcs: List[torch.nn.Module],
act_scales: torch.Tensor,
group_size: int = -1,
alpha: float = 0.5) -> torch.Tensor:
"""Smooth weights of a fully connected layer and its downstream layers.
:param pre_fc: Previous Fully Connected layer
:param fcs: List of Fully Connected modules
:param act_scales: Activation scales
:param alpha: Scaling factor (default is 0.5)
:return: Scales
"""
device, dtype = pre_fc.weight.device, pre_fc.weight.dtype
act_scales = act_scales.to(device=device, dtype=dtype)
concat_w = torch.cat([fc.weight for fc in fcs], dim=0)
w_scales = get_weight_scale(concat_w, group_size)
scales = (act_scales.pow(alpha) /
w_scales.pow(1 - alpha)).clamp(min=1e-4).to(device).to(dtype)
scales = scales / (scales.max() * scales.min()).sqrt()
pre_fc.weight.div_(scales.view(-1, 1))
if getattr(pre_fc, 'bias', None) is not None:
pre_fc.bias.div_(scales)
for fc in fcs:
fc.weight.mul_(scales.view(1, -1))
for p in pre_fc.parameters():
assert torch.isnan(p).sum() == 0
for fc in fcs:
for p in fc.parameters():
assert torch.isnan(p).sum() == 0
return scales
def check_awq_supported(layer_type):
"""Check if the smooth function is supported by inspecting layer type."""
norm_fcs_found = False
fc_fcs_found = False
if isinstance(layer_type, str):
if layer_type in NORM_FCS_MAP:
norm_fcs_found = True
if layer_type in FC_FCS_MAP:
fc_fcs_found = True
elif isinstance(layer_type, type):
if layer_type.__name__ in NORM_FCS_MAP:
norm_fcs_found = True
if layer_type.__name__ in FC_FCS_MAP:
fc_fcs_found = True
else:
raise NotImplementedError
if not norm_fcs_found:
raise NotImplementedError
if not fc_fcs_found:
raise NotImplementedError
def quant_weights(model, fcs, bits, symmetry, group_size=-1, device='cuda'):
"""Quantize the weights of the target model's linear layers."""
from lmdeploy.lite.quantization import WeightQuantizer
from lmdeploy.pytorch.modules import WeightOnlyQLinear
for name, fc in fcs.items():
fc.to(device)
quantizer = WeightQuantizer(bits, symmetry, 'per_group', group_size)
q_linear = WeightOnlyQLinear.from_linear(fc, quantizer)
parent_name, _, child_name = name.rpartition('.')
parent = model.get_submodule(parent_name)
fc.to('cpu')
setattr(parent, child_name, q_linear)
print(f'{name} weight packed.')
def smooth_layers(layers,
fc2fcs,
norm2fcs,
a_scales,
group_size=-1,
device='cuda'):
"""Apply weight smoothing based on input scales."""
for l_name, layer in layers.items():
layer.to(device)
for ln_name, fc_names in norm2fcs.items():
a_name = [f'{l_name}.{n}' for n in fc_names][0]
ln = layer.get_submodule(ln_name)
fcs = [layer.get_submodule(n) for n in fc_names]
smooth_ln_fcs(ln, fcs, a_scales[a_name], group_size)
for f_name, fc_names in fc2fcs.items():
a_name = [f'{l_name}.{n}' for n in fc_names][0]
fc = layer.get_submodule(f_name)
fcs = [layer.get_submodule(n) for n in fc_names]
smooth_fc_fcs(fc, fcs, a_scales[a_name], group_size)
layer.to('cpu')
print(f'{l_name} smooth weight done.')
# Copyright (c) OpenMMLab. All rights reserved.
from functools import partial
from typing import Union
import torch
from torch import nn
from transformers import PreTrainedTokenizer
from lmdeploy.lite.quantization.activation import (ActivationObserver,
KVCacheObserver)
from lmdeploy.lite.utils import (bimap_name_mod, collect_target_modules,
concat_decoder_layer_outputs,
split_decoder_layer_inputs)
class CalibrationContext():
"""Calibration context manager for model quantization.
Parameters:
- model: The target model to be calibrated and quantized
- tokenizer: The tokenizer used in the model training
- layer_type: Layer type to be targeted for calibration
- norm_type: Normalization type used for calibration
- device: Device on which model is to be calibrated ('cpu' or 'cuda')
"""
inp_obs_group = 'inputs'
out_obs_group = 'outputs'
key_obs_group = 'keys'
value_obs_group = 'values'
def __init__(self,
model: nn.Module,
tokenizer: PreTrainedTokenizer,
layer_type: Union[str, type],
norm_type: Union[str, type],
device: str = 'cuda') -> None:
"""Initiate calibration context.
Args:
model (nn.Module): Model to be calibrated.
tokenizer (PreTrainedTokenizer): Tokenizer of the given model.
layer_type (Union[str, type]): Type of the layers to be observed.
norm_type (Union[str, type]): Norm type used in the model.
device (str, optional): Device where the model should run.
Defaults to 'cuda'.
"""
self.layer_type = layer_type
self.norm_type = norm_type
self.num_head = self._guess_num_heads(model)
self.head_dim = model.config.hidden_size // self.num_head
self.model = model
self.tokenizer = tokenizer
# Collect modules to observe
self.name2layer = collect_target_modules(self.model, layer_type)
self.name2fc = {}
for l_name, layer in self.name2layer.items():
name2fc = collect_target_modules(layer, nn.Linear, prefix=l_name)
self.name2fc.update(name2fc)
self.name2norm = collect_target_modules(self.model, norm_type)
maps = bimap_name_mod([self.name2layer, self.name2fc, self.name2norm])
self.name2mod, self.mod2name = maps
# Initialize observers
self._init_input_observers(self.name2fc)
self._init_output_observers(self.name2norm)
self._init_output_observers(self.name2fc)
self._init_kv_observers(self.name2layer)
self.device = device
def _guess_num_heads(self, model):
if hasattr(model.config, 'num_attention_heads'):
return model.config.num_attention_heads
elif hasattr(model.config, 'num_key_value_heads'):
return model.config.num_key_value_heads
else:
raise KeyError
def _init_input_observers(self, name2mod):
"""Initialize input observers for given modules."""
for name, mod in name2mod.items():
obs = ActivationObserver(mod.weight.size(-1))
obs.global_available(name, group=self.inp_obs_group)
def _init_output_observers(self, name2mod):
"""Initialize output observers for given modules."""
for name, mod in name2mod.items():
obs = ActivationObserver(mod.weight.size(0))
obs.global_available(name, group=self.out_obs_group)
def _init_kv_observers(self, name2mod):
"""Initialize KV observers for given modules."""
for name in name2mod.keys():
k_obs = KVCacheObserver(self.num_head, self.head_dim)
v_obs = KVCacheObserver(self.num_head, self.head_dim)
k_obs.global_available(name, group=self.key_obs_group)
v_obs.global_available(name, group=self.value_obs_group)
def _insert_input_observers(self):
"""Insert input observers into the target modules.
This function registers a forward pre-hook on each target module to
observe the inputs.
"""
def _input_hook(mod: nn.Module, inp: torch.Tensor):
m_name = self.mod2name[mod]
obs = ActivationObserver.find(m_name, group=self.inp_obs_group)
obs.observe(inp[0])
group = ActivationObserver.find_group(self.inp_obs_group)
for name in group.keys():
mod = self.name2mod[name]
hook_fn = mod.register_forward_pre_hook(_input_hook)
self._hooks.append(hook_fn)
def _insert_output_observers(self):
"""Insert output observers into the target modules.
This function registers a forward hook on each target module to observe
the outputs.
"""
def _output_hook(mod: nn.Module, inp: torch.Tensor, out: torch.Tensor):
m_name = self.mod2name[mod]
obs = ActivationObserver.find(m_name, group=self.out_obs_group)
obs.observe(out)
group = ActivationObserver.find_group(self.out_obs_group)
for name in group.keys():
mod = self.name2mod[name]
hook_fn = mod.register_forward_hook(_output_hook)
self._hooks.append(hook_fn)
def _wrap_decoder_layers(self):
"""Method to wrap the decoder layers' forward functions for observing
their key/value cache during batched forward passes."""
def _forward(mod, *args, **kwargs):
mod.to(self.device)
batch_args, batch_kwargs = split_decoder_layer_inputs(
*args, **kwargs)
batch_outputs = []
samples = len(batch_args)
m_name = self.mod2name[mod]
k_obs = KVCacheObserver.find(m_name, group=self.key_obs_group)
v_obs = KVCacheObserver.find(m_name, group=self.value_obs_group)
for i in range(len(batch_args)):
if k_obs and v_obs:
batch_kwargs[i]['use_cache'] = True
out = self._ori_forwards[mod](*batch_args[i],
**batch_kwargs[i])
out = list(out)
key, value = out.pop(-1)
k_obs.observe(key)
v_obs.observe(value)
del key, value
torch.cuda.empty_cache()
batch_outputs.append(tuple(out))
else:
batch_outputs.append(self._ori_forwards[mod](
*batch_args[i], **batch_kwargs[i]))
outputs = concat_decoder_layer_outputs(batch_outputs)
del batch_outputs, batch_args, batch_kwargs, args
mod.to('cpu')
torch.cuda.empty_cache()
max_memory = torch.cuda.max_memory_allocated() / 1024 / 1024 / 1024
print(f'{m_name}, samples: {samples}, '
f'max gpu memory: {max_memory:.2f} GB')
return outputs
for layer in self.name2layer.values():
self._ori_forwards[layer] = layer.forward
layer.forward = partial(_forward, layer)
def collect_inputs_stats(self):
"""Collect statistics (min, max, absmax values) of the observed inputs.
Returns a dictionary with these collected stats.
"""
inputs_stats = {
'max': {},
'min': {},
'mean': {},
'absmax': {},
'absmean': {}
}
obs_group = ActivationObserver.find_group(self.inp_obs_group)
for name, obs in obs_group.items():
inputs_stats['max'][name] = obs.max_val
inputs_stats['min'][name] = obs.min_val
inputs_stats['mean'][name] = obs.mean_val
inputs_stats['absmax'][name] = obs.absmax_val
inputs_stats['absmean'][name] = obs.absmean_val
return inputs_stats
def collect_outputs_stats(self):
"""Collect statistics (min, max, absmax values) of the observed
outputs.
Returns a dictionary with these collected stats.
"""
outputs_stats = {
'max': {},
'min': {},
'mean': {},
'absmax': {},
'absmean': {}
}
obs_group = ActivationObserver.find_group(self.out_obs_group)
for name, obs in obs_group.items():
outputs_stats['max'][name] = obs.max_val
outputs_stats['min'][name] = obs.min_val
outputs_stats['mean'][name] = obs.mean_val
outputs_stats['absmax'][name] = obs.absmax_val
outputs_stats['absmean'][name] = obs.absmean_val
return outputs_stats
def collect_kv_stats(self):
"""Collect statistics (min, max, absmax values) of the observed keys
and values.
Returns a tuple of two dictionaries with these collected stats.
"""
key_stats = {'max': {}, 'min': {}, 'absmax': {}}
obs_group = KVCacheObserver.find_group(self.key_obs_group)
for name, obs in obs_group.items():
key_stats['max'][name] = obs.max_val
key_stats['min'][name] = obs.min_val
key_stats['absmax'][name] = obs.absmax_val
value_stats = {'max': {}, 'min': {}, 'absmax': {}}
obs_group = KVCacheObserver.find_group(self.value_obs_group)
for name, obs in obs_group.items():
value_stats['max'][name] = obs.max_val
value_stats['min'][name] = obs.min_val
value_stats['absmax'][name] = obs.absmax_val
return key_stats, value_stats
def export(self, out_dir):
"""Export the calibration statistics (inputs, outputs, keys and values)
to specified directory.
Args:
out_dir (Union[str, Path]): The directory path where the stats
will be saved.
"""
inp_stats = self.collect_inputs_stats()
torch.save(inp_stats, out_dir / 'inputs_stats.pth')
out_stats = self.collect_outputs_stats()
torch.save(out_stats, out_dir / 'outputs_stats.pth')
key_stats, value_stats = self.collect_kv_stats()
torch.save(key_stats, out_dir / 'key_stats.pth')
torch.save(value_stats, out_dir / 'value_stats.pth')
def calibrate(self, data):
"""Forward pass through the model in inference mode with given data."""
with torch.inference_mode():
_ = self.model.model(data.to(self.device))
def __enter__(self):
"""Prepares the Calibration object for a 'with' statement by
registering hooks and wrapping layer forward methods."""
self._hooks = list()
self._ori_forwards = {}
for layer in self.name2layer.values():
self._ori_forwards[layer] = layer.forward
self._insert_input_observers()
self._insert_output_observers()
self._wrap_decoder_layers()
def __exit__(self, exc_type, exc_value, traceback):
"""Clean up after a 'with' statement by removing registered hooks,
restoring original forward methods, and if no exception occurred,
collecting all gathered statistics and saving them."""
for h in self._hooks:
h.remove()
for layer in self.name2layer.values():
layer.forward = self._ori_forwards[layer]
# Copyright (c) OpenMMLab. All rights reserved.
from typing import Any, Callable
class Observer:
"""The Observer class applies a user-specified function on its inputs and
stores the results in a buffer.
Args:
observe_fn (Callable[..., Any]): The function to apply on inputs.
"""
def __init__(self, observe_fn: Callable[..., Any]) -> None:
super().__init__()
self.fn = observe_fn
self.buffer = list()
self.enabled = False
def enable_observer(self, enabled: bool = True) -> None:
"""Enable or disable the observer.
Args:
enabled (bool, optional): Whether to enable the observer.
Defaults to True.
"""
self.enabled = enabled
def __call__(self, *args: Any, **kwargs: Any) -> Any:
"""Apply the observer function on the input if the observer is enabled.
Args:
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
if self.enabled:
self.buffer.append(self.fn(*args, **kwargs))
# Copyright (c) OpenMMLab. All rights reserved.
from .quantizer import WeightQuantizer
__all__ = ['WeightQuantizer']
# Copyright (c) OpenMMLab. All rights reserved.
from typing import Callable, Dict, Optional
import torch
from lmdeploy.lite.utils import (QParams, cal_qparams_per_channel_absmax,
cal_qparams_per_channel_minmax,
cal_qparams_per_group_absmax,
cal_qparams_per_group_minmax,
cal_qparams_per_tensor_absmax,
cal_qparams_per_tensor_minmax)
from lmdeploy.lite.utils.global_avail import GlobalAvailMixin
class WeightQuantizer(GlobalAvailMixin):
"""A class for performing weight quantization of neural networks.
The WeightQuantizer class provides various methods to quantize the weights
of a neural network. This helps in reducing the memory requirements and
computational complexity of the model, potentially offering faster
inference and lower power consumption.
Attributes:
bits (int): The bit width for quantization.
symmetry (bool): If True, use absmax scaling; if False,
use min-max scaling.
granularity (str): The granularity of quantization. Available options
are 'per_channel', 'per_tensor', and 'per_group'.
group_size (Optional[int]): If using 'per_group' quantization, this is
the number of channels in each group.
Example:
# Instantiate the weight quantizer with specific quantization settings
quantizer = WeightQuantizer(bits=8,
symmetry=True,
granularity='per_tensor')
# Calculate the quantization parameters for given weights
qparams = quantizer.calculate_qparams(weights)
# Perform fake quantization on the weights
quantized_weights = quantizer.fake_quant(weights, qparams)
"""
CAL_FUNC_MAP: Dict[str, Dict[str, Callable]] = {
'per_group': {
'absmax': cal_qparams_per_group_absmax,
'minmax': cal_qparams_per_group_minmax,
},
'per_channel': {
'absmax': cal_qparams_per_channel_absmax,
'minmax': cal_qparams_per_channel_minmax,
},
'per_tensor': {
'absmax': cal_qparams_per_tensor_absmax,
'minmax': cal_qparams_per_tensor_minmax,
},
}
def __init__(self,
bits: int,
symmetry: bool,
granularity: str,
group_size: Optional[int] = -1):
assert bits in [4, 8], "The 'bits' argument must be either 4 or 8."
self.bits = bits
if granularity not in ['per_channel', 'per_tensor', 'per_group']:
raise NotImplementedError(
"The 'granularity' argument must be one of 'per_channel', "
"'per_tensor', or 'per_group'.")
self.granularity = granularity
if self.granularity == 'per_group':
assert group_size > 0, \
"The 'group_size' argument must be greater than 0."
self.group_size = group_size
# If symmetry is True, use absmax to compute scales
# If symmetry is False, use minmax to compute scales and zeor-points
self.symmetry = symmetry
self.observer = 'absmax' if symmetry else 'minmax'
def calculate_qparams(self, weight: torch.Tensor) -> QParams:
"""Calculate the quantization parameters for the given weight tensor.
Args:
weight (torch.Tensor): The weight tensor with shape
(out_features, in_features).
Returns:
QParams: A namedtuple containing 'scales' and 'zero_points'.
"""
cal_func = self.CAL_FUNC_MAP[self.granularity][self.observer]
if self.granularity == 'per_group':
return cal_func(weight, self.bits, self.group_size)
else:
return cal_func(weight, self.bits)
def quant(self,
weight: torch.Tensor,
qparams: Optional[QParams] = None,
real: bool = False) -> torch.Tensor:
"""Perform fake quantization on the given weight tensor.
Args:
weight (torch.Tensor): The weight tensor with shape
(out_features, in_features).
qparams (Optional[QParams]): A namedtuple containing 'scales'
and 'zero_points'.
real (bool): If True, return the tensor with quantized type.
Returns:
torch.Tensor: The fake quantized weight tensor.
"""
if qparams is None:
qparams = self.calculate_qparams(weight)
scales = qparams.scales
zero_points = qparams.zero_points
out_c, in_c = weight.shape
# Reshape the weights if using per_group quantization
# per tensor scales shape: [1]
# per channel scales shape: [out_c, 1]
# per group scales shape: [out_c, in_c//group_size, 1]
if len(scales.shape) > 2:
# scales shape: [out_c, in_c//group_size, 1]
weight = weight.reshape(out_c, scales.shape[1], -1)
if zero_points is None:
assert self.symmetry
real_qweight = (weight / scales).round()
fake_qweight = real_qweight * scales
else:
assert not self.symmetry
real_qweight = (weight / scales).round() + zero_points
fake_qweight = (real_qweight - zero_points) * scales
if len(scales.shape) > 2:
real_qweight = real_qweight.reshape(out_c, in_c)
fake_qweight = fake_qweight.reshape(out_c, in_c)
if real:
return real_qweight.to(torch.int32)
else:
return fake_qweight
# Copyright (c) OpenMMLab. All rights reserved. # Copyright (c) OpenMMLab. All rights reserved.
from .cal_qparams import (cal_qparams_per_channel_absmax, from .batch_split import (concat_decoder_layer_outputs,
split_decoder_layer_inputs)
from .cal_qparams import (QParams, cal_qparams_per_channel_absmax,
cal_qparams_per_channel_minmax, cal_qparams_per_channel_minmax,
cal_qparams_per_group_absmax, cal_qparams_per_group_absmax,
cal_qparams_per_group_minmax, cal_qparams_per_group_minmax,
cal_qparams_per_tensor_absmax, cal_qparams_per_tensor_absmax,
cal_qparams_per_tensor_minmax) cal_qparams_per_tensor_minmax)
from .calib_dataloader import get_calib_loaders from .calib_dataloader import get_calib_loaders
from .collect import collect_target_modules, collect_target_weights from .collect import (bimap_name_mod, collect_target_modules,
from .memory_efficient import memory_efficient_inference collect_target_weights)
from .global_avail import GlobalAvailMixin
__all__ = [ __all__ = [
'cal_qparams_per_channel_absmax', 'cal_qparams_per_channel_minmax', 'cal_qparams_per_channel_absmax', 'cal_qparams_per_channel_minmax',
'cal_qparams_per_group_absmax', 'cal_qparams_per_group_minmax', 'cal_qparams_per_group_absmax', 'cal_qparams_per_group_minmax',
'cal_qparams_per_tensor_absmax', 'cal_qparams_per_tensor_minmax', 'cal_qparams_per_tensor_absmax', 'cal_qparams_per_tensor_minmax',
'get_calib_loaders', 'memory_efficient_inference', 'QParams', 'get_calib_loaders', 'collect_target_modules',
'collect_target_modules', 'collect_target_weights' 'collect_target_weights', 'GlobalAvailMixin', 'split_decoder_layer_inputs',
'bimap_name_mod', 'concat_decoder_layer_outputs'
] ]
# Copyright (c) OpenMMLab. All rights reserved.
from typing import Any, Dict, List, Tuple, Union
import torch
def split_decoder_layer_inputs(
*args: Union[torch.Tensor, Any], **kwargs: Union[torch.Tensor, Any]
) -> Tuple[List[List[Any]], List[Dict[str, Any]]]:
"""This function splits batched decoder layer inputs into individual
elements.
Args:
*args (Union[torch.Tensor, Any]): Positional arguments which could
be a mix of tensors and other types.
**kwargs (Union[torch.Tensor, Any]): Keyword arguments which could
be a mix of tensors and other types.
Returns:
Tuple[List[List[Any]], List[Dict[str, Any]]]: A tuple containing two
lists, one for positional arguments, one for keyword arguments.
Each list contains individual elements from the batch.
"""
if not isinstance(args[0], torch.Tensor):
raise ValueError('The first argument must be a Tensor')
bs = args[0].size(0)
batch_args = []
batch_kwargs = []
for i in range(bs):
new_args = []
# Iterate over each argument. If it's a torch.Tensor and its first
# dimension equals the batch size, then get the value corresponding
# to the current index, else directly add the whole value.
for val in args:
if isinstance(val, torch.Tensor) and val.size(0) == bs:
new_args.append(val[i:i + 1])
else:
new_args.append(val)
new_kwargs = {}
# Execute the same operation for the keyword arguments.
for name, val in kwargs.items():
if isinstance(val, torch.Tensor) and val.size(0) == bs:
new_kwargs[name] = val[i:i + 1]
else:
new_kwargs[name] = val
batch_args.append(new_args)
batch_kwargs.append(new_kwargs)
return batch_args, batch_kwargs
def concat_decoder_layer_outputs(
batch_outputs: List[Tuple[Any]]) -> Tuple[Any]:
"""This function concatenates individual decoder layer outputs into a
batched output.
Args:
batch_outputs (List[Tuple[Any]]): A list of tuples, where each tuple
represents the output from an individual element in the batch.
Returns:
Tuple[Any]: A tuple representing the batched output.
"""
num_returns = len(batch_outputs[0])
def is_past_key_value(data: Any) -> bool:
"""Check whether data is a past key-value pair.
Args:
data (Any): The data to check.
Returns:
bool: True if data is a past key-value pair, False otherwise.
"""
flag = isinstance(data, tuple)
flag = flag and len(data) == 2
flag = flag and isinstance(data[0], torch.Tensor)
flag = flag and isinstance(data[1], torch.Tensor)
return flag
new_outputs = []
# Iterate over all types of return values.
for i in range(num_returns):
# Check if the current element is a past key-value pair.
flag = is_past_key_value(batch_outputs[0][i])
if flag:
# Concatenate the keys and values separately.
key = torch.cat([out[i][0] for out in batch_outputs])
value = torch.cat([out[i][1] for out in batch_outputs])
out_i = (key, value)
else:
# If it's not a past key-value pair, concatenate directly.
out_i = torch.cat([out[i] for out in batch_outputs])
new_outputs.append(out_i)
return tuple(new_outputs)
...@@ -12,18 +12,26 @@ class QParams(NamedTuple): ...@@ -12,18 +12,26 @@ class QParams(NamedTuple):
@torch.no_grad() @torch.no_grad()
def cal_qparams_per_channel_absmax(w: torch.Tensor, n_bits: int) -> QParams: def cal_qparams_per_channel_absmax(w: torch.Tensor,
n_bits: int,
return_stats: bool = False) -> QParams:
"""Calculate quantization parameters for each channel using absolute max """Calculate quantization parameters for each channel using absolute max
value.""" value."""
scales = w.abs().max(dim=-1, keepdim=True)[0] absmax = w.abs().max(dim=-1, keepdim=True)[0]
q_max = 2**(n_bits - 1) - 1 q_max = 2**(n_bits - 1) - 1
scales = scales.clamp_(min=1e-5).div_(q_max) scales = absmax.clamp(min=1e-5).div(q_max)
return QParams(scales=scales, zero_points=None)
if return_stats:
return QParams(scales=scales, zero_points=None), absmax
else:
return QParams(scales=scales, zero_points=None)
@torch.no_grad() @torch.no_grad()
def cal_qparams_per_channel_minmax(w: torch.Tensor, n_bits: int) -> QParams: def cal_qparams_per_channel_minmax(w: torch.Tensor,
n_bits: int,
return_stats: bool = False) -> QParams:
"""Calculate quantization parameters for each channel using min and max """Calculate quantization parameters for each channel using min and max
values.""" values."""
...@@ -36,12 +44,17 @@ def cal_qparams_per_channel_minmax(w: torch.Tensor, n_bits: int) -> QParams: ...@@ -36,12 +44,17 @@ def cal_qparams_per_channel_minmax(w: torch.Tensor, n_bits: int) -> QParams:
zero_points = (-w_min / scales).round() zero_points = (-w_min / scales).round()
return QParams(scales=scales, zero_points=zero_points) if return_stats:
return QParams(scales=scales, zero_points=zero_points), (w_min, w_max)
else:
return QParams(scales=scales, zero_points=zero_points)
@torch.no_grad() @torch.no_grad()
def cal_qparams_per_group_absmax(w: torch.Tensor, n_bits: int, def cal_qparams_per_group_absmax(w: torch.Tensor,
group_size: int) -> QParams: n_bits: int,
group_size: int,
return_stats: bool = False) -> QParams:
"""Calculate quantization parameters for each group using absolute max """Calculate quantization parameters for each group using absolute max
value.""" value."""
...@@ -50,15 +63,20 @@ def cal_qparams_per_group_absmax(w: torch.Tensor, n_bits: int, ...@@ -50,15 +63,20 @@ def cal_qparams_per_group_absmax(w: torch.Tensor, n_bits: int,
'Input channels should be greater than or equal to group_size.' 'Input channels should be greater than or equal to group_size.'
assert inc % group_size == 0, \ assert inc % group_size == 0, \
'Input channels should be divisible by group_size.' 'Input channels should be divisible by group_size.'
scales = w.abs().reshape(outc, -1, group_size).max(dim=-1, keepdim=True)[0] absmax = w.abs().reshape(outc, -1, group_size).max(dim=-1, keepdim=True)[0]
q_max = 2**(n_bits - 1) - 1 q_max = 2**(n_bits - 1) - 1
scales = scales.clamp_(min=1e-5).div_(q_max) scales = absmax.clamp(min=1e-5).div(q_max)
return QParams(scales=scales, zero_points=None) if return_stats:
return QParams(scales=scales, zero_points=None), absmax
else:
return QParams(scales=scales, zero_points=None)
@torch.no_grad() @torch.no_grad()
def cal_qparams_per_group_minmax(w: torch.Tensor, n_bits: int, def cal_qparams_per_group_minmax(w: torch.Tensor,
group_size: int) -> QParams: n_bits: int,
group_size: int,
return_stats: bool = False) -> QParams:
"""Calculate quantization parameters for each group using min and max """Calculate quantization parameters for each group using min and max
values.""" values."""
...@@ -75,11 +93,16 @@ def cal_qparams_per_group_minmax(w: torch.Tensor, n_bits: int, ...@@ -75,11 +93,16 @@ def cal_qparams_per_group_minmax(w: torch.Tensor, n_bits: int,
scales = (w_max - w_min) scales = (w_max - w_min)
scales = scales.clamp_(min=1e-5).div_(q_max) scales = scales.clamp_(min=1e-5).div_(q_max)
zero_points = (-w_min / scales).round() zero_points = (-w_min / scales).round()
return QParams(scales=scales, zero_points=zero_points) if return_stats:
return QParams(scales=scales, zero_points=zero_points), (w_min, w_max)
else:
return QParams(scales=scales, zero_points=zero_points)
@torch.no_grad() @torch.no_grad()
def cal_qparams_per_tensor_minmax(w: torch.Tensor, n_bits: int) -> QParams: def cal_qparams_per_tensor_minmax(w: torch.Tensor,
n_bits: int,
return_stats: bool = False) -> QParams:
"""Calculate quantization parameters for the entire tensor using min and """Calculate quantization parameters for the entire tensor using min and
max values.""" max values."""
...@@ -90,15 +113,23 @@ def cal_qparams_per_tensor_minmax(w: torch.Tensor, n_bits: int) -> QParams: ...@@ -90,15 +113,23 @@ def cal_qparams_per_tensor_minmax(w: torch.Tensor, n_bits: int) -> QParams:
scales = (w_max - w_min) scales = (w_max - w_min)
scales = scales.clamp_(min=1e-5).div_(q_max) scales = scales.clamp_(min=1e-5).div_(q_max)
zero_points = (-w_min / scales).round() zero_points = (-w_min / scales).round()
return QParams(scales=scales, zero_points=zero_points) if return_stats:
return QParams(scales=scales, zero_points=zero_points), (w_min, w_max)
else:
return QParams(scales=scales, zero_points=zero_points)
@torch.no_grad() @torch.no_grad()
def cal_qparams_per_tensor_absmax(w: torch.Tensor, n_bits: int) -> QParams: def cal_qparams_per_tensor_absmax(w: torch.Tensor,
n_bits: int,
return_stats: bool = False) -> QParams:
"""Calculate quantization parameters for the entire tensor using absolute """Calculate quantization parameters for the entire tensor using absolute
max value.""" max value."""
absmax = w.abs().max()
scales = w.abs().max()
q_max = 2**(n_bits - 1) - 1 q_max = 2**(n_bits - 1) - 1
scales = scales.clamp_(min=1e-5).div_(q_max) scales = absmax.clamp(min=1e-5).div(q_max)
return QParams(scales=scales, zero_points=None)
if return_stats:
return QParams(scales=scales, zero_points=None), absmax
else:
return QParams(scales=scales, zero_points=None)
...@@ -16,6 +16,7 @@ def get_wikitext2(tokenizer, nsamples, seed, seqlen): ...@@ -16,6 +16,7 @@ def get_wikitext2(tokenizer, nsamples, seed, seqlen):
nsamples: Number of samples to take from train set. nsamples: Number of samples to take from train set.
seed: Random seed for sampling. seed: Random seed for sampling.
seqlen: Maximum sequence length. seqlen: Maximum sequence length.
Returns: Returns:
train_loader: List of sampled and tokenized training examples. train_loader: List of sampled and tokenized training examples.
test_enc: Full tokenized Wikitext-2 test set. test_enc: Full tokenized Wikitext-2 test set.
...@@ -48,6 +49,7 @@ def get_ptb(tokenizer, nsamples, seed, seqlen): ...@@ -48,6 +49,7 @@ def get_ptb(tokenizer, nsamples, seed, seqlen):
nsamples: Number of samples to take from train set. nsamples: Number of samples to take from train set.
seed: Random seed for sampling. seed: Random seed for sampling.
seqlen: Maximum sequence length. seqlen: Maximum sequence length.
Returns: Returns:
train_loader: List of sampled and tokenized training examples. train_loader: List of sampled and tokenized training examples.
test_enc: Full tokenized PTB validation set. test_enc: Full tokenized PTB validation set.
...@@ -83,6 +85,7 @@ def get_c4(tokenizer, nsamples, seed, seqlen): ...@@ -83,6 +85,7 @@ def get_c4(tokenizer, nsamples, seed, seqlen):
nsamples: Number of samples to take from train set. nsamples: Number of samples to take from train set.
seed: Random seed for sampling. seed: Random seed for sampling.
seqlen: Maximum sequence length. seqlen: Maximum sequence length.
Returns: Returns:
train_loader: List of sampled and tokenized training examples. train_loader: List of sampled and tokenized training examples.
test_enc: Full tokenized PTB validation set. test_enc: Full tokenized PTB validation set.
...@@ -149,6 +152,7 @@ def get_ptb_new(tokenizer, nsamples, seed, seqlen): ...@@ -149,6 +152,7 @@ def get_ptb_new(tokenizer, nsamples, seed, seqlen):
nsamples: Number of samples to take from train set. nsamples: Number of samples to take from train set.
seed: Random seed for sampling. seed: Random seed for sampling.
seqlen: Maximum sequence length. seqlen: Maximum sequence length.
Returns: Returns:
train_loader: List of sampled and tokenized training examples. train_loader: List of sampled and tokenized training examples.
test_enc: Full tokenized PTB validation set. test_enc: Full tokenized PTB validation set.
...@@ -181,6 +185,7 @@ def get_c4_new(tokenizer, nsamples, seed, seqlen): ...@@ -181,6 +185,7 @@ def get_c4_new(tokenizer, nsamples, seed, seqlen):
nsamples: Number of samples to take from train set. nsamples: Number of samples to take from train set.
seed: Random seed for sampling. seed: Random seed for sampling.
seqlen: Maximum sequence length. seqlen: Maximum sequence length.
Returns: Returns:
train_loader: List of sampled and tokenized training examples. train_loader: List of sampled and tokenized training examples.
test_enc: Full tokenized PTB validation set. test_enc: Full tokenized PTB validation set.
...@@ -234,6 +239,7 @@ def get_pileval(tokenizer, nsamples, seed, seqlen=512): ...@@ -234,6 +239,7 @@ def get_pileval(tokenizer, nsamples, seed, seqlen=512):
nsamples: Number of samples to take from train set. nsamples: Number of samples to take from train set.
seed: Random seed for sampling. seed: Random seed for sampling.
seqlen: Maximum sequence length. seqlen: Maximum sequence length.
Returns: Returns:
train_loader: List of sampled and tokenized training examples. train_loader: List of sampled and tokenized training examples.
test_enc: Full tokenized PTB validation set. test_enc: Full tokenized PTB validation set.
...@@ -285,6 +291,7 @@ def get_calib_loaders(name, tokenizer, nsamples=128, seed=0, seqlen=2048): ...@@ -285,6 +291,7 @@ def get_calib_loaders(name, tokenizer, nsamples=128, seed=0, seqlen=2048):
nsamples: Number of samples to take from train set. nsamples: Number of samples to take from train set.
seed: Random seed for sampling. seed: Random seed for sampling.
seqlen: Maximum sequence length. seqlen: Maximum sequence length.
Returns: Returns:
train_loader: List of sampled and tokenized training examples. train_loader: List of sampled and tokenized training examples.
test_data: Full tokenized validation set. test_data: Full tokenized validation set.
......
# Copyright (c) OpenMMLab. All rights reserved. # Copyright (c) OpenMMLab. All rights reserved.
from typing import Dict, List, Tuple, Union
from mmengine.config.lazy import LazyAttr
from torch import nn from torch import nn
def collect_target_weights(model: nn.Module, target_module_types: type, def collect_target_modules(model: nn.Module,
skip_modules: list) -> dict: target: Union[str, type],
"""Collects target weight tensors in the model and returns them in a skip_names: List[str] = [],
dictionary. prefix: str = '') -> Dict[str, nn.Module]:
"""Collects the specific target modules from the model.
Args: Args:
model (nn.Module): Model containing the target modules. model : The PyTorch module from which to collect the target modules.
target (type): Target module type, e.g., nn.Linear. target : The specific target to be collected. It can be a class of a
skip_modules (list): List of modules that should not be included in module or the name of a module.
the result. skip_names : List of names of modules to be skipped during collection.
prefix : A string to be added as a prefix to the module names.
Returns: Returns:
dict: A dictionary containing the target weight tensors in the model. A dictionary mapping from module names to module instances.
""" """
target_weights = {}
for name, module in model.named_modules():
if isinstance(module,
target_module_types) and name not in skip_modules:
assert hasattr(module, 'weight')
target_weights[name] = module.weight
return target_weights if isinstance(target, LazyAttr):
target = target.build()
if not isinstance(target, (type, str)):
raise TypeError('Target must be a string (name of the module) '
'or a type (class of the module)')
def collect_target_modules(model: nn.Module, def _is_target(n, m):
target_module_types: type, if isinstance(target, str):
skip_modules: list = []) -> dict: return target == type(m).__name__ and n not in skip_names
"""Collects target weight tensors in the model and returns them in a return isinstance(m, target) and n not in skip_names
dictionary.
name2mod = {}
for name, mod in model.named_modules():
m_name = f'{prefix}.{name}' if prefix else name
if _is_target(name, mod):
name2mod[m_name] = mod
return name2mod
def collect_target_weights(model: nn.Module, target: Union[str, type],
skip_names: List[str]) -> Dict[str, nn.Module]:
"""Collects weights of the specific target modules from the model.
Args:
model : The PyTorch module from which to collect the weights of
target modules.
target : The specific target whose weights to be collected. It can be
a class of a module or the name of a module.
skip_names : Names of modules to be skipped during weight collection.
Returns:
A dictionary mapping from module instances to their
corresponding weights.
"""
named_modules = collect_target_modules(model, target, skip_names)
mod2weight = {}
for _, mod in named_modules.items():
assert hasattr(
mod, 'weight'), "The module does not have a 'weight' attribute"
mod2weight[mod] = mod.weight
return mod2weight
def bimap_name_mod(
name2mod_mappings: List[Dict[str, nn.Module]]
) -> Tuple[Dict[str, nn.Module], Dict[nn.Module, str]]:
"""Generates bidirectional maps from module names to module instances and
vice versa.
Args: Args:
model (nn.Module): Model containing the target modules. name2mod_mappings : List of dictionaries each mapping from module
target (type): Target module type, e.g., nn.Linear. names to module instances.
skip_modules (list): List of modules that should not be included in
the result.
Returns: Returns:
dict: A dictionary containing the target modules in the model. Two dictionaries providing bidirectional mappings between module
names and module instances.
""" """
target_modules = {}
for name, module in model.named_modules():
if isinstance(module,
target_module_types) and name not in skip_modules:
target_modules[name] = module
return target_modules name2mod = {}
mod2name = {}
for mapping in name2mod_mappings:
mod2name.update({v: k for k, v in mapping.items()})
name2mod.update(mapping)
return name2mod, mod2name
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment