Commit f7db21eb authored by lvzhen's avatar lvzhen
Browse files

first

parents
Pipeline #1580 canceled with stages
{
"datas/single_turn_example.jsonl": 2.0,
"datas/multi_turn_example.jsonl": 1.0
}
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
#!/usr/bin/env python
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
import argparse
import os
import math
import sys
import torch
from torch.utils.data import DataLoader, RandomSampler
from torch.utils.data.distributed import DistributedSampler
import torch.nn.functional as F
from transformers import (
SchedulerType,
default_data_collator,
get_scheduler,
AutoTokenizer,
AutoModelForCausalLM,
AutoConfig,
)
# from .transformers import AutoModelForCausalLM
# import torch.distributed as dist
import deepspeed
from deepspeed.ops.adam import DeepSpeedCPUAdam, FusedAdam
from transformers.deepspeed import HfDeepSpeedConfig
import json
sys.path.append(
os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
from utils.data.data_utils import create_prompt_dataset, get_dataset
from utils.utils import print_rank_0, to_device, save_hf_format, set_random_seed, get_all_reduce_mean, get_optimizer_grouped_parameters, save_zero_three_model
from utils.ds_utils import get_train_ds_config
from utils.module.lora import convert_linear_layer_to_lora, convert_lora_to_linear_layer, only_optimize_lora_parameters, recover_lora, mark_only_lora_as_trainable, make_model_gradient_checkpointing_compatible
def parse_args():
parser = argparse.ArgumentParser(
description=
"Finetune a transformers model on a causal language modeling task")
parser.add_argument('--data_path',
type=str,
required=True,
help='Path to the training dataset.')
parser.add_argument(
"--model_name_or_path",
type=str,
help=
"Path to pretrained model or model identifier from huggingface.co/models.",
required=True,
)
parser.add_argument(
"--per_device_train_batch_size",
type=int,
default=1,
help="Batch size (per device) for the training dataloader.",
)
parser.add_argument(
"--max_seq_len",
type=int,
default=512,
help="The maximum sequence length.",
)
parser.add_argument(
"--learning_rate",
type=float,
default=1e-5,
help=
"Initial learning rate (after the potential warmup period) to use.",
)
parser.add_argument("--weight_decay",
type=float,
default=0.,
help="Weight decay to use.")
parser.add_argument("--num_train_epochs",
type=int,
default=1,
help="Total number of training epochs to perform.")
parser.add_argument(
"--gradient_accumulation_steps",
type=int,
default=1,
help=
"Number of updates steps to accumulate before performing a backward/update pass.",
)
parser.add_argument(
"--lr_scheduler_type",
type=SchedulerType,
default="cosine",
help="The scheduler type to use.",
choices=[
"linear", "cosine", "cosine_with_restarts", "polynomial",
"constant", "constant_with_warmup"
],
)
parser.add_argument(
"--warmup_proportion",
type=float,
default=0.1,
help="Proportion of steps for the warmup in the lr scheduler.")
parser.add_argument("--output_dir",
type=str,
default=None,
help="Where to store the model.")
parser.add_argument("--seed",
type=int,
default=1234,
help="A seed for reproducible training.")
parser.add_argument("--local_rank",
type=int,
default=-1,
help="local_rank for distributed training on gpus")
parser.add_argument("--mark_only_lora_as_trainable",
action='store_true',
help="mark only lora as trainable")
parser.add_argument('--gradient_checkpointing',
action='store_true',
help='Enable HF gradient checkpointing for model.')
parser.add_argument('--with_loss_mask',
action='store_true',
help='Whether use loss mask in training phrase')
parser.add_argument("--user_token",
type=str,
default="<_user>",
help="user token")
parser.add_argument("--bot_token",
type=str,
default="<_bot>",
help="bot token")
parser.add_argument("--end_token",
type=str,
default="<_end>",
help="end token")
parser.add_argument('--disable_dropout',
action='store_true',
help='Disable the dropout of the model.')
parser.add_argument("--save_steps",
type=int,
help="Save model steps")
# deepspeed features
parser.add_argument('--offload',
action='store_true',
help='Enable ZeRO Offload techniques.')
parser.add_argument(
'--zero_stage',
type=int,
default=0,
help='ZeRO optimization stage for Actor model (and clones).')
## LoRA for efficient training setting
parser.add_argument("--lora_dim",
type=int,
default=0,
help="If > 0, use LoRA for efficient training.")
parser.add_argument("--lora_scaling",
type=int,
default=1,
help="use for scaling LoRA matrix.")
parser.add_argument("--lora_module_name",
type=str,
default="decoder.layers.",
help="The scope of LoRA.")
parser.add_argument('--only_optimize_lora',
action='store_true',
help='Only optimize the LoRA parameters.')
parser.add_argument('--precision',
choices=['fp16', 'bf16'],
required=True,
help='Choose the mixed precision type while training')
parser.add_argument("--gradient_checkpointing_use_reentrant", action="store_true")
parser = deepspeed.add_config_arguments(parser)
args = parser.parse_args()
# Validate settings
if args.gradient_checkpointing and args.lora_dim > 0:
assert (
not args.only_optimize_lora
), "--gradient_checkpointing and --only_optimize_lora cannot be enabled at the same time."
return args
def load_telechat_tokenizer(model_name_or_path, fast_tokenizer=True):
tokenizer = AutoTokenizer.from_pretrained(model_name_or_path,
fast_tokenizer=fast_tokenizer,
padding_side="left",
trust_remote_code=True)
return tokenizer
def create_hf_telechat(model_name_or_path,
precision,
ds_config=None,
disable_dropout=False):
model_config = AutoConfig.from_pretrained(model_name_or_path, trust_remote_code=True)
if disable_dropout:
model_config.dropout = 0.0
# Note: dschf is defined in function scope to avoid global effects
# https://huggingface.co/docs/transformers/main_classes/deepspeed#nontrainer-deepspeed-integration
if ds_config is not None and ds_config["zero_optimization"]["stage"] == 3:
dschf = HfDeepSpeedConfig(ds_config)
else:
dschf = None
model = AutoModelForCausalLM.from_pretrained(model_name_or_path,
trust_remote_code=True,
config=model_config,
torch_dtype=torch.float16 if precision == "fp16" else torch.bfloat16)
return model
def masked_cross_entropy_loss(logits, labels, loss_mask):
shift_logits = logits[..., :-1, :].contiguous()
shift_labels = labels[..., 1:].contiguous()
shift_loss_mask = loss_mask[..., 1:].contiguous()
shift_logits = F.log_softmax(shift_logits, dim=-1)
loss = -torch.gather(shift_logits, dim=-1, index=shift_labels.unsqueeze(-1)).squeeze(-1)
shift_loss_mask = shift_loss_mask.view(-1)
expected_number_of_tokens = shift_loss_mask.sum()
loss = torch.sum(loss.view(-1) * shift_loss_mask) / expected_number_of_tokens
return loss
def loss_fn(logits, labels, user_token_id, bot_token_id, end_token_id):
# unpack
loss_mask = torch.zeros(labels.size(), device=labels.device)
indices_user = torch.where(labels == user_token_id)[1].tolist()
indices_bot = torch.where(labels == bot_token_id)[1].tolist()
indices_end = torch.where(labels == end_token_id)[1].tolist()
assert len(indices_user) != 0
assert len(indices_user) == len(indices_bot) == len(indices_end)
for i in range(len(indices_bot)):
bot_idx = indices_bot[i]
end_idx = indices_end[i]
user_idx = indices_user[i]
loss_mask[0][bot_idx:end_idx + 1] = 1
loss_mask[0][user_idx] = 1
loss = masked_cross_entropy_loss(logits, labels, loss_mask)
return loss
def main():
args = parse_args()
if args.local_rank == -1:
device = torch.device("cuda")
else:
torch.cuda.set_device(args.local_rank)
device = torch.device("cuda", args.local_rank)
# Initializes the distributed backend which will take care of sychronizing nodes/GPUs
# torch.distributed.init_process_group(backend='nccl')
deepspeed.init_distributed()
args.global_rank = torch.distributed.get_rank()
ds_config = get_train_ds_config(offload=args.offload,
stage=args.zero_stage,
precision=args.precision)
ds_config[
'train_micro_batch_size_per_gpu'] = args.per_device_train_batch_size
ds_config[
'train_batch_size'] = args.per_device_train_batch_size * torch.distributed.get_world_size(
) * args.gradient_accumulation_steps
loss_update_steps = args.per_device_train_batch_size * args.gradient_accumulation_steps
# If passed along, set the training seed now.
set_random_seed(args.seed)
torch.distributed.barrier()
tokenizer = load_telechat_tokenizer(args.model_name_or_path, fast_tokenizer=True)
args.user_token_id = tokenizer.convert_tokens_to_ids(args.user_token)
args.bot_token_id = tokenizer.convert_tokens_to_ids(args.bot_token)
args.end_token_id = tokenizer.convert_tokens_to_ids(args.end_token)
model = create_hf_telechat(args.model_name_or_path,
args.precision,
ds_config,
disable_dropout=args.disable_dropout)
if args.lora_dim > 0:
model = convert_linear_layer_to_lora(model, args.lora_module_name, args.lora_scaling,
args.lora_dim)
if args.mark_only_lora_as_trainable:
mark_only_lora_as_trainable(model, 'lora_only')
make_model_gradient_checkpointing_compatible(model)
if args.only_optimize_lora:
model = only_optimize_lora_parameters(model)
# Prepare the data
print(f"train_fname:{args.data_path}")
assert os.path.exists(args.data_path), "Please process data first!"
torch.distributed.barrier()
train_dataset = get_dataset(args.data_path, args.seed)
# DataLoaders creation:
if args.local_rank == -1:
train_sampler = RandomSampler(train_dataset)
else:
train_sampler = DistributedSampler(train_dataset)
train_dataloader = DataLoader(train_dataset,
collate_fn=default_data_collator,
sampler=train_sampler,
batch_size=args.per_device_train_batch_size)
# Split weights in two groups, one with weight decay and the other not.
optimizer_grouped_parameters = get_optimizer_grouped_parameters(
model, args.weight_decay)
AdamOptimizer = DeepSpeedCPUAdam if args.offload else FusedAdam
optimizer = AdamOptimizer(optimizer_grouped_parameters,
lr=args.learning_rate,
betas=(0.9, 0.95),
eps=1e-5)
num_update_steps_per_epoch = math.ceil(
len(train_dataloader) / args.gradient_accumulation_steps)
num_warmup_steps = int(args.warmup_proportion * args.num_train_epochs * num_update_steps_per_epoch)
lr_scheduler = get_scheduler(
name=args.lr_scheduler_type,
optimizer=optimizer,
num_warmup_steps=num_warmup_steps,
num_training_steps=args.num_train_epochs * num_update_steps_per_epoch,
)
model, optimizer, _, lr_scheduler = deepspeed.initialize(
model=model,
optimizer=optimizer,
args=args,
config=ds_config,
lr_scheduler=lr_scheduler,
dist_init_required=True)
if args.gradient_checkpointing:
model.gradient_checkpointing_enable()
# model.gradient_checkpointing_enable(
# gradient_checkpointing_kwargs={"use_reentrant": args.gradient_checkpointing_use_reentrant}
# )
# Train!
print_rank_0("***** Running training *****", args.global_rank)
global_step = 0
cur_batch_loss = 0.0
for epoch in range(args.num_train_epochs):
print_rank_0(
f"Beginning of Epoch {epoch+1}/{args.num_train_epochs}, Total Micro Batches {len(train_dataloader)}",
args.global_rank)
model.train()
total_loss = 0.0
for step, batch in enumerate(train_dataloader):
batch = to_device(batch, device)
if args.with_loss_mask:
outputs = model(batch["input_ids"], attention_mask=batch["attention_mask"], use_cache=False)
logits = outputs.logits
loss = loss_fn(logits, batch["labels"], args.user_token_id, args.bot_token_id, args.end_token_id)
else:
outputs = model(**batch, use_cache=False)
loss = outputs.loss
model.backward(loss)
model.step()
torch.distributed.reduce(loss, 0)
total_loss += loss
if (step + 1) % loss_update_steps == 0:
cur_batch_loss = total_loss / (loss_update_steps * torch.distributed.get_world_size())
print_rank_0(f"epoch:{epoch+1}, global_step:{global_step+1}, step:{step+1} cur_batch_loss: {cur_batch_loss}", args.global_rank)
global_step += 1
total_loss = 0.0
if global_step > 0 and global_step % args.save_steps == 0:
if args.output_dir is not None:
print_rank_0(f'saving step {global_step} model ...', args.global_rank)
if args.lora_dim > 0:
model = convert_lora_to_linear_layer(model)
print_rank_0('convert lora to linear layer successfully!', args.global_rank)
if args.zero_stage < 3 and args.global_rank <= 0:
save_hf_format(model, tokenizer, args, f"global_step_{global_step}_loss_{cur_batch_loss:.4f}")
if args.zero_stage == 3:
# For zero stage 3, each gpu only has a part of the model, so we need a special save function
save_zero_three_model(model, tokenizer, args, f"global_step_{global_step}_loss_{cur_batch_loss:.4f}")
print_rank_0('save successfully!', args.global_rank)
if args.lora_dim > 0:
print_rank_0('recovering lora...', args.global_rank)
model = recover_lora(model)
print_rank_0('recover successfully!', args.global_rank)
model.tput_timer.update_epoch_count()
if args.output_dir is not None:
print_rank_0('saving the final model ...', args.global_rank)
if args.lora_dim > 0:
model = convert_lora_to_linear_layer(model)
print_rank_0('convert lora to linear layer successfully!', args.global_rank)
if args.zero_stage < 3 and args.global_rank == 0:
save_hf_format(model, tokenizer, args)
if args.zero_stage == 3:
# For zero stage 3, each gpu only has a part of the model, so we need a special save function
save_zero_three_model(model, tokenizer, args)
print_rank_0('save successfully!', args.global_rank)
if __name__ == "__main__":
main()
node1 slots=8
node2 slots=8
\ No newline at end of file
#!/usr/bin/env python
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
import argparse
import os
import sys
from transformers import AutoTokenizer
sys.path.append(
os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
from utils.data.data_utils import create_prompt_dataset
from utils.utils import set_random_seed
def parse_args():
parser = argparse.ArgumentParser(
description=
"Finetune a transformers model on a causal language modeling task")
parser.add_argument('--data_path',
type=str,
required=True,
help='A json file store dataset path and weight')
parser.add_argument(
'--data_output_path',
type=str,
default='/tmp/data_files/',
help='Where to save the processed data.'
)
parser.add_argument(
"--tokenizer_path",
type=str,
help=
"Path to the tokenizer",
required=True,
)
parser.add_argument(
"--max_seq_len",
type=int,
default=512,
help="The maximum sequence length.",
)
parser.add_argument("--seed",
type=int,
default=1234,
help="A seed for reproducible training.")
parser.add_argument("--user_token",
type=str,
default="<_user>",
help="user token")
parser.add_argument("--bot_token",
type=str,
default="<_bot>",
help="bot token")
parser.add_argument("--end_token",
type=str,
default="<_end>",
help="end token")
parser.add_argument("--num_workers",
type=int,
default=5,
help="Number of workers when tokenizing dataset")
parser.add_argument("--num_samples",
type=int,
required=True,
help="Number of samples while training")
parser.add_argument('--process_method',
choices=['single', 'multiple'],
required=True,
help='Choose the method (multiple process or single process) while processing dataset, note that'
'when using both multi-process and multi-nodes, you should have a shared system.')
parser.add_argument("--gradient_checkpointing_use_reentrant", action="store_true")
args = parser.parse_args()
return args
def load_telechat_tokenizer(tokenizer_path, fast_tokenizer=True):
tokenizer = AutoTokenizer.from_pretrained(tokenizer_path,
fast_tokenizer=fast_tokenizer,
padding_side="left",
trust_remote_code=True)
return tokenizer
def main():
args = parse_args()
set_random_seed(args.seed)
tokenizer = load_telechat_tokenizer(args.tokenizer_path, fast_tokenizer=True)
args.user_token_id = tokenizer.convert_tokens_to_ids(args.user_token)
args.bot_token_id = tokenizer.convert_tokens_to_ids(args.bot_token)
args.end_token_id = tokenizer.convert_tokens_to_ids(args.end_token)
create_prompt_dataset(
args.data_path,
args.data_output_path,
args.seed,
tokenizer,
args.max_seq_len,
args.num_workers,
args.num_samples,
args.process_method,
args)
print("Finish processing data!")
if __name__ == "__main__":
main()
#!/bin/bash
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
source /workspace/dtk-24.04.1-miopen/env.sh
export HIP_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
export HSA_FORCE_FINE_GRAIN_PCIE=1
OUTPUT=telechat-lora-test
ZERO_STAGE=2
MAX_LEN=4096
NUM_SAMPLES=1000
DATA_OUTPUT_PATH=datas/data_files
MODEL_PATH=$1
if [ "$OUTPUT" == "" ]; then
OUTPUT=./output
fi
if [ "$ZERO_STAGE" == "" ]; then
ZERO_STAGE=3
fi
mkdir -p $OUTPUT
python -u process_data.py \
--data_path data.json \
--tokenizer_path $MODEL_PATH \
--data_output_path $DATA_OUTPUT_PATH \
--max_seq_len $MAX_LEN \
--num_samples $NUM_SAMPLES \
--num_workers 10 \
--process_method multiple \
--seed 42
deepspeed --master_port 29500 main.py \
--data_path $DATA_OUTPUT_PATH \
--model_name_or_path $MODEL_PATH \
--with_loss_mask \
--per_device_train_batch_size 1 \
--max_seq_len $MAX_LEN \
--learning_rate 3e-5 \
--weight_decay 0.0001 \
--num_train_epochs 1 \
--gradient_accumulation_steps 4 \
--lr_scheduler_type cosine \
--precision fp16 \
--warmup_proportion 0.1 \
--gradient_checkpointing \
--seed 42 \
--zero_stage $ZERO_STAGE \
--save_steps 1 \
--deepspeed \
--lora_dim 8 \
--mark_only_lora_as_trainable \
--lora_module_name "self_attention." \
--output_dir $OUTPUT \
2>&1 | tee $OUTPUT/training.log
#!/bin/bash
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
OUTPUT=test
ZERO_STAGE=3
HOST=my_hostfile
MAX_LEN=8192
NUM_SAMPLES=1000
DATA_OUTPUT_PATH=datas/data_files
MODEL_PATH=$1
if [ "$OUTPUT" == "" ]; then
OUTPUT=./output
fi
if [ "$ZERO_STAGE" == "" ]; then
ZERO_STAGE=3
fi
mkdir -p $OUTPUT
python -u process_data.py \
--data_path data.json \
--tokenizer_path $MODEL_PATH \
--data_output_path $DATA_OUTPUT_PATH \
--max_seq_len $MAX_LEN \
--num_samples $NUM_SAMPLES \
--num_workers 10 \
--process_method multiple \
--seed 42
deepspeed --master_port 29500 --hostfile=$HOST main.py \
--data_path $DATA_OUTPUT_PATH \
--model_name_or_path $MODEL_PATH \
--with_loss_mask \
--per_device_train_batch_size 1 \
--max_seq_len $MAX_LEN \
--learning_rate 3e-5 \
--weight_decay 0.0001 \
--num_train_epochs 1 \
--gradient_accumulation_steps 4 \
--lr_scheduler_type cosine \
--precision fp16 \
--warmup_proportion 0.1 \
--gradient_checkpointing \
--seed 42 \
--zero_stage $ZERO_STAGE \
--save_steps 100 \
--deepspeed \
--output_dir $OUTPUT \
2>&1 | tee $OUTPUT/training.log
#!/bin/bash
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
OUTPUT=test
ZERO_STAGE=3
MAX_LEN=4096
NUM_SAMPLES=1000
DATA_OUTPUT_PATH=datas/data_files
MODEL_PATH=$1
if [ "$OUTPUT" == "" ]; then
OUTPUT=./output
fi
if [ "$ZERO_STAGE" == "" ]; then
ZERO_STAGE=3
fi
mkdir -p $OUTPUT
python -u process_data.py \
--data_path data.json \
--tokenizer_path $MODEL_PATH \
--data_output_path $DATA_OUTPUT_PATH \
--max_seq_len $MAX_LEN \
--num_samples $NUM_SAMPLES \
--num_workers 10 \
--process_method multiple \
--seed 42
deepspeed --master_port 29500 main.py \
--data_path $DATA_OUTPUT_PATH \
--model_name_or_path $MODEL_PATH \
--with_loss_mask \
--per_device_train_batch_size 1 \
--max_seq_len $MAX_LEN \
--learning_rate 3e-5 \
--weight_decay 0.0001 \
--num_train_epochs 1 \
--gradient_accumulation_steps 4 \
--lr_scheduler_type cosine \
--precision fp16 \
--warmup_proportion 0.1 \
--gradient_checkpointing \
--seed 42 \
--zero_stage $ZERO_STAGE \
--save_steps 100 \
--deepspeed \
--output_dir $OUTPUT \
2>&1 | tee $OUTPUT/training.log
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
"""
Part of the code was adopted from https://github.com/microsoft/Megatron-DeepSpeed/blob/main/megatron/data/dataset_utils.py
"""
import torch
from torch.utils.data import Dataset, Subset, ConcatDataset
import numpy as np
import os
import hashlib
from . import raw_datasets
from tqdm import tqdm
from random import shuffle
import json
import re
import math
import random
from multiprocessing import Pool
from functools import partial
from itertools import chain
import glob
def get_raw_dataset(dataset_name, output_path, seed):
return raw_datasets.TelechatDataset(output_path, seed, dataset_name)
def get_shuffle_idx(seed, size):
np_rng = np.random.RandomState(seed=seed)
dtype_ = np.uint32
if size >= (np.iinfo(np.uint32).max - 1):
dtype_ = np.int64
shuffle_idx = np.arange(start=0, stop=size, step=1, dtype=dtype_)
np_rng.shuffle(shuffle_idx)
return shuffle_idx
class PromptDataset(Dataset):
def __init__(self, chosen_dataset) -> None:
super().__init__()
self.dataset = chosen_dataset
def __len__(self):
length = len(self.dataset)
return length
def __getitem__(self, idx):
return {
"input_ids": self.dataset[idx]["input_ids"],
"attention_mask": self.dataset[idx]["attention_mask"],
"labels": self.dataset[idx]["input_ids"]
}
def get_weight_data(current_dataset, dataset_weight):
dataset = []
all_lines = []
for i, tmp_data in enumerate(current_dataset):
if dataset_weight < 1.0 and random.random() > dataset_weight: continue
input = tmp_data['input']
input = re.sub(r"^<_user>", "", input, flags=re.S)
input = "<_user>" + input
output = tmp_data['output']
output = re.sub(r"^<_bot>", "", output, flags=re.S)
if "<_bot>" in input: ### multiturn
concat_line = ""
input_turns = input.split("<_user>")[1:]
for item in input_turns:
if "<_bot>" in item:
concat_line += "<_user>" + item + "<_end>"
else:
concat_line += "<_user>" + item + "<_bot>"
concat_line += output + "<_end>"
else: ####single turn
concat_line = str(input) + "<_bot>" + str(output) + "<_end>"
assert concat_line.count("<_user>") == concat_line.count("<_bot>") == concat_line.count("<_end>")
if dataset_weight < 1.0:
all_lines.append(concat_line)
else:
weight_integer = math.floor(dataset_weight)
weight_decimal = dataset_weight - weight_integer
for i in range(math.floor(dataset_weight)):
all_lines.append(concat_line)
if random.random() < weight_decimal:
all_lines.append(concat_line)
return all_lines
def create_dataset( dataset_name, dataset_weight, output_path, seed):
raw_dataset = get_raw_dataset(dataset_name, output_path, seed)
train_dataset = raw_dataset.get_train_data()
train_dataset = get_weight_data(train_dataset, dataset_weight)
return train_dataset
def process_concat_data(text, tokenizer, max_seq_len, args):
texts = text.split("<_end>")
sentence_ids = []
for text in texts:
if text != '':
input, output = text.split("<_bot>")
input = re.sub(r"^<_user>", "", input, flags=re.S)
input_ids = [args.user_token_id] + tokenizer(input)["input_ids"]
output_ids = [args.bot_token_id] + tokenizer(output)["input_ids"] + [args.end_token_id]
sentence_ids += (input_ids + output_ids)
sentence_ids = [3] * (max_seq_len - len(sentence_ids)) + sentence_ids
return {"input_ids": torch.tensor(sentence_ids), "attention_mask": torch.ones(len(sentence_ids))}
def process(id, samples, tokenizer, max_seq_len, num_workers, num_samples, output_path, args):
cnt = 0
sample_nums = num_samples
all_lines = []
dataset = []
train_fname = os.path.join(output_path, f"train_data_{id}.pt")
while cnt < sample_nums // num_workers:
index = id
single_process_length = len(samples) // num_workers
#### 统计所有句子的长度
lengths = []
chunk_size = 1
all_lines_shard = samples[index * single_process_length:(index + 1) * single_process_length] if index < num_workers - 1 \
else samples[index * single_process_length:]
all_lines_chunk_list = [all_lines_shard[i:i + chunk_size] for i in range(0, len(all_lines_shard), chunk_size)]
for i in tqdm(range(len(all_lines_chunk_list))):
encoded_batch = tokenizer.batch_encode_plus(all_lines_chunk_list[i], padding=False)
for j in range(len(encoded_batch["input_ids"])):
lengths.append(len(encoded_batch["input_ids"][j]))
all_lines_and_length = []
for i, item in tqdm(enumerate(all_lines_shard)):
if lengths[i] < max_seq_len - 10: ###只有小于maxlen的才可以被处理
all_lines_and_length.append((item, lengths[i]))
pool = all_lines_and_length
min_threshold = min(lengths)
pad_count = 0
tot = 0
pbar = tqdm(total=len(pool), desc=f"Processing {id}, Concating dataset", disable=(id != 0))
while pool:
ptr = 0
buffer_len = 0
buffer = []
while ptr < len(pool) and (max_seq_len - buffer_len) > min_threshold:
if pool[ptr][1] + buffer_len < max_seq_len - 10: ####至少留10个padding
buffer_len += pool[ptr][1]
buffer.append(pool[ptr][0])
pool.pop(ptr)
pbar.update(1)
else:
ptr += 1
buffer_text = "".join(buffer)
output = buffer_text
pad_count += (max_seq_len - buffer_len)
tot += 1
assert output.count("<_user>") == output.count("<_bot>") == output.count("<_end>")
if output.count("<_user>") == output.count("<_bot>") == output.count("<_end>") and output.count(
"<_user>") >= 1:
all_lines.append(output)
cnt += 1
if cnt >= sample_nums // num_workers: break
pbar.close()
for line in tqdm(all_lines, desc="Convert token ids", disable=(id != 0)):
tokens = process_concat_data(line, tokenizer, max_seq_len, args)
dataset.append(tokens)
train_dataset = PromptDataset(dataset)
torch.save(train_dataset, train_fname)
return dataset
def create_prompt_dataset(data_path,
output_path,
seed,
tokenizer,
max_seq_len,
num_workers,
num_samples,
process_method,
args):
"""
Creates the dataset
"""
os.makedirs(output_path, exist_ok=True)
with open(data_path, "r", encoding="utf-8") as f: data_dic = json.load(f)
train_datasets = []
train_size = 0
for dataset_name, dataset_weight in data_dic.items():
train_dataset = create_dataset(
dataset_name, dataset_weight,
output_path, seed)
train_datasets.extend(train_dataset)
train_size += len(train_dataset)
shuffle(train_datasets)
if process_method == "multiple":
with Pool(processes=num_workers) as pool:
partial_process = partial(process, samples=train_datasets,
tokenizer=tokenizer, max_seq_len=max_seq_len,
num_workers=num_workers, num_samples=num_samples,
output_path=output_path, args=args)
pool.map(partial_process, [i for i in range(num_workers)])
else:
process(0, train_datasets, tokenizer, max_seq_len, 1, num_samples, output_path, args)
def get_dataset(data_path, seed):
files = glob.glob(os.path.join(data_path, "train_data*.pt"))
assert len(files) > 0, "There is no data here!"
train_datasets = []
train_size = 0
for file in files:
train_dataset = torch.load(file)
train_datasets.append(train_dataset)
train_size += len(train_dataset)
train_dataset = ConcatDataset(train_datasets)
shuffle_idx = get_shuffle_idx(seed, train_size)
train_dataset = Subset(train_dataset, shuffle_idx.tolist())
return train_dataset
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
from datasets import load_dataset
class PromptRawDataset(object):
def __init__(self, output_path, seed, dataset_name):
self.output_path = output_path
self.seed = seed
self.raw_datasets = load_dataset(path="json",data_files=dataset_name)
def get_train_data(self):
return
def get_eval_data(self):
return
def get_prompt(self, sample):
return
def get_prompt_and_answer(self, sample):
return
class TelechatDataset(PromptRawDataset):
def __init__(self, output_path, seed, dataset_name):
super().__init__(output_path, seed, dataset_name)
self.dataset_name = dataset_name
def get_train_data(self):
dataset = self.raw_datasets["train"]
return dataset
def get_eval_data(self):
dataset = self.raw_datasets["train"]
return dataset
def get_prompt(self, sample):
return "<_user>" + sample['input'] + "<_bot>"
def get_prompt_and_answer(self, sample):
return "<_user>" + sample['input'] + "<_bot>" + sample['output'] + "<_end>"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment