Commit b77a7567 authored by wanglch's avatar wanglch
Browse files

Initial commit

parents
Pipeline #1305 canceled with stages
transformers>=4.41.2
datasets>=2.16.0
accelerate>=0.30.1
peft>=0.11.1
trl>=0.8.6
gradio>=4.0.0
pandas>=2.0.0
scipy
einops
sentencepiece
tiktoken
protobuf
uvicorn
pydantic
fastapi
sse-starlette
matplotlib>=3.7.0
fire
packaging
pyyaml
numpy<2.0.0
transformers_stream_generator
modelscope
#!/bin/bash
deepspeed --master_port $(shuf -n 1 -i 10000-65535) --include="localhost:4,5,6,7" /home/wanglch/projects/LLaMA-Factory/src/train.py \
--deepspeed /home/wanglch/projects/LLaMA-Factory/deepspeed.json \
--stage sft \
--do_train \
--model_name_or_path /home/wanglch/projects/XuanYuan/XuanYuan-13B-Chat \
--dataset fingpt_sentiment \
--dataset_dir /home/wanglch/projects/LLaMA-Factory/data \
--template qwen \
--finetuning_type lora \
--lora_target all \
--output_dir /home/wanglch/projects/saves/XuanYuan-13B-Chat/lora_multi_dtk/sft \
--overwrite_output_dir \
--cutoff_len 1024 \
--preprocessing_num_workers 1 \
--per_device_train_batch_size 1 \
--per_device_eval_batch_size 1 \
--gradient_accumulation_steps 1 \
--lr_scheduler_type cosine \
--logging_steps 10 \
--warmup_steps 20 \
--save_steps 100 \
--eval_steps 10 \
--evaluation_strategy steps \
--load_best_model_at_end \
--learning_rate 5e-5 \
--num_train_epochs 1.0 \
--max_samples 3000 \
--val_size 0.1 \
--ddp_timeout 180000000 \
--plot_loss True \
--fp16
## LLM-CODE
轩辕系列模型的预训练和指令微调代码库,暨大语言模型-原理与工程实践书籍的示例代码
### 运行环境
Nvidia显卡驱动版本>=525.85.12
CUDA版本>=12.3
Python版本>=3.10.12
gcc版本>=11.4.0
python pip安装包版本如下:
argparse==1.4.0
deepspeed==0.12.5
datasets==2.15.0
transformers==4.36.0
sentencepiece==0.1.99
### 数据集下载
方式一:直接下载,数据集文件可通过如下huggingface链接下载
https://huggingface.co/datasets/Duxiaoman-DI/FinCorpus
方式二:python代码获取,代码如下
```py
from datasets import load_dataset
dataset = load_dataset("Duxiaoman-DI/FinCorpus")
```
### 模型下载
方式一:直接下载,Llama-2-7b-hf模型文件可通过如下huggingface链接下载
https://huggingface.co/meta-llama/Llama-2-7b-hf
方式二:python代码获取,代码如下
```py
# Load model directly
from transformers import AutoTokenizer, AutoModelForCausalLM
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf")
model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b-hf")
```
### 执行步骤
```sh
切换到你的宿主机工作目录
cd /your_host_workspace
clone项目代码
git clone https://github.com/Duxiaoman-DI/XuanYuan.git
切换到容器内项目根目录
cd XuanYuan/llm-code
删除缓存
sh clear_cache.sh
执行数据预处理
sh data_preprocess_run.sh
执行预训练
sh pretrain_run.sh
执行sft
sh sft_run.sh
```
# 删除缓存
rm -rf __pycache__
\ No newline at end of file
import argparse
import deepspeed
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('--data_path', type=str, default='', help="数据所在位置")
parser.add_argument("--model_name_or_path", type=str, required=True, help="模型文件位置")
parser.add_argument('--save_name', type=str, default='test', help='模型保存位置')
# optimizer/lr_scheduler
parser.add_argument("--learning_rate", type=float, default=5e-5, help="learning rate")
parser.add_argument("--weight_decay", type=float, default=0.01, help="Weight decay")
parser.add_argument("--num_warmup_steps", type=int, default=0, help="lr scheduler的warmup步数")
parser.add_argument("--seed", type=int, default=1234, help="随机种子")
# 训练相关参数
parser.add_argument("--train_mode", type=str, default='pretrain', help="训练模式:pretrain表示预训练任务,sft表示指令微调任务")
parser.add_argument("--epochs", type=int, default=1, help="指定训练轮数")
parser.add_argument("--total_num_steps", type=int, default=100000, help="总训练步数")
parser.add_argument("--gradient_accumulation_steps", type=int, default=1, help="梯度累积步数",)
parser.add_argument("--per_device_train_batch_size", type=int, default=16, help="Batch size")
parser.add_argument("--max_length", type=int, default=1024, help="最大长度")
parser.add_argument('--gradient_checkpointing', action='store_true', help='是否开启梯度检查点,默认不开启。开启可节省GPU内存占用')
parser.add_argument("--log_steps", type=int, default=10, help="每隔多少步记录一次日志")
parser.add_argument("--save_steps", type=int, default=-1, help="每隔多少步保存一次模型")
# deepspeed相关参数
parser.add_argument('--ds_offload_cpu', action='store_true', help='是否开启cpu offload')
parser.add_argument('--ds_zero_stage', type=int, default=2, help='deepspeed的zero配置')
parser.add_argument('--ds_steps_per_print', type=int, default=100, help='每隔多少步输出一次deepspeed日志')
parser.add_argument("--local_rank", type=int, default=-1, help="多机多卡情况下的local_rank")
parser.add_argument("--global_rank", type=int, default=-1, help="多机多卡情况下的global_rank")
# 加载deepspeed的相关参数
parser = deepspeed.add_config_arguments(parser)
args = parser.parse_args()
return args
def get_deepspeed_config(args):
ds_config = {
"train_micro_batch_size_per_gpu": args.per_device_train_batch_size, # 每个GPU的batch_size
'gradient_accumulation_steps': args.gradient_accumulation_steps, # 梯度累积步数
"steps_per_print": args.ds_steps_per_print, # deepspeed输出中间log
"zero_optimization": {
"stage": args.ds_zero_stage, # 指定zero stage,可选0,1,2,3
},
"scheduler": {
"type": "WarmupDecayLR", # 学习率衰减策略
"params": {
"total_num_steps": args.total_num_steps,
"warmup_min_lr": 0,
"warmup_max_lr": args.learning_rate,
"warmup_num_steps": args.num_warmup_steps
}
},
"optimizer": {
"type": "Adam", # 优化器
"params": {
"lr": args.learning_rate, # 学习率
"weight_decay": args.weight_decay, # 权重衰减
}
},
"fp16": {
"enabled": True, # 开启fp16半精度训练
},
"gradient_clipping": 1.0, # 梯度裁剪
"prescale_gradients": False, # 是否在梯度更新前缩放梯度
"wall_clock_breakdown": False, # 是否输出deepspeed时间分析
}
return ds_config
# Pretrain数据预处理
python3 pretrain_data_process.py \
--model_name_or_path ./Llama-2-7b-hf \
--data_path ./opensource_final \
--save_dir data/FinCorpus_tokenized \
--max_length 4096 \
--num_proc 128
import torch
import json
from dataclasses import dataclass
from datasets import load_from_disk
from dxm_llm_main import log_dist
class JsonlDatasetPT(torch.utils.data.Dataset):
"""
用于加载jsonl格式的数据集,用于预训练任务。
"""
def __init__(self,
data_path, # 数据集路径
tokenizer, # 分词器实例
max_length, # 最大长度
):
# 加载数据集并进行tokenize
self.dataset = []
with open(data_path, 'r', encoding='utf-8') as f:
for line in f:
text = json.loads(line)['text']
# 使用tokenizer对句子进行tokenize
inputs = tokenizer.encode_plus(
text,
add_special_tokens=True,
max_length=max_length,
padding='max_length',
return_tensors='pt',
truncation=True
)
input_ids = inputs['input_ids'].squeeze() # shape: [max_length]
# 将tokenize后的样本添加到dataset中
self.dataset.append({
'input_ids': input_ids,
})
log_dist(f'Loaded {len(self.dataset)} examples from {data_path}')
def __len__(self):
# 返回数据集大小
return len(self.dataset)
def __getitem__(self, idx):
# 返回一个样本
return self.dataset[idx]
def get_pt_dataset(args):
"""
用于加载已tokenize后的数据集,用于预训练任务。
"""
# 从磁盘加载数据集,注意该数据集必须是通过save_to_disk()函数保存的
train_dataset = load_from_disk(args.data_path)
train_dataset = train_dataset.shuffle(seed=42)
return train_dataset
class JsonDatasetSFT(torch.utils.data.Dataset):
"""
用于加载json格式的数据集,用于指令微调任务。
"""
def __init__(self,
data_path, # 数据集路径
tokenizer, # 分词器实例
max_length, # 最大长度
):
super().__init__()
self.max_length = max_length
self.tokenizer = tokenizer
self.eos_token_id = tokenizer.eos_token_id
self.pad_token_id = tokenizer.pad_token_id
self.data = []
with open(data_path, 'r') as file:
for line in file:
sample = json.loads(line)
self.data.append({
"prompt": sample['instruction'],
"response": sample['response'],
})
log_dist(f'Loaded {len(self.data)} examples from {data_path}')
def __len__(self):
# 返回数据集大小
return len(self.data)
def __getitem__(self, idx):
# 返回一个样本
prompt = self.data[idx]['prompt']
response = self.data[idx]['response']
prompt = f"Human: {prompt}\nAssistant: "
# 使用tokenizer对句子进行tokenize
prompt_ids = self.tokenizer(prompt).input_ids
response_ids = self.tokenizer(response).input_ids
# prompt部分对应的label应为-100,表示不计算该部分的loss
input_ids = prompt_ids + [self.eos_token_id] + response_ids + [self.eos_token_id]
labels = [-100] * (len(prompt_ids) + 1) + response_ids + [self.eos_token_id]
if len(input_ids) > self.max_length:
# 超长的截断
input_ids = input_ids[: self.max_length]
labels = labels[: self.max_length]
else:
# 不足的填充padding至max_length
pad_len = self.max_length - len(input_ids)
input_ids += [self.pad_token_id] * pad_len
labels += [self.pad_token_id] * pad_len
input_ids = torch.LongTensor(input_ids)
labels = torch.LongTensor(labels)
attention_mask = input_ids.ne(self.pad_token_id)
return {
"input_ids": input_ids,
"labels": labels,
"attention_mask": attention_mask,
}
@dataclass
class DataCollatorForPT(object):
"""
Data collator函数,用于将多个样本拼接成一个batch,同时生成labels,用于计算loss。
该函数用于pretrain模式。
"""
pad_token_id: int = 0
ignore_index: int = -100
max_length: int = -1 # 默认不进行max_length截断
def __call__(self, instances: list) -> dict:
if self.max_length > 0:
input_ids = torch.stack([instance['input_ids'][:self.max_length] for instance in instances], dim=0) # shape: [batch_size, max_length]
else:
input_ids = torch.stack([instance['input_ids'] for instance in instances], dim=0) # shape: [batch_size, max_length]
labels = input_ids.clone()
# 将labels中的pad部分置为ignore_index,计算loss时要忽略
labels[labels == self.pad_token_id] = self.ignore_index
return dict(
input_ids=input_ids,
labels=labels,
)
import sys
import os
import deepspeed
import logging
import random
import numpy as np
import torch
from transformers import set_seed
from deepspeed import comm as dist
import time
from model_hook import * # 从model_hook.py文件中加载自定义的函数
# 定义日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
encoding='utf-8'
)
def log_dist(message: str, level: int = logging.INFO) -> None:
"""定义日志函数,只给特定rank的进程记录日志"""
my_rank = int(os.environ.get("RANK", "0"))
if my_rank % 8 == 0:
if level == logging.INFO:
logging.info(f"[rank{my_rank}] {message}")
if level == logging.ERROR:
logging.error(f"[rank{my_rank}] {message}")
if level == logging.DEBUG:
logging.debug(f"[rank{my_rank}] {message}")
def get_ds_model(args, dataloader_dict):
# 获取deepspeed配置
ds_config = get_ds_config(args)
# 加载模型
model = get_model_common(args)
# 计算模型参数量
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
log_dist(f"Finally total_params: {total_params} trainable_params: {trainable_params} ratio {trainable_params/total_params if total_params>0 else -1:.4%} ")
# 获取自定义的优化器和学习率调度器
op_lr_dict = get_op_lr(args, model, dataloader_dict)
if op_lr_dict is None:
lr_scheduler = None
optimizer = None
else:
lr_scheduler = op_lr_dict.get("lr_scheduler", None)
optimizer = op_lr_dict.get("optimizer", None)
# 初始化deepspeed
model, _, _, lr_scheduler = deepspeed.initialize(
model=model,
lr_scheduler=lr_scheduler,
optimizer=optimizer,
model_parameters=filter(lambda p: p.requires_grad, model.parameters()),
config=ds_config
)
log_dist("deepspeed initialize finished.")
# 设置梯度检查点
if args.gradient_checkpointing:
model.gradient_checkpointing_enable()
return model
# 设置所有随机种子,保证运行结果可复现
def seed_all(seed):
if seed is not None:
set_seed(seed)
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
def save_hf_format(model, tokenizer, args, sub_folder=""):
"""
保存模型为huggingface格式,以便后续可以用hf.from_pretrained加载
"""
model_to_save = model.module if hasattr(model, 'module') else model
output_dir = os.path.join(args.save_name, sub_folder)
os.makedirs(output_dir, exist_ok=True)
output_model_file = os.path.join(output_dir, "pytorch_model.bin")
output_config_file = os.path.join(output_dir, "config.json")
state_dict = model_to_save.state_dict()
config = model_to_save.config
torch.save(state_dict, output_model_file) # 保存模型权重:pytorch_model.bin
config.to_json_file(output_config_file) # 保存config配置文件:config.json
tokenizer.save_pretrained(output_dir) # 保存tokenizer
print('=====================================')
print(f'Model saved at: {output_dir}')
print('=====================================')
def main():
# 解析命令行参数
args = parse_args()
if args.local_rank == 0:
# 创建保存模型的文件夹
os.makedirs(args.save_name, exist_ok=True)
# 设置所有随机种子,保证运行结果可复现
seed_all(args.seed)
# 初始化deepspeed分布式训练环境
if args.local_rank > -1 and torch.cuda.is_available():
# 如果是分布式训练,则使用cuda
torch.cuda.set_device(args.local_rank)
device = torch.device("cuda", args.local_rank)
print(f'local_rank={args.local_rank} device={device}')
deepspeed.init_distributed()
args.global_rank = dist.get_rank()
print(f"global rank:{args.global_rank} local rank: {args.local_rank}")
else:
# 如果不是分布式训练,则使用cpu
device = torch.device("cpu")
# 加载dataloader,获取训练数据
dataloader_dict = get_dataloader_common(args)
# 加载模型
model = get_ds_model(args, dataloader_dict)
model.train() # 设置为train模式
dataloader_dict["device"] = device
# 在训练开始前运行用户自定义的函数
before_train(args, model, dataloader_dict)
if args.gradient_accumulation_steps >= 1:
args.log_steps = args.log_steps * args.gradient_accumulation_steps
args.save_steps = args.save_steps * args.gradient_accumulation_steps
for epoch in range(0, args.epochs):
dataloader_dict["sampler"].set_epoch(epoch) # 为sampler设置epoch\
train_dataloader = dataloader_dict["train_dataloader"]
tic = time.time()
num_total_steps = len(train_dataloader)
for step, batch in enumerate(train_dataloader):
batch = {k: v.to(device) for k, v in batch.items()} # 将batch中的数据转移到device上
outputs = model(use_cache=False, **batch) # 前向计算
loss = outputs['loss'] # 获取loss
model.backward(loss) # 反向传播
model.step() # deepspeed更新模型参数
# 每隔一定step打印一次日志
if step % args.log_steps == 0:
time_per_step = (time.time() - tic) / args.log_steps
speed = args.per_device_train_batch_size * args.max_length / time_per_step
real_step = step
# 如果使用了梯度累积,则需要将step除以梯度累积步数
if args.gradient_accumulation_steps >= 1:
real_step = step / args.gradient_accumulation_steps
log_dist(f"epoch{epoch} step{int(real_step)}/{num_total_steps} loss: {loss:.4f}")
tic = time.time() # 重置计时器
# 每隔一定step保存一次模型
if step > 0 and args.save_steps > 0 and step % args.save_steps == 0:
# 保存模型
log_dist(f'save model at epoch {epoch} step {step}')
if args.global_rank == 0:
save_hf_format(
model, dataloader_dict['tokenizer'], args,
sub_folder=f'epoch{epoch}_step-{step}-hf'
)
# 在每个step结束时运行用户自定义的函数
on_step_end(args, model, dataloader_dict, step, epoch, outputs)
# epoch结束时保存模型
log_dist(f"save model at end of epoch {epoch}")
if args.global_rank == 0:
save_hf_format(model, dataloader_dict['tokenizer'], args,
sub_folder=f'epoch{epoch}_step-{step}-hf'
)
# 在每个epoch结束时运行用户自定义的函数
on_epoch_end(args, model, dataloader_dict, epoch)
log_dist("Training finished")
# 在训练结束时运行用户自定义的函数
after_train(args, model, dataloader_dict)
if __name__ == "__main__":
main()
import time
from transformers import (
AutoModelForCausalLM, AutoTokenizer,
LlamaForCausalLM, LlamaTokenizer,
BloomForCausalLM, BloomTokenizerFast,
)
from torch.utils.data import DataLoader, DistributedSampler
from dataset import get_pt_dataset, DataCollatorForPT, JsonDatasetSFT
from dxm_llm_main import log_dist
from config import get_deepspeed_config, parse_arguments
def get_tokenizer(args):
'''
加载tokenizer
'''
# 对于llama系列模型使用LlamaTokenizer类
if 'llama' in args.model_name_or_path.lower():
tokenizer = LlamaTokenizer.from_pretrained(args.model_name_or_path)
# 对于bloom系列模型使用BloomTokenizerFast类
elif 'bloom' in args.model_name_or_path.lower():
tokenizer = BloomTokenizerFast.from_pretrained(args.model_name_or_path)
else:
tokenizer = AutoTokenizer.from_pretrained(args.model_name_or_path, fast_tokenizer=True)
# 将分词器的pad_token设置为eos_token,以便正确处理填充(padding)
tokenizer.pad_token = tokenizer.eos_token
return tokenizer
def get_model_common(args):
"""
获取并加载模型文件
"""
log_dist('=================== Loading Model =====================')
log_dist(f"loading model from {args.model_name_or_path}")
tic = time.time()
# 对于llama系列模型使用 LlamaForCausalLM 类
if 'llama' in args.model_name_or_path.lower():
model = LlamaForCausalLM.from_pretrained(args.model_name_or_path)
# 对于bloom系列模型使用 BloomForCausalLM 类
elif 'bloom' in args.model_name_or_path.lower():
model = BloomForCausalLM.from_pretrained(args.model_name_or_path)
else:
model = AutoModelForCausalLM.from_pretrained(args.model_name_or_path, trust_remote_code=True)
log_dist(f'model loaded. costtime={time.time()-tic:.2f}s')
log_dist(f"model = {model}")
return model
def get_dataloader_common(args):
'''
用于创建数据加载器(DataLoader)和数据集
'''
tokenizer = get_tokenizer(args)
log_dist(f'==================== Loading dataset =================')
tic = time.time()
if args.train_mode == 'pretrain':
# 对于已预处理过的语料数据,直接使用load_from_disk()函数加载即可
train_dataset = get_pt_dataset(args)
collator = DataCollatorForPT(pad_token_id=tokenizer.pad_token_id)
elif args.train_mode == 'sft':
train_dataset = JsonDatasetSFT(args.data_path, tokenizer, args.max_length)
collator = None
else:
raise ValueError(f"train_mode {args.train_mode} is not supported")
# 进行数据集的分布式随机采样,确保在多GPU训练时数据的随机性
sampler = DistributedSampler(train_dataset, shuffle=True, seed=args.seed)
train_dataloader = DataLoader(
train_dataset,
batch_size=args.per_device_train_batch_size,
num_workers=16, # 指定16个核并行处理
sampler=sampler,
collate_fn=collator,
)
log_dist(f"Dataset Loaded: {args.data_path} costtime={time.time()-tic:.2f}s")
log_dist(f" Num samples: {len(train_dataset)}")
log_dist(f" Num Tokens: {len(train_dataset) * args.max_length / 1e9:.2f}B")
log_dist(f" Total Steps: {len(train_dataloader)}")
return {
"sampler": sampler,
"train_dataloader": train_dataloader,
"tokenizer": tokenizer
}
def get_ds_config(args):
'''
用于获取 DeepSpeed 的配置参数
'''
ds_config = get_deepspeed_config(args) # 获取deepspeed的配置参数,在config.py中定义
return ds_config
def parse_args():
'''
解析命令行参数
'''
args = parse_arguments() # 解析命令行参数的函数,在config.py中定义
log_dist('============== 参数 ====================')
for k, v in vars(args).items():
log_dist(f' {k} = {v}')
log_dist('=======================================')
return args
def get_op_lr(args, origin_model, dataloader_dict):
'''
获取优化器和学习率
'''
return None
def before_train(args, model_engine, dataloader_dict):
'''
在训练开始前执行
'''
pass
def on_step_end(args, model_engine, dataloader_dict, step_num, epoch_num, outputs):
'''
在每个训练步骤结束时执行
'''
pass
def on_epoch_end(args, model_engine, dataloader_dict, epoch_num):
'''
在每个训练周期(epoch)结束时执行
'''
pass
def after_train(args, model_engine, dataloader_dict):
'''
在整个训练过程结束时执行
'''
pass
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment