Commit d7cad875 authored by Sugon_ldc's avatar Sugon_ldc
Browse files

add new files

parents
Pipeline #1560 failed with stages
in 0 seconds
import argparse
import functools
import gc
import os
import evaluate
import numpy as np
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm
from transformers import WhisperForConditionalGeneration, WhisperProcessor
from utils.data_utils import DataCollatorSpeechSeq2SeqWithPadding, remove_punctuation, to_simple
from utils.reader import CustomDataset
from utils.utils import print_arguments, add_arguments
parser = argparse.ArgumentParser(description=__doc__)
add_arg = functools.partial(add_arguments, argparser=parser)
add_arg("test_data", type=str, default="dataset/test.json", help="测试集的路径")
add_arg("model_path", type=str, default="models/whisper-tiny-finetune", help="合并模型的路径,或者是huggingface上模型的名称")
add_arg("batch_size", type=int, default=16, help="评估的batch size")
add_arg("num_workers", type=int, default=8, help="读取数据的线程数量")
add_arg("language", type=str, default="Chinese", help="设置语言,可全称也可简写,如果为None则评估的是多语言")
add_arg("remove_pun", type=bool, default=True, help="是否移除标点符号")
add_arg("to_simple", type=bool, default=True, help="是否转为简体中文")
add_arg("timestamps", type=bool, default=False, help="评估时是否使用时间戳数据")
add_arg("min_audio_len", type=float, default=0.5, help="最小的音频长度,单位秒")
add_arg("max_audio_len", type=float, default=30, help="最大的音频长度,单位秒")
add_arg("local_files_only", type=bool, default=True, help="是否只在本地加载模型,不尝试下载")
add_arg("task", type=str, default="transcribe", choices=['transcribe', 'translate'], help="模型的任务")
add_arg("metric", type=str, default="cer", choices=['cer', 'wer'], help="评估方式")
args = parser.parse_args()
print_arguments(args)
# 判断模型路径是否合法
assert 'openai' == os.path.dirname(args.model_path) or os.path.exists(args.model_path), \
f"模型文件{args.model_path}不存在,请检查是否已经成功合并模型,或者是否为huggingface存在模型"
def main():
# 获取Whisper的数据处理器,这个包含了特征提取器、tokenizer
processor = WhisperProcessor.from_pretrained(args.model_path,
language=args.language,
task=args.task,
no_timestamps=not args.timestamps,
local_files_only=args.local_files_only)
# 获取模型
model = WhisperForConditionalGeneration.from_pretrained(args.model_path,
device_map="auto",
local_files_only=args.local_files_only)
model.generation_config.language = args.language.lower()
model.eval()
# 获取测试数据
test_dataset = CustomDataset(data_list_path=args.test_data,
processor=processor,
timestamps=args.timestamps,
min_duration=args.min_audio_len,
max_duration=args.max_audio_len)
print(f"测试数据:{len(test_dataset)}")
# 数据padding器
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)
eval_dataloader = DataLoader(test_dataset, batch_size=args.batch_size,
num_workers=args.num_workers, collate_fn=data_collator)
# 获取评估方法
metric = evaluate.load(f'metrics/{args.metric}.py')
# 开始评估
for step, batch in enumerate(tqdm(eval_dataloader)):
with torch.cuda.amp.autocast():
with torch.no_grad():
generated_tokens = (
model.generate(
input_features=batch["input_features"].cuda(),
decoder_input_ids=batch["labels"][:, :4].cuda(),
max_new_tokens=255).cpu().numpy())
labels = batch["labels"].cpu().numpy()
labels = np.where(labels != -100, labels, processor.tokenizer.pad_token_id)
# 将预测和实际的token转换为文本
decoded_preds = processor.tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)
decoded_labels = processor.tokenizer.batch_decode(labels, skip_special_tokens=True)
# 删除标点符号
if args.remove_pun:
decoded_preds = remove_punctuation(decoded_preds)
decoded_labels = remove_punctuation(decoded_labels)
# 将繁体中文总成简体中文
if args.to_simple:
decoded_preds = to_simple(decoded_preds)
decoded_labels = to_simple(decoded_labels)
metric.add_batch(predictions=decoded_preds, references=decoded_labels)
# 删除计算的记录
del generated_tokens, labels, batch
gc.collect()
# 计算评估结果
m = metric.compute()
print(f"评估结果:{args.metric}={round(m, 5)}")
if __name__ == '__main__':
main()
import argparse
import functools
import os
from peft import LoraConfig, get_peft_model, AdaLoraConfig, PeftModel, prepare_model_for_kbit_training
from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments, WhisperForConditionalGeneration, WhisperProcessor
from utils.callback import SavePeftModelCallback
from utils.data_utils import DataCollatorSpeechSeq2SeqWithPadding
from utils.model_utils import load_from_checkpoint
from utils.reader import CustomDataset
from utils.utils import print_arguments, make_inputs_require_grad, add_arguments
parser = argparse.ArgumentParser(description=__doc__)
add_arg = functools.partial(add_arguments, argparser=parser)
add_arg("train_data", type=str, default="dataset/train.json", help="训练数据集的路径")
add_arg("test_data", type=str, default="dataset/test.json", help="测试数据集的路径")
add_arg("base_model", type=str, default="openai/whisper-tiny", help="Whisper的基础模型")
add_arg("output_dir", type=str, default="output/", help="训练保存模型的路径")
add_arg("warmup_steps", type=int, default=50, help="训练预热步数")
add_arg("logging_steps", type=int, default=100, help="打印日志步数")
add_arg("eval_steps", type=int, default=1000, help="多少步数评估一次")
add_arg("save_steps", type=int, default=1000, help="多少步数保存模型一次")
add_arg("num_workers", type=int, default=8, help="读取数据的线程数量")
add_arg("learning_rate", type=float, default=1e-3, help="学习率大小")
add_arg("min_audio_len", type=float, default=0.5, help="最小的音频长度,单位秒")
add_arg("max_audio_len", type=float, default=30, help="最大的音频长度,单位秒,不能大于30秒")
add_arg("use_adalora", type=bool, default=True, help="是否使用AdaLora而不是Lora")
add_arg("fp16", type=bool, default=True, help="是否使用fp16训练模型")
add_arg("use_8bit", type=bool, default=False, help="是否将模型量化为8位")
add_arg("timestamps", type=bool, default=False, help="训练时是否使用时间戳数据")
add_arg("use_compile", type=bool, default=False, help="是否使用Pytorch2.0的编译器")
add_arg("local_files_only", type=bool, default=False, help="是否只在本地加载模型,不尝试下载")
add_arg("num_train_epochs", type=int, default=3, help="训练的轮数")
add_arg("language", type=str, default="Chinese", help="设置语言,可全称也可简写,如果为None则训练的是多语言")
add_arg("task", type=str, default="transcribe", choices=['transcribe', 'translate'], help="模型的任务")
add_arg("augment_config_path", type=str, default=None, help="数据增强配置文件路径")
add_arg("resume_from_checkpoint", type=str, default=None, help="恢复训练的检查点路径")
add_arg("per_device_train_batch_size", type=int, default=8, help="训练的batch size")
add_arg("per_device_eval_batch_size", type=int, default=8, help="评估的batch size")
add_arg("gradient_accumulation_steps", type=int, default=1, help="梯度累积步数")
add_arg("push_to_hub", type=bool, default=False, help="是否将模型权重推到HuggingFace Hub")
add_arg("hub_model_id", type=str, default=None, help="HuggingFace Hub上的模型仓库ID")
args = parser.parse_args()
print_arguments(args)
def main():
# 获取Whisper的数据处理器,这个包含了特征提取器、tokenizer
processor = WhisperProcessor.from_pretrained(args.base_model,
language=args.language,
task=args.task,
no_timestamps=not args.timestamps,
local_files_only=args.local_files_only)
# 读取数据
train_dataset = CustomDataset(data_list_path=args.train_data,
processor=processor,
language=args.language,
timestamps=args.timestamps,
min_duration=args.min_audio_len,
max_duration=args.max_audio_len,
augment_config_path=args.augment_config_path)
test_dataset = CustomDataset(data_list_path=args.test_data,
processor=processor,
language=args.language,
timestamps=args.timestamps,
min_duration=args.min_audio_len,
max_duration=args.max_audio_len)
print(f"训练数据:{len(train_dataset)},测试数据:{len(test_dataset)}")
# 数据padding器
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)
# 获取Whisper模型
device_map = "auto"
world_size = int(os.environ.get("WORLD_SIZE", 1))
ddp = world_size != 1
if ddp:
device_map = {"": int(os.environ.get("LOCAL_RANK") or 0)}
# 获取模型
model = WhisperForConditionalGeneration.from_pretrained(args.base_model,
load_in_8bit=args.use_8bit,
device_map=device_map,
local_files_only=args.local_files_only)
model.config.forced_decoder_ids = None
model.config.suppress_tokens = []
# 量化模型
model = prepare_model_for_kbit_training(model)
# 注册forward,否则多卡训练会失败
model.model.encoder.conv1.register_forward_hook(make_inputs_require_grad)
print('加载LoRA模块...')
if args.resume_from_checkpoint:
# 恢复训练时加载Lora参数
print("Loading adapters from checkpoint.")
model = PeftModel.from_pretrained(model, args.resume_from_checkpoint, is_trainable=True)
else:
print(f'adding LoRA modules...')
target_modules = ["k_proj", "q_proj", "v_proj", "out_proj", "fc1", "fc2"]
print(target_modules)
if args.use_adalora:
config = AdaLoraConfig(init_r=12, target_r=4, beta1=0.85, beta2=0.85, tinit=200, tfinal=1000, deltaT=10,
lora_alpha=32, lora_dropout=0.1, orth_reg_weight=0.5, target_modules=target_modules)
else:
config = LoraConfig(r=32, lora_alpha=64, target_modules=target_modules, lora_dropout=0.05, bias="none")
model = get_peft_model(model, config)
if args.base_model.endswith("/"):
args.base_model = args.base_model[:-1]
output_dir = os.path.join(args.output_dir, os.path.basename(args.base_model))
# 定义训练参数
training_args = \
Seq2SeqTrainingArguments(output_dir=output_dir, # 保存检查点和意志的目录
per_device_train_batch_size=args.per_device_train_batch_size, # 训练batch_size大小
per_device_eval_batch_size=args.per_device_eval_batch_size, # 评估batch_size大小
gradient_accumulation_steps=args.gradient_accumulation_steps, # 训练梯度累计步数
learning_rate=args.learning_rate, # 学习率大小
warmup_steps=args.warmup_steps, # 预热步数
num_train_epochs=args.num_train_epochs, # 微调训练轮数
save_strategy="steps", # 指定按照步数保存检查点
evaluation_strategy="steps", # 指定按照步数评估模型
load_best_model_at_end=True, # 指定是否在结束时加载最优模型
fp16=args.fp16, # 是否使用半精度训练
report_to=["tensorboard"], # 指定使用tensorboard保存log
save_steps=args.save_steps, # 指定保存检查点的步数
eval_steps=args.eval_steps, # 指定评估模型的步数
torch_compile=args.use_compile, # 使用Pytorch2.0的编译器
save_total_limit=5, # 只保存最新检查点的数量
optim='adamw_torch', # 指定优化方法
ddp_find_unused_parameters=False if ddp else None, # 分布式训练设置
dataloader_num_workers=args.num_workers, # 设置读取数据的线程数量
logging_steps=args.logging_steps, # 指定打印log的步数
remove_unused_columns=False, # 删除模型不需要的数据列
label_names=["labels"], # 与标签对应的输入字典中的键列表
push_to_hub=args.push_to_hub,
)
if training_args.local_rank == 0 or training_args.local_rank == -1:
print('=' * 90)
model.print_trainable_parameters()
print('=' * 90)
# 定义训练器
trainer = Seq2SeqTrainer(args=training_args,
model=model,
train_dataset=train_dataset,
eval_dataset=test_dataset,
data_collator=data_collator,
tokenizer=processor.feature_extractor,
callbacks=[SavePeftModelCallback])
model.config.use_cache = False
trainer._load_from_checkpoint = load_from_checkpoint
# 开始训练
trainer.train(resume_from_checkpoint=args.resume_from_checkpoint)
# 保存最后的模型
trainer.save_state()
# 重新启用缓存以更快地推断
model.config.use_cache = True
if training_args.local_rank == 0 or training_args.local_rank == -1:
model.save_pretrained(os.path.join(output_dir, "checkpoint-final"))
# 是否把模型参数文件推送到huggingface
if training_args.push_to_hub:
hub_model_id = args.hub_model_id if args.hub_model_id is not None else output_dir
model.push_to_hub(hub_model_id)
if __name__ == '__main__':
main()
import argparse
import functools
import platform
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline, AutoModelForCausalLM
from utils.utils import print_arguments, add_arguments
parser = argparse.ArgumentParser(description=__doc__)
add_arg = functools.partial(add_arguments, argparser=parser)
add_arg("audio_path", type=str, default="dataset/test.wav", help="预测的音频路径")
add_arg("model_path", type=str, default="models/whisper-tiny-finetune/", help="合并模型的路径,或者是huggingface上模型的名称")
add_arg("use_gpu", type=bool, default=True, help="是否使用gpu进行预测")
add_arg("language", type=str, default="chinese", help="设置语言,如果为None则预测的是多语言")
add_arg("num_beams", type=int, default=1, help="解码搜索大小")
add_arg("batch_size", type=int, default=16, help="预测batch_size大小")
add_arg("use_compile", type=bool, default=False, help="是否使用Pytorch2.0的编译器")
add_arg("task", type=str, default="transcribe", choices=['transcribe', 'translate'], help="模型的任务")
add_arg("assistant_model_path", type=str, default=None, help="助手模型,可以提高推理速度,例如openai/whisper-tiny")
add_arg("local_files_only", type=bool, default=True, help="是否只在本地加载模型,不尝试下载")
add_arg("use_flash_attention_2", type=bool, default=False, help="是否使用FlashAttention2加速")
add_arg("use_bettertransformer", type=bool, default=False, help="是否使用BetterTransformer加速")
args = parser.parse_args()
print_arguments(args)
# 设置设备
device = "cuda:0" if torch.cuda.is_available() and args.use_gpu else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() and args.use_gpu else torch.float32
# 获取Whisper的特征提取器、编码器和解码器
processor = AutoProcessor.from_pretrained(args.model_path)
# 获取模型
model = AutoModelForSpeechSeq2Seq.from_pretrained(
args.model_path, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True,
use_flash_attention_2=args.use_flash_attention_2
)
if args.use_bettertransformer and not args.use_flash_attention_2:
model = model.to_bettertransformer()
# 使用Pytorch2.0的编译器
if args.use_compile:
if torch.__version__ >= "2" and platform.system().lower() != 'windows':
model = torch.compile(model)
model.to(device)
# 获取助手模型
generate_kwargs_pipeline = None
if args.assistant_model_path is not None:
assistant_model = AutoModelForCausalLM.from_pretrained(
args.assistant_model_path, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
)
assistant_model.to(device)
generate_kwargs_pipeline = {"assistant_model": assistant_model}
# 获取管道
infer_pipe = pipeline("automatic-speech-recognition",
model=model,
tokenizer=processor.tokenizer,
feature_extractor=processor.feature_extractor,
max_new_tokens=128,
chunk_length_s=30,
batch_size=args.batch_size,
torch_dtype=torch_dtype,
generate_kwargs=generate_kwargs_pipeline,
device=device)
# 推理参数
generate_kwargs = {"task": args.task, "num_beams": args.num_beams}
if args.language is not None:
generate_kwargs["language"] = args.language
# 推理
result = infer_pipe(args.audio_path, return_timestamps=True, generate_kwargs=generate_kwargs)
for chunk in result["chunks"]:
print(f"[{chunk['timestamp'][0]}-{chunk['timestamp'][1]}s] {chunk['text']}")
import argparse
import functools
import os
from faster_whisper import WhisperModel
from utils.utils import print_arguments, add_arguments
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
parser = argparse.ArgumentParser(description=__doc__)
add_arg = functools.partial(add_arguments, argparser=parser)
add_arg("audio_path", type=str, default="dataset/test.wav", help="预测的音频路径")
add_arg("model_path", type=str, default="models/whisper-tiny-finetune-ct2", help="转换后的模型路径,转换方式看文档")
add_arg("language", type=str, default="zh", help="设置语言,必须简写,如果为None则自动检测语言")
add_arg("task", type=str, default="transcribe", choices=['transcribe', 'translate'], help="模型的任务")
add_arg("use_gpu", type=bool, default=True, help="是否使用gpu进行预测")
add_arg("use_int8", type=bool, default=False, help="是否使用int8进行预测")
add_arg("beam_size", type=int, default=10, help="解码搜索大小")
add_arg("num_workers", type=int, default=1, help="预测器的并发数量")
add_arg("vad_filter", type=bool, default=False, help="是否使用VAD过滤掉部分没有讲话的音频")
add_arg("local_files_only", type=bool, default=True, help="是否只在本地加载模型,不尝试下载")
args = parser.parse_args()
print_arguments(args)
# 检查模型文件是否存在
assert os.path.exists(args.model_path), f"模型文件{args.model_path}不存在"
# 加载模型
if args.use_gpu:
if not args.use_int8:
model = WhisperModel(args.model_path, device="cuda", compute_type="float16", num_workers=args.num_workers,
local_files_only=args.local_files_only)
else:
model = WhisperModel(args.model_path, device="cuda", compute_type="int8_float16", num_workers=args.num_workers,
local_files_only=args.local_files_only)
else:
model = WhisperModel(args.model_path, device="cpu", compute_type="int8", num_workers=args.num_workers,
local_files_only=args.local_files_only)
# 支持large-v3模型
if 'large-v3' in args.model_path:
model.feature_extractor.mel_filters = \
model.feature_extractor.get_mel_filters(model.feature_extractor.sampling_rate,
model.feature_extractor.n_fft, n_mels=128)
# 预热
_, _ = model.transcribe("dataset/test.wav", beam_size=5)
# 语音识别
segments, info = model.transcribe(args.audio_path, beam_size=args.beam_size, language=args.language, task=args.task,
vad_filter=args.vad_filter)
for segment in segments:
text = segment.text
print(f"[{round(segment.start, 2)} - {round(segment.end, 2)}]:{text}\n")
import _thread
import argparse
import functools
import os
import platform
import time
import tkinter.messagebox
from tkinter import *
from tkinter.filedialog import askopenfilename
import numpy as np
import soundcard
import soundfile
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline, AutoModelForCausalLM
from zhconv import convert
from utils.utils import print_arguments, add_arguments
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
parser = argparse.ArgumentParser(description=__doc__)
add_arg = functools.partial(add_arguments, argparser=parser)
add_arg("model_path", type=str, default="models/whisper-tiny-finetune/", help="合并模型的路径,或者是huggingface上模型的名称")
add_arg("language", type=str, default="chinese", help="设置语言,如果为None则预测的是多语言")
add_arg("use_gpu", type=bool, default=True, help="是否使用gpu进行预测")
add_arg("num_beams", type=int, default=1, help="解码搜索大小")
add_arg("batch_size", type=int, default=16, help="预测batch_size大小")
add_arg("use_compile", type=bool, default=False, help="是否使用Pytorch2.0的编译器")
add_arg("task", type=str, default="transcribe", choices=['transcribe', 'translate'], help="模型的任务")
add_arg("assistant_model_path", type=str, default=None, help="助手模型,可以提高推理速度,例如openai/whisper-tiny")
add_arg("local_files_only", type=bool, default=True, help="是否只在本地加载模型,不尝试下载")
add_arg("use_flash_attention_2", type=bool, default=False, help="是否使用FlashAttention2加速")
add_arg("use_bettertransformer", type=bool, default=False, help="是否使用BetterTransformer加速")
args = parser.parse_args()
print_arguments(args)
class SpeechRecognitionApp:
def __init__(self, window: Tk, args):
self.window = window
self.wav_path = None
self.predicting = False
self.playing = False
self.recording = False
# 录音参数
self.frames = []
self.sample_rate = 16000
self.interval_time = 0.5
self.block_size = int(self.sample_rate * self.interval_time)
# 最大录音时长
self.max_record = 600
# 录音保存的路径
self.output_path = 'dataset/record'
# 指定窗口标题
self.window.title("夜雨飘零语音识别")
# 固定窗口大小
self.window.geometry('870x500')
self.window.resizable(False, False)
# 识别短语音按钮
self.short_button = Button(self.window, text="选择文件", width=20, command=self.predict_audio_thread)
self.short_button.place(x=10, y=10)
# 录音按钮
self.record_button = Button(self.window, text="录音识别", width=20, command=self.record_audio_thread)
self.record_button.place(x=170, y=10)
# 播放音频按钮
self.play_button = Button(self.window, text="播放音频", width=20, command=self.play_audio_thread)
self.play_button.place(x=330, y=10)
# 输出结果文本框
self.result_label = Label(self.window, text="输出日志:")
self.result_label.place(x=10, y=70)
self.result_text = Text(self.window, width=120, height=30)
self.result_text.place(x=10, y=100)
# 转阿拉伯数字控件
self.check_frame = Frame(self.window)
self.joint_text_check_var = BooleanVar()
self.joint_text_check = Checkbutton(self.check_frame, text='拼接文本', variable=self.joint_text_check_var)
self.joint_text_check.grid(column=0, row=0)
self.to_simple_check_var = BooleanVar()
self.to_simple_check = Checkbutton(self.check_frame, text='繁体转简体', variable=self.to_simple_check_var)
self.to_simple_check.grid(column=1, row=0)
self.to_simple_check.select()
self.task_check_var = BooleanVar()
self.task_check = Checkbutton(self.check_frame, text='音频转录', variable=self.task_check_var)
self.task_check.grid(column=2, row=0)
self.task_check.select()
self.check_frame.grid(row=1)
self.check_frame.place(x=600, y=10)
# 设置设备
device = "cuda:0" if torch.cuda.is_available() and args.use_gpu else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() and args.use_gpu else torch.float32
# 获取Whisper的特征提取器、编码器和解码器
processor = AutoProcessor.from_pretrained(args.model_path)
# 获取模型
model = AutoModelForSpeechSeq2Seq.from_pretrained(
args.model_path, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True,
use_flash_attention_2=args.use_flash_attention_2
)
if args.use_bettertransformer and not args.use_flash_attention_2:
model = model.to_bettertransformer()
# 使用Pytorch2.0的编译器
if args.use_compile:
if torch.__version__ >= "2" and platform.system().lower() != 'windows':
model = torch.compile(model)
model.to(device)
# 获取助手模型
generate_kwargs_pipeline = None
if args.assistant_model_path is not None:
assistant_model = AutoModelForCausalLM.from_pretrained(
args.assistant_model_path, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
)
assistant_model.to(device)
generate_kwargs_pipeline = {"assistant_model": assistant_model}
# 获取管道
self.infer_pipe = pipeline("automatic-speech-recognition",
model=model,
tokenizer=processor.tokenizer,
feature_extractor=processor.feature_extractor,
max_new_tokens=128,
chunk_length_s=30,
batch_size=2,
torch_dtype=torch_dtype,
generate_kwargs=generate_kwargs_pipeline,
device=device)
# 预热
_ = self.infer_pipe("dataset/test.wav")
# 预测短语音线程
def predict_audio_thread(self):
if not self.predicting:
self.wav_path = askopenfilename(filetypes=[("音频文件", "*.wav"), ("音频文件", "*.mp3")],
initialdir='./dataset')
if self.wav_path == '': return
self.result_text.delete('1.0', 'end')
self.result_text.insert(END, "已选择音频文件:%s\n" % self.wav_path)
self.result_text.insert(END, "正在识别中...\n")
_thread.start_new_thread(self.predict_audio, (self.wav_path,))
else:
tkinter.messagebox.showwarning('警告', '正在预测,请等待上一轮预测结束!')
# 预测短语音
def predict_audio(self, wav_path):
self.predicting = True
self.result_text.delete('1.0', 'end')
try:
task = "transcribe" if self.task_check_var.get() else "translate"
# 推理参数
generate_kwargs = {"task": task, "num_beams": args.num_beams}
if args.language is not None:
generate_kwargs["language"] = args.language
# 推理
result = self.infer_pipe(wav_path, return_timestamps=True, generate_kwargs=generate_kwargs)
# 判断是否要分段输出
if self.joint_text_check_var.get():
text = result['text']
# 繁体转简体
if self.to_simple_check_var.get():
text = convert(text, 'zh-cn')
self.result_text.delete('1.0', 'end')
self.result_text.insert(END, f"{text}\n")
else:
for chunk in result["chunks"]:
text = chunk['text']
# 繁体转简体
if self.to_simple_check_var.get():
text = convert(text, 'zh-cn')
self.result_text.insert(END, f"[{chunk['timestamp'][0]} - {chunk['timestamp'][1]}]:{text}\n")
self.predicting = False
except Exception as e:
print(e)
self.predicting = False
# 录音识别线程
def record_audio_thread(self):
if not self.playing and not self.recording:
self.result_text.delete('1.0', 'end')
self.recording = True
_thread.start_new_thread(self.record_audio, ())
else:
if self.playing:
tkinter.messagebox.showwarning('警告', '正在播放音频,无法录音!')
else:
# 停止播放
self.recording = False
# 播放音频线程
def play_audio_thread(self):
if self.wav_path is None or self.wav_path == '':
tkinter.messagebox.showwarning('警告', '音频路径为空!')
else:
if not self.playing and not self.recording:
_thread.start_new_thread(self.play_audio, ())
else:
if self.recording:
tkinter.messagebox.showwarning('警告', '正在录音,无法播放音频!')
else:
# 停止播放
self.playing = False
def record_audio(self):
self.frames = []
self.record_button.configure(text='停止录音')
self.result_text.insert(END, "正在录音...\n")
# 打开默认的输入设备
input_device = soundcard.default_microphone()
recorder = input_device.recorder(samplerate=self.sample_rate, channels=1, blocksize=self.block_size)
with recorder:
while True:
if len(self.frames) * self.interval_time > self.max_record: break
# 开始录制并获取数据
data = recorder.record(numframes=self.block_size)
data = data.squeeze()
self.frames.append(data)
self.result_text.delete('1.0', 'end')
self.result_text.insert(END, f"已经录音{len(self.frames) * self.interval_time}\n")
if not self.recording: break
# 拼接录音数据
data = np.concatenate(self.frames)
# 保存音频数据
os.makedirs(self.output_path, exist_ok=True)
self.wav_path = os.path.join(self.output_path, '%s.wav' % str(int(time.time())))
soundfile.write(self.wav_path, data=data, samplerate=self.sample_rate)
self.recording = False
self.record_button.configure(text='录音识别')
self.result_text.delete('1.0', 'end')
_thread.start_new_thread(self.predict_audio, (self.wav_path,))
# 播放音频
def play_audio(self):
self.play_button.configure(text='停止播放')
self.playing = True
default_speaker = soundcard.default_speaker()
data, sr = soundfile.read(self.wav_path)
with default_speaker.player(samplerate=sr) as player:
for i in range(0, data.shape[0], sr):
if not self.playing: break
d = data[i:i + sr]
player.play(d / np.max(np.abs(d)))
self.playing = False
self.play_button.configure(text='播放音频')
tk = Tk()
myapp = SpeechRecognitionApp(tk, args)
if __name__ == '__main__':
tk.mainloop()
import argparse
import functools
import os
import platform
import torch
import uvicorn
from fastapi import FastAPI, File, Body, UploadFile, Request
from starlette.staticfiles import StaticFiles
from starlette.templating import Jinja2Templates
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline, AutoModelForCausalLM
from zhconv import convert
from utils.data_utils import remove_punctuation
from utils.utils import add_arguments, print_arguments
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
parser = argparse.ArgumentParser(description=__doc__)
add_arg = functools.partial(add_arguments, argparser=parser)
add_arg("host", type=str, default="0.0.0.0", help="监听主机的IP地址")
add_arg("port", type=int, default=5000, help="服务所使用的端口号")
add_arg("model_path", type=str, default="models/whisper-tiny-finetune/", help="合并模型的路径,或者是huggingface上模型的名称")
add_arg("use_gpu", type=bool, default=True, help="是否使用gpu进行预测")
add_arg("num_beams", type=int, default=1, help="解码搜索大小")
add_arg("batch_size", type=int, default=16, help="预测batch_size大小")
add_arg("use_compile", type=bool, default=False, help="是否使用Pytorch2.0的编译器")
add_arg("assistant_model_path", type=str, default=None, help="助手模型,可以提高推理速度,例如openai/whisper-tiny")
add_arg("local_files_only", type=bool, default=True, help="是否只在本地加载模型,不尝试下载")
add_arg("use_flash_attention_2", type=bool, default=False, help="是否使用FlashAttention2加速")
add_arg("use_bettertransformer", type=bool, default=False, help="是否使用BetterTransformer加速")
args = parser.parse_args()
print_arguments(args)
# 设置设备
device = "cuda:0" if torch.cuda.is_available() and args.use_gpu else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() and args.use_gpu else torch.float32
# 获取Whisper的特征提取器、编码器和解码器
processor = AutoProcessor.from_pretrained(args.model_path)
# 获取模型
model = AutoModelForSpeechSeq2Seq.from_pretrained(
args.model_path, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True,
use_flash_attention_2=args.use_flash_attention_2
)
if args.use_bettertransformer and not args.use_flash_attention_2:
model = model.to_bettertransformer()
# 使用Pytorch2.0的编译器
if args.use_compile:
if torch.__version__ >= "2" and platform.system().lower() != 'windows':
model = torch.compile(model)
model.to(device)
# 获取助手模型
generate_kwargs_pipeline = None
if args.assistant_model_path is not None:
assistant_model = AutoModelForCausalLM.from_pretrained(
args.assistant_model_path, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
)
assistant_model.to(device)
generate_kwargs_pipeline = {"assistant_model": assistant_model}
# 获取管道
infer_pipe = pipeline("automatic-speech-recognition",
model=model,
tokenizer=processor.tokenizer,
feature_extractor=processor.feature_extractor,
max_new_tokens=128,
chunk_length_s=30,
batch_size=args.batch_size,
torch_dtype=torch_dtype,
generate_kwargs=generate_kwargs_pipeline,
device=device)
# 预热
_ = infer_pipe("dataset/test.wav")
app = FastAPI(title="夜雨飘零语音识别")
app.mount('/static', StaticFiles(directory='static'), name='static')
templates = Jinja2Templates(directory="templates")
model_semaphore = None
def release_model_semaphore():
model_semaphore.release()
def recognition(file: File, to_simple: int, remove_pun: int, language: str = None, task: str = "transcribe"):
# 推理参数
generate_kwargs = {"task": task, "num_beams": args.num_beams}
if language is not None:
generate_kwargs["language"] = args.language
# 推理
result = infer_pipe(file, return_timestamps=True, generate_kwargs=generate_kwargs)
results = []
for chunk in result["chunks"]:
text = chunk['text']
if to_simple == 1:
text = convert(text, 'zh-cn')
if remove_pun == 1:
text = remove_punctuation(text)
ret = {"text": text, "start": chunk['timestamp'][0], "end": chunk['timestamp'][1]}
results.append(ret)
return results
@app.post("/recognition")
async def api_recognition(to_simple: int = Body(1, description="是否繁体转简体", embed=True),
remove_pun: int = Body(0, description="是否删除标点符号", embed=True),
language: str = Body(None, description="设置语言,如果为None则预测的是多语言", embed=True),
task: str = Body("transcribe", description="识别任务类型,支持transcribe和translate", embed=True),
audio: UploadFile = File(..., description="音频文件")):
if language == "None": language = None
data = await audio.read()
results = recognition(file=data, to_simple=to_simple, remove_pun=remove_pun, language=language, task=task)
ret = {"results": results, "code": 0}
return ret
@app.get("/")
async def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request, "id": id})
if __name__ == '__main__':
uvicorn.run(app, host=args.host, port=args.port)
import argparse
import functools
import os
from transformers import WhisperForConditionalGeneration, WhisperFeatureExtractor, WhisperTokenizerFast,\
WhisperProcessor
from peft import PeftModel, PeftConfig
from utils.utils import print_arguments, add_arguments
parser = argparse.ArgumentParser(description=__doc__)
add_arg = functools.partial(add_arguments, argparser=parser)
add_arg("lora_model", type=str, default="output/whisper-tiny/checkpoint-best/", help="微调保存的模型路径")
add_arg('output_dir', type=str, default='models/', help="合并模型的保存目录")
add_arg("local_files_only", type=bool, default=False, help="是否只在本地加载模型,不尝试下载")
args = parser.parse_args()
print_arguments(args)
# 检查模型文件是否存在
assert os.path.exists(args.lora_model), f"模型文件{args.lora_model}不存在"
# 获取Lora配置参数
peft_config = PeftConfig.from_pretrained(args.lora_model)
# 获取Whisper的基本模型
base_model = WhisperForConditionalGeneration.from_pretrained(peft_config.base_model_name_or_path, device_map={"": "cpu"},
local_files_only=args.local_files_only)
# 与Lora模型合并
model = PeftModel.from_pretrained(base_model, args.lora_model, local_files_only=args.local_files_only)
feature_extractor = WhisperFeatureExtractor.from_pretrained(peft_config.base_model_name_or_path,
local_files_only=args.local_files_only)
tokenizer = WhisperTokenizerFast.from_pretrained(peft_config.base_model_name_or_path,
local_files_only=args.local_files_only)
processor = WhisperProcessor.from_pretrained(peft_config.base_model_name_or_path,
local_files_only=args.local_files_only)
# 合并参数
model = model.merge_and_unload()
model.train(False)
# 保存的文件夹路径
if peft_config.base_model_name_or_path.endswith("/"):
peft_config.base_model_name_or_path = peft_config.base_model_name_or_path[:-1]
save_directory = os.path.join(args.output_dir, f'{os.path.basename(peft_config.base_model_name_or_path)}-finetune')
os.makedirs(save_directory, exist_ok=True)
# 保存模型到指定目录中
model.save_pretrained(save_directory, max_shard_size='4GB')
feature_extractor.save_pretrained(save_directory)
tokenizer.save_pretrained(save_directory)
processor.save_pretrained(save_directory)
print(f'合并模型保持在:{save_directory}')
# Copyright 2021 The HuggingFace Evaluate Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Character Error Ratio (CER) metric. """
from typing import List
import datasets
import jiwer
import jiwer.transforms as tr
from datasets.config import PY_VERSION
from packaging import version
import evaluate
if PY_VERSION < version.parse("3.8"):
import importlib_metadata
else:
import importlib.metadata as importlib_metadata
SENTENCE_DELIMITER = ""
if version.parse(importlib_metadata.version("jiwer")) < version.parse("2.3.0"):
class SentencesToListOfCharacters(tr.AbstractTransform):
def __init__(self, sentence_delimiter: str = " "):
self.sentence_delimiter = sentence_delimiter
def process_string(self, s: str):
return list(s)
def process_list(self, inp: List[str]):
chars = []
for sent_idx, sentence in enumerate(inp):
chars.extend(self.process_string(sentence))
if self.sentence_delimiter is not None and self.sentence_delimiter != "" and sent_idx < len(inp) - 1:
chars.append(self.sentence_delimiter)
return chars
cer_transform = tr.Compose(
[tr.RemoveMultipleSpaces(), tr.Strip(), SentencesToListOfCharacters(SENTENCE_DELIMITER)]
)
else:
cer_transform = tr.Compose(
[
tr.RemoveMultipleSpaces(),
tr.Strip(),
tr.ReduceToSingleSentence(SENTENCE_DELIMITER),
tr.ReduceToListOfListOfChars(),
]
)
_CITATION = """\
@inproceedings{inproceedings,
author = {Morris, Andrew and Maier, Viktoria and Green, Phil},
year = {2004},
month = {01},
pages = {},
title = {From WER and RIL to MER and WIL: improved evaluation measures for connected speech recognition.}
}
"""
_DESCRIPTION = """\
Character error rate (CER) is a common metric of the performance of an automatic speech recognition system.
CER is similar to Word Error Rate (WER), but operates on character instead of word. Please refer to docs of WER for further information.
Character error rate can be computed as:
CER = (S + D + I) / N = (S + D + I) / (S + D + C)
where
S is the number of substitutions,
D is the number of deletions,
I is the number of insertions,
C is the number of correct characters,
N is the number of characters in the reference (N=S+D+C).
CER's output is not always a number between 0 and 1, in particular when there is a high number of insertions. This value is often associated to the percentage of characters that were incorrectly predicted. The lower the value, the better the
performance of the ASR system with a CER of 0 being a perfect score.
"""
_KWARGS_DESCRIPTION = """
Computes CER score of transcribed segments against references.
Args:
references: list of references for each speech input.
predictions: list of transcribtions to score.
concatenate_texts: Whether or not to concatenate sentences before evaluation, set to True for more accurate result.
Returns:
(float): the character error rate
Examples:
>>> predictions = ["this is the prediction", "there is an other sample"]
>>> references = ["this is the reference", "there is another one"]
>>> cer = evaluate.load("cer")
>>> cer_score = cer.compute(predictions=predictions, references=references)
>>> print(cer_score)
0.34146341463414637
"""
@evaluate.utils.file_utils.add_start_docstrings(_DESCRIPTION, _KWARGS_DESCRIPTION)
class CER(evaluate.Metric):
def _info(self):
return evaluate.MetricInfo(
description=_DESCRIPTION,
citation=_CITATION,
inputs_description=_KWARGS_DESCRIPTION,
features=datasets.Features(
{
"predictions": datasets.Value("string", id="sequence"),
"references": datasets.Value("string", id="sequence"),
}
),
codebase_urls=["https://github.com/jitsi/jiwer/"],
reference_urls=[
"https://en.wikipedia.org/wiki/Word_error_rate",
"https://sites.google.com/site/textdigitisation/qualitymeasures/computingerrorrates",
],
)
def _compute(self, predictions, references, concatenate_texts=False):
if concatenate_texts:
return jiwer.compute_measures(
references,
predictions,
truth_transform=cer_transform,
hypothesis_transform=cer_transform,
)["wer"]
incorrect = 0
total = 0
for prediction, reference in zip(predictions, references):
measures = jiwer.compute_measures(
reference,
prediction,
truth_transform=cer_transform,
hypothesis_transform=cer_transform,
)
incorrect += measures["substitutions"] + measures["deletions"] + measures["insertions"]
total += measures["substitutions"] + measures["deletions"] + measures["hits"]
return incorrect / total
# Copyright 2021 The HuggingFace Evaluate Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Word Error Ratio (WER) metric. """
import datasets
from jiwer import compute_measures
import evaluate
_CITATION = """\
@inproceedings{inproceedings,
author = {Morris, Andrew and Maier, Viktoria and Green, Phil},
year = {2004},
month = {01},
pages = {},
title = {From WER and RIL to MER and WIL: improved evaluation measures for connected speech recognition.}
}
"""
_DESCRIPTION = """\
Word error rate (WER) is a common metric of the performance of an automatic speech recognition system.
The general difficulty of measuring performance lies in the fact that the recognized word sequence can have a different length from the reference word sequence (supposedly the correct one). The WER is derived from the Levenshtein distance, working at the word level instead of the phoneme level. The WER is a valuable tool for comparing different systems as well as for evaluating improvements within one system. This kind of measurement, however, provides no details on the nature of translation errors and further work is therefore required to identify the main source(s) of error and to focus any research effort.
This problem is solved by first aligning the recognized word sequence with the reference (spoken) word sequence using dynamic string alignment. Examination of this issue is seen through a theory called the power law that states the correlation between perplexity and word error rate.
Word error rate can then be computed as:
WER = (S + D + I) / N = (S + D + I) / (S + D + C)
where
S is the number of substitutions,
D is the number of deletions,
I is the number of insertions,
C is the number of correct words,
N is the number of words in the reference (N=S+D+C).
This value indicates the average number of errors per reference word. The lower the value, the better the
performance of the ASR system with a WER of 0 being a perfect score.
"""
_KWARGS_DESCRIPTION = """
Compute WER score of transcribed segments against references.
Args:
references: List of references for each speech input.
predictions: List of transcriptions to score.
concatenate_texts (bool, default=False): Whether to concatenate all input texts or compute WER iteratively.
Returns:
(float): the word error rate
Examples:
>>> predictions = ["this is the prediction", "there is an other sample"]
>>> references = ["this is the reference", "there is another one"]
>>> wer = evaluate.load("wer")
>>> wer_score = wer.compute(predictions=predictions, references=references)
>>> print(wer_score)
0.5
"""
@evaluate.utils.file_utils.add_start_docstrings(_DESCRIPTION, _KWARGS_DESCRIPTION)
class WER(evaluate.Metric):
def _info(self):
return evaluate.MetricInfo(
description=_DESCRIPTION,
citation=_CITATION,
inputs_description=_KWARGS_DESCRIPTION,
features=datasets.Features(
{
"predictions": datasets.Value("string", id="sequence"),
"references": datasets.Value("string", id="sequence"),
}
),
codebase_urls=["https://github.com/jitsi/jiwer/"],
reference_urls=[
"https://en.wikipedia.org/wiki/Word_error_rate",
],
)
def _compute(self, predictions=None, references=None, concatenate_texts=False):
if concatenate_texts:
return compute_measures(references, predictions)["wer"]
else:
incorrect = 0
total = 0
for prediction, reference in zip(predictions, references):
measures = compute_measures(reference, prediction)
incorrect += measures["substitutions"] + measures["deletions"] + measures["insertions"]
total += measures["substitutions"] + measures["deletions"] + measures["hits"]
return incorrect / total
numpy>=1.23.1
soundfile>=0.12.1
librosa>=0.10.0
dataclasses>=0.6
transformers>=4.39.3
bitsandbytes>=0.41.0
datasets>=2.11.0
evaluate>=0.4.0
ctranslate2>=3.21.0
faster-whisper>=0.10.0
jiwer>=2.5.1
peft>=0.6.2
accelerate>=0.21.0
zhconv>=1.4.2
tqdm>=4.62.1
soundcard>=0.4.2
uvicorn>=0.21.1
fastapi>=0.95.1
starlette>=0.26.1
tensorboardX>=2.2
#!/bin/bash
CUDA_VISIBLE_DEVICES=0,1 torchrun --nproc_per_node=2 finetune.py --base_model=openai/whisper-tiny --use_8bit=False --per_device_train_batch_size=8 --per_device_eval_batch_size=8 --gradient_accumulation_steps=1
CUDA_VISIBLE_DEVICES=0 python merge_lora.py --lora_model=output/whisper-tiny/checkpoint-final
CUDA_VISIBLE_DEVICES=0,1 torchrun --nproc_per_node=2 finetune.py --base_model=openai/whisper-base --use_8bit=False --per_device_train_batch_size=8 --per_device_eval_batch_size=8 --gradient_accumulation_steps=1
CUDA_VISIBLE_DEVICES=0 python merge_lora.py --lora_model=output/whisper-base/checkpoint-final
CUDA_VISIBLE_DEVICES=0,1 torchrun --nproc_per_node=2 finetune.py --base_model=openai/whisper-small --use_8bit=True --per_device_train_batch_size=8 --per_device_eval_batch_size=8 --gradient_accumulation_steps=1
CUDA_VISIBLE_DEVICES=0 python merge_lora.py --lora_model=output/whisper-small/checkpoint-final
CUDA_VISIBLE_DEVICES=0,1 torchrun --nproc_per_node=2 finetune.py --base_model=openai/whisper-medium --use_8bit=True --per_device_train_batch_size=4 --per_device_eval_batch_size=2 --gradient_accumulation_steps=2
CUDA_VISIBLE_DEVICES=0 python merge_lora.py --lora_model=output/whisper-medium/checkpoint-final
CUDA_VISIBLE_DEVICES=0,1 torchrun --nproc_per_node=2 finetune.py --base_model=openai/whisper-large-v2 --use_8bit=True --per_device_train_batch_size=2 --per_device_eval_batch_size=2 --gradient_accumulation_steps=4
CUDA_VISIBLE_DEVICES=0 python merge_lora.py --lora_model=output/whisper-large-v2/checkpoint-final
CUDA_VISIBLE_DEVICES=0 python evaluation.py --model_path=models/whisper-tiny-finetune
CUDA_VISIBLE_DEVICES=0 python evaluation.py --model_path=models/whisper-base-finetune
CUDA_VISIBLE_DEVICES=0 python evaluation.py --model_path=models/whisper-small-finetune
CUDA_VISIBLE_DEVICES=0 python evaluation.py --model_path=models/whisper-medium-finetune
CUDA_VISIBLE_DEVICES=0 python evaluation.py --model_path=models/whisper-large-v2-finetune
* {
box-sizing: border-box;
}
body {
font-family: "Helvetica Neue", "Roboto", sans-serif;
background-color: #f2f2f2;
margin: 0;
padding: 0;
}
#header {
background-color: #fff;
color: #333;
display: flex;
justify-content: center;
align-items: center;
height: 80px;
}
h1 {
font-size: 36px;
margin: 0;
}
#content {
background-color: #fff;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.2);
margin: 50px auto;
max-width: 800px;
padding: 20px;
}
#content div {
display: flex;
flex-wrap: wrap;
justify-content: space-between;
margin-bottom: 20px;
}
#content a {
background-color: #fff;
border-radius: 5px;
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.2);
color: #333;
padding: 10px;
text-align: center;
text-decoration: none;
transition: background-color 0.2s;
width: 20%;
}
#content a:hover {
background-color: #f2f2f2;
}
#content img {
cursor: pointer;
height: 50px;
transition: transform 0.2s;
width: 50px;
}
#content img:hover {
transform: scale(1.1);
}
#result {
background-color: #fff;
border-radius: 5px;
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.2);
padding: 10px;
}
#result textarea {
border: none;
border-radius: 5px;
font-size: 16px;
height: 300px;
margin-top: 10px;
padding: 10px;
resize: none;
width: 100%;
}
@media only screen and (max-width: 600px) {
#content a {
width: 100%;
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment