Commit ccfcffb1 authored by chenzk's avatar chenzk
Browse files

v1.0

parents
Pipeline #805 canceled with stages
FROM image.sourcefind.cn:5000/dcu/admin/base/pytorch:2.1.0-centos7.6-dtk23.10-py38
ENV DEBIAN_FRONTEND=noninteractive
# RUN yum update && yum install -y git cmake wget build-essential
RUN source /opt/dtk-23.10/env.sh
# 安装pip相关依赖
COPY requirements.txt requirements.txt
RUN pip3 install -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com -r requirements.txt
torch>=2.1.0dev
lightning==2.1.2
lightning[app]
jsonargparse[signatures] # CLI
pandas
pyarrow
tokenizers
sentencepiece
wandb
zstd
# for finetuning
bitsandbytes==0.40.0
transformers==4.31.0
peft==0.4.0
accelerate==0.21.0
einops==0.6.1
evaluate==0.4.0
scikit-learn==1.2.2
sentencepiece==0.1.99
wandb==0.15.3
# other optional dependencies are
# sentencepiece # pythia, falcon, redpajama
# tokenizers # llama-based models
# bitsandbytes>=0.41.1 # quantize/bnb.py
# scipy # TODO: remove when https://github.com/TimDettmers/bitsandbytes/pull/525 is released
# datasets # quantize/gptq.py
# zstandard # scripts/prepare_redpajama.py
# git+https://github.com/EleutherAI/lm-evaluation-harness.git@master # eval
from lit_gpt.model import GPT
from lit_gpt.config import Config
from lit_gpt.tokenizer import Tokenizer
from lit_gpt.fused_cross_entropy import FusedCrossEntropyLoss
from lightning_utilities.core.imports import RequirementCache
if not bool(RequirementCache("torch>=2.1.0dev")):
raise ImportError(
"Lit-GPT requires torch nightly (future torch 2.1). Please follow the installation instructions in the"
" repository README.md"
)
_LIGHTNING_AVAILABLE = RequirementCache("lightning>=2.1.0.dev0")
if not bool(_LIGHTNING_AVAILABLE):
raise ImportError(
"Lit-GPT requires Lightning nightly (future lightning 2.1). Please run:\n"
f" pip uninstall -y lightning; pip install -r requirements.txt\n{str(_LIGHTNING_AVAILABLE)}"
)
__all__ = ["GPT", "Config", "Tokenizer"]
"""Implementation of the paper:
LLaMA-Adapter: Efficient Fine-tuning of Language Models with Zero-init Attention
https://arxiv.org/abs/2303.16199
Port for Lit-GPT
"""
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple, Union
import torch
import torch.nn as nn
from typing_extensions import Self
from lit_gpt.config import Config as BaseConfig
from lit_gpt.model import GPT as BaseModel
from lit_gpt.model import CausalSelfAttention as BaseCausalSelfAttention
from lit_gpt.model import KVCache, RoPECache, apply_rope
@dataclass
class Config(BaseConfig):
adapter_prompt_length: int = 10
adapter_start_layer: int = 2
class GPT(BaseModel):
"""The implementation is identical to `lit_gpt.model.GPT` with the exception that
the `Block` saves the layer index and passes it down to the attention layer."""
def __init__(self, config: Config) -> None:
nn.Module.__init__(self)
assert config.padded_vocab_size is not None
self.config = config
self.lm_head = nn.Linear(config.n_embd, config.padded_vocab_size, bias=False)
self.transformer = nn.ModuleDict(
dict(
wte=nn.Embedding(config.padded_vocab_size, config.n_embd),
h=nn.ModuleList(Block(config, i) for i in range(config.n_layer)),
ln_f=config.norm_class(config.n_embd, eps=config.norm_eps),
)
)
self.rope_cache: Optional[RoPECache] = None
self.mask_cache: Optional[torch.Tensor] = None
self.kv_caches: List[KVCache] = []
self.adapter_kv_caches: List[KVCache] = []
def reset_cache(self) -> None:
super().reset_cache()
self.adapter_kv_caches.clear()
def forward(
self,
idx: torch.Tensor,
max_seq_length: Optional[int] = None,
input_pos: Optional[torch.Tensor] = None,
lm_head_chunk_size: int = 0,
) -> Union[torch.Tensor, List[torch.Tensor]]:
B, T = idx.size()
use_kv_cache = input_pos is not None
block_size = self.config.block_size
if max_seq_length is None:
max_seq_length = block_size
if use_kv_cache: # not relevant otherwise
assert (
max_seq_length >= T
), f"Cannot forward sequence of length {T}, max seq length is only {max_seq_length}"
assert max_seq_length <= block_size, f"Cannot attend to {max_seq_length}, block size is only {block_size}"
assert block_size >= T, f"Cannot forward sequence of length {T}, block size is only {block_size}"
if self.rope_cache is None:
self.rope_cache = self.build_rope_cache(idx)
# passing `attn_mask` to SDPA downgrades it to use the inefficient implementation. since we only need the mask
# for the kv-cache support (only during inference), we only create it in that situation
# this will be resolved by https://github.com/pytorch/pytorch/issues/96099
if use_kv_cache and self.mask_cache is None:
self.mask_cache = self.build_mask_cache(idx)
cos, sin = self.rope_cache
if use_kv_cache:
cos = cos.index_select(0, input_pos)
sin = sin.index_select(0, input_pos)
mask = self.mask_cache.index_select(2, input_pos)
mask = mask[:, :, :, :max_seq_length]
else:
cos = cos[:T]
sin = sin[:T]
mask = None
# forward the model itself
x = self.transformer.wte(idx) # token embeddings of shape (b, t, n_embd)
if not use_kv_cache:
for block in self.transformer.h:
x, *_ = block(x, (cos, sin), max_seq_length)
else:
self.kv_caches = self.kv_caches or self.build_kv_caches(x, max_seq_length, cos.size(-1))
self.adapter_kv_caches = self.adapter_kv_caches or [None for _ in range(self.config.n_layer)]
for i, block in enumerate(self.transformer.h):
x, self.kv_caches[i], self.adapter_kv_caches[i] = block(
x, (cos, sin), max_seq_length, mask, input_pos, self.kv_caches[i], self.adapter_kv_caches[i]
)
x = self.transformer.ln_f(x)
if lm_head_chunk_size > 0:
# chunk the lm head logits to reduce the peak memory used by autograd
return [self.lm_head(x_i) for x_i in x.split(lm_head_chunk_size, dim=1)]
return self.lm_head(x) # (b, t, vocab_size)
@classmethod
def from_name(cls, name: str, **kwargs: Any) -> Self:
return cls(Config.from_name(name, **kwargs))
def _init_weights(self, module: nn.Module) -> None:
"""Meant to be used with `gpt.apply(gpt._init_weights)`. Unused method left for completeness."""
super()._init_weights(module)
if isinstance(module, CausalSelfAttention):
module.reset_parameters()
class Block(nn.Module):
"""The implementation is identical to `lit_gpt.model.Block` with the exception that
we replace the attention layer where adaption is implemented."""
def __init__(self, config: Config, block_idx: int) -> None:
super().__init__()
self.norm_1 = config.norm_class(config.n_embd, eps=config.norm_eps)
self.attn = CausalSelfAttention(config, block_idx)
if not config.shared_attention_norm:
self.norm_2 = config.norm_class(config.n_embd, eps=config.norm_eps)
self.mlp = config.mlp_class(config)
self.config = config
def forward(
self,
x: torch.Tensor,
rope: RoPECache,
max_seq_length: int,
mask: Optional[torch.Tensor] = None,
input_pos: Optional[torch.Tensor] = None,
kv_cache: Optional[KVCache] = None,
adapter_kv_cache: Optional[KVCache] = None,
) -> Tuple[torch.Tensor, Optional[KVCache], Optional[KVCache]]:
n_1 = self.norm_1(x)
h, new_kv_cache, new_adapter_kv_cache = self.attn(
n_1, rope, max_seq_length, mask, input_pos, kv_cache, adapter_kv_cache
)
if self.config.parallel_residual:
n_2 = n_1 if self.config.shared_attention_norm else self.norm_2(x)
x = x + h + self.mlp(n_2)
else:
if self.config.shared_attention_norm:
raise NotImplementedError(
"No checkpoint amongst the ones we support uses this configuration"
" (non-parallel residual and shared attention norm)."
)
x = x + h
x = x + self.mlp(self.norm_2(x))
return x, new_kv_cache, new_adapter_kv_cache
class CausalSelfAttention(BaseCausalSelfAttention):
"""A modification of `lit_gpt.model.CausalSelfAttention` that adds the attention
over the adaption prompt."""
def __init__(self, config: Config, block_idx: int) -> None:
super().__init__(config)
if block_idx >= config.adapter_start_layer:
# adapter embedding layer
self.adapter_wte = nn.Embedding(config.adapter_prompt_length, config.n_embd)
# gate for adaption
self.gating_factor = torch.nn.Parameter(torch.zeros(1, 1, config.n_head, 1))
self.reset_parameters()
self.block_idx = block_idx
def forward(
self,
x: torch.Tensor,
rope: RoPECache,
max_seq_length: int,
mask: Optional[torch.Tensor] = None,
input_pos: Optional[torch.Tensor] = None,
kv_cache: Optional[KVCache] = None,
adapter_kv_cache: Optional[KVCache] = None,
) -> Tuple[torch.Tensor, Optional[KVCache], Optional[KVCache]]:
B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd)
qkv = self.attn(x)
# assemble into a number of query groups to support MHA, MQA and GQA together (see `config.n_query_groups`)
q_per_kv = self.config.n_head // self.config.n_query_groups
total_qkv = q_per_kv + 2 # each group has 1+ queries, 1 key, and 1 value
qkv = qkv.view(B, T, self.config.n_query_groups, total_qkv, self.config.head_size)
qkv = qkv.permute(0, 2, 3, 1, 4) # (B, n_query_groups, total_qkv, T, hs)
# split batched computation into three
q, k, v = qkv.split((q_per_kv, 1, 1), dim=2)
# repeat k and v if necessary
if self.config.n_query_groups != 1: # doing this would require a full kv cache with MQA (inefficient!)
# for MHA this is a no-op
k = k.expand(B, self.config.n_query_groups, q_per_kv, T, self.config.head_size)
v = v.expand(B, self.config.n_query_groups, q_per_kv, T, self.config.head_size)
q = q.reshape(B, -1, T, self.config.head_size) # (B, nh_q, T, hs)
k = k.reshape(B, -1, T, self.config.head_size) # (B, nh_k, T, hs)
v = v.reshape(B, -1, T, self.config.head_size) # (B, nh_v, T, hs)
n_elem = int(self.config.rotary_percentage * self.config.head_size)
cos, sin = rope
q_roped = apply_rope(q[..., :n_elem], cos, sin)
k_roped = apply_rope(k[..., :n_elem], cos, sin)
q = torch.cat((q_roped, q[..., n_elem:]), dim=-1)
k = torch.cat((k_roped, k[..., n_elem:]), dim=-1)
if kv_cache is not None:
cache_k, cache_v = kv_cache
cache_k, cache_v = cache_k.to(dtype=k.dtype), cache_v.to(dtype=v.dtype)
# check if reached token limit
if input_pos[-1] >= max_seq_length:
input_pos = torch.tensor(max_seq_length - 1, device=input_pos.device)
# shift 1 position to the left
cache_k = torch.roll(cache_k, -1, dims=2)
cache_v = torch.roll(cache_v, -1, dims=2)
k = cache_k.index_copy_(2, input_pos, k)
v = cache_v.index_copy_(2, input_pos, v)
kv_cache = k, v
y = self.scaled_dot_product_attention(q, k, v, mask=mask)
if self.block_idx >= self.config.adapter_start_layer:
aT = self.config.adapter_prompt_length
if adapter_kv_cache is not None:
ak, av = adapter_kv_cache
else:
prefix = self.adapter_wte.weight.reshape(1, aT, C)
aqkv = self.attn(prefix)
aqkv = aqkv.view(1, aT, self.config.n_query_groups, q_per_kv + 2, self.config.head_size)
aqkv = aqkv.permute(0, 2, 3, 1, 4)
_, ak, av = aqkv.split((q_per_kv, 1, 1), dim=2)
if self.config.n_query_groups != 1:
# for MHA this is a no-op
ak = ak.repeat_interleave(q_per_kv, dim=2)
av = av.repeat_interleave(q_per_kv, dim=2)
ak = ak.view(1, -1, aT, self.config.head_size) # (1, nh_ak, aT, hs)
av = av.view(1, -1, aT, self.config.head_size) # (1, nh_av, aT, hs)
adapter_kv_cache = (ak, av)
amask = torch.ones(T, aT, dtype=torch.bool, device=x.device)
ay = self.scaled_dot_product_attention(q, ak, av, amask)
y = y + self.gating_factor * ay
y = y.reshape(B, T, C) # re-assemble all head outputs side by side
# output projection
y = self.proj(y)
return y, kv_cache, adapter_kv_cache
def reset_parameters(self) -> None:
torch.nn.init.zeros_(self.gating_factor)
def _load_from_state_dict(self, state_dict: Dict, prefix: str, *args: Any, **kwargs: Any) -> None:
"""For compatibility with older checkpoints."""
if (key := prefix + "gating_factor") in state_dict and state_dict[key].size(1) == self.config.n_head:
state_dict[key] = state_dict[key].permute(0, 2, 1, 3)
super()._load_from_state_dict(state_dict, prefix, *args, **kwargs)
def mark_only_adapter_as_trainable(model: GPT) -> None:
"""Sets `requires_grad=False` for all non-adapter weights."""
for name, param in model.named_parameters():
param.requires_grad = adapter_filter(name, param)
def adapter_filter(key: str, value: Any) -> bool:
return "adapter_wte" in key or "gating_factor" in key
"""Implementation of the paper:
LLaMA-Adapter V2: Parameter-Efficient Visual Instruction Model
https://arxiv.org/abs/2304.15010
Port for Lit-GPT
"""
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple, Type
import torch
import torch.nn as nn
from typing_extensions import Self
import lit_gpt
from lit_gpt.adapter import GPT as BaseModel
from lit_gpt.adapter import Block as BaseBlock
from lit_gpt.adapter import Config as BaseConfig
from lit_gpt.adapter import KVCache, RoPECache
from lit_gpt.model import CausalSelfAttention as BaseCausalSelfAttention
from lit_gpt.model import apply_rope
from lit_gpt.utils import map_old_state_dict_weights
@dataclass
class Config(BaseConfig):
@property
def mlp_class(self) -> Type:
return getattr(lit_gpt.adapter_v2, self._mlp_class)
def adapter_filter(key: str, value: Any) -> bool:
adapter_substrings = (
# regular adapter v1 parameters
"adapter_wte",
"gating_factor",
# adapter v2: new bias and scale used in Linear
"adapter_scale",
"adapter_bias",
# adapter v2: Norm parameters are now trainable
"norm_1",
"norm_2",
"ln_f",
)
return any(s in key for s in adapter_substrings)
class AdapterV2Linear(torch.nn.Module):
def __init__(self, in_features: int, out_features: int, **kwargs) -> None:
super().__init__()
self.linear = torch.nn.Linear(in_features, out_features, **kwargs)
self.adapter_bias = torch.nn.Parameter(torch.zeros(out_features), requires_grad=False)
self.adapter_scale = torch.nn.Parameter(torch.ones(out_features), requires_grad=False)
self.reset_parameters()
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.adapter_scale * (self.linear(x) + self.adapter_bias)
def reset_parameters(self) -> None:
nn.init.zeros_(self.adapter_bias)
nn.init.ones_(self.adapter_scale)
class GPT(BaseModel):
def __init__(self, config: Config) -> None:
# Skip the parent class __init__ altogether and replace it to avoid useless allocations
nn.Module.__init__(self)
assert config.padded_vocab_size is not None
self.config = config
self.lm_head = AdapterV2Linear(config.n_embd, config.padded_vocab_size, bias=False)
self.transformer = nn.ModuleDict(
dict(
wte=nn.Embedding(config.padded_vocab_size, config.n_embd),
h=nn.ModuleList(Block(config, i) for i in range(config.n_layer)),
ln_f=config.norm_class(config.n_embd, eps=config.norm_eps),
)
)
self.rope_cache: Optional[RoPECache] = None
self.mask_cache: Optional[torch.Tensor] = None
self.kv_caches: List[KVCache] = []
self.adapter_kv_caches: List[KVCache] = []
@classmethod
def from_name(cls, name: str, **kwargs: Any) -> Self:
return cls(Config.from_name(name, **kwargs))
def _init_weights(self, module: nn.Module) -> None:
"""Meant to be used with `gpt.apply(gpt._init_weights)`. Unused method left for completeness."""
super()._init_weights(module)
if isinstance(module, CausalSelfAttention):
module.reset_parameters()
if isinstance(module, AdapterV2Linear):
module.reset_parameters()
def _load_from_state_dict(self, state_dict: Dict, prefix: str, *args: Any, **kwargs: Any) -> None:
"""For compatibility with base checkpoints."""
mapping = {"lm_head.weight": "lm_head.linear.weight"}
state_dict = map_old_state_dict_weights(state_dict, mapping, prefix)
super()._load_from_state_dict(state_dict, prefix, *args, **kwargs)
class Block(BaseBlock):
"""The implementation is identical to `lit_gpt.model.Block` with the exception that
we replace the attention layer where adaption is implemented."""
def __init__(self, config: Config, block_idx: int) -> None:
# Skip the parent class __init__ altogether and replace it to avoid useless allocations
nn.Module.__init__(self)
self.norm_1 = config.norm_class(config.n_embd, eps=config.norm_eps)
self.attn = CausalSelfAttention(config, block_idx)
if not config.shared_attention_norm:
self.norm_2 = config.norm_class(config.n_embd, eps=config.norm_eps)
self.mlp = config.mlp_class(config)
self.config = config
class CausalSelfAttention(BaseCausalSelfAttention):
def __init__(self, config: Config, block_idx: int) -> None:
"""Causal self-attention with calculating qkv matrices with a single matrix* and Low Ranking Adaptation for
parameter-efficient fine-tuning.
*Instead of creating multiple heads and concatenating the result (in addition to creating separate matrices for
query, key and value for each head) we can do this in a single pass with a single weight matrix.
"""
# Skip the parent class __init__ altogether and replace it to avoid useless allocations
nn.Module.__init__(self)
shape = (config.n_head + 2 * config.n_query_groups) * config.head_size
# key, query, value projections for all heads, but in a batch
self.attn = AdapterV2Linear(in_features=config.n_embd, out_features=shape, bias=config.bias)
# output projection
self.proj = AdapterV2Linear(config.n_embd, config.n_embd, bias=config.bias)
if block_idx >= config.adapter_start_layer:
# adapter embedding layer
self.adapter_wte = nn.Embedding(config.adapter_prompt_length, config.n_embd)
# gate for adaption
self.gating_factor = torch.nn.Parameter(torch.zeros(1, 1, config.n_head, 1))
self.reset_parameters()
self.block_idx = block_idx
self.config = config
def forward(
self,
x: torch.Tensor,
rope: RoPECache,
max_seq_length: int,
mask: Optional[torch.Tensor] = None,
input_pos: Optional[torch.Tensor] = None,
kv_cache: Optional[KVCache] = None,
adapter_kv_cache: Optional[KVCache] = None,
) -> Tuple[torch.Tensor, Optional[KVCache], Optional[KVCache]]:
B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd)
qkv = self.attn(x)
# assemble into a number of query groups to support MHA, MQA and GQA together (see `config.n_query_groups`)
q_per_kv = self.config.n_head // self.config.n_query_groups
total_qkv = q_per_kv + 2 # each group has 1+ queries, 1 key, and 1 value
qkv = qkv.view(B, T, self.config.n_query_groups, total_qkv, self.config.head_size)
qkv = qkv.permute(0, 2, 3, 1, 4) # (B, n_query_groups, total_qkv, T, hs)
# split batched computation into three
q, k, v = qkv.split((q_per_kv, 1, 1), dim=2)
# repeat k and v if necessary
if self.config.n_query_groups != 1: # doing this would require a full kv cache with MQA (inefficient!)
# for MHA this is a no-op
k = k.expand(B, self.config.n_query_groups, q_per_kv, T, self.config.head_size)
v = v.expand(B, self.config.n_query_groups, q_per_kv, T, self.config.head_size)
q = q.reshape(B, -1, T, self.config.head_size) # (B, nh_q, T, hs)
k = k.reshape(B, -1, T, self.config.head_size) # (B, nh_k, T, hs)
v = v.reshape(B, -1, T, self.config.head_size) # (B, nh_v, T, hs)
n_elem = int(self.config.rotary_percentage * self.config.head_size)
cos, sin = rope
q_roped = apply_rope(q[..., :n_elem], cos, sin)
k_roped = apply_rope(k[..., :n_elem], cos, sin)
q = torch.cat((q_roped, q[..., n_elem:]), dim=-1)
k = torch.cat((k_roped, k[..., n_elem:]), dim=-1)
if kv_cache is not None:
cache_k, cache_v = kv_cache
cache_k, cache_v = cache_k.to(dtype=k.dtype), cache_v.to(dtype=v.dtype)
# check if reached token limit
if input_pos[-1] >= max_seq_length:
input_pos = torch.tensor(max_seq_length - 1, device=input_pos.device)
# shift 1 position to the left
cache_k = torch.roll(cache_k, -1, dims=2)
cache_v = torch.roll(cache_v, -1, dims=2)
k = cache_k.index_copy_(2, input_pos, k)
v = cache_v.index_copy_(2, input_pos, v)
kv_cache = k, v
y = self.scaled_dot_product_attention(q, k, v, mask=mask)
if self.block_idx >= self.config.adapter_start_layer:
aT = self.config.adapter_prompt_length
if adapter_kv_cache is not None:
ak, av = adapter_kv_cache
else:
prefix = self.adapter_wte.weight.reshape(1, aT, C)
aqkv = self.attn(prefix)
aqkv = aqkv.view(1, aT, self.config.n_query_groups, q_per_kv + 2, self.config.head_size)
aqkv = aqkv.permute(0, 2, 3, 1, 4)
_, ak, av = aqkv.split((q_per_kv, 1, 1), dim=2)
if self.config.n_query_groups != 1:
# for MHA this is a no-op
ak = ak.repeat_interleave(q_per_kv, dim=2)
av = av.repeat_interleave(q_per_kv, dim=2)
ak = ak.view(1, -1, aT, self.config.head_size) # (1, nh_ak, aT, hs)
av = av.view(1, -1, aT, self.config.head_size) # (1, nh_av, aT, hs)
adapter_kv_cache = (ak, av)
amask = torch.ones(T, aT, dtype=torch.bool, device=x.device)
ay = self.scaled_dot_product_attention(q, ak, av, amask)
y = y + self.gating_factor * ay
y = y.reshape(B, T, C) # re-assemble all head outputs side by side
# output projection
y = self.proj(y)
return y, kv_cache, adapter_kv_cache
def reset_parameters(self) -> None:
torch.nn.init.zeros_(self.gating_factor)
def _load_from_state_dict(self, state_dict: Dict, prefix: str, *args: Any, **kwargs: Any) -> None:
"""For compatibility with base checkpoints."""
mapping = {
"attn.weight": "attn.linear.weight",
"attn.bias": "attn.linear.bias",
"proj.weight": "proj.linear.weight",
"proj.bias": "proj.linear.bias",
}
state_dict = map_old_state_dict_weights(state_dict, mapping, prefix)
# For compatibility with older checkpoints
if (key := prefix + "gating_factor") in state_dict and state_dict[key].size(1) == self.config.n_head:
state_dict[key] = state_dict[key].permute(0, 2, 1, 3)
super()._load_from_state_dict(state_dict, prefix, *args, **kwargs)
class GptNeoxMLP(lit_gpt.model.GptNeoxMLP):
def __init__(self, config: Config) -> None:
nn.Module.__init__(self)
self.fc = AdapterV2Linear(config.n_embd, config.intermediate_size, bias=config.bias)
self.proj = AdapterV2Linear(config.intermediate_size, config.n_embd, bias=config.bias)
def _load_from_state_dict(self, state_dict: Dict, prefix: str, *args: Any, **kwargs: Any) -> None:
"""For compatibility with base checkpoints."""
mapping = {
"fc.weight": "fc.linear.weight",
"fc.bias": "fc.linear.bias",
"proj.weight": "proj.linear.weight",
"proj.bias": "proj.linear.bias",
}
state_dict = map_old_state_dict_weights(state_dict, mapping, prefix)
super()._load_from_state_dict(state_dict, prefix, *args, **kwargs)
class LLaMAMLP(lit_gpt.model.LLaMAMLP):
def __init__(self, config: Config) -> None:
nn.Module.__init__(self)
self.fc_1 = AdapterV2Linear(config.n_embd, config.intermediate_size, bias=config.bias)
self.fc_2 = AdapterV2Linear(config.n_embd, config.intermediate_size, bias=config.bias)
self.proj = AdapterV2Linear(config.intermediate_size, config.n_embd, bias=config.bias)
def _load_from_state_dict(self, state_dict: Dict, prefix: str, *args: Any, **kwargs: Any) -> None:
"""For compatibility with base checkpoints."""
mapping = {
"fc_1.weight": "fc_1.linear.weight",
"fc_1.bias": "fc_1.linear.bias",
"fc_2.weight": "fc_2.linear.weight",
"fc_2.bias": "fc_2.linear.bias",
"proj.weight": "proj.linear.weight",
"proj.bias": "proj.linear.bias",
}
state_dict = map_old_state_dict_weights(state_dict, mapping, prefix)
super()._load_from_state_dict(state_dict, prefix, *args, **kwargs)
def mark_only_adapter_v2_as_trainable(model: GPT) -> None:
"""Sets requires_grad=False for all non-adapter weights"""
for name, param in model.named_parameters():
param.requires_grad = adapter_filter(name, param)
This diff is collapsed.
# Copyright (c) 2023, Tri Dao.
import torch
import torch.nn as nn
import xentropy_cuda_lib
# `all_gather_into_tensor` and `reduce_scatter_tensor` are new placeholders for
# `_all_gather_base` and `_reduce_scatter_base`. They require the most recent
# version of PyTorch. The following 2 lines are for backward compatibility with
# older PyTorch.
if "all_gather_into_tensor" not in dir(torch.distributed):
torch.distributed.all_gather_into_tensor = torch.distributed._all_gather_base
class SoftmaxCrossEntropyLossFn(torch.autograd.Function):
@staticmethod
def forward(
ctx,
logits,
labels,
smoothing=0.0,
ignored_index=-100,
inplace_backward=False,
process_group=None,
):
"""
logits: (batch, vocab_size)
labels: (batch,)
If process_group is not None, we're doing Tensor Parallel: each process is responsible for
one part of the vocab. The loss needs to be aggregated across processes.
"""
batch, vocab_size = logits.shape
assert labels.shape == (batch,)
world_size = 1 if process_group is None else torch.distributed.get_world_size(process_group)
ctx.total_classes = world_size * vocab_size
if world_size == 1:
losses, lse = xentropy_cuda_lib.forward(logits, labels, smoothing)
losses.masked_fill_(labels == ignored_index, 0)
labels_local = labels
else:
rank = torch.distributed.get_rank(process_group)
vocab_start_index, vocab_end_index = rank * vocab_size, (rank + 1) * vocab_size
# Create a mask of valid vocab ids (1 means it needs to be masked).
labels_mask = (labels < vocab_start_index) | (labels >= vocab_end_index)
ignored_mask = labels == ignored_index
labels_local = torch.where(ignored_mask, labels, labels - vocab_start_index)
# For tensor parallel cross entropy with smoothing, we want to pass in the total number
# of classes so that smoothing can be applied correctly. If total_classes=-1, use the
# last dimension of the input tensor.
losses, lse_local = xentropy_cuda_lib.forward(
logits, labels_local, smoothing, world_size * vocab_size
)
assert lse_local.shape == (batch,)
assert losses.shape == (batch,)
losses.masked_fill_(ignored_mask, 0)
# For labels == ignored_index, the loss is always 0.
# If there's no smoothing, if labels are in the vocab of this partition, losses contains
# lse_local - predicted logit, and 0 otherwise.
# If there's smoothing=0.1, for labels in the vocab of this partition, losses contains
# 0.9 * (lse_local - predicted logit) + 0.1 * (lse_local - sum logit / total_classes)
# For labels not in the vocab of this partition, losses contains
# 0.1 * (lse_local - sum logit / total_classes).
lse_allgather = torch.empty(
world_size, batch, dtype=lse_local.dtype, device=lse_local.device
)
torch.distributed.all_gather_into_tensor(
lse_allgather, lse_local.contiguous(), group=process_group
)
handle_losses = torch.distributed.all_reduce(
losses, op=torch.distributed.ReduceOp.SUM, group=process_group, async_op=True
)
lse = torch.logsumexp(lse_allgather, dim=0)
# If there's no smoothing, the total losses are lse_local - predicted_logit,
# we just have to subtract the lse_local and add the lse (global).
# If there's smoothing=0.1, the total losses are
# 0.9 * (lse_local - predicted_logit) + 0.1 * (sum of all lse_local - sum logit / total_classes)
# We want 0.9 * (lse - predicted_logit) + 0.1 * (lse - sum logit / total_classes).
rank_per_sample = torch.div(labels, vocab_size, rounding_mode="floor")
lse_local = lse_allgather[
rank_per_sample, torch.arange(batch, device=lse_allgather.device)
]
handle_losses.wait()
if smoothing == 0.0:
losses += lse - lse_local
else:
losses += (1 - smoothing) * (lse - lse_local) + smoothing * (
lse - lse_allgather.sum(dim=0)
)
losses.masked_fill_(ignored_mask, 0)
ctx.save_for_backward(logits, lse, labels_local)
ctx.smoothing = smoothing
ctx.ignored_index = ignored_index
ctx.inplace_backward = inplace_backward
return losses
@staticmethod
def backward(ctx, grad_loss):
logits, lse, labels = ctx.saved_tensors
grad_loss = grad_loss.contiguous()
grad_loss.masked_fill_(labels == ctx.ignored_index, 0)
grad_logits = xentropy_cuda_lib.backward(
grad_loss, logits, lse, labels, ctx.smoothing, ctx.inplace_backward, ctx.total_classes
)
return grad_logits, None, None, None, None, None, None
class FusedCrossEntropyLoss(nn.Module):
def __init__(
self,
ignore_index=-100,
reduction="mean",
label_smoothing=0.0,
inplace_backward=True,
process_group=None,
):
super().__init__()
if reduction not in ["mean", "none"]:
raise NotImplementedError("Only support reduction = 'mean' or 'none'")
self.ignore_index = ignore_index
self.reduction = reduction
self.label_smoothing = label_smoothing
self.inplace_backward = inplace_backward
self.process_group = process_group
def forward(self, input, target):
assert input.is_cuda and target.is_cuda
# SoftmaxCrossEntropyLoss implicitly casts to float
if len(input.shape) == 3:
input = input.view(-1, input.size(-1))
target = target.view(-1)
loss = SoftmaxCrossEntropyLossFn.apply(
input,
target,
self.label_smoothing,
self.ignore_index,
self.inplace_backward,
self.process_group,
)
if self.reduction == "mean":
return loss.sum() / (target != self.ignore_index).sum()
else:
return loss
\ No newline at end of file
# Copyright (c) 2023, Tri Dao.
import math
from typing import Optional, Tuple
import rotary_emb
import torch
from einops import rearrange, repeat
class ApplyRotaryEmb(torch.autograd.Function):
@staticmethod
def forward(ctx, x, cos, sin, interleaved=False, inplace=False):
"""
x: (batch_size, seqlen, nheads, headdim)
cos, sin: (seqlen, rotary_dim / 2)
interleaved: if True, rotate pairs of even and odd dimensions (GPT-J style) instead
of 1st half and 2nd half (GPT-NeoX style).
rotary_dim must be <= headdim
Apply rotary embedding to the first rotary_dim of x.
"""
batch, seqlen, nheads, headdim = x.shape
rotary_seqlen, rotary_dim = cos.shape
rotary_dim *= 2
assert rotary_dim <= headdim
assert seqlen <= rotary_seqlen
assert sin.shape == (rotary_seqlen, rotary_dim // 2)
x_ro = x[..., :rotary_dim]
x1, x2 = x_ro.chunk(2, dim=-1) if not interleaved else (x_ro[..., ::2], x_ro[..., 1::2])
out = torch.empty_like(x) if not inplace else x
out_ro = out[..., :rotary_dim]
if inplace:
o1, o2 = x1, x2
else:
o1, o2 = (
out_ro.chunk(2, dim=-1)
if not interleaved
else (out_ro[..., ::2], out_ro[..., 1::2])
)
rotary_emb.apply_rotary(
x1,
x2,
rearrange(cos[:seqlen], "s d -> s 1 d"),
rearrange(sin[:seqlen], "s d -> s 1 d"),
o1,
o2,
False,
)
if not inplace and rotary_dim < headdim:
out[..., rotary_dim:].copy_(x[..., rotary_dim:])
ctx.save_for_backward(cos, sin)
ctx.interleaved = interleaved
ctx.inplace = inplace
return out if not inplace else x
@staticmethod
def backward(ctx, do):
cos, sin = ctx.saved_tensors
_, seqlen, _, headdim = do.shape
rotary_dim = cos.shape[-1]
rotary_dim *= 2
inplace = ctx.inplace
do_ro = do[..., :rotary_dim]
do1, do2 = (
do_ro.chunk(2, dim=-1) if not ctx.interleaved else (do_ro[..., ::2], do_ro[..., 1::2])
)
dx = torch.empty_like(do) if not inplace else do
if inplace:
dx1, dx2 = do1, do2
else:
dx_ro = dx[..., :rotary_dim]
dx1, dx2 = (
dx_ro.chunk(2, dim=-1)
if not ctx.interleaved
else (dx_ro[..., ::2], dx_ro[..., 1::2])
)
rotary_emb.apply_rotary(
do1,
do2,
rearrange(cos[:seqlen], "s d -> s 1 d"),
rearrange(sin[:seqlen], "s d -> s 1 d"),
dx1,
dx2,
True,
)
if not inplace and rotary_dim < headdim:
dx[..., rotary_dim:].copy_(do[..., rotary_dim:])
return dx, None, None, None, None
apply_rotary_emb_func = ApplyRotaryEmb.apply
This diff is collapsed.
"""Full definition of a GPT NeoX Language Model, all of it in this single file.
Based on the nanoGPT implementation: https://github.com/karpathy/nanoGPT and
https://github.com/EleutherAI/gpt-neox/tree/main/megatron/model.
"""
import math
from typing import Any, List, Optional, Tuple
import torch
import torch.nn as nn
from lightning_utilities.core.imports import RequirementCache
from typing_extensions import Self
from flash_attn import flash_attn_func
from lit_gpt.config import Config
from xformers.ops import SwiGLU
from .fused_rotary_embedding import apply_rotary_emb_func
RoPECache = Tuple[torch.Tensor, torch.Tensor]
KVCache = Tuple[torch.Tensor, torch.Tensor]
FlashAttention2Available = RequirementCache("flash-attn>=2.0.0.post1")
class GPT(nn.Module):
def __init__(self, config: Config) -> None:
super().__init__()
assert config.padded_vocab_size is not None
self.config = config
self.lm_head = nn.Linear(config.n_embd, config.padded_vocab_size, bias=False)
self.transformer = nn.ModuleDict(
dict(
wte=nn.Embedding(config.padded_vocab_size, config.n_embd),
h=nn.ModuleList(Block(config) for _ in range(config.n_layer)),
ln_f=config.norm_class(config.n_embd, eps=config.norm_eps),
)
)
self.rope_cache: Optional[RoPECache] = None
self.mask_cache: Optional[torch.Tensor] = None
self.kv_caches: List[KVCache] = []
def _init_weights(self, module: nn.Module, n_layer) -> None:
"""Meant to be used with `gpt.apply(gpt._init_weights)`."""
# GPT-NeoX https://arxiv.org/pdf/2204.06745.pdf
if isinstance(module, nn.Embedding):
torch.nn.init.normal_(module.weight, mean=0.0, std=math.sqrt(2.0 / 5 / self.config.n_embd))
# RWKV: set it to 1e-4
# torch.nn.init.uniform_(module.weight, -1e-4, 1e-4)
elif isinstance(module, nn.Linear):
torch.nn.init.normal_(module.weight, mean=0.0, std=math.sqrt(2.0 / 5 / self.config.n_embd))
if module.bias is not None:
torch.nn.init.zeros_(module.bias)
# GPT-NeoX
for name, p in module.named_parameters():
if (name == "proj.weight" and isinstance(module, LLaMAMLP)) or (name == "w3.weight" and isinstance(module, SwiGLU) or (name=="proj.weight" and isinstance(module, CausalSelfAttention))): #if use xformer swiglu, fc2 layer will be renamed to w3
nn.init.normal_(p, mean=0.0, std=1 / math.sqrt(self.config.n_embd) / n_layer)
def reset_cache(self) -> None:
self.kv_caches.clear()
if self.mask_cache is not None and self.mask_cache.device.type == "xla":
# https://github.com/Lightning-AI/lit-gpt/pull/83#issuecomment-1558150179
self.rope_cache = None
self.mask_cache = None
def forward(
self, idx: torch.Tensor, max_seq_length: Optional[int] = None, input_pos: Optional[torch.Tensor] = None
) -> torch.Tensor:
B, T = idx.size()
use_kv_cache = input_pos is not None
block_size = self.config.block_size
if max_seq_length is None:
max_seq_length = block_size
if use_kv_cache: # not relevant otherwise
assert (
max_seq_length >= T
), f"Cannot forward sequence of length {T}, max seq length is only {max_seq_length}"
assert max_seq_length <= block_size, f"Cannot attend to {max_seq_length}, block size is only {block_size}"
assert block_size >= T, f"Cannot forward sequence of length {T}, block size is only {block_size}"
if self.rope_cache is None:
self.rope_cache = self.build_rope_cache(idx)
# passing `attn_mask` to SDPA downgrades it to use the inefficient implementation. since we only need the mask
# for the kv-cache support (only during inference), we only create it in that situation
# this will be resolved by https://github.com/pytorch/pytorch/issues/96099
if use_kv_cache and self.mask_cache is None:
self.mask_cache = self.build_mask_cache(idx)
cos, sin = self.rope_cache
if use_kv_cache:
cos = cos.index_select(0, input_pos)
sin = sin.index_select(0, input_pos)
mask = self.mask_cache.index_select(2, input_pos)
mask = mask[:, :, :, :max_seq_length]
else:
cos = cos[:T]
sin = sin[:T]
mask = None
# forward the model itself
x = self.transformer.wte(idx) # token embeddings of shape (b, t, n_embd)
if not use_kv_cache:
for block in self.transformer.h:
x, *_ = block(x, (cos, sin), max_seq_length)
else:
self.kv_caches = self.kv_caches or self.build_kv_caches(x, max_seq_length, cos.size(-1) * 2)
for i, block in enumerate(self.transformer.h):
x, self.kv_caches[i] = block(x, (cos, sin), max_seq_length, mask, input_pos, self.kv_caches[i])
x = self.transformer.ln_f(x)
return self.lm_head(x) # (b, t, vocab_size)
@classmethod
def from_name(cls, name: str, **kwargs: Any) -> Self:
return cls(Config.from_name(name, **kwargs))
def build_rope_cache(self, idx: torch.Tensor) -> RoPECache:
return build_rope_cache(
seq_len=self.config.block_size,
n_elem=int(self.config.rotary_percentage * self.config.head_size),
dtype=torch.bfloat16,
device=idx.device,
condense_ratio=self.config.condense_ratio,
)
def build_mask_cache(self, idx: torch.Tensor) -> torch.Tensor:
ones = torch.ones((self.config.block_size, self.config.block_size), device=idx.device, dtype=torch.bool)
return torch.tril(ones).unsqueeze(0).unsqueeze(0)
def build_kv_caches(self, idx: torch.Tensor, max_seq_length: int, rope_cache_length: int) -> List[KVCache]:
B = idx.size(0)
heads = 1 if self.config.n_query_groups == 1 else self.config.n_query_groups
k_cache_shape = (
B,
max_seq_length,
heads,
rope_cache_length + self.config.head_size - int(self.config.rotary_percentage * self.config.head_size),
)
v_cache_shape = (B, max_seq_length, heads, self.config.head_size)
device = idx.device
return [
(torch.zeros(k_cache_shape, device=device), torch.zeros(v_cache_shape, device=device))
for _ in range(self.config.n_layer)
]
class Block(nn.Module):
def __init__(self, config: Config) -> None:
super().__init__()
self.norm_1 = config.norm_class(config.n_embd, eps=config.norm_eps)
self.attn = CausalSelfAttention(config)
if not config.shared_attention_norm:
self.norm_2 = config.norm_class(config.n_embd, eps=config.norm_eps)
self.mlp = config.mlp_class(config)
self.config = config
def forward(
self,
x: torch.Tensor,
rope: RoPECache,
max_seq_length: int,
mask: Optional[torch.Tensor] = None,
input_pos: Optional[torch.Tensor] = None,
kv_cache: Optional[KVCache] = None,
) -> Tuple[torch.Tensor, Optional[KVCache]]:
n_1 = self.norm_1(x)
h, new_kv_cache = self.attn(n_1, rope, max_seq_length, mask, input_pos, kv_cache)
if self.config.parallel_residual:
n_2 = n_1 if self.config.shared_attention_norm else self.norm_2(x)
x = x + h + self.mlp(n_2)
else:
if self.config.shared_attention_norm:
raise NotImplementedError(
"No checkpoint amongst the ones we support uses this configuration"
" (non-parallel residual and shared attention norm)."
)
x = x + h
x = x + self.mlp(self.norm_2(x))
return x, new_kv_cache
class CausalSelfAttention(nn.Module):
def __init__(self, config: Config) -> None:
super().__init__()
shape = (config.n_head + 2 * config.n_query_groups) * config.head_size
# key, query, value projections for all heads, but in a batch
self.attn = nn.Linear(config.n_embd, shape, bias=config.bias)
# output projection
self.proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)
self.config = config
def forward(
self,
x: torch.Tensor,
rope: RoPECache,
max_seq_length: int,
mask: Optional[torch.Tensor] = None,
input_pos: Optional[torch.Tensor] = None,
kv_cache: Optional[KVCache] = None,
) -> Tuple[torch.Tensor, Optional[KVCache]]:
B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd)
qkv = self.attn(x)
# assemble into a number of query groups to support MHA, MQA and GQA together (see `config.n_query_groups`)
q_per_kv = self.config.n_head // self.config.n_query_groups
total_qkv = q_per_kv + 2 # each group has 1+ queries, 1 key, and 1 value
qkv = qkv.view(B, T, self.config.n_query_groups, total_qkv, self.config.head_size) # (B, T, n_query_groups, total_qkv, hs)
# qkv = qkv.permute(0, 2, 3, 1, 4) # (B, n_query_groups, total_qkv, T, hs)
# split batched computation into three
q, k, v = qkv.split((q_per_kv, 1, 1), dim=-2)
# repeat k and v if necessary
# Peiyuan: we do not need to do this as flash attention 2 already support GQA
# if self.config.n_query_groups != 1: # doing this would require a full kv cache with MQA (inefficient!)
# # for MHA this is a no-op
# k = k.expand(B, self.config.n_query_groups, q_per_kv, T, self.config.head_size)
# v = v.expand(B, self.config.n_query_groups, q_per_kv, T, self.config.head_size)
q = q.reshape(B, T, -1, self.config.head_size) # (B, T, nh_q, hs)
k = k.reshape(B, T, -1, self.config.head_size)
v = v.reshape(B, T, -1, self.config.head_size)
cos, sin = rope
# apply rope in fp32 significanly stabalize training
# fused rope expect (batch_size, seqlen, nheads, headdim)
q = apply_rotary_emb_func(q, cos, sin, False, True)
k = apply_rotary_emb_func(k, cos, sin, False, True)
# n_elem = int(self.config.rotary_percentage * self.config.head_size)
# q_roped = apply_rope(q[..., :n_elem], cos.repeat(1,2), sin.repeat(1,2))
# k_roped = apply_rope(k[..., :n_elem], cos.repeat(1,2), sin.repeat(1,2))
# print( (q_roped - q).sum())
# q = torch.cat((q_roped, q[..., n_elem:]), dim=-1)
# k = torch.cat((k_roped, k[..., n_elem:]), dim=-1)
if kv_cache is not None:
cache_k, cache_v = kv_cache
cache_k, cache_v = cache_k.to(dtype=k.dtype), cache_v.to(dtype=v.dtype)
# check if reached token limit
if input_pos[-1] >= max_seq_length:
input_pos = torch.tensor(max_seq_length - 1, device=input_pos.device)
# shift 1 position to the left
cache_k = torch.roll(cache_k, -1, dims=1)
cache_v = torch.roll(cache_v, -1, dims=1)
k = cache_k.index_copy_(1, input_pos, k)
v = cache_v.index_copy_(1, input_pos, v)
kv_cache = k, v
y = self.scaled_dot_product_attention(q, k, v, mask=mask)
y = y.reshape(B, T, C) # re-assemble all head outputs side by side
# output projection
y = self.proj(y)
return y, kv_cache
def scaled_dot_product_attention(
self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor, mask: Optional[torch.Tensor] = None
):
scale = 1.0 / math.sqrt(self.config.head_size)
if (
FlashAttention2Available
and mask is None
and q.device.type == "cuda"
and q.dtype in (torch.float16, torch.bfloat16)
):
from flash_attn import flash_attn_func
return flash_attn_func(q, k, v, dropout_p=0.0, softmax_scale=scale, causal=True)
q = q.transpose(1, 2)
k = k.transpose(1, 2)
v = v.transpose(1, 2)
if q.size() != k.size():
k = k.repeat_interleave(q.shape[1]//k.shape[1], dim=1)
v = v.repeat_interleave(q.shape[1]//v.shape[1], dim=1)
y = torch.nn.functional.scaled_dot_product_attention(
q, k, v, attn_mask=mask, dropout_p=0.0, scale=scale, is_causal=mask is None
)
return y.transpose(1, 2)
class GptNeoxMLP(nn.Module):
def __init__(self, config: Config) -> None:
super().__init__()
self.fc = nn.Linear(config.n_embd, config.intermediate_size, bias=config.bias)
self.proj = nn.Linear(config.intermediate_size, config.n_embd, bias=config.bias)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.fc(x)
x = torch.nn.functional.gelu(x)
return self.proj(x)
class LLaMAMLP(nn.Module):
def __init__(self, config: Config) -> None:
super().__init__()
# self.fc_1 = nn.Linear(config.n_embd, config.intermediate_size, bias=config.bias)
# self.fc_2 = nn.Linear(config.n_embd, config.intermediate_size, bias=config.bias)
# self.proj = nn.Linear(config.intermediate_size, config.n_embd, bias=config.bias)
self.swiglu = SwiGLU(config.n_embd,config.intermediate_size, bias=False, _pack_weights=False)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x_fc_1 = self.fc_1(x)
# x_fc_2 = self.fc_2(x)
# x = torch.nn.functional.silu(x_fc_1) * x_fc_2
# return self.proj(x)
return self.swiglu(x)
def build_rope_cache(
seq_len: int, n_elem: int, dtype: torch.dtype, device: torch.device, base: int = 10000, condense_ratio: int = 1
) -> RoPECache:
"""Enhanced Transformer with Rotary Position Embedding.
Derived from: https://github.com/labmlai/annotated_deep_learning_paper_implementations/blob/master/labml_nn/
transformers/rope/__init__.py. MIT License:
https://github.com/labmlai/annotated_deep_learning_paper_implementations/blob/master/license.
"""
# $\Theta = {\theta_i = 10000^{\frac{2(i-1)}{d}}, i \in [1, 2, ..., \frac{d}{2}]}$
theta = 1.0 / (base ** (torch.arange(0, n_elem, 2, device=device) / n_elem))
# Create position indexes `[0, 1, ..., seq_len - 1]`
seq_idx = torch.arange(seq_len, device=device) / condense_ratio
# Calculate the product of position index and $\theta_i$
idx_theta = torch.outer(seq_idx, theta)
cos, sin = torch.cos(idx_theta), torch.sin(idx_theta)
# added by peiyuan to ensure same data type with q, k, to use fused rotary embedding
if dtype == torch.bfloat16:
return cos.bfloat16(), sin.bfloat16()
# this is to mimic the behaviour of complex32, else we will get different results
if dtype in (torch.float16, torch.bfloat16, torch.int8):
return cos.half(), sin.half()
return cos, sin
def apply_rope(x: torch.Tensor, cos: torch.Tensor, sin: torch.Tensor) -> torch.Tensor:
head_size = x.size(-1)
x1 = x[..., : head_size // 2] # (B, nh, T, hs/2)
x2 = x[..., head_size // 2 :] # (B, nh, T, hs/2)
rotated = torch.cat((-x2, x1), dim=-1) # (B, nh, T, hs)
roped = (x * cos) + (rotated * sin)
return roped.type_as(x)
# Very loosely inspired by indexed_dataset in Fairseq, Megatron
# https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/data/indexed_dataset.py
import os
import random
import struct
import numpy as np
import torch
from torch.utils.data import IterableDataset, get_worker_info
dtypes = {1: np.uint8, 2: np.int8, 3: np.int16, 4: np.int32, 5: np.int64, 6: np.float32, 7: np.float64, 8: np.uint16}
def code(dtype):
for k in dtypes:
if dtypes[k] == dtype:
return k
raise ValueError(dtype)
HDR_MAGIC = b"LITPKDS"
HDR_SIZE = 24 # bytes
class PackedDataset(IterableDataset):
def __init__(
self, filenames, n_chunks, block_size, seed=12345, shuffle=True, wrap=False, num_processes=1, process_rank=0
):
self._filenames = filenames
self._n_chunks = n_chunks
self._block_size = block_size
self._seed = seed
self._shuffle = shuffle
self._wrap = wrap
self._num_processes = num_processes
self._process_rank = process_rank
def __iter__(self):
worker_info = get_worker_info()
num_workers = worker_info.num_workers if worker_info is not None else 1
worker_id = worker_info.id if worker_info is not None else 0
num_shards = num_workers * self._num_processes
shard_id = self._process_rank * num_workers + worker_id
max_num_files = len(self._filenames) // num_shards * num_shards
filenames = self._filenames[shard_id:max_num_files:num_shards]
return PackedDatasetIterator(
filenames=filenames,
n_chunks=self._n_chunks,
block_size=self._block_size,
seed=self._seed,
shuffle=self._shuffle,
wrap=self._wrap,
)
class PackedDatasetBuilder(object):
def __init__(self, outdir, prefix, chunk_size, sep_token, dtype="auto", vocab_size=None):
if dtype == "auto":
if vocab_size is None:
raise ValueError("vocab_size cannot be None when dtype='auto'")
if vocab_size is not None and vocab_size < 65500:
self._dtype = np.uint16
else:
self._dtype = np.int32
else:
self._dtype = dtype
self._counter = 0
self._chunk_size = chunk_size
self._outdir = outdir
self._prefix = prefix
self._sep_token = sep_token
self._arr = np.zeros(self._chunk_size, dtype=self._dtype)
self._arr.fill(self._sep_token)
self._idx = 0
self._version = 1
self._filenames = []
def _write_chunk(self):
filename = f"{self._prefix}_{self._counter:010d}.bin"
filename = os.path.join(self._outdir, filename)
with open(filename, "wb") as f:
f.write(HDR_MAGIC)
f.write(struct.pack("<Q", self._version))
f.write(struct.pack("<B", code(self._dtype)))
f.write(struct.pack("<Q", self._chunk_size))
f.write(self._arr.tobytes(order="C"))
self._filenames.append(filename)
self._counter += 1
self._arr.fill(self._sep_token)
self._idx = 0
@property
def dtype(self):
return self._dtype
@property
def filenames(self):
return self._filenames.copy()
def add_array(self, arr):
while self._idx + arr.shape[0] > self._chunk_size:
part_len = self._chunk_size - self._idx
self._arr[self._idx : self._idx + part_len] = arr[:part_len]
self._write_chunk()
arr = arr[part_len:]
arr_len = arr.shape[0]
self._arr[self._idx : self._idx + arr_len] = arr
self._idx += arr_len
def write_reminder(self):
self._write_chunk()
class PackedDatasetIterator:
def __init__(self, filenames, n_chunks, block_size, seed, shuffle, wrap):
self._seed = seed
self._shuffle = shuffle
self._rng = np.random.default_rng(seed) if shuffle else None
self._block_idxs = None
self._wrap = wrap
# TODO: instead of filenames, we could have a single text stream
# (or text file) with the sequence of all files to be
# fetched/loaded.
self._filenames = filenames
self._file_idx = 0
self._n_chunks = n_chunks
self._dtype = None
self._block_size = block_size
self._n_blocks = None
self._mmaps = []
self._buffers = []
self._block_idxs = []
self._curr_idx = 0
self._load_n_chunks()
def _read_header(self, path):
with open(path, "rb") as f:
magic = f.read(len(HDR_MAGIC))
assert magic == HDR_MAGIC, "File doesn't match expected format."
version = struct.unpack("<Q", f.read(8))
assert version == (1,)
(dtype_code,) = struct.unpack("<B", f.read(1))
dtype = dtypes[dtype_code]
(chunk_size,) = struct.unpack("<Q", f.read(8))
return dtype, chunk_size
def _close_mmaps(self):
for mmap in self._mmaps:
mmap._mmap.close()
def _load_n_chunks(self):
self._close_mmaps()
self._mmaps = []
self._buffers = []
if self._n_chunks > len(self._filenames[self._file_idx :]):
# if not self._wrap:
# raise StopIteration
self._file_idx = 0
for i in range(self._n_chunks):
filename = self._filenames[self._file_idx + i]
if self._dtype is None:
self._dtype, self._chunk_size = self._read_header(filename)
self._n_blocks = self._chunk_size // self._block_size
# TODO: check header matches with previous files
mmap = np.memmap(filename, mode="r", order="C", offset=HDR_SIZE)
self._mmaps.append(mmap)
self._buffers.append(memoryview(mmap))
self._file_idx += self._n_chunks
n_all_blocks = self._n_chunks * self._n_blocks
self._block_idxs = self._rng.permutation(n_all_blocks) if self._shuffle else range(n_all_blocks)
self._curr_idx = 0
def __del__(self):
self._close_mmaps()
del self._mmaps
del self._buffers
def __iter__(self):
return self
def __next__(self):
if self._curr_idx >= len(self._block_idxs):
self._load_n_chunks()
# TODO: trigger fetching next next n_chunks if remote
block_idx = self._block_idxs[self._curr_idx]
chunk_id = block_idx // self._n_blocks
buffer = self._buffers[chunk_id]
elem_id = (block_idx % self._n_blocks) * self._block_size
offset = np.dtype(self._dtype).itemsize * elem_id
arr = np.frombuffer(buffer, dtype=self._dtype, count=self._block_size, offset=offset)
self._curr_idx += 1
return torch.from_numpy(arr.astype(np.int64))
class CombinedDataset(IterableDataset):
def __init__(self, datasets, seed, weights=None):
self._seed = seed
self._datasets = datasets
self._weights = weights
n_datasets = len(datasets)
if weights is None:
self._weights = [1 / n_datasets] * n_datasets
def __iter__(self):
return CombinedDatasetIterator(self._datasets, self._seed, self._weights)
class CombinedDatasetIterator:
def __init__(self, datasets, seed, weights):
self._datasets = [iter(el) for el in datasets]
self._weights = weights
self._rng = random.Random(seed)
def __next__(self):
(dataset,) = self._rng.choices(self._datasets, weights=self._weights, k=1)
return next(dataset)
This diff is collapsed.
This diff is collapsed.
import json
from pathlib import Path
from typing import Optional
import torch
class Tokenizer:
def __init__(self, checkpoint_dir: Path) -> None:
# some checkpoints have both files, `.model` takes precedence
if (vocabulary_path := checkpoint_dir / "tokenizer.model").is_file():
from sentencepiece import SentencePieceProcessor
self.processor = SentencePieceProcessor(model_file=str(vocabulary_path))
self.backend = "sentencepiece"
self.bos_id = self.processor.bos_id()
self.eos_id = self.processor.eos_id()
elif (vocabulary_path := checkpoint_dir / "tokenizer.json").is_file():
from tokenizers import Tokenizer as HFTokenizer
self.processor = HFTokenizer.from_file(str(vocabulary_path))
self.backend = "huggingface"
with open(checkpoint_dir / "tokenizer_config.json") as fp:
config = json.load(fp)
bos_token = config.get("bos_token")
self.bos_id = self.token_to_id(bos_token) if bos_token is not None else None
self.eos_id = self.token_to_id(config["eos_token"])
else:
raise NotImplementedError
@property
def vocab_size(self) -> int:
if self.backend == "huggingface":
return self.processor.get_vocab_size(with_added_tokens=False)
if self.backend == "sentencepiece":
return self.processor.vocab_size()
raise RuntimeError
def token_to_id(self, token: str) -> int:
if self.backend == "huggingface":
id_ = self.processor.token_to_id(token)
elif self.backend == "sentencepiece":
id_ = self.processor.piece_to_id(token)
else:
raise RuntimeError
if id_ is None:
raise ValueError(f"token {token!r} not found in the collection.")
return id_
def encode(
self,
string: str,
device: Optional[torch.device] = None,
bos: bool = False,
eos: bool = True,
max_length: int = -1,
) -> torch.Tensor:
if self.backend == "huggingface":
tokens = self.processor.encode(string).ids
elif self.backend == "sentencepiece":
tokens = self.processor.encode(string)
else:
raise RuntimeError
if bos:
bos_id = self.bos_id
if bos_id is None:
raise NotImplementedError("This tokenizer does not defined a bos token")
tokens = [bos_id] + tokens
if eos:
tokens = tokens + [self.eos_id]
if max_length > 0:
tokens = tokens[:max_length]
return torch.tensor(tokens, dtype=torch.int, device=device)
def decode(self, tensor: torch.Tensor) -> str:
tokens = [tensor.item()] if tensor.ndim == 0 else tensor.tolist()
return self.processor.decode(tokens)
This diff is collapsed.
# 模型编码
modelCode=
# 模型名称
modelName=tinyllama_pytorch
# 模型描述
modelDescription=只有1.1B参数,减小了llama2模型规模和训练数据量,可以在许多基于Llama的开源项目中即插即用。
# 应用场景
appScenario=推理,训练,制造,广媒,金融,能源,医疗,家居,教育
# 框架类型
frameType=pytorch
This diff is collapsed.
This diff is collapsed.
torch>=2.1.0dev
lightning==2.1.2
lightning[app]
jsonargparse[signatures] # CLI
pandas
pyarrow
tokenizers
sentencepiece
wandb
zstd
# for finetuning
bitsandbytes==0.40.0
transformers==4.31.0
peft==0.4.0
accelerate==0.21.0
einops==0.6.1
evaluate==0.4.0
scikit-learn==1.2.2
sentencepiece==0.1.99
wandb==0.15.3
# other optional dependencies are
# sentencepiece # pythia, falcon, redpajama
# tokenizers # llama-based models
# bitsandbytes>=0.41.1 # quantize/bnb.py
# scipy # TODO: remove when https://github.com/TimDettmers/bitsandbytes/pull/525 is released
# datasets # quantize/gptq.py
# zstandard # scripts/prepare_redpajama.py
# git+https://github.com/EleutherAI/lm-evaluation-harness.git@master # eval
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment