Commit d3cea8c9 authored by sunxx1's avatar sunxx1
Browse files

Merge branch 'main' into 'main'

增加了pytorch框架下的音频处理模型FastSpeech和ECAPA-TDNN的测试代码

See merge request dcutoolkit/deeplearing/dlexamples_new!31
parents 13a50bfe eb779cd5
{
"resblock": "1",
"num_gpus": 0,
"batch_size": 16,
"learning_rate": 0.0002,
"adam_b1": 0.8,
"adam_b2": 0.99,
"lr_decay": 0.999,
"seed": 1234,
"upsample_rates": [8,8,2,2],
"upsample_kernel_sizes": [16,16,4,4],
"upsample_initial_channel": 512,
"resblock_kernel_sizes": [3,7,11],
"resblock_dilation_sizes": [[1,3,5], [1,3,5], [1,3,5]],
"segment_size": 8192,
"num_mels": 80,
"num_freq": 1025,
"n_fft": 1024,
"hop_size": 256,
"win_size": 1024,
"sampling_rate": 22050,
"fmin": 0,
"fmax": 8000,
"fmax_for_loss": null,
"num_workers": 4,
"dist_config": {
"dist_backend": "nccl",
"dist_url": "tcp://localhost:54321",
"world_size": 1
}
}
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import Conv1d, ConvTranspose1d
from torch.nn.utils import weight_norm, remove_weight_norm
LRELU_SLOPE = 0.1
def init_weights(m, mean=0.0, std=0.01):
classname = m.__class__.__name__
if classname.find("Conv") != -1:
m.weight.data.normal_(mean, std)
def get_padding(kernel_size, dilation=1):
return int((kernel_size * dilation - dilation) / 2)
class ResBlock(torch.nn.Module):
def __init__(self, h, channels, kernel_size=3, dilation=(1, 3, 5)):
super(ResBlock, self).__init__()
self.h = h
self.convs1 = nn.ModuleList(
[
weight_norm(
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[0],
padding=get_padding(kernel_size, dilation[0]),
)
),
weight_norm(
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[1],
padding=get_padding(kernel_size, dilation[1]),
)
),
weight_norm(
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=dilation[2],
padding=get_padding(kernel_size, dilation[2]),
)
),
]
)
self.convs1.apply(init_weights)
self.convs2 = nn.ModuleList(
[
weight_norm(
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=1,
padding=get_padding(kernel_size, 1),
)
),
weight_norm(
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=1,
padding=get_padding(kernel_size, 1),
)
),
weight_norm(
Conv1d(
channels,
channels,
kernel_size,
1,
dilation=1,
padding=get_padding(kernel_size, 1),
)
),
]
)
self.convs2.apply(init_weights)
def forward(self, x):
for c1, c2 in zip(self.convs1, self.convs2):
xt = F.leaky_relu(x, LRELU_SLOPE)
xt = c1(xt)
xt = F.leaky_relu(xt, LRELU_SLOPE)
xt = c2(xt)
x = xt + x
return x
def remove_weight_norm(self):
for l in self.convs1:
remove_weight_norm(l)
for l in self.convs2:
remove_weight_norm(l)
class Generator(torch.nn.Module):
def __init__(self, h):
super(Generator, self).__init__()
self.h = h
self.num_kernels = len(h.resblock_kernel_sizes)
self.num_upsamples = len(h.upsample_rates)
self.conv_pre = weight_norm(
Conv1d(80, h.upsample_initial_channel, 7, 1, padding=3)
)
resblock = ResBlock
self.ups = nn.ModuleList()
for i, (u, k) in enumerate(zip(h.upsample_rates, h.upsample_kernel_sizes)):
self.ups.append(
weight_norm(
ConvTranspose1d(
h.upsample_initial_channel // (2 ** i),
h.upsample_initial_channel // (2 ** (i + 1)),
k,
u,
padding=(k - u) // 2,
)
)
)
self.resblocks = nn.ModuleList()
for i in range(len(self.ups)):
ch = h.upsample_initial_channel // (2 ** (i + 1))
for j, (k, d) in enumerate(
zip(h.resblock_kernel_sizes, h.resblock_dilation_sizes)
):
self.resblocks.append(resblock(h, ch, k, d))
self.conv_post = weight_norm(Conv1d(ch, 1, 7, 1, padding=3))
self.ups.apply(init_weights)
self.conv_post.apply(init_weights)
def forward(self, x):
x = self.conv_pre(x)
for i in range(self.num_upsamples):
x = F.leaky_relu(x, LRELU_SLOPE)
x = self.ups[i](x)
xs = None
for j in range(self.num_kernels):
if xs is None:
xs = self.resblocks[i * self.num_kernels + j](x)
else:
xs += self.resblocks[i * self.num_kernels + j](x)
x = xs / self.num_kernels
x = F.leaky_relu(x)
x = self.conv_post(x)
x = torch.tanh(x)
return x
def remove_weight_norm(self):
print("Removing weight norm...")
for l in self.ups:
remove_weight_norm(l)
for l in self.resblocks:
l.remove_weight_norm()
remove_weight_norm(self.conv_pre)
remove_weight_norm(self.conv_post)
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from .fastspeech2 import FastSpeech2
from .loss import FastSpeech2Loss
from .optimizer import ScheduledOptim
\ No newline at end of file
import os
import json
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformer import Encoder, Decoder, PostNet
from .modules import VarianceAdaptor
from utils.tools import get_mask_from_lengths
class FastSpeech2(nn.Module):
""" FastSpeech2 """
def __init__(self, preprocess_config, model_config):
super(FastSpeech2, self).__init__()
self.model_config = model_config
self.encoder = Encoder(model_config)
self.variance_adaptor = VarianceAdaptor(preprocess_config, model_config)
self.decoder = Decoder(model_config)
self.mel_linear = nn.Linear(
model_config["transformer"]["decoder_hidden"],
preprocess_config["preprocessing"]["mel"]["n_mel_channels"],
)
self.postnet = PostNet()
self.speaker_emb = None
if model_config["multi_speaker"]:
with open(
os.path.join(
preprocess_config["path"]["preprocessed_path"], "speakers.json"
),
"r",
) as f:
n_speaker = len(json.load(f))
self.speaker_emb = nn.Embedding(
n_speaker,
model_config["transformer"]["encoder_hidden"],
)
def forward(
self,
speakers,
texts,
src_lens,
max_src_len,
mels=None,
mel_lens=None,
max_mel_len=None,
p_targets=None,
e_targets=None,
d_targets=None,
p_control=1.0,
e_control=1.0,
d_control=1.0,
):
src_masks = get_mask_from_lengths(src_lens, max_src_len)
mel_masks = (
get_mask_from_lengths(mel_lens, max_mel_len)
if mel_lens is not None
else None
)
output = self.encoder(texts, src_masks)
if self.speaker_emb is not None:
output = output + self.speaker_emb(speakers).unsqueeze(1).expand(
-1, max_src_len, -1
)
(
output,
p_predictions,
e_predictions,
log_d_predictions,
d_rounded,
mel_lens,
mel_masks,
) = self.variance_adaptor(
output,
src_masks,
mel_masks,
max_mel_len,
p_targets,
e_targets,
d_targets,
p_control,
e_control,
d_control,
)
output, mel_masks = self.decoder(output, mel_masks)
output = self.mel_linear(output)
postnet_output = self.postnet(output) + output
return (
output,
postnet_output,
p_predictions,
e_predictions,
log_d_predictions,
d_rounded,
src_masks,
mel_masks,
src_lens,
mel_lens,
)
\ No newline at end of file
import torch
import torch.nn as nn
class FastSpeech2Loss(nn.Module):
""" FastSpeech2 Loss """
def __init__(self, preprocess_config, model_config):
super(FastSpeech2Loss, self).__init__()
self.pitch_feature_level = preprocess_config["preprocessing"]["pitch"][
"feature"
]
self.energy_feature_level = preprocess_config["preprocessing"]["energy"][
"feature"
]
self.mse_loss = nn.MSELoss()
self.mae_loss = nn.L1Loss()
def forward(self, inputs, predictions):
(
mel_targets,
_,
_,
pitch_targets,
energy_targets,
duration_targets,
) = inputs[6:]
(
mel_predictions,
postnet_mel_predictions,
pitch_predictions,
energy_predictions,
log_duration_predictions,
_,
src_masks,
mel_masks,
_,
_,
) = predictions
src_masks = ~src_masks
mel_masks = ~mel_masks
log_duration_targets = torch.log(duration_targets.float() + 1)
mel_targets = mel_targets[:, : mel_masks.shape[1], :]
mel_masks = mel_masks[:, :mel_masks.shape[1]]
log_duration_targets.requires_grad = False
pitch_targets.requires_grad = False
energy_targets.requires_grad = False
mel_targets.requires_grad = False
if self.pitch_feature_level == "phoneme_level":
pitch_predictions = pitch_predictions.masked_select(src_masks)
pitch_targets = pitch_targets.masked_select(src_masks)
elif self.pitch_feature_level == "frame_level":
pitch_predictions = pitch_predictions.masked_select(mel_masks)
pitch_targets = pitch_targets.masked_select(mel_masks)
if self.energy_feature_level == "phoneme_level":
energy_predictions = energy_predictions.masked_select(src_masks)
energy_targets = energy_targets.masked_select(src_masks)
if self.energy_feature_level == "frame_level":
energy_predictions = energy_predictions.masked_select(mel_masks)
energy_targets = energy_targets.masked_select(mel_masks)
log_duration_predictions = log_duration_predictions.masked_select(src_masks)
log_duration_targets = log_duration_targets.masked_select(src_masks)
mel_predictions = mel_predictions.masked_select(mel_masks.unsqueeze(-1))
postnet_mel_predictions = postnet_mel_predictions.masked_select(
mel_masks.unsqueeze(-1)
)
mel_targets = mel_targets.masked_select(mel_masks.unsqueeze(-1))
mel_loss = self.mae_loss(mel_predictions, mel_targets)
postnet_mel_loss = self.mae_loss(postnet_mel_predictions, mel_targets)
pitch_loss = self.mse_loss(pitch_predictions, pitch_targets)
energy_loss = self.mse_loss(energy_predictions, energy_targets)
duration_loss = self.mse_loss(log_duration_predictions, log_duration_targets)
total_loss = (
mel_loss + postnet_mel_loss + duration_loss + pitch_loss + energy_loss
)
return (
total_loss,
mel_loss,
postnet_mel_loss,
pitch_loss,
energy_loss,
duration_loss,
)
import os
import json
import copy
import math
from collections import OrderedDict
import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
from utils.tools import get_mask_from_lengths, pad
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class VarianceAdaptor(nn.Module):
"""Variance Adaptor"""
def __init__(self, preprocess_config, model_config):
super(VarianceAdaptor, self).__init__()
self.duration_predictor = VariancePredictor(model_config)
self.length_regulator = LengthRegulator()
self.pitch_predictor = VariancePredictor(model_config)
self.energy_predictor = VariancePredictor(model_config)
self.pitch_feature_level = preprocess_config["preprocessing"]["pitch"][
"feature"
]
self.energy_feature_level = preprocess_config["preprocessing"]["energy"][
"feature"
]
assert self.pitch_feature_level in ["phoneme_level", "frame_level"]
assert self.energy_feature_level in ["phoneme_level", "frame_level"]
pitch_quantization = model_config["variance_embedding"]["pitch_quantization"]
energy_quantization = model_config["variance_embedding"]["energy_quantization"]
n_bins = model_config["variance_embedding"]["n_bins"]
assert pitch_quantization in ["linear", "log"]
assert energy_quantization in ["linear", "log"]
with open(
os.path.join(preprocess_config["path"]["preprocessed_path"], "stats.json")
) as f:
stats = json.load(f)
pitch_min, pitch_max = stats["pitch"][:2]
energy_min, energy_max = stats["energy"][:2]
if pitch_quantization == "log":
self.pitch_bins = nn.Parameter(
torch.exp(
torch.linspace(np.log(pitch_min), np.log(pitch_max), n_bins - 1)
),
requires_grad=False,
)
else:
self.pitch_bins = nn.Parameter(
torch.linspace(pitch_min, pitch_max, n_bins - 1),
requires_grad=False,
)
if energy_quantization == "log":
self.energy_bins = nn.Parameter(
torch.exp(
torch.linspace(np.log(energy_min), np.log(energy_max), n_bins - 1)
),
requires_grad=False,
)
else:
self.energy_bins = nn.Parameter(
torch.linspace(energy_min, energy_max, n_bins - 1),
requires_grad=False,
)
self.pitch_embedding = nn.Embedding(
n_bins, model_config["transformer"]["encoder_hidden"]
)
self.energy_embedding = nn.Embedding(
n_bins, model_config["transformer"]["encoder_hidden"]
)
def get_pitch_embedding(self, x, target, mask, control):
prediction = self.pitch_predictor(x, mask)
if target is not None:
embedding = self.pitch_embedding(torch.bucketize(target, self.pitch_bins))
else:
prediction = prediction * control
embedding = self.pitch_embedding(
torch.bucketize(prediction, self.pitch_bins)
)
return prediction, embedding
def get_energy_embedding(self, x, target, mask, control):
prediction = self.energy_predictor(x, mask)
if target is not None:
embedding = self.energy_embedding(torch.bucketize(target, self.energy_bins))
else:
prediction = prediction * control
embedding = self.energy_embedding(
torch.bucketize(prediction, self.energy_bins)
)
return prediction, embedding
def forward(
self,
x,
src_mask,
mel_mask=None,
max_len=None,
pitch_target=None,
energy_target=None,
duration_target=None,
p_control=1.0,
e_control=1.0,
d_control=1.0,
):
log_duration_prediction = self.duration_predictor(x, src_mask)
if self.pitch_feature_level == "phoneme_level":
pitch_prediction, pitch_embedding = self.get_pitch_embedding(
x, pitch_target, src_mask, p_control
)
x = x + pitch_embedding
if self.energy_feature_level == "phoneme_level":
energy_prediction, energy_embedding = self.get_energy_embedding(
x, energy_target, src_mask, p_control
)
x = x + energy_embedding
if duration_target is not None:
x, mel_len = self.length_regulator(x, duration_target, max_len)
duration_rounded = duration_target
else:
duration_rounded = torch.clamp(
(torch.round(torch.exp(log_duration_prediction) - 1) * d_control),
min=0,
)
x, mel_len = self.length_regulator(x, duration_rounded, max_len)
mel_mask = get_mask_from_lengths(mel_len)
if self.pitch_feature_level == "frame_level":
pitch_prediction, pitch_embedding = self.get_pitch_embedding(
x, pitch_target, mel_mask, p_control
)
x = x + pitch_embedding
if self.energy_feature_level == "frame_level":
energy_prediction, energy_embedding = self.get_energy_embedding(
x, energy_target, mel_mask, p_control
)
x = x + energy_embedding
return (
x,
pitch_prediction,
energy_prediction,
log_duration_prediction,
duration_rounded,
mel_len,
mel_mask,
)
class LengthRegulator(nn.Module):
"""Length Regulator"""
def __init__(self):
super(LengthRegulator, self).__init__()
def LR(self, x, duration, max_len):
output = list()
mel_len = list()
for batch, expand_target in zip(x, duration):
expanded = self.expand(batch, expand_target)
output.append(expanded)
mel_len.append(expanded.shape[0])
if max_len is not None:
output = pad(output, max_len)
else:
output = pad(output)
return output, torch.LongTensor(mel_len).to(device)
def expand(self, batch, predicted):
out = list()
for i, vec in enumerate(batch):
expand_size = predicted[i].item()
out.append(vec.expand(max(int(expand_size), 0), -1))
out = torch.cat(out, 0)
return out
def forward(self, x, duration, max_len):
output, mel_len = self.LR(x, duration, max_len)
return output, mel_len
class VariancePredictor(nn.Module):
"""Duration, Pitch and Energy Predictor"""
def __init__(self, model_config):
super(VariancePredictor, self).__init__()
self.input_size = model_config["transformer"]["encoder_hidden"]
self.filter_size = model_config["variance_predictor"]["filter_size"]
self.kernel = model_config["variance_predictor"]["kernel_size"]
self.conv_output_size = model_config["variance_predictor"]["filter_size"]
self.dropout = model_config["variance_predictor"]["dropout"]
self.conv_layer = nn.Sequential(
OrderedDict(
[
(
"conv1d_1",
Conv(
self.input_size,
self.filter_size,
kernel_size=self.kernel,
padding=(self.kernel - 1) // 2,
),
),
("relu_1", nn.ReLU()),
("layer_norm_1", nn.LayerNorm(self.filter_size)),
("dropout_1", nn.Dropout(self.dropout)),
(
"conv1d_2",
Conv(
self.filter_size,
self.filter_size,
kernel_size=self.kernel,
padding=1,
),
),
("relu_2", nn.ReLU()),
("layer_norm_2", nn.LayerNorm(self.filter_size)),
("dropout_2", nn.Dropout(self.dropout)),
]
)
)
self.linear_layer = nn.Linear(self.conv_output_size, 1)
def forward(self, encoder_output, mask):
out = self.conv_layer(encoder_output)
out = self.linear_layer(out)
out = out.squeeze(-1)
if mask is not None:
out = out.masked_fill(mask, 0.0)
return out
class Conv(nn.Module):
"""
Convolution Module
"""
def __init__(
self,
in_channels,
out_channels,
kernel_size=1,
stride=1,
padding=0,
dilation=1,
bias=True,
w_init="linear",
):
"""
:param in_channels: dimension of input
:param out_channels: dimension of output
:param kernel_size: size of kernel
:param stride: size of stride
:param padding: size of padding
:param dilation: dilation rate
:param bias: boolean. if True, bias is included.
:param w_init: str. weight inits with xavier initialization.
"""
super(Conv, self).__init__()
self.conv = nn.Conv1d(
in_channels,
out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
bias=bias,
)
def forward(self, x):
x = x.contiguous().transpose(1, 2)
x = self.conv(x)
x = x.contiguous().transpose(1, 2)
return x
import torch
import numpy as np
class ScheduledOptim:
""" A simple wrapper class for learning rate scheduling """
def __init__(self, model, train_config, model_config, current_step):
self._optimizer = torch.optim.Adam(
model.parameters(),
betas=train_config["optimizer"]["betas"],
eps=train_config["optimizer"]["eps"],
weight_decay=train_config["optimizer"]["weight_decay"],
)
self.n_warmup_steps = train_config["optimizer"]["warm_up_step"]
self.anneal_steps = train_config["optimizer"]["anneal_steps"]
self.anneal_rate = train_config["optimizer"]["anneal_rate"]
self.current_step = current_step
self.init_lr = np.power(model_config["transformer"]["encoder_hidden"], -0.5)
def step_and_update_lr(self):
self._update_learning_rate()
self._optimizer.step()
def zero_grad(self):
# print(self.init_lr)
self._optimizer.zero_grad()
def load_state_dict(self, path):
self._optimizer.load_state_dict(path)
def _get_lr_scale(self):
lr = np.min(
[
np.power(self.current_step, -0.5),
np.power(self.n_warmup_steps, -1.5) * self.current_step,
]
)
for s in self.anneal_steps:
if self.current_step > s:
lr = lr * self.anneal_rate
return lr
def _update_learning_rate(self):
""" Learning rate scheduling per step """
self.current_step += 1
lr = self.init_lr * self._get_lr_scale()
for param_group in self._optimizer.param_groups:
param_group["lr"] = lr
import argparse
import yaml
from preprocessor import ljspeech, aishell3, libritts
def main(config):
if "LJSpeech" in config["dataset"]:
ljspeech.prepare_align(config)
if "AISHELL3" in config["dataset"]:
aishell3.prepare_align(config)
if "LibriTTS" in config["dataset"]:
libritts.prepare_align(config)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("config", type=str, help="path to preprocess.yaml")
args = parser.parse_args()
config = yaml.load(open(args.config, "r"), Loader=yaml.FullLoader)
main(config)
import argparse
import yaml
from preprocessor.preprocessor import Preprocessor
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("config", type=str, help="path to preprocess.yaml")
args = parser.parse_args()
config = yaml.load(open(args.config, "r"), Loader=yaml.FullLoader)
preprocessor = Preprocessor(config)
preprocessor.build_from_path()
import os
import librosa
import numpy as np
from scipy.io import wavfile
from tqdm import tqdm
def prepare_align(config):
in_dir = config["path"]["corpus_path"]
out_dir = config["path"]["raw_path"]
sampling_rate = config["preprocessing"]["audio"]["sampling_rate"]
max_wav_value = config["preprocessing"]["audio"]["max_wav_value"]
for dataset in ["train", "test"]:
print("Processing {}ing set...".format(dataset))
with open(os.path.join(in_dir, dataset, "content.txt"), encoding="utf-8") as f:
for line in tqdm(f):
wav_name, text = line.strip("\n").split("\t")
speaker = wav_name[:7]
text = text.split(" ")[1::2]
wav_path = os.path.join(in_dir, dataset, "wav", speaker, wav_name)
if os.path.exists(wav_path):
os.makedirs(os.path.join(out_dir, speaker), exist_ok=True)
wav, _ = librosa.load(wav_path, sampling_rate)
wav = wav / max(abs(wav)) * max_wav_value
wavfile.write(
os.path.join(out_dir, speaker, wav_name),
sampling_rate,
wav.astype(np.int16),
)
with open(
os.path.join(out_dir, speaker, "{}.lab".format(wav_name[:11])),
"w",
) as f1:
f1.write(" ".join(text))
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment