Unverified Commit f42a35e6 authored by Yoach Lacombe's avatar Yoach Lacombe Committed by GitHub
Browse files

Add bark (#24086)



* first raw version of the bark integration

* working code on small models with single run

* add converting script from suno weights 2 hf

* many changes

* correct past_kv output

* working implementation for inference

* update the converting script according to the architecture changes

* add a working end-to-end inference code

* remove some comments and make small changes

* remove unecessary comment

* add docstrings and ensure no unecessary intermediary output during audio generation

* remove done TODOs

* make style + add config docstrings

* modification for batch inference support on the whole model

* add details to .generation_audio method

* add copyright

* convert EncodecModel from original library to transformers implementation

* add two class in order to facilitate model and sub-models loading from the hub

* add support of loading the whole model

* add BarkProcessor

* correct modeling according to processor output

* Add proper __init__ and auto support

* Add up-to-date copyright/license message

* add relative import instead of absolute

* cleaner head_dim computation

* small comment removal or changes

* more verbose LayerNorm init method

* specify eps for clearer comprehension

* more verbose variable naming in the MLP module

* remove unecessary BarkBlock parameter

* clearer code in the forward pass of the BarkBlock

* remove _initialize_modules method for cleaner code

* Remove unnecessary methods from sub-models

* move code to remove unnecessary function

* rename a variable for clarity and change an assert

* move code and change variable name for clarity

* remove unnecessary asserts

* correct small bug

* correct a comment

* change variable names for clarity

* remove asserts

* change import from absolute to relative

* correct small error due to comma missing + correct import

* Add attribute Bark config

* add first version of tests

* update attention_map

* add tie_weights and resize_token_embeddings for fineModel

* correct getting attention_mask in generate_text_semantic

* remove Bark inference trick

* leave more choices in barkProcessor

* remove _no_split_modules

* fixe error in forward of block and introduce clearer notations

* correct converting script with last changes

* make style + add draft bark.mdx

* correct BarkModelTest::test_generate_text_semantic

* add Bark in main README

* add dummy_pt_objects for Bark

* add missing models in the main init

* correct test_decoder_model_past_with_large_inputs

* disable torchscript test

* change docstring of BarkProcessor

* Add test_processor_bark

* make style

* correct copyrights

* add bark.mdx + make style, quality and consistency

* Apply suggestions from code review
Co-authored-by: default avatarSanchit Gandhi <93869735+sanchit-gandhi@users.noreply.github.com>

* Remove unnecessary test method

* simply logic of a test

* Only check first ids for slow audio generation

* split full end-to-end generation tests

* remove unneccessary comment

* change submodel names for clearer naming

* remove ModuleDict from modeling_bark

* combine two if statements

* ensure that an edge misued won't happen

* modify variable name

* move code snippet to the right place (coarse instead of semantic)

* change BarkSemanticModule -> BarkSemanticModel

* align BarkProcessor with transformers paradigm

* correct BarkProcessor tests with last commit changes

* change _validate_voice_preset to an instance method instead of a class method

* tie_weights already called with post_init

* add codec_model config to configuration

* update bark modeling tests with recent BarkProcessor changes

* remove SubModelPretrainedModel + change speakers embeddings prompt type in BarkModel

* change absolute imports to relative

* remove TODO

* change docstrings

* add examples to docs and docstrings

* make style

* uses BatchFeature in BarkProcessor insteads of dict

* continue improving docstrings and docs + make style

* correct docstrings examples

* more comprehensible speaker_embeddings load/Save

* rename speaker_embeddings_dict -> speaker_embeddings

* correct bark.mdx + add bark to documentation_tests

* correct docstrings configuration_bark

* integrate last nit suggestions

* integrate BarkGeneration configs

* make style

* remove bark tests from documentation_tests.txt because timeout - tested manually

* add proper generation config initialization

* small bark.mdx documentation changes

* rename bark.mdx -> bark.md

* add torch.no_grad behind BarkModel.generate_audio()

* replace assert by ValueError in convert_suno_to_hf.py

* integrate a series of short comments from reviewer

* move SemanticLogitsProcessors and remove .detach() from Bark docs and docstrings

* actually remove SemanticLogitsProcessor from modeling_bark.oy

* BarkProcessor returns a single output instead of tuple + correct docstrings

* make style + correct bug

* add initializer_range to BarkConfig + correct slow modeling tests

* add .clone() to history_prompt.coarse_prompt to avoid modifying input array

* Making sure no extra "`" are present

* remove extra characters in modeling_bark.py

* Correct output if history_prompt is None

* remove TODOs

* remove ravel comment

* completing generation_configuration_bark.py docstrings

* change docstrings - number of audio codebooks instead of Encodec codebooks

* change 'bias' docstrings in configuration_bark.py

* format code

* rename BarkModel.generate_audio -> BarkModel.generate_speech

* modify AutoConfig instead of EncodecConfig in BarkConfig

* correct AutoConfig wrong init

* refactor BarkModel and sub-models generate_coarse, generate_fine, generate_text_semantic

* remove SemanticLogitsProcessor and replace it with SuppressTokensLogitsProcessor

* move nb_codebook related config arguments to BarkFineConfig

* rename bark.mdx -> bark.md

* correcting BarkModelConfig from_pretrained + remove keys_to_ignore

* correct bark.md with correct hub path

* correct code bug in bark.md

* correct list tokens_to_suppress

* modify Processor to load nested speaker embeddings in a safer way

* correct batch sampling in BarkFineModel.generate_fine

* Apply suggestions from code review

Small docstrings correction and code improvements
Co-authored-by: default avataramyeroberts <22614925+amyeroberts@users.noreply.github.com>

* give more details about num_layers in docstrings

* correct indentation mistake

* correct submodelconfig order of docstring variables

* put audio models in alphabetical order in utils/check_repo.my

* remove useless line from test_modeling_bark.py

* makes BarkCoarseModelTest inherits from (ModelTesterMixin, GenerationTesterMixin, unittest.TestCase) instead of BarkSemanticModelTest

* make a Tester class for each sub-model instead of inheriting

* add test_resize_embeddings=True for Bark sub-models

* add Copied from transformers.models.gpt_neo.modeling_gpt_neo.GPTNeoSelfAttention._split_heads

* remove 'Copied fom Bark' comment

* remove unneccessary comment

* change np.min -> min in modeling_bark.py

* refactored all custom layers to have Bark prefix

* add attention_mask as an argument of generate_text_semantic

* refactor sub-models start docstrings to have more precise config class definition

* move _tied_weights_keys overriding

* add docstrings to generate_xxx in modeling_bark.py

* add loading whole BarkModel to convert_suno_to_hf

* refactor attribute and variable names

* make style convert_suno

* update bark checkpoints

* remove never entered if statement

* move bark_modeling docstrings after BarkPretrainedModel class definition

* refactor modeling_bark.py: kv -> key_values

* small nits - code refactoring and removing unecessary lines from _init_weights

* nits - replace inplace method by variable assigning

* remove *optional* when necessary

* remove some lines in generate_speech

* add default value for optional parameter

* Refactor preprocess_histories_before_coarse -> preprocess_histories
Co-authored-by: default avatarSylvain Gugger <35901082+sgugger@users.noreply.github.com>

* correct usage after refactoring

* refactor Bark's generate_xxx -> generate and modify docstrings and tests accordingly

* update docstrings python in configuration_bark.py

* add bark files in utils/documentation_test.txt

* correct docstrings python snippet

* add the ability to use parameters in the form of e.g coarse_temperature

* add semantic_max_new_tokens in python snippet in docstrings for quicker generation

* Reformate sub-models kwargs in BakModel.generate
Co-authored-by: default avataramyeroberts <22614925+amyeroberts@users.noreply.github.com>

* correct kwargs in BarkModel.generate

* correct attention_mask kwarg in BarkModel.generate

* add tests for sub-models args in BarkModel.generate and correct BarkFineModel.test_generate_fp16

* enrich BarkModel.generate docstrings with a description of how to use the kwargs

---------
Co-authored-by: default avatarSanchit Gandhi <93869735+sanchit-gandhi@users.noreply.github.com>
Co-authored-by: default avataramyeroberts <22614925+amyeroberts@users.noreply.github.com>
Co-authored-by: default avatarSylvain Gugger <35901082+sgugger@users.noreply.github.com>
parent c21c3737
# coding=utf-8
# Copyright 2023 The Suno AI Authors and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" PyTorch BARK model."""
import math
from typing import Dict, Optional, Tuple, Union
import numpy as np
import torch
from torch import nn
from torch.nn import functional as F
from ...generation.logits_process import AlternatingCodebooksLogitsProcessor, SuppressTokensLogitsProcessor
from ...modeling_outputs import CausalLMOutputWithPast, MaskedLMOutput
from ...modeling_utils import PreTrainedModel
from ...utils import add_start_docstrings, add_start_docstrings_to_model_forward, logging
from ..auto import AutoModel
from .configuration_bark import (
BarkCoarseConfig,
BarkConfig,
BarkFineConfig,
BarkSemanticConfig,
BarkSubModelConfig,
)
from .generation_configuration_bark import (
BarkCoarseGenerationConfig,
BarkFineGenerationConfig,
BarkSemanticGenerationConfig,
)
logger = logging.get_logger(__name__)
_CHECKPOINT_FOR_DOC = "suno/bark-small"
_CONFIG_FOR_DOC = "BarkConfig"
BARK_PRETRAINED_MODEL_ARCHIVE_LIST = [
"suno/bark-small",
"suno/barh",
# See all Bark models at https://huggingface.co/models?filter=bark
]
class BarkSelfAttention(nn.Module):
# adapted from GPTNeoSelfAttention and Bark code
# BarkSelfAttention can have two attention type, i.e full attention or causal attention
def __init__(self, config, is_causal=False):
super().__init__()
# regularization
self.dropout = config.dropout
self.attn_dropout = nn.Dropout(config.dropout)
self.resid_dropout = nn.Dropout(config.dropout)
self.embed_dim = config.hidden_size
self.num_heads = config.num_heads
self.head_dim = self.embed_dim // self.num_heads
if config.hidden_size % config.num_heads != 0:
raise ValueError(
f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim} and `num_heads`:"
f" {self.num_heads})."
)
# key, query, value projections for all heads, but in a batch
self.att_proj = nn.Linear(config.hidden_size, 3 * config.hidden_size, bias=config.bias)
# output projection
self.out_proj = nn.Linear(config.hidden_size, config.hidden_size, bias=config.bias)
self.is_causal = is_causal
if is_causal:
block_size = config.block_size
bias = torch.tril(torch.ones((block_size, block_size), dtype=bool)).view(1, 1, block_size, block_size)
self.register_buffer("bias", bias)
# Copied from transformers.models.gpt_neo.modeling_gpt_neo.GPTNeoSelfAttention._split_heads
def _split_heads(self, tensor, num_heads, attn_head_size):
"""
Splits hidden_size dim into attn_head_size and num_heads
"""
new_shape = tensor.size()[:-1] + (num_heads, attn_head_size)
tensor = tensor.view(new_shape)
return tensor.permute(0, 2, 1, 3) # (batch, head, seq_length, head_features)
def _merge_heads(self, tensor, num_heads, attn_head_size):
"""
Merges attn_head_size dim and num_attn_heads dim into hidden_size
"""
# re-assemble all head outputs side by side
# (batch, num_heads, seq_len, attn_head_size) -> (batch, seq_len, num_heads*attn_head_size)
tensor = tensor.transpose(1, 2).contiguous()
tensor = tensor.view(tensor.size()[:-2] + (num_heads * attn_head_size,))
return tensor
def _attn(self, query, key, value, attention_mask=None, head_mask=None):
# unlike GPTNeo's SelfAttention, divide by the square root of the dimension of the query and the key
attn_weights = torch.matmul(query, key.transpose(-1, -2)) * (1.0 / math.sqrt(self.head_dim))
if self.is_causal:
query_length, key_length = query.size(-2), key.size(-2)
# fill the upper left part of the attention weights with inf
attn_weights = attn_weights.masked_fill(
self.bias[:, :, key_length - query_length : key_length, :key_length] == 0,
torch.finfo(attn_weights.dtype).min,
)
if attention_mask is not None:
# Apply the attention mask
attn_weights = attn_weights + attention_mask
attn_weights = nn.functional.softmax(attn_weights, dim=-1)
attn_weights = attn_weights.to(value.dtype)
attn_weights = self.attn_dropout(attn_weights)
# Mask heads if we want to
if head_mask is not None:
attn_weights = attn_weights * head_mask
# (batch, num_heads, seq_len, seq_len) x (batch, num_heads, seq_len, attn_head_size)
# -> (batch, num_heads, seq_len, attn_head_size)
attn_output = torch.matmul(attn_weights, value)
return attn_output, attn_weights
def forward(
self,
hidden_states,
attention_mask=None,
past_key_values=None,
head_mask=None,
use_cache=False,
output_attentions=False,
):
# calculate query, key, values for all heads in batch and move head forward to be the batch dim
query, key, value = self.att_proj(hidden_states).split(self.embed_dim, dim=2)
query = self._split_heads(query, self.num_heads, self.head_dim)
key = self._split_heads(key, self.num_heads, self.head_dim)
value = self._split_heads(value, self.num_heads, self.head_dim)
if past_key_values is not None:
past_key = past_key_values[0]
past_value = past_key_values[1]
key = torch.cat((past_key, key), dim=-2)
value = torch.cat((past_value, value), dim=-2)
if use_cache is True:
present = (key, value)
else:
present = None
attn_output, attn_weights = self._attn(query, key, value, attention_mask, head_mask)
attn_output = self._merge_heads(attn_output, self.num_heads, self.head_dim)
attn_output = self.out_proj(attn_output)
attn_output = self.resid_dropout(attn_output)
outputs = (attn_output, present)
if output_attentions:
outputs += (attn_weights,)
return outputs
class BarkLayerNorm(nn.Module):
"""LayerNorm but with an optional bias. PyTorch doesn't support simply bias=False."""
def __init__(self, hidden_size, bias=True):
super().__init__()
self.weight = nn.Parameter(torch.ones(hidden_size))
self.bias = nn.Parameter(torch.zeros(hidden_size)) if bias else None
def forward(self, input):
return F.layer_norm(input, self.weight.shape, self.weight, self.bias, eps=1e-5)
class BarkMLP(nn.Module):
def __init__(self, config):
super().__init__()
self.in_proj = nn.Linear(config.hidden_size, 4 * config.hidden_size, bias=config.bias)
self.out_proj = nn.Linear(4 * config.hidden_size, config.hidden_size, bias=config.bias)
self.dropout = nn.Dropout(config.dropout)
self.gelu = nn.GELU()
def forward(self, hidden_states):
hidden_states = self.in_proj(hidden_states)
hidden_states = self.gelu(hidden_states)
hidden_states = self.out_proj(hidden_states)
hidden_states = self.dropout(hidden_states)
return hidden_states
class BarkBlock(nn.Module):
def __init__(self, config, is_causal=False):
super().__init__()
if is_causal:
# if causal, uses handmade LayerNorm, so that the layerNorm bias is optional
# this handmade layerNorm is used to stick with Bark choice of leaving optional bias in
# AutoRegressive models (corresponding to the "Text" and the "Coarse" modules)
self.layernorm_1 = BarkLayerNorm(config.hidden_size, bias=config.bias)
self.layernorm_2 = BarkLayerNorm(config.hidden_size, bias=config.bias)
else:
self.layernorm_1 = nn.LayerNorm(config.hidden_size)
self.layernorm_2 = nn.LayerNorm(config.hidden_size)
self.attn = BarkSelfAttention(config, is_causal=is_causal)
self.mlp = BarkMLP(config)
def forward(
self,
hidden_states,
past_key_values=None,
attention_mask=None,
head_mask=None,
use_cache=False,
output_attentions=False,
):
intermediary_hidden_states = self.layernorm_1(hidden_states)
attn_outputs = self.attn(
intermediary_hidden_states,
past_key_values=past_key_values,
attention_mask=attention_mask,
head_mask=head_mask,
use_cache=use_cache,
output_attentions=output_attentions,
)
attn_output = attn_outputs[0] # output_attn: output, present_key_values, (attn_weights)
outputs = attn_outputs[1:]
intermediary_hidden_states = hidden_states + attn_output
intermediary_hidden_states = intermediary_hidden_states + self.mlp(
self.layernorm_2(intermediary_hidden_states)
)
if use_cache:
outputs = (intermediary_hidden_states,) + outputs
else:
outputs = (intermediary_hidden_states,) + outputs[1:]
return outputs # hidden_states, ((present), attentions)
class BarkPreTrainedModel(PreTrainedModel):
"""
An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained
models.
"""
config_class = BarkConfig
supports_gradient_checkpointing = False
def _init_weights(self, module):
"""Initialize the weights."""
if isinstance(module, (nn.Linear,)):
# Slightly different from the TF version which uses truncated_normal for initialization
# cf https://github.com/pytorch/pytorch/pull/5617
module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.Embedding):
module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
if module.padding_idx is not None:
module.weight.data[module.padding_idx].zero_()
elif isinstance(module, nn.LayerNorm):
module.bias.data.zero_()
module.weight.data.fill_(1.0)
def __init__(self, *inputs, **kwargs):
super().__init__(*inputs, **kwargs)
def _set_gradient_checkpointing(self, module, value=False):
if isinstance(module, BarkCausalModel) or isinstance(module, BarkFineModel) or isinstance(module, BarkModel):
module.gradient_checkpointing = value
BARK_MODEL_START_DOCSTRING = """
This model inherits from [`PreTrainedModel`]. Check the superclass documentation for the generic methods the
library implements for all its model (such as downloading or saving, resizing the input embeddings, pruning heads
etc.)
This model is also a PyTorch [torch.nn.Module](https://pytorch.org/docs/stable/nn.html#torch.nn.Module) subclass.
Use it as a regular PyTorch Module and refer to the PyTorch documentation for all matter related to general usage
and behavior.
Parameters:
config ([`{config}`]):
Model configuration class with all the parameters of the model. Initializing with a config file does not
load the weights associated with the model, only the configuration. Check out the
[`~PreTrainedModel.from_pretrained`] method to load the model weights.
"""
BARK_START_DOCSTRING = r"""
This model inherits from [`PreTrainedModel`]. Check the superclass documentation for the generic methods the
library implements for all its model (such as downloading or saving, resizing the input embeddings, pruning heads
etc.)
This model is also a PyTorch [torch.nn.Module](https://pytorch.org/docs/stable/nn.html#torch.nn.Module) subclass.
Use it as a regular PyTorch Module and refer to the PyTorch documentation for all matter related to general usage
and behavior.
Parameters:
config ([`BarkConfig`]):
Model configuration class with all the parameters of the model. Initializing with a config file does not
load the weights associated with the model, only the configuration. Check out the
[`~PreTrainedModel.from_pretrained`] method to load the model weights.
"""
BARK_FINE_INPUTS_DOCSTRING = r"""
Args:
codebook_idx (`int`):
Index of the codebook that will be predicted.
input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length, number_of_codebooks)`):
Indices of input sequence tokens in the vocabulary. Padding will be ignored by default should you provide
it. Initially, indices of the first two codebooks are obtained from the `coarse` sub-model. The rest is
predicted recursively by attending the previously predicted channels. The model predicts on windows of
length 1024.
attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*):
Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`:
- 1 for tokens that are **not masked**,
- 0 for tokens that are **masked**.
[What are attention masks?](../glossary#attention-mask)
position_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
Indices of positions of each input sequence tokens in the position embeddings. Selected in the range `[0,
config.max_position_embeddings - 1]`.
[What are position IDs?](../glossary#position-ids)
head_mask (`torch.Tensor` of shape `(encoder_layers, encoder_attention_heads)`, *optional*):
Mask to nullify selected heads of the attention modules in the encoder. Mask values selected in `[0, 1]`:
- 1 indicates the head is **not masked**,
- 0 indicates the head is **masked**.
labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): NOT IMPLEMENTED YET.
input_embeds (`torch.FloatTensor` of shape `(batch_size, input_sequence_length, hidden_size)`, *optional*):
Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. If
`past_key_values` is used, optionally only the last `input_embeds` have to be input (see
`past_key_values`). This is useful if you want more control over how to convert `input_ids` indices into
associated vectors than the model's internal embedding lookup matrix.
output_attentions (`bool`, *optional*):
Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned
tensors for more detail.
output_hidden_states (`bool`, *optional*):
Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for
more detail.
return_dict (`bool`, *optional*):
Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple.
"""
BARK_CAUSAL_MODEL_INPUTS_DOCSTRING = r"""
Args:
input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`):
Indices of input sequence tokens in the vocabulary. Padding will be ignored by default should you provide
it. Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and
[`PreTrainedTokenizer.__call__`] for details. [What are input IDs?](../glossary#input-ids)
past_key_values (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `use_cache` is passed or when `config.use_cache=True`):
Tuple of `tuple(torch.FloatTensor)` of length `config.n_layers`, with each tuple having 2 tensors of shape
`(batch_size, num_heads, sequence_length, embed_size_per_head)`.
Contains pre-computed hidden-states (key and values in the self-attention blocks) that can be used (see
`past_key_values` input) to speed up sequential decoding.
If `past_key_values` are used, the user can optionally input only the last `decoder_input_ids` (those that
don't have their past key value states given to this model) of shape `(batch_size, 1)` instead of all
`input_ids` of shape `(batch_size, sequence_length)`.
attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*):
Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`:
- 1 for tokens that are **not masked**,
- 0 for tokens that are **masked**.
[What are attention masks?](../glossary#attention-mask)
position_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
Indices of positions of each input sequence tokens in the position embeddings. Selected in the range `[0,
config.max_position_embeddings - 1]`.
[What are position IDs?](../glossary#position-ids)
head_mask (`torch.Tensor` of shape `(encoder_layers, encoder_attention_heads)`, *optional*):
Mask to nullify selected heads of the attention modules in the encoder. Mask values selected in `[0, 1]`:
- 1 indicates the head is **not masked**,
- 0 indicates the head is **masked**.
input_embeds (`torch.FloatTensor` of shape `(batch_size, input_sequence_length, hidden_size)`, *optional*):
Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation.
Here, due to `Bark` particularities, if `past_key_values` is used, `input_embeds` will be ignored and you
have to use `input_ids`. If `past_key_values` is not used and `use_cache` is set to `True`, `input_embeds`
is used in priority instead of `input_ids`.
use_cache (`bool`, *optional*):
If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding (see
`past_key_values`).
output_attentions (`bool`, *optional*):
Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned
tensors for more detail.
output_hidden_states (`bool`, *optional*):
Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for
more detail.
return_dict (`bool`, *optional*):
Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple.
"""
# GPT2-like autoregressive model
class BarkCausalModel(BarkPreTrainedModel):
config_class = BarkSubModelConfig
def __init__(self, config):
super().__init__(config)
self.config = config
# initialize as an autoregressive GPT-like model
self.input_embeds_layer = nn.Embedding(config.input_vocab_size, config.hidden_size)
self.position_embeds_layer = nn.Embedding(config.block_size, config.hidden_size)
self.drop = nn.Dropout(config.dropout)
self.layers = nn.ModuleList([BarkBlock(config, is_causal=True) for _ in range(config.num_layers)])
self.layernorm_final = BarkLayerNorm(config.hidden_size, bias=config.bias)
self.lm_head = nn.Linear(config.hidden_size, config.output_vocab_size, bias=False)
self.gradient_checkpointing = False
# Initialize weights and apply final processing
self.post_init()
def get_input_embeddings(self):
return self.input_embeds_layer
def set_input_embeddings(self, new_embeddings):
self.input_embeds_layer = new_embeddings
def prepare_inputs_for_generation(self, input_ids, past_key_values=None, **kwargs):
input_embeds = kwargs.get("input_embeds", None)
attention_mask = kwargs.get("attention_mask", None)
position_ids = kwargs.get("position_ids", None)
if past_key_values is not None:
# only last token for inputs_ids if past is defined in kwargs
seq_len = input_ids.shape[1]
input_ids = input_ids[:, [-1]]
# input_embeds have already been used and is not required anymore
input_embeds = None
else:
if input_embeds is not None and kwargs.get("use_cache"):
seq_len = input_embeds.shape[1]
else:
seq_len = input_ids.shape[1]
# ensure that attention_mask and position_ids shapes are aligned with the weird Bark hack of reducing
# sequence length on the first forward pass
if attention_mask is not None:
attention_mask = attention_mask[:, :seq_len]
if position_ids is not None:
position_ids = position_ids[:, :seq_len]
if attention_mask is not None and position_ids is None:
# create position_ids on the fly for batch generation
position_ids = attention_mask.long().cumsum(-1) - 1
position_ids.masked_fill_(attention_mask == 0, 1)
if past_key_values:
position_ids = position_ids[:, -1].unsqueeze(-1)
else:
position_ids = None
if input_embeds is not None and kwargs.get("use_cache"):
return {
"input_ids": None,
"input_embeds": input_embeds,
"past_key_values": past_key_values,
"use_cache": kwargs.get("use_cache"),
"position_ids": position_ids,
"attention_mask": attention_mask,
}
return {
"input_ids": input_ids,
"past_key_values": past_key_values,
"use_cache": kwargs.get("use_cache"),
"position_ids": position_ids,
"attention_mask": attention_mask,
}
@add_start_docstrings_to_model_forward(BARK_CAUSAL_MODEL_INPUTS_DOCSTRING)
def forward(
self,
input_ids: Optional[torch.Tensor] = None,
past_key_values: Optional[Tuple[torch.FloatTensor]] = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.Tensor] = None,
head_mask: Optional[torch.Tensor] = None,
labels: Optional[torch.LongTensor] = None,
input_embeds: Optional[torch.Tensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple[torch.Tensor], CausalLMOutputWithPast]:
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
use_cache = use_cache if use_cache is not None else self.config.use_cache
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# Verify if input_embeds already exists
# then compute embeddings.
if input_ids is not None and input_embeds is not None:
raise ValueError("You cannot specify both input_ids and input_embeds at the same time")
elif input_embeds is not None and past_key_values is None:
# we want to return the input_embeds in priority so that it is in line with a weird hack
# of Bark which concatenate two bits of the input_embeds on the first forward pass of the semantic model
pass
elif input_ids is not None:
input_embeds = self.input_embeds_layer(input_ids) # token embeddings of shape (b, t, n_embd)
elif input_embeds is not None:
pass
else:
raise ValueError("You have to specify either input_ids or input_embeds")
input_shape = input_embeds.size()[:-1]
batch_size = input_embeds.shape[0]
seq_length = input_shape[-1]
device = input_ids.device if input_ids is not None else input_embeds.device
if past_key_values is None:
past_length = 0
past_key_values = tuple([None] * len(self.layers))
else:
past_length = past_key_values[0][0].size(-2)
if position_ids is None:
position_ids = torch.arange(past_length, seq_length + past_length, dtype=torch.long, device=device)
position_ids = position_ids.unsqueeze(0) # shape (1, seq_length)
position_embeds = self.position_embeds_layer(position_ids) # position embeddings of shape (1, t, n_embd)
# Attention mask.
if attention_mask is not None:
if batch_size <= 0:
raise ValueError("batch_size has to be defined and > 0")
attention_mask = attention_mask.view(batch_size, -1)
# We create a 3D attention mask from a 2D tensor mask.
# Sizes are [batch_size, 1, 1, to_seq_length]
# So we can broadcast to [batch_size, num_heads, from_seq_length, to_seq_length]
# this attention mask is more simple than the triangular masking of causal attention
# used in OpenAI GPT, we just need to prepare the broadcast dimension here.
attention_mask = attention_mask[:, None, None, :]
# Since attention_mask is 1.0 for positions we want to attend and 0.0 for
# masked positions, this operation will create a tensor which is 0.0 for
# positions we want to attend and the dtype's smallest value for masked positions.
# Since we are adding it to the raw scores before the softmax, this is
# effectively the same as removing these entirely.
attention_mask = attention_mask.to(dtype=self.dtype) # fp16 compatibility
attention_mask = (1.0 - attention_mask) * torch.finfo(self.dtype).min
# Prepare head mask if needed
# 1.0 in head_mask indicate we keep the head
# attention_probs has shape bsz x num_heads x N x N
# head_mask has shape num_layers x batch x num_heads x N x N
head_mask = self.get_head_mask(head_mask, self.config.num_layers)
hidden_states = self.drop(input_embeds + position_embeds)
output_shape = input_shape + (hidden_states.size(-1),)
if self.gradient_checkpointing and self.training:
if use_cache:
logger.warning_once(
"`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..."
)
use_cache = False
present_key_values = () if use_cache else None
all_self_attentions = () if output_attentions else None
all_hidden_states = () if output_hidden_states else None
for i, (block, past_layer_key_values) in enumerate(zip(self.layers, past_key_values)):
if output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states,)
if self.gradient_checkpointing and self.training:
def create_custom_forward(module):
def custom_forward(*inputs):
# None for past_key_value
return module(*inputs, use_cache, output_attentions)
return custom_forward
outputs = torch.utils.checkpoint.checkpoint(
create_custom_forward(block),
hidden_states,
None,
attention_mask,
head_mask[i],
)
else:
outputs = block(
hidden_states,
past_key_values=past_layer_key_values,
attention_mask=attention_mask,
head_mask=head_mask[i],
use_cache=use_cache,
output_attentions=output_attentions,
)
hidden_states = outputs[0]
if use_cache:
present_key_values = present_key_values + (outputs[1],)
if output_attentions:
all_self_attentions = all_self_attentions + (outputs[2 if use_cache else 1],)
hidden_states = self.layernorm_final(hidden_states)
hidden_states = hidden_states.view(output_shape)
# Add last hidden state
if output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states,)
logits = self.lm_head(hidden_states)
loss = None
if labels is not None:
raise NotImplementedError(
"Training is not implemented yet for Bark - ensure you do not pass `labels` to the model."
)
if not return_dict:
return tuple(
v for v in [None, logits, present_key_values, all_hidden_states, all_self_attentions] if v is not None
)
return CausalLMOutputWithPast(
loss=loss,
logits=logits,
past_key_values=present_key_values,
hidden_states=all_hidden_states,
attentions=all_self_attentions,
)
@staticmethod
def _reorder_cache(
past_key_values: Tuple[Tuple[torch.Tensor]], beam_idx: torch.Tensor
) -> Tuple[Tuple[torch.Tensor]]:
"""
This function is used to re-order the `past_key_values` cache if [`~PreTrainedModel.beam_search`] or
[`~PreTrainedModel.beam_sample`] is called. This is required to match `past_key_values` with the correct
beam_idx at every generation step.
"""
# Necessary for beam_search
return tuple(
tuple(past_state.index_select(0, beam_idx.to(past_state.device)) for past_state in layer_past)
for layer_past in past_key_values
)
@add_start_docstrings(
"""Bark semantic (or text) model. It shares the same architecture as the coarse model.
It is a GPT-2 like autoregressive model with a language modeling head on top.""",
BARK_MODEL_START_DOCSTRING.format(config="BarkSemanticConfig"),
)
class BarkSemanticModel(BarkCausalModel):
base_model_prefix = "semantic"
config_class = BarkSemanticConfig
def generate(
self,
input_ids: torch.Tensor,
semantic_generation_config: BarkSemanticGenerationConfig = None,
history_prompt: Optional[Dict[str, torch.Tensor]] = None,
attention_mask: Optional[torch.Tensor] = None,
**kwargs,
) -> torch.LongTensor:
"""
Generates text semantic tokens from an input prompt and an additional optional `Bark` speaker prompt.
Args:
input_ids (`Optional[torch.Tensor]` of shape (batch_size, seq_len), *optional*):
Input ids, i.e tokenized input sentences. Will be truncated up to
semantic_generation_config.max_input_semantic_length tokens. Note that the output audios will be as
long as the longest generation among the batch.
semantic_generation_config (`BarkSemanticGenerationConfig`):
Generation config indicating how to generate the semantic tokens.
history_prompt (`Optional[Dict[str,torch.Tensor]]`, *optional*):
Optional `Bark` speaker prompt.
attention_mask (`Optional[torch.Tensor]`, *optional*):
Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`:
- 1 for tokens that are **not masked**,
- 0 for tokens that are **masked**.
[What are attention masks?](../glossary#attention-mask)
Returns:
torch.LongTensor: Output semantic tokens.
"""
if semantic_generation_config is None:
raise ValueError("`semantic_generation_config` has to be provided")
batch_size = input_ids.shape[0]
max_input_semantic_length = semantic_generation_config.max_input_semantic_length
input_ids = input_ids + semantic_generation_config.text_encoding_offset
if attention_mask is not None:
input_ids = input_ids.masked_fill((1 - attention_mask).bool(), semantic_generation_config.text_pad_token)
if history_prompt is not None:
semantic_history = history_prompt["semantic_prompt"][-max_input_semantic_length:]
semantic_history = nn.functional.pad(
semantic_history,
(0, max_input_semantic_length - len(semantic_history)),
value=semantic_generation_config.semantic_pad_token,
mode="constant",
)
else:
semantic_history = torch.tensor(
[semantic_generation_config.semantic_pad_token] * max_input_semantic_length, dtype=torch.int
).to(self.device)
semantic_history = torch.repeat_interleave(semantic_history[None], batch_size, dim=0)
infer_array = torch.tensor(
[[semantic_generation_config.semantic_infer_token]] * batch_size, dtype=torch.int
).to(self.device)
input_embeds = torch.cat(
[
self.input_embeds_layer(input_ids[:, :max_input_semantic_length])
+ self.input_embeds_layer(semantic_history[:, : max_input_semantic_length + 1]),
self.input_embeds_layer(infer_array),
],
dim=1,
)
tokens_to_suppress = list(
range(semantic_generation_config.semantic_vocab_size, semantic_generation_config.semantic_pad_token)
)
tokens_to_suppress.extend(
list(range(semantic_generation_config.semantic_pad_token + 1, self.config.output_vocab_size))
)
suppress_tokens_logits_processor = SuppressTokensLogitsProcessor(tokens_to_suppress)
# pass input_ids in order to stay consistent with the transformers generate method even though it is not used
# (except to get the input seq_len - that's why we keep the first 257 tokens)
semantic_output = super().generate(
torch.ones((batch_size, max_input_semantic_length + 1), dtype=torch.int).to(self.device),
input_embeds=input_embeds,
logits_processor=[suppress_tokens_logits_processor],
generation_config=semantic_generation_config,
**kwargs,
) # size: 10048
# take the generated semantic tokens
semantic_output = semantic_output[:, max_input_semantic_length + 1 :]
return semantic_output
@add_start_docstrings(
"""Bark coarse acoustics model.
It shares the same architecture as the semantic (or text) model. It is a GPT-2 like autoregressive model with a
language modeling head on top.""",
BARK_MODEL_START_DOCSTRING.format(config="BarkCoarseConfig"),
)
class BarkCoarseModel(BarkCausalModel):
base_model_prefix = "coarse_acoustics"
config_class = BarkCoarseConfig
def preprocess_histories(
self,
max_coarse_history: int,
semantic_to_coarse_ratio: int,
batch_size: int,
semantic_generation_config: int,
codebook_size: int,
history_prompt: Optional[Dict[str, torch.Tensor]] = None,
):
"""
Preprocess the optional `Bark` speaker prompts before `self.generate`.
Args:
max_coarse_history (`int`):
Maximum size of coarse tokens used.
semantic_to_coarse_ratio (`int`):
Ratio of semantic to coarse frequency
batch_size (`int`):
Batch size, i.e the number of samples.
semantic_generation_config (`BarkSemanticGenerationConfig`):
Generation config indicating how to generate the semantic tokens.
codebook_size (`int`):
Codebook channel size, i.e. the size of the output vocabulary per codebook channel.
history_prompt (`Optional[Dict[str,torch.Tensor]]`):
Optional `Bark` speaker prompt.
Returns: Returns:
`tuple(torch.FloatTensor)`:
- **x_semantic_history** (`torch.FloatTensor` -- Processed semantic speaker prompt.
- **x_coarse_history** (`torch.FloatTensor`) -- Processed coarse speaker prompt.
"""
if history_prompt is not None:
x_semantic_history = torch.repeat_interleave(history_prompt["semantic_prompt"][None], batch_size, dim=0)
# clone to avoid modifying history_prompt.coarse_prompt
x_coarse_history = history_prompt["coarse_prompt"].clone()
# offset x_coarse_history
if codebook_size is not None:
for n in range(1, x_coarse_history.shape[0]):
# offset
x_coarse_history[n, :] += codebook_size * n
# flatten x_coarse_history
x_coarse_history = torch.transpose(x_coarse_history, 0, 1).view(-1)
x_coarse_history = x_coarse_history + semantic_generation_config.semantic_vocab_size
x_coarse_history = torch.repeat_interleave(x_coarse_history[None], batch_size, dim=0)
# e.g: after SEMANTIC_VOCAB_SIZE (10000), 1024 tokens dedicated to first codebook, 1024 next tokens
# dedicated to second codebook.
max_semantic_history = int(np.floor(max_coarse_history / semantic_to_coarse_ratio))
# trim histories correctly
n_semantic_hist_provided = min(
[
max_semantic_history,
x_semantic_history.shape[1] - x_semantic_history.shape[1] % 2,
int(np.floor(x_coarse_history.shape[1] / semantic_to_coarse_ratio)),
]
)
n_coarse_hist_provided = int(round(n_semantic_hist_provided * semantic_to_coarse_ratio))
x_semantic_history = x_semantic_history[:, -n_semantic_hist_provided:].int()
x_coarse_history = x_coarse_history[:, -n_coarse_hist_provided:].int()
# bit of a hack for time alignment (sounds better) - from Bark original implementation
x_coarse_history = x_coarse_history[:, :-2]
else:
# shape: (batch_size, 0)
x_semantic_history = torch.tensor([[]] * batch_size, dtype=torch.int).to(self.device)
x_coarse_history = torch.tensor([[]] * batch_size, dtype=torch.int).to(self.device)
return x_semantic_history, x_coarse_history
def generate(
self,
semantic_output: torch.Tensor,
semantic_generation_config: BarkSemanticGenerationConfig = None,
coarse_generation_config: BarkCoarseGenerationConfig = None,
codebook_size: int = 1024,
history_prompt: Optional[Dict[str, torch.Tensor]] = None,
**kwargs,
) -> torch.LongTensor:
"""
Generates coarse acoustics tokens from input text semantic tokens and an additional optional `Bark` speaker
prompt.
Args:
semantic_output (`torch.Tensor` of shape (batch_size, seq_len), *optional*):
Input text semantic ids, i.e the output of `BarkSemanticModel.generate`.
semantic_generation_config (`BarkSemanticGenerationConfig`):
Generation config indicating how to generate the semantic tokens.
coarse_generation_config (`BarkCoarseGenerationConfig`):
Generation config indicating how to generate the coarse tokens.
codebook_size (`int`, *optional*, defaults to 1024):
Codebook channel size, i.e. the size of the output vocabulary per codebook channel.
history_prompt (`Optional[Dict[str,torch.Tensor]]`, *optional*):
Optional `Bark` speaker prompt.
Returns:
torch.LongTensor: Output coarse acoustics tokens.
"""
if semantic_generation_config is None:
raise ValueError("`semantic_generation_config` has to be provided")
if coarse_generation_config is None:
raise ValueError("`coarse_generation_config` has to be provided")
max_coarse_input_length = coarse_generation_config.max_coarse_input_length
max_coarse_history = coarse_generation_config.max_coarse_history
sliding_window_len = coarse_generation_config.sliding_window_len
# replace semantic_pad_token (eos_tok and pad_tok here) with coarse_semantic_pad_token i.e the pad_token
# used in the next model
semantic_output.masked_fill_(
semantic_output == semantic_generation_config.semantic_pad_token,
coarse_generation_config.coarse_semantic_pad_token,
)
semantic_to_coarse_ratio = (
coarse_generation_config.coarse_rate_hz
/ semantic_generation_config.semantic_rate_hz
* coarse_generation_config.n_coarse_codebooks
)
max_semantic_history = int(np.floor(max_coarse_history / semantic_to_coarse_ratio))
# beware, depends on the seq_len of the longest sequence of the batch.
# Also, the seq_len might be one token too long because of an added
# pad_token as compared to Bark original implementation.
max_generated_len = np.floor(
semantic_output.shape[1] * semantic_to_coarse_ratio / coarse_generation_config.n_coarse_codebooks
)
max_generated_len = int(round(max_generated_len * coarse_generation_config.n_coarse_codebooks))
batch_size = semantic_output.shape[0]
x_semantic_history, x_coarse = self.preprocess_histories(
history_prompt=history_prompt,
max_coarse_history=max_coarse_history,
semantic_to_coarse_ratio=semantic_to_coarse_ratio,
batch_size=batch_size,
semantic_generation_config=semantic_generation_config,
codebook_size=codebook_size,
)
base_semantic_idx = x_semantic_history.shape[1]
semantic_output = torch.hstack([x_semantic_history, semantic_output])
n_window_steps = int(np.ceil(max_generated_len / sliding_window_len))
total_generated_len = 0
len_coarse_history = x_coarse.shape[1]
for _ in range(n_window_steps):
semantic_idx = base_semantic_idx + int(round(total_generated_len / semantic_to_coarse_ratio))
# pad from right side
input_coarse = semantic_output[:, np.max([0, semantic_idx - max_semantic_history]) :]
input_coarse = input_coarse[:, :max_coarse_input_length]
input_coarse = F.pad(
input_coarse,
(0, max_coarse_input_length - input_coarse.shape[-1]),
"constant",
coarse_generation_config.coarse_semantic_pad_token,
)
input_coarse = torch.hstack(
[
input_coarse,
torch.tensor([[coarse_generation_config.coarse_infer_token]] * batch_size).to(self.device),
x_coarse[:, -max_coarse_history:],
]
)
alternatingLogitsProcessor = AlternatingCodebooksLogitsProcessor(
input_coarse.shape[1],
semantic_generation_config.semantic_vocab_size,
codebook_size,
)
output_coarse = super().generate(
input_coarse,
logits_processor=[alternatingLogitsProcessor],
max_new_tokens=min(sliding_window_len, max_generated_len - total_generated_len),
generation_config=coarse_generation_config,
**kwargs,
)
input_coarse_len = input_coarse.shape[1]
x_coarse = torch.hstack([x_coarse, output_coarse[:, input_coarse_len:]])
total_generated_len = x_coarse.shape[1] - len_coarse_history
del output_coarse
coarse_output = x_coarse[:, len_coarse_history:]
return coarse_output
@add_start_docstrings(
"""Bark fine acoustics model. It is a non-causal GPT-like model with `config.n_codes_total` embedding layers and
language modeling heads, one for each codebook.""",
BARK_MODEL_START_DOCSTRING.format(config="BarkFineConfig"),
)
class BarkFineModel(BarkPreTrainedModel):
base_model_prefix = "fine_acoustics"
config_class = BarkFineConfig
main_input_name = "codebook_idx"
def __init__(self, config):
# non-causal gpt-like model with one embedding layer and one lm_head for each codebook of Encodec
super().__init__(config)
self.config = config
# initialize a modified non causal GPT-like model
# note that for there is one embedding layer and one lm_head for each codebook of Encodec
self.input_embeds_layers = nn.ModuleList(
[nn.Embedding(config.input_vocab_size, config.hidden_size) for _ in range(config.n_codes_total)]
)
self.position_embeds_layer = nn.Embedding(config.block_size, config.hidden_size)
self.drop = nn.Dropout(config.dropout)
self.layers = nn.ModuleList([BarkBlock(config, is_causal=False) for _ in range(config.num_layers)])
self.layernorm_final = nn.LayerNorm(config.hidden_size)
self.lm_heads = nn.ModuleList(
[
nn.Linear(config.hidden_size, config.output_vocab_size, bias=False)
for _ in range(config.n_codes_given, config.n_codes_total)
]
)
self.gradient_checkpointing = False
self.n_codes_total = config.n_codes_total
# Initialize weights and apply final processing
self.post_init()
def get_input_embeddings(self):
# one embedding layers for each codebook
return self.input_embeds_layers
def set_input_embeddings(self, new_embeddings):
# one embedding layers for each codebook
self.input_embeds_layers = new_embeddings
def get_output_embeddings(self):
# one lm_head for each codebook
return self.lm_heads
def set_output_embeddings(self, new_output_embeddings):
# one lm_head for each codebook
self.lm_heads = new_output_embeddings
def _resize_token_embeddings(self, new_num_tokens):
old_embeddings_list = self.get_input_embeddings()
new_embeddings_list = nn.ModuleList(
[self._get_resized_embeddings(old_embeddings, new_num_tokens) for old_embeddings in old_embeddings_list]
)
self.set_input_embeddings(new_embeddings_list)
# if word embeddings are not tied, make sure that lm head is resized as well
if self.get_output_embeddings() is not None and not self.config.tie_word_embeddings:
old_lm_head_list = self.get_output_embeddings()
new_lm_head_list = nn.ModuleList(
[self._get_resized_lm_head(old_lm_head, new_num_tokens) for old_lm_head in old_lm_head_list]
)
self.set_output_embeddings(new_lm_head_list)
return self.get_input_embeddings()
def tie_weights(self):
"""
Tie the weights between the input embeddings list and the output embeddings list.
If the `torchscript` flag is set in the configuration, can't handle parameter sharing so we are cloning the
weights instead.
"""
if getattr(self.config, "tie_word_embeddings", True):
self._tied_weights_keys = []
output_embeddings = self.get_output_embeddings()
input_embeddings = self.get_input_embeddings()
for i in range(self.config.n_codes_total - self.config.n_codes_given):
# self.input_embeds_layers[i + 1].weight = self.lm_heads[i].weight
self._tie_or_clone_weights(output_embeddings[i], input_embeddings[i + 1])
self._tied_weights_keys.append(f"lm_heads.{i}.weight")
for module in self.modules():
if hasattr(module, "_tie_weights"):
module._tie_weights()
@add_start_docstrings_to_model_forward(BARK_FINE_INPUTS_DOCSTRING)
def forward(
self,
codebook_idx: int, # an additionnal idx corresponding to the id of the codebook that will be predicted
input_ids: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.Tensor] = None,
head_mask: Optional[torch.Tensor] = None,
labels: Optional[torch.LongTensor] = None,
input_embeds: Optional[torch.Tensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple[torch.Tensor], MaskedLMOutput]:
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
if codebook_idx == 0:
raise ValueError("Cannot predict 0th codebook - 0th codebook should be predicted by the coarse model")
if input_ids is not None and input_embeds is not None:
raise ValueError("You cannot specify both input_ids and input_embeds at the same time")
if input_ids is None and input_embeds is None:
raise ValueError("You have to specify either input_ids or input_embeds")
if input_ids is not None:
# the input_embeddings are the sum of the j previous codebooks embeddings before
# the current codebook_idx codebook
# forward the GPT model itself
input_embeds = [
input_embeds_layer(input_ids[:, :, i]).unsqueeze(-1)
for i, input_embeds_layer in enumerate(self.input_embeds_layers)
] # token embeddings of shape (b, t, n_embd)
input_embeds = torch.cat(input_embeds, dim=-1)
input_embeds = input_embeds[:, :, :, : codebook_idx + 1].sum(dim=-1)
input_shape = input_embeds.size()[:-1]
batch_size = input_embeds.shape[0]
seq_length = input_shape[1]
device = input_ids.device if input_ids is not None else input_embeds.device
if position_ids is None:
position_ids = torch.arange(0, seq_length, dtype=torch.long, device=device)
position_ids = position_ids.unsqueeze(0) # shape (1, seq_length)
position_embeds = self.position_embeds_layer(position_ids) # position embeddings of shape (1, t, n_embd)
# Attention mask.
if attention_mask is not None:
if batch_size <= 0:
raise ValueError("batch_size has to be defined and > 0")
attention_mask = attention_mask.view(batch_size, -1)
attention_mask = attention_mask[:, None, None, :]
attention_mask = attention_mask.to(dtype=self.dtype) # fp16 compatibility
attention_mask = (1.0 - attention_mask) * torch.finfo(self.dtype).min
head_mask = self.get_head_mask(head_mask, self.config.num_layers)
hidden_states = self.drop(input_embeds + position_embeds)
output_shape = input_shape + (hidden_states.size(-1),)
all_self_attentions = () if output_attentions else None
all_hidden_states = () if output_hidden_states else None
for i, block in enumerate(self.layers):
if output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states,)
outputs = block(
hidden_states,
attention_mask=attention_mask,
head_mask=head_mask[i],
output_attentions=output_attentions,
)
hidden_states = outputs[0]
if output_attentions:
all_self_attentions = all_self_attentions + (outputs[1],)
hidden_states = self.layernorm_final(hidden_states)
hidden_states = hidden_states.view(output_shape)
# Add last hidden state
if output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states,)
logits = self.lm_heads[codebook_idx - self.config.n_codes_given](hidden_states)
loss = None
if labels is not None:
raise NotImplementedError("Training is not implemented yet")
if not return_dict:
return tuple(v for v in [None, logits, all_hidden_states, all_self_attentions] if v is not None)
return MaskedLMOutput(
loss=loss,
logits=logits,
hidden_states=all_hidden_states,
attentions=all_self_attentions,
)
def can_generate(self) -> bool:
"""
Returns True. Despite being an autoencoder, BarkFineModel shares some characteristics with generative models
due to the way audio are generated.
"""
return True
def generate(
self,
coarse_output: torch.Tensor,
semantic_generation_config: BarkSemanticGenerationConfig = None,
coarse_generation_config: BarkCoarseGenerationConfig = None,
fine_generation_config: BarkFineGenerationConfig = None,
codebook_size: int = 1024,
history_prompt: Optional[Dict[str, torch.Tensor]] = None,
**kwargs,
) -> torch.LongTensor:
"""
Generates fine acoustics tokens from input coarse acoustics tokens and an additional optional `Bark` speaker
prompt.
Args:
coarse_output (`torch.Tensor` of shape (batch_size, seq_len)):
Input coarse acoustics ids, i.e the output of `BarkCoarseModel.generate`.
semantic_generation_config (`BarkSemanticGenerationConfig`):
Generation config indicating how to generate the semantic tokens.
coarse_generation_config (`BarkCoarseGenerationConfig`):
Generation config indicating how to generate the coarse tokens.
fine_generation_config (`BarkFineGenerationConfig`):
Generation config indicating how to generate the fine tokens.
codebook_size (`int`, *optional*, defaults to 1024):
Codebook channel size, i.e. the size of the output vocabulary per codebook channel.
history_prompt (`Optional[Dict[str,torch.Tensor]]`, *optional*):
Optional `Bark` speaker prompt.
Returns:
torch.LongTensor: Output fine acoustics tokens.
"""
if semantic_generation_config is None:
raise ValueError("`semantic_generation_config` has to be provided")
if coarse_generation_config is None:
raise ValueError("`coarse_generation_config` has to be provided")
if fine_generation_config is None:
raise ValueError("`fine_generation_config` has to be provided")
# since we don't really use GenerationConfig through the fine model (autoencoder)
# and since only temperature is used from the classic GenerationConfig parameters
# manually impose the kwargs priority over the generation config
temperature = kwargs.get("temperature", fine_generation_config.temperature)
max_fine_history_length = fine_generation_config.max_fine_history_length
max_fine_input_length = fine_generation_config.max_fine_input_length
# shape: (batch, n_coarse_codebooks * seq_len)
# new_shape: (batch, seq_len, n_coarse_codebooks)
coarse_output = coarse_output.view(coarse_output.shape[0], -1, coarse_generation_config.n_coarse_codebooks)
# brings ids into the range [0, codebook_size -1]
coarse_output = torch.remainder(coarse_output - semantic_generation_config.semantic_vocab_size, codebook_size)
batch_size = coarse_output.shape[0]
if history_prompt is not None:
x_fine_history = torch.repeat_interleave(history_prompt["fine_prompt"].T[None], batch_size, dim=0)
# transpose to get to shape (seq_len, n_fine_codebooks)
else:
x_fine_history = None
n_coarse = coarse_generation_config.n_coarse_codebooks
# pad the last 6th codebooks
fine_input = F.pad(
coarse_output,
(0, fine_generation_config.n_fine_codebooks - n_coarse),
"constant",
codebook_size,
)
# prepend history if available (max max_fine_history_length)
if x_fine_history is not None:
fine_input = torch.cat([x_fine_history[:, -max_fine_history_length:, :], fine_input], dim=1)
# len of the fine_history that has been added to fine_input
n_history = x_fine_history[:, -max_fine_history_length:, :].shape[1]
else:
n_history = 0
n_remove_from_end = 0
# need to pad if too short (since non-causal model)
if fine_input.shape[1] < max_fine_input_length:
n_remove_from_end = max_fine_input_length - fine_input.shape[1]
fine_input = F.pad(fine_input, (0, 0, 0, n_remove_from_end), mode="constant", value=codebook_size)
# we can be lazy about fractional loop and just keep overwriting codebooks.
# seems that coarse_output.shape[1] - (max_fine_input_length - n_history) is equal to minus n_remove_from_end
# So if we needed to pad because too short, n_loops is always 1 (because n_remove_from_end > 0)
# If not, we loop over at least twice.
n_loops = (coarse_output.shape[1] - (max_fine_input_length - n_history)) / max_fine_history_length
n_loops = int(np.ceil(n_loops))
n_loops = max(0, n_loops) + 1
for n_outer in range(n_loops):
start_idx = min([n_outer * max_fine_history_length, fine_input.shape[1] - max_fine_input_length])
start_fill_idx = min(
[n_history + n_outer * max_fine_history_length, fine_input.shape[1] - max_fine_history_length]
)
rel_start_fill_idx = start_fill_idx - start_idx
input_buffer = fine_input[:, start_idx : start_idx + max_fine_input_length, :]
for n_inner in range(n_coarse, fine_generation_config.n_fine_codebooks):
logits = self.forward(n_inner, input_buffer).logits
if temperature is None:
relevant_logits = logits[:, rel_start_fill_idx:, :codebook_size]
codebook_preds = torch.argmax(relevant_logits, -1)
else:
relevant_logits = logits[:, :, :codebook_size] / temperature
# apply softmax
probs = F.softmax(relevant_logits, dim=-1)[:, rel_start_fill_idx:max_fine_input_length]
# reshape to 2D: (batch_size, seq_len, codebook_size) -> (batch_size*seq_len, codebook_size)
probs = probs.reshape((-1, codebook_size))
# multinomial then reshape : (batch_size*seq_len)-> (batch_size,seq_len)
codebook_preds = torch.multinomial(probs, num_samples=1).view(batch_size, -1)
codebook_preds = codebook_preds.to(torch.int32)
input_buffer[:, rel_start_fill_idx:, n_inner] = codebook_preds
del logits, codebook_preds
# transfer into fine_input
for n_inner in range(n_coarse, fine_generation_config.n_fine_codebooks):
fine_input[
:, start_fill_idx : start_fill_idx + (max_fine_input_length - rel_start_fill_idx), n_inner
] = input_buffer[:, rel_start_fill_idx:, n_inner]
del input_buffer
fine_input = fine_input.transpose(1, 2)[:, :, n_history:]
if n_remove_from_end > 0:
fine_input = fine_input[:, :, :-n_remove_from_end]
if fine_input.shape[-1] != coarse_output.shape[-2]:
raise ValueError("input and output should have the same seq_len")
return fine_input
@add_start_docstrings(
"""
The full Bark model, a text-to-speech model composed of 4 sub-models:
- [`BarkSemanticModel`] (also referred to as the 'text' model): a causal auto-regressive transformer model that
takes
as input tokenized text, and predicts semantic text tokens that capture the meaning of the text.
- [`BarkCoarseModel`] (also refered to as the 'coarse acoustics' model), also a causal autoregressive transformer,
that takes into input the results of the last model. It aims at regressing the first two audio codebooks necessary
to `encodec`.
- [`BarkFineModel`] (the 'fine acoustics' model), this time a non-causal autoencoder transformer, which iteratively
predicts the last codebooks based on the sum of the previous codebooks embeddings.
- having predicted all the codebook channels from the [`EncodecModel`], Bark uses it to decode the output audio
array.
It should be noted that each of the first three modules can support conditional speaker embeddings to condition the
output sound according to specific predefined voice.
""",
BARK_START_DOCSTRING,
)
class BarkModel(BarkPreTrainedModel):
config_class = BarkConfig
def __init__(self, config):
super().__init__(config)
self.semantic = BarkSemanticModel(config.semantic_config)
self.coarse_acoustics = BarkCoarseModel(config.coarse_acoustics_config)
self.fine_acoustics = BarkFineModel(config.fine_acoustics_config)
self.codec_model = AutoModel.from_config(config.codec_config)
self.config = config
def codec_decode(self, fine_output):
"""Turn quantized audio codes into audio array using encodec."""
fine_output = fine_output.transpose(0, 1)
emb = self.codec_model.quantizer.decode(fine_output)
out = self.codec_model.decoder(emb)
audio_arr = out.squeeze(1) # squeeze the codebook dimension
return audio_arr
@torch.no_grad()
def generate(
self,
input_ids: Optional[torch.Tensor] = None,
history_prompt: Optional[Dict[str, torch.Tensor]] = None,
**kwargs,
) -> torch.LongTensor:
"""
Generates audio from an input prompt and an additional optional `Bark` speaker prompt.
Args:
input_ids (`Optional[torch.Tensor]` of shape (batch_size, seq_len), *optional*):
Input ids. Will be truncated up to 256 tokens. Note that the output audios will be as long as the
longest generation among the batch.
history_prompt (`Optional[Dict[str,torch.Tensor]]`, *optional*):
Optional `Bark` speaker prompt. Note that for now, this model takes only one speaker prompt per batch.
kwargs (*optional*): Remaining dictionary of keyword arguments. Keyword arguments are of two types:
- Without a prefix, they will be entered as `**kwargs` for the `generate` method of each sub-model.
- With a *semantic_*, *coarse_*, *fine_* prefix, they will be input for the `generate` method of the
semantic, coarse and fine respectively. It has the priority over the keywords without a prefix.
This means you can, for example, specify a generation strategy for all sub-models except one.
Returns:
torch.LongTensor: Output generated audio.
Example:
```python
>>> from transformers import AutoProcessor, BarkModel
>>> processor = AutoProcessor.from_pretrained("ylacombe/bark-small")
>>> model = BarkModel.from_pretrained("ylacombe/bark-small")
>>> # To add a voice preset, you can pass `voice_preset` to `BarkProcessor.__call__(...)`
>>> voice_preset = "v2/en_speaker_6"
>>> inputs = processor("Hello, my dog is cute, I need him in my life", voice_preset=voice_preset)
>>> audio_array = model.generate(**inputs, semantic_max_new_tokens=100)
>>> audio_array = audio_array.cpu().numpy().squeeze()
```
"""
# TODO (joao):workaround until nested generation config is compatible with PreTrained Model
# todo: dict
semantic_generation_config = BarkSemanticGenerationConfig(**self.generation_config.semantic_config)
coarse_generation_config = BarkCoarseGenerationConfig(**self.generation_config.coarse_acoustics_config)
fine_generation_config = BarkFineGenerationConfig(**self.generation_config.fine_acoustics_config)
kwargs_semantic = {
# if "attention_mask" is set, it should not be passed to CoarseModel and FineModel
"attention_mask": kwargs.pop("attention_mask", None)
}
kwargs_coarse = {}
kwargs_fine = {}
for key, value in kwargs.items():
if key.startswith("semantic_"):
key = key[len("semantic_") :]
kwargs_semantic[key] = value
elif key.startswith("coarse_"):
key = key[len("coarse_") :]
kwargs_coarse[key] = value
elif key.startswith("fine_"):
key = key[len("fine_") :]
kwargs_fine[key] = value
else:
# If the key is already in a specific config, then it's been set with a
# submodules specific value and we don't override
if key not in kwargs_semantic:
kwargs_semantic[key] = value
if key not in kwargs_coarse:
kwargs_coarse[key] = value
if key not in kwargs_fine:
kwargs_fine[key] = value
# 1. Generate from the semantic model
semantic_output = self.semantic.generate(
input_ids,
history_prompt=history_prompt,
semantic_generation_config=semantic_generation_config,
**kwargs_semantic,
)
# 2. Generate from the coarse model
coarse_output = self.coarse_acoustics.generate(
semantic_output,
history_prompt=history_prompt,
semantic_generation_config=semantic_generation_config,
coarse_generation_config=coarse_generation_config,
codebook_size=self.generation_config.codebook_size,
**kwargs_coarse,
)
# 3. "generate" from the fine model
output = self.fine_acoustics.generate(
coarse_output,
history_prompt=history_prompt,
semantic_generation_config=semantic_generation_config,
coarse_generation_config=coarse_generation_config,
fine_generation_config=fine_generation_config,
codebook_size=self.generation_config.codebook_size,
**kwargs_fine,
)
# 4. Decode the output and generate audio array
audio = self.codec_decode(output)
return audio
def can_generate(self) -> bool:
"""
Returns True. Despite not having a `self.generate` method, this model can `generate` and thus needs a
BarkGenerationConfig.
"""
return True
# coding=utf-8
# Copyright 2023 The Suno AI Authors and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Processor class for Bark
"""
import json
import os
from typing import Optional
import numpy as np
from ...feature_extraction_utils import BatchFeature
from ...processing_utils import ProcessorMixin
from ...utils import logging
from ...utils.hub import get_file_from_repo
from ..auto import AutoTokenizer
logger = logging.get_logger(__name__)
class BarkProcessor(ProcessorMixin):
r"""
Constructs a Bark processor which wraps a text tokenizer and optional Bark voice presets into a single processor.
Args:
tokenizer ([`PreTrainedTokenizer`]):
An instance of [`PreTrainedTokenizer`].
speaker_embeddings (`Dict[Dict[str]]`, *optional*, defaults to `None`):
Optional nested speaker embeddings dictionary. The first level contains voice preset names (e.g
`"en_speaker_4"`). The second level contains `"semantic_prompt"`, `"coarse_prompt"` and `"fine_prompt"`
embeddings. The values correspond to the path of the corresponding `np.ndarray`. See
[here](https://suno-ai.notion.site/8b8e8749ed514b0cbf3f699013548683?v=bc67cff786b04b50b3ceb756fd05f68c) for
a list of `voice_preset_names`.
"""
tokenizer_class = "AutoTokenizer"
attributes = ["tokenizer"]
preset_shape = {
"semantic_prompt": 1,
"coarse_prompt": 2,
"fine_prompt": 2,
}
def __init__(self, tokenizer, speaker_embeddings=None):
super().__init__(tokenizer)
self.speaker_embeddings = speaker_embeddings
@classmethod
def from_pretrained(
cls, pretrained_processor_name_or_path, speaker_embeddings_dict_path="speaker_embeddings_path.json", **kwargs
):
r"""
Instantiate a Bark processor associated with a pretrained model.
Args:
pretrained_model_name_or_path (`str` or `os.PathLike`):
This can be either:
- a string, the *model id* of a pretrained [`BarkProcessor`] hosted inside a model repo on
huggingface.co. Valid model ids can be located at the root-level, like `bert-base-uncased`, or
namespaced under a user or organization name, like `dbmdz/bert-base-german-cased`.
- a path to a *directory* containing a processor saved using the [`~BarkProcessor.save_pretrained`]
method, e.g., `./my_model_directory/`.
speaker_embeddings_dict_path (`str`, *optional*, defaults to `"speaker_embeddings_path.json"`):
The name of the `.json` file containing the speaker_embeddings dictionnary located in
`pretrained_model_name_or_path`. If `None`, no speaker_embeddings is loaded.
**kwargs
Additional keyword arguments passed along to both
[`~tokenization_utils_base.PreTrainedTokenizer.from_pretrained`].
"""
if speaker_embeddings_dict_path is not None:
speaker_embeddings_path = get_file_from_repo(
pretrained_processor_name_or_path,
speaker_embeddings_dict_path,
subfolder=kwargs.pop("subfolder", None),
cache_dir=kwargs.pop("cache_dir", None),
force_download=kwargs.pop("force_download", False),
proxies=kwargs.pop("proxies", None),
resume_download=kwargs.pop("resume_download", False),
local_files_only=kwargs.pop("local_files_only", False),
use_auth_token=kwargs.pop("use_auth_token", None),
revision=kwargs.pop("revision", None),
)
if speaker_embeddings_path is None:
logger.warning(
f"""`{os.path.join(pretrained_processor_name_or_path,speaker_embeddings_dict_path)}` does not exists
, no preloaded speaker embeddings will be used - Make sure to provide a correct path to the json
dictionnary if wanted, otherwise set `speaker_embeddings_dict_path=None`."""
)
speaker_embeddings = None
else:
with open(speaker_embeddings_path) as speaker_embeddings_json:
speaker_embeddings = json.load(speaker_embeddings_json)
else:
speaker_embeddings = None
tokenizer = AutoTokenizer.from_pretrained(pretrained_processor_name_or_path, **kwargs)
return cls(tokenizer=tokenizer, speaker_embeddings=speaker_embeddings)
def save_pretrained(
self,
save_directory,
speaker_embeddings_dict_path="speaker_embeddings_path.json",
speaker_embeddings_directory="speaker_embeddings",
push_to_hub: bool = False,
**kwargs,
):
"""
Saves the attributes of this processor (tokenizer...) in the specified directory so that it can be reloaded
using the [`~BarkProcessor.from_pretrained`] method.
Args:
save_directory (`str` or `os.PathLike`):
Directory where the tokenizer files and the speaker embeddings will be saved (directory will be created
if it does not exist).
speaker_embeddings_dict_path (`str`, *optional*, defaults to `"speaker_embeddings_path.json"`):
The name of the `.json` file that will contains the speaker_embeddings nested path dictionnary, if it
exists, and that will be located in `pretrained_model_name_or_path/speaker_embeddings_directory`.
speaker_embeddings_directory (`str`, *optional*, defaults to `"speaker_embeddings/"`):
The name of the folder in which the speaker_embeddings arrays will be saved.
push_to_hub (`bool`, *optional*, defaults to `False`):
Whether or not to push your model to the Hugging Face model hub after saving it. You can specify the
repository you want to push to with `repo_id` (will default to the name of `save_directory` in your
namespace).
kwargs:
Additional key word arguments passed along to the [`~utils.PushToHubMixin.push_to_hub`] method.
"""
if self.speaker_embeddings is not None:
os.makedirs(os.path.join(save_directory, speaker_embeddings_directory, "v2"), exist_ok=True)
embeddings_dict = {}
embeddings_dict["repo_or_path"] = save_directory
for prompt_key in self.speaker_embeddings:
if prompt_key != "repo_or_path":
voice_preset = self._load_voice_preset(prompt_key)
tmp_dict = {}
for key in self.speaker_embeddings[prompt_key]:
np.save(
os.path.join(
embeddings_dict["repo_or_path"], speaker_embeddings_directory, f"{prompt_key}_{key}"
),
voice_preset[key],
allow_pickle=False,
)
tmp_dict[key] = os.path.join(speaker_embeddings_directory, f"{prompt_key}_{key}.npy")
embeddings_dict[prompt_key] = tmp_dict
with open(os.path.join(save_directory, speaker_embeddings_dict_path), "w") as fp:
json.dump(embeddings_dict, fp)
super().save_pretrained(save_directory, push_to_hub, **kwargs)
def _load_voice_preset(self, voice_preset: str = None, **kwargs):
voice_preset_paths = self.speaker_embeddings[voice_preset]
voice_preset_dict = {}
for key in ["semantic_prompt", "coarse_prompt", "fine_prompt"]:
if key not in voice_preset_paths:
raise ValueError(
f"Voice preset unrecognized, missing {key} as a key in self.speaker_embeddings[{voice_preset}]."
)
path = get_file_from_repo(
self.speaker_embeddings.get("repo_or_path", "/"),
voice_preset_paths[key],
subfolder=kwargs.pop("subfolder", None),
cache_dir=kwargs.pop("cache_dir", None),
force_download=kwargs.pop("force_download", False),
proxies=kwargs.pop("proxies", None),
resume_download=kwargs.pop("resume_download", False),
local_files_only=kwargs.pop("local_files_only", False),
use_auth_token=kwargs.pop("use_auth_token", None),
revision=kwargs.pop("revision", None),
)
if path is None:
raise ValueError(
f"""`{os.path.join(self.speaker_embeddings.get("repo_or_path", "/"),voice_preset_paths[key])}` does not exists
, no preloaded voice preset will be used - Make sure to provide correct paths to the {voice_preset}
embeddings."""
)
voice_preset_dict[key] = np.load(path)
return voice_preset_dict
def _validate_voice_preset_dict(self, voice_preset: Optional[dict] = None):
for key in ["semantic_prompt", "coarse_prompt", "fine_prompt"]:
if key not in voice_preset:
raise ValueError(f"Voice preset unrecognized, missing {key} as a key.")
if not isinstance(voice_preset[key], np.ndarray):
raise ValueError(f"{key} voice preset must be a {str(self.preset_shape[key])}D ndarray.")
if len(voice_preset[key].shape) != self.preset_shape[key]:
raise ValueError(f"{key} voice preset must be a {str(self.preset_shape[key])}D ndarray.")
def __call__(
self,
text=None,
voice_preset=None,
return_tensors="pt",
max_length=256,
add_special_tokens=False,
return_attention_mask=True,
return_token_type_ids=False,
**kwargs,
):
"""
Main method to prepare for the model one or several sequences(s). This method forwards the `text` and `kwargs`
arguments to the AutoTokenizer's [`~AutoTokenizer.__call__`] to encode the text. The method also proposes a
voice preset which is a dictionary of arrays that conditions `Bark`'s output. `kwargs` arguments are forwarded
to the tokenizer and to `cached_file` method if `voice_preset` is a valid filename.
Args:
text (`str`, `List[str]`, `List[List[str]]`):
The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings
(pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set
`is_split_into_words=True` (to lift the ambiguity with a batch of sequences).
voice_preset (`str`, `Dict[np.ndarray]`):
The voice preset, i.e the speaker embeddings. It can either be a valid voice_preset name, e.g
`"en_speaker_1"`, or directly a dictionnary of `np.ndarray` embeddings for each submodel of `Bark`. Or
it can be a valid file name of a local `.npz` single voice preset.
return_tensors (`str` or [`~utils.TensorType`], *optional*):
If set, will return tensors of a particular framework. Acceptable values are:
- `'pt'`: Return PyTorch `torch.Tensor` objects.
- `'np'`: Return NumPy `np.ndarray` objects.
Returns:
Tuple([`BatchEncoding`], [`BatchFeature`]): A tuple composed of a [`BatchEncoding`], i.e the output of the
`tokenizer` and a [`BatchFeature`], i.e the voice preset with the right tensors type.
"""
if voice_preset is not None and not isinstance(voice_preset, dict):
if (
isinstance(voice_preset, str)
and self.speaker_embeddings is not None
and voice_preset in self.speaker_embeddings
):
voice_preset = self._load_voice_preset(voice_preset)
else:
if isinstance(voice_preset, str) and not voice_preset.endswith(".npz"):
voice_preset = voice_preset + ".npz"
voice_preset = np.load(voice_preset)
if voice_preset is not None:
self._validate_voice_preset_dict(voice_preset, **kwargs)
voice_preset = BatchFeature(data=voice_preset, tensor_type=return_tensors)
encoded_text = self.tokenizer(
text,
return_tensors=return_tensors,
padding="max_length",
max_length=max_length,
return_attention_mask=return_attention_mask,
return_token_type_ids=return_token_type_ids,
add_special_tokens=add_special_tokens,
**kwargs,
)
if voice_preset is not None:
encoded_text["history_prompt"] = voice_preset
return encoded_text
......@@ -816,6 +816,51 @@ class AutoformerPreTrainedModel(metaclass=DummyObject):
requires_backends(self, ["torch"])
BARK_PRETRAINED_MODEL_ARCHIVE_LIST = None
class BarkCausalModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class BarkCoarseModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class BarkFineModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class BarkModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class BarkPreTrainedModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
class BarkSemanticModel(metaclass=DummyObject):
_backends = ["torch"]
def __init__(self, *args, **kwargs):
requires_backends(self, ["torch"])
BART_PRETRAINED_MODEL_ARCHIVE_LIST = None
......
# coding=utf-8
# Copyright 2023 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Testing suite for the PyTorch Bark model. """
import copy
import inspect
import tempfile
import unittest
from transformers import (
BarkCoarseConfig,
BarkFineConfig,
BarkSemanticConfig,
is_torch_available,
)
from transformers.models.bark.generation_configuration_bark import (
BarkCoarseGenerationConfig,
BarkFineGenerationConfig,
BarkSemanticGenerationConfig,
)
from transformers.testing_utils import require_torch, slow, torch_device
from transformers.utils import cached_property
from ...generation.test_utils import GenerationTesterMixin
from ...test_configuration_common import ConfigTester
from ...test_modeling_common import ModelTesterMixin, ids_tensor, random_attention_mask
if is_torch_available():
import torch
from transformers import (
BarkCausalModel,
BarkCoarseModel,
BarkFineModel,
BarkModel,
BarkProcessor,
BarkSemanticModel,
)
class BarkSemanticModelTester:
def __init__(
self,
parent,
batch_size=2,
seq_length=4,
is_training=False, # for now training is not supported
use_input_mask=True,
use_labels=True,
vocab_size=33,
output_vocab_size=33,
hidden_size=16,
num_hidden_layers=2,
num_attention_heads=2,
intermediate_size=15,
dropout=0.1,
window_size=256,
initializer_range=0.02,
n_codes_total=8, # for BarkFineModel
n_codes_given=1, # for BarkFineModel
config_class=None,
model_class=None,
):
self.parent = parent
self.batch_size = batch_size
self.seq_length = seq_length
self.is_training = is_training
self.use_input_mask = use_input_mask
self.use_labels = use_labels
self.vocab_size = vocab_size
self.output_vocab_size = output_vocab_size
self.hidden_size = hidden_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.intermediate_size = intermediate_size
self.dropout = dropout
self.window_size = window_size
self.initializer_range = initializer_range
self.bos_token_id = output_vocab_size - 1
self.eos_token_id = output_vocab_size - 1
self.pad_token_id = output_vocab_size - 1
self.n_codes_total = n_codes_total
self.n_codes_given = n_codes_given
self.is_encoder_decoder = False
self.config_class = config_class
self.model_class = model_class
def prepare_config_and_inputs(self):
input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size)
input_mask = None
if self.use_input_mask:
input_mask = random_attention_mask([self.batch_size, self.seq_length])
config = self.get_config()
head_mask = ids_tensor([self.num_hidden_layers, self.num_attention_heads], 2)
inputs_dict = {
"input_ids": input_ids,
"head_mask": head_mask,
"attention_mask": input_mask,
}
return config, inputs_dict
def get_config(self):
return self.config_class(
vocab_size=self.vocab_size,
output_vocab_size=self.output_vocab_size,
hidden_size=self.hidden_size,
num_layers=self.num_hidden_layers,
num_heads=self.num_attention_heads,
use_cache=True,
bos_token_id=self.bos_token_id,
eos_token_id=self.eos_token_id,
pad_token_id=self.pad_token_id,
window_size=self.window_size,
)
def get_pipeline_config(self):
config = self.get_config()
config.vocab_size = 300
return config
def prepare_config_and_inputs_for_common(self):
config, inputs_dict = self.prepare_config_and_inputs()
return config, inputs_dict
def create_and_check_decoder_model_past_large_inputs(self, config, inputs_dict):
model = self.model_class(config=config).to(torch_device).eval()
input_ids = inputs_dict["input_ids"]
attention_mask = inputs_dict["attention_mask"]
# first forward pass
outputs = model(input_ids, attention_mask=attention_mask, use_cache=True)
output, past_key_values = outputs.to_tuple()
# create hypothetical multiple next token and extent to next_input_ids
next_tokens = ids_tensor((self.batch_size, 3), config.vocab_size)
next_attn_mask = ids_tensor((self.batch_size, 3), 2)
# append to next input_ids and
next_input_ids = torch.cat([input_ids, next_tokens], dim=-1)
next_attention_mask = torch.cat([attention_mask, next_attn_mask], dim=-1)
output_from_no_past = model(next_input_ids, attention_mask=next_attention_mask)["logits"]
output_from_past = model(next_tokens, attention_mask=next_attention_mask, past_key_values=past_key_values)[
"logits"
]
# select random slice
random_slice_idx = ids_tensor((1,), output_from_past.shape[-1]).item()
output_from_no_past_slice = output_from_no_past[:, -3:, random_slice_idx].detach()
output_from_past_slice = output_from_past[:, :, random_slice_idx].detach()
self.parent.assertTrue(output_from_past_slice.shape[1] == next_tokens.shape[1])
# test that outputs are equal for slice
self.parent.assertTrue(torch.allclose(output_from_past_slice, output_from_no_past_slice, atol=1e-3))
# test no attention_mask works
outputs = model(input_ids, use_cache=True)
_, past_key_values = outputs.to_tuple()
output_from_no_past = model(next_input_ids)["logits"]
output_from_past = model(next_tokens, past_key_values=past_key_values)["logits"]
random_slice_idx = ids_tensor((1,), output_from_past.shape[-1]).item()
output_from_no_past_slice = output_from_no_past[:, -3:, random_slice_idx].detach()
output_from_past_slice = output_from_past[:, :, random_slice_idx].detach()
# test that outputs are equal for slice
self.parent.assertTrue(torch.allclose(output_from_past_slice, output_from_no_past_slice, atol=1e-3))
class BarkCoarseModelTester:
def __init__(
self,
parent,
batch_size=2,
seq_length=4,
is_training=False, # for now training is not supported
use_input_mask=True,
use_labels=True,
vocab_size=33,
output_vocab_size=33,
hidden_size=16,
num_hidden_layers=2,
num_attention_heads=2,
intermediate_size=15,
dropout=0.1,
window_size=256,
initializer_range=0.02,
n_codes_total=8, # for BarkFineModel
n_codes_given=1, # for BarkFineModel
config_class=None,
model_class=None,
):
self.parent = parent
self.batch_size = batch_size
self.seq_length = seq_length
self.is_training = is_training
self.use_input_mask = use_input_mask
self.use_labels = use_labels
self.vocab_size = vocab_size
self.output_vocab_size = output_vocab_size
self.hidden_size = hidden_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.intermediate_size = intermediate_size
self.dropout = dropout
self.window_size = window_size
self.initializer_range = initializer_range
self.bos_token_id = output_vocab_size - 1
self.eos_token_id = output_vocab_size - 1
self.pad_token_id = output_vocab_size - 1
self.n_codes_total = n_codes_total
self.n_codes_given = n_codes_given
self.is_encoder_decoder = False
self.config_class = config_class
self.model_class = model_class
def prepare_config_and_inputs(self):
input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size)
input_mask = None
if self.use_input_mask:
input_mask = random_attention_mask([self.batch_size, self.seq_length])
config = self.get_config()
head_mask = ids_tensor([self.num_hidden_layers, self.num_attention_heads], 2)
inputs_dict = {
"input_ids": input_ids,
"head_mask": head_mask,
"attention_mask": input_mask,
}
return config, inputs_dict
def get_config(self):
return self.config_class(
vocab_size=self.vocab_size,
output_vocab_size=self.output_vocab_size,
hidden_size=self.hidden_size,
num_layers=self.num_hidden_layers,
num_heads=self.num_attention_heads,
use_cache=True,
bos_token_id=self.bos_token_id,
eos_token_id=self.eos_token_id,
pad_token_id=self.pad_token_id,
window_size=self.window_size,
)
def get_pipeline_config(self):
config = self.get_config()
config.vocab_size = 300
return config
def prepare_config_and_inputs_for_common(self):
config, inputs_dict = self.prepare_config_and_inputs()
return config, inputs_dict
def create_and_check_decoder_model_past_large_inputs(self, config, inputs_dict):
model = self.model_class(config=config).to(torch_device).eval()
input_ids = inputs_dict["input_ids"]
attention_mask = inputs_dict["attention_mask"]
# first forward pass
outputs = model(input_ids, attention_mask=attention_mask, use_cache=True)
output, past_key_values = outputs.to_tuple()
# create hypothetical multiple next token and extent to next_input_ids
next_tokens = ids_tensor((self.batch_size, 3), config.vocab_size)
next_attn_mask = ids_tensor((self.batch_size, 3), 2)
# append to next input_ids and
next_input_ids = torch.cat([input_ids, next_tokens], dim=-1)
next_attention_mask = torch.cat([attention_mask, next_attn_mask], dim=-1)
output_from_no_past = model(next_input_ids, attention_mask=next_attention_mask)["logits"]
output_from_past = model(next_tokens, attention_mask=next_attention_mask, past_key_values=past_key_values)[
"logits"
]
# select random slice
random_slice_idx = ids_tensor((1,), output_from_past.shape[-1]).item()
output_from_no_past_slice = output_from_no_past[:, -3:, random_slice_idx].detach()
output_from_past_slice = output_from_past[:, :, random_slice_idx].detach()
self.parent.assertTrue(output_from_past_slice.shape[1] == next_tokens.shape[1])
# test that outputs are equal for slice
self.parent.assertTrue(torch.allclose(output_from_past_slice, output_from_no_past_slice, atol=1e-3))
# test no attention_mask works
outputs = model(input_ids, use_cache=True)
_, past_key_values = outputs.to_tuple()
output_from_no_past = model(next_input_ids)["logits"]
output_from_past = model(next_tokens, past_key_values=past_key_values)["logits"]
random_slice_idx = ids_tensor((1,), output_from_past.shape[-1]).item()
output_from_no_past_slice = output_from_no_past[:, -3:, random_slice_idx].detach()
output_from_past_slice = output_from_past[:, :, random_slice_idx].detach()
# test that outputs are equal for slice
self.parent.assertTrue(torch.allclose(output_from_past_slice, output_from_no_past_slice, atol=1e-3))
class BarkFineModelTester:
def __init__(
self,
parent,
batch_size=2,
seq_length=4,
is_training=False, # for now training is not supported
use_input_mask=True,
use_labels=True,
vocab_size=33,
output_vocab_size=33,
hidden_size=16,
num_hidden_layers=2,
num_attention_heads=2,
intermediate_size=15,
dropout=0.1,
window_size=256,
initializer_range=0.02,
n_codes_total=8, # for BarkFineModel
n_codes_given=1, # for BarkFineModel
config_class=None,
model_class=None,
):
self.parent = parent
self.batch_size = batch_size
self.seq_length = seq_length
self.is_training = is_training
self.use_input_mask = use_input_mask
self.use_labels = use_labels
self.vocab_size = vocab_size
self.output_vocab_size = output_vocab_size
self.hidden_size = hidden_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.intermediate_size = intermediate_size
self.dropout = dropout
self.window_size = window_size
self.initializer_range = initializer_range
self.bos_token_id = output_vocab_size - 1
self.eos_token_id = output_vocab_size - 1
self.pad_token_id = output_vocab_size - 1
self.n_codes_total = n_codes_total
self.n_codes_given = n_codes_given
self.is_encoder_decoder = False
self.config_class = config_class
self.model_class = model_class
def prepare_config_and_inputs(self):
input_ids = ids_tensor([self.batch_size, self.seq_length, self.n_codes_total], self.vocab_size)
input_mask = None
if self.use_input_mask:
input_mask = random_attention_mask([self.batch_size, self.seq_length])
config = self.get_config()
head_mask = ids_tensor([self.num_hidden_layers, self.num_attention_heads], 2)
# randint between self.n_codes_given - 1 and self.n_codes_total - 1
codebook_idx = ids_tensor((1,), self.n_codes_total - self.n_codes_given).item() + self.n_codes_given
inputs_dict = {
"codebook_idx": codebook_idx,
"input_ids": input_ids,
"head_mask": head_mask,
"attention_mask": input_mask,
}
return config, inputs_dict
def get_config(self):
return self.config_class(
vocab_size=self.vocab_size,
output_vocab_size=self.output_vocab_size,
hidden_size=self.hidden_size,
num_layers=self.num_hidden_layers,
num_heads=self.num_attention_heads,
use_cache=True,
bos_token_id=self.bos_token_id,
eos_token_id=self.eos_token_id,
pad_token_id=self.pad_token_id,
window_size=self.window_size,
)
def get_pipeline_config(self):
config = self.get_config()
config.vocab_size = 300
return config
def prepare_config_and_inputs_for_common(self):
config, inputs_dict = self.prepare_config_and_inputs()
return config, inputs_dict
def create_and_check_decoder_model_past_large_inputs(self, config, inputs_dict):
model = self.model_class(config=config).to(torch_device).eval()
input_ids = inputs_dict["input_ids"]
attention_mask = inputs_dict["attention_mask"]
# first forward pass
outputs = model(input_ids, attention_mask=attention_mask, use_cache=True)
output, past_key_values = outputs.to_tuple()
# create hypothetical multiple next token and extent to next_input_ids
next_tokens = ids_tensor((self.batch_size, 3), config.vocab_size)
next_attn_mask = ids_tensor((self.batch_size, 3), 2)
# append to next input_ids and
next_input_ids = torch.cat([input_ids, next_tokens], dim=-1)
next_attention_mask = torch.cat([attention_mask, next_attn_mask], dim=-1)
output_from_no_past = model(next_input_ids, attention_mask=next_attention_mask)["logits"]
output_from_past = model(next_tokens, attention_mask=next_attention_mask, past_key_values=past_key_values)[
"logits"
]
# select random slice
random_slice_idx = ids_tensor((1,), output_from_past.shape[-1]).item()
output_from_no_past_slice = output_from_no_past[:, -3:, random_slice_idx].detach()
output_from_past_slice = output_from_past[:, :, random_slice_idx].detach()
self.parent.assertTrue(output_from_past_slice.shape[1] == next_tokens.shape[1])
# test that outputs are equal for slice
self.parent.assertTrue(torch.allclose(output_from_past_slice, output_from_no_past_slice, atol=1e-3))
# test no attention_mask works
outputs = model(input_ids, use_cache=True)
_, past_key_values = outputs.to_tuple()
output_from_no_past = model(next_input_ids)["logits"]
output_from_past = model(next_tokens, past_key_values=past_key_values)["logits"]
random_slice_idx = ids_tensor((1,), output_from_past.shape[-1]).item()
output_from_no_past_slice = output_from_no_past[:, -3:, random_slice_idx].detach()
output_from_past_slice = output_from_past[:, :, random_slice_idx].detach()
# test that outputs are equal for slice
self.parent.assertTrue(torch.allclose(output_from_past_slice, output_from_no_past_slice, atol=1e-3))
@require_torch
class BarkSemanticModelTest(ModelTesterMixin, GenerationTesterMixin, unittest.TestCase):
all_model_classes = (BarkSemanticModel,) if is_torch_available() else ()
all_generative_model_classes = (BarkCausalModel,) if is_torch_available() else ()
is_encoder_decoder = False
fx_compatible = False
test_missing_keys = False
test_pruning = False
test_model_parallel = False
# no model_parallel for now
test_resize_embeddings = True
def setUp(self):
self.model_tester = BarkSemanticModelTester(
self, config_class=BarkSemanticConfig, model_class=BarkSemanticModel
)
self.config_tester = ConfigTester(self, config_class=BarkSemanticConfig, n_embd=37)
def test_config(self):
self.config_tester.run_common_tests()
def test_save_load_strict(self):
config, inputs_dict = self.model_tester.prepare_config_and_inputs()
for model_class in self.all_model_classes:
model = model_class(config)
with tempfile.TemporaryDirectory() as tmpdirname:
model.save_pretrained(tmpdirname)
model2, info = model_class.from_pretrained(tmpdirname, output_loading_info=True)
self.assertEqual(info["missing_keys"], [])
def test_decoder_model_past_with_large_inputs(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_decoder_model_past_large_inputs(*config_and_inputs)
def test_inputs_embeds(self):
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
for model_class in self.all_model_classes:
model = model_class(config)
model.to(torch_device)
model.eval()
inputs = copy.deepcopy(self._prepare_for_class(inputs_dict, model_class))
input_ids = inputs["input_ids"]
del inputs["input_ids"]
wte = model.get_input_embeddings()
inputs["input_embeds"] = wte(input_ids)
with torch.no_grad():
model(**inputs)[0]
def test_generate_fp16(self):
config, input_dict = self.model_tester.prepare_config_and_inputs()
input_ids = input_dict["input_ids"]
attention_mask = input_ids.ne(1).to(torch_device)
model = self.all_generative_model_classes[0](config).eval().to(torch_device)
if torch_device == "cuda":
model.half()
model.generate(input_ids, attention_mask=attention_mask)
model.generate(num_beams=4, do_sample=True, early_stopping=False, num_return_sequences=3)
@require_torch
class BarkCoarseModelTest(ModelTesterMixin, GenerationTesterMixin, unittest.TestCase):
# Same tester as BarkSemanticModelTest, except for model_class and config_class
all_model_classes = (BarkCoarseModel,) if is_torch_available() else ()
all_generative_model_classes = (BarkCausalModel,) if is_torch_available() else ()
is_encoder_decoder = False
fx_compatible = False
test_missing_keys = False
test_pruning = False
test_model_parallel = False
# no model_parallel for now
test_resize_embeddings = True
def setUp(self):
self.model_tester = BarkCoarseModelTester(self, config_class=BarkCoarseConfig, model_class=BarkCoarseModel)
self.config_tester = ConfigTester(self, config_class=BarkCoarseConfig, n_embd=37)
def test_config(self):
self.config_tester.run_common_tests()
def test_save_load_strict(self):
config, inputs_dict = self.model_tester.prepare_config_and_inputs()
for model_class in self.all_model_classes:
model = model_class(config)
with tempfile.TemporaryDirectory() as tmpdirname:
model.save_pretrained(tmpdirname)
model2, info = model_class.from_pretrained(tmpdirname, output_loading_info=True)
self.assertEqual(info["missing_keys"], [])
def test_decoder_model_past_with_large_inputs(self):
config_and_inputs = self.model_tester.prepare_config_and_inputs()
self.model_tester.create_and_check_decoder_model_past_large_inputs(*config_and_inputs)
def test_inputs_embeds(self):
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
for model_class in self.all_model_classes:
model = model_class(config)
model.to(torch_device)
model.eval()
inputs = copy.deepcopy(self._prepare_for_class(inputs_dict, model_class))
input_ids = inputs["input_ids"]
del inputs["input_ids"]
wte = model.get_input_embeddings()
inputs["input_embeds"] = wte(input_ids)
with torch.no_grad():
model(**inputs)[0]
def test_generate_fp16(self):
config, input_dict = self.model_tester.prepare_config_and_inputs()
input_ids = input_dict["input_ids"]
attention_mask = input_ids.ne(1).to(torch_device)
model = self.all_generative_model_classes[0](config).eval().to(torch_device)
if torch_device == "cuda":
model.half()
model.generate(input_ids, attention_mask=attention_mask)
model.generate(num_beams=4, do_sample=True, early_stopping=False, num_return_sequences=3)
@require_torch
class BarkFineModelTest(ModelTesterMixin, unittest.TestCase):
all_model_classes = (BarkFineModel,) if is_torch_available() else ()
is_encoder_decoder = False
fx_compatible = False
test_missing_keys = False
test_pruning = False
# no model_parallel for now
test_model_parallel = False
# torchscript disabled for now because forward with an int
test_torchscript = False
test_resize_embeddings = True
def setUp(self):
self.model_tester = BarkFineModelTester(self, config_class=BarkFineConfig, model_class=BarkFineModel)
self.config_tester = ConfigTester(self, config_class=BarkFineConfig, n_embd=37)
def test_config(self):
self.config_tester.run_common_tests()
def test_save_load_strict(self):
config, inputs_dict = self.model_tester.prepare_config_and_inputs()
for model_class in self.all_model_classes:
model = model_class(config)
with tempfile.TemporaryDirectory() as tmpdirname:
model.save_pretrained(tmpdirname)
model2, info = model_class.from_pretrained(tmpdirname, output_loading_info=True)
self.assertEqual(info["missing_keys"], [])
def test_inputs_embeds(self):
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
for model_class in self.all_model_classes:
model = model_class(config)
model.to(torch_device)
model.eval()
inputs = copy.deepcopy(self._prepare_for_class(inputs_dict, model_class))
input_ids = inputs["input_ids"]
del inputs["input_ids"]
wte = model.get_input_embeddings()[inputs_dict["codebook_idx"]]
inputs["input_embeds"] = wte(input_ids[:, :, inputs_dict["codebook_idx"]])
with torch.no_grad():
model(**inputs)[0]
def test_generate_fp16(self):
config, input_dict = self.model_tester.prepare_config_and_inputs()
input_ids = input_dict["input_ids"]
# take first codebook channel
model = self.all_model_classes[0](config).eval().to(torch_device)
if torch_device == "cuda":
model.half()
# toy generation_configs
semantic_generation_config = BarkSemanticGenerationConfig(semantic_vocab_size=0)
coarse_generation_config = BarkCoarseGenerationConfig(n_coarse_codebooks=config.n_codes_given)
fine_generation_config = BarkFineGenerationConfig(
max_fine_history_length=config.block_size // 2,
max_fine_input_length=config.block_size,
n_fine_codebooks=config.n_codes_total,
)
codebook_size = config.vocab_size - 1
model.generate(
input_ids,
history_prompt=None,
temperature=None,
semantic_generation_config=semantic_generation_config,
coarse_generation_config=coarse_generation_config,
fine_generation_config=fine_generation_config,
codebook_size=codebook_size,
)
model.generate(
input_ids,
history_prompt=None,
temperature=0.7,
semantic_generation_config=semantic_generation_config,
coarse_generation_config=coarse_generation_config,
fine_generation_config=fine_generation_config,
codebook_size=codebook_size,
)
def test_forward_signature(self):
config, _ = self.model_tester.prepare_config_and_inputs_for_common()
for model_class in self.all_model_classes:
model = model_class(config)
signature = inspect.signature(model.forward)
# signature.parameters is an OrderedDict => so arg_names order is deterministic
arg_names = [*signature.parameters.keys()]
expected_arg_names = ["codebook_idx", "input_ids"]
self.assertListEqual(arg_names[:2], expected_arg_names)
def test_model_common_attributes(self):
# one embedding layer per codebook
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
for model_class in self.all_model_classes:
model = model_class(config)
self.assertIsInstance(model.get_input_embeddings()[0], (torch.nn.Embedding))
model.set_input_embeddings(
torch.nn.ModuleList([torch.nn.Embedding(10, 10) for _ in range(config.n_codes_total)])
)
x = model.get_output_embeddings()
self.assertTrue(x is None or isinstance(x[0], torch.nn.Linear))
def test_resize_tokens_embeddings(self):
# resizing tokens_embeddings of a ModuleList
original_config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
if not self.test_resize_embeddings:
return
for model_class in self.all_model_classes:
config = copy.deepcopy(original_config)
model = model_class(config)
model.to(torch_device)
if self.model_tester.is_training is False:
model.eval()
model_vocab_size = config.vocab_size
# Retrieve the embeddings and clone theme
model_embed_list = model.resize_token_embeddings(model_vocab_size)
cloned_embeddings_list = [model_embed.weight.clone() for model_embed in model_embed_list]
# Check that resizing the token embeddings with a larger vocab size increases the model's vocab size
model_embed_list = model.resize_token_embeddings(model_vocab_size + 10)
self.assertEqual(model.config.vocab_size, model_vocab_size + 10)
# Check that it actually resizes the embeddings matrix for each codebook
for model_embed, cloned_embeddings in zip(model_embed_list, cloned_embeddings_list):
self.assertEqual(model_embed.weight.shape[0], cloned_embeddings.shape[0] + 10)
# Check that the model can still do a forward pass successfully (every parameter should be resized)
model(**self._prepare_for_class(inputs_dict, model_class))
# Check that resizing the token embeddings with a smaller vocab size decreases the model's vocab size
model_embed_list = model.resize_token_embeddings(model_vocab_size - 15)
self.assertEqual(model.config.vocab_size, model_vocab_size - 15)
for model_embed, cloned_embeddings in zip(model_embed_list, cloned_embeddings_list):
self.assertEqual(model_embed.weight.shape[0], cloned_embeddings.shape[0] - 15)
# Check that the model can still do a forward pass successfully (every parameter should be resized)
# Input ids should be clamped to the maximum size of the vocabulary
inputs_dict["input_ids"].clamp_(max=model_vocab_size - 15 - 1)
model(**self._prepare_for_class(inputs_dict, model_class))
# Check that adding and removing tokens has not modified the first part of the embedding matrix.
# only check for the first embedding matrix
models_equal = True
for p1, p2 in zip(cloned_embeddings_list[0], model_embed_list[0].weight):
if p1.data.ne(p2.data).sum() > 0:
models_equal = False
self.assertTrue(models_equal)
def test_resize_embeddings_untied(self):
# resizing tokens_embeddings of a ModuleList
original_config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
if not self.test_resize_embeddings:
return
original_config.tie_word_embeddings = False
for model_class in self.all_model_classes:
config = copy.deepcopy(original_config)
model = model_class(config).to(torch_device)
# if no output embeddings -> leave test
if model.get_output_embeddings() is None:
continue
# Check that resizing the token embeddings with a larger vocab size increases the model's vocab size
model_vocab_size = config.vocab_size
model.resize_token_embeddings(model_vocab_size + 10)
self.assertEqual(model.config.vocab_size, model_vocab_size + 10)
output_embeds_list = model.get_output_embeddings()
for output_embeds in output_embeds_list:
self.assertEqual(output_embeds.weight.shape[0], model_vocab_size + 10)
# Check bias if present
if output_embeds.bias is not None:
self.assertEqual(output_embeds.bias.shape[0], model_vocab_size + 10)
# Check that the model can still do a forward pass successfully (every parameter should be resized)
model(**self._prepare_for_class(inputs_dict, model_class))
# Check that resizing the token embeddings with a smaller vocab size decreases the model's vocab size
model.resize_token_embeddings(model_vocab_size - 15)
self.assertEqual(model.config.vocab_size, model_vocab_size - 15)
# Check that it actually resizes the embeddings matrix
output_embeds_list = model.get_output_embeddings()
for output_embeds in output_embeds_list:
self.assertEqual(output_embeds.weight.shape[0], model_vocab_size - 15)
# Check bias if present
if output_embeds.bias is not None:
self.assertEqual(output_embeds.bias.shape[0], model_vocab_size - 15)
# Check that the model can still do a forward pass successfully (every parameter should be resized)
# Input ids should be clamped to the maximum size of the vocabulary
inputs_dict["input_ids"].clamp_(max=model_vocab_size - 15 - 1)
# Check that the model can still do a forward pass successfully (every parameter should be resized)
model(**self._prepare_for_class(inputs_dict, model_class))
@require_torch
class BarkModelIntegrationTests(unittest.TestCase):
@cached_property
def model(self):
return BarkModel.from_pretrained("ylacombe/bark-large").to(torch_device)
@cached_property
def processor(self):
return BarkProcessor.from_pretrained("ylacombe/bark-large")
@cached_property
def inputs(self):
input_ids = self.processor("In the light of the moon, a little egg lay on a leaf", voice_preset="en_speaker_6")
input_ids = input_ids.to(torch_device)
return input_ids
@cached_property
def semantic_generation_config(self):
semantic_generation_config = BarkSemanticGenerationConfig(**self.model.generation_config.semantic_config)
return semantic_generation_config
@cached_property
def coarse_generation_config(self):
coarse_generation_config = BarkCoarseGenerationConfig(**self.model.generation_config.coarse_acoustics_config)
return coarse_generation_config
@cached_property
def fine_generation_config(self):
fine_generation_config = BarkFineGenerationConfig(**self.model.generation_config.fine_acoustics_config)
return fine_generation_config
@slow
def test_generate_semantic(self):
input_ids = self.inputs
# fmt: off
# check first ids
expected_output_ids = [7363, 321, 41, 1461, 6915, 952, 326, 41, 41, 927,]
# fmt: on
# greedy decoding
with torch.no_grad():
output_ids = self.model.semantic.generate(
**input_ids,
do_sample=False,
semantic_generation_config=self.semantic_generation_config,
)
self.assertListEqual(output_ids[0, : len(expected_output_ids)].tolist(), expected_output_ids)
@slow
def test_generate_coarse(self):
input_ids = self.inputs
history_prompt = input_ids["history_prompt"]
# fmt: off
# check first ids
expected_output_ids = [11018, 11391, 10651, 11418, 10857, 11620, 10642, 11366, 10312, 11528, 10531, 11516, 10474, 11051, 10524, 11051, ]
# fmt: on
with torch.no_grad():
output_ids = self.model.semantic.generate(
**input_ids,
do_sample=False,
semantic_generation_config=self.semantic_generation_config,
)
output_ids = self.model.coarse_acoustics.generate(
output_ids,
history_prompt=history_prompt,
do_sample=False,
semantic_generation_config=self.semantic_generation_config,
coarse_generation_config=self.coarse_generation_config,
codebook_size=self.model.generation_config.codebook_size,
)
self.assertListEqual(output_ids[0, : len(expected_output_ids)].tolist(), expected_output_ids)
@slow
def test_generate_fine(self):
input_ids = self.inputs
history_prompt = input_ids["history_prompt"]
# fmt: off
expected_output_ids = [
[1018, 651, 857, 642, 312, 531, 474, 524, 524, 776,],
[367, 394, 596, 342, 504, 492, 27, 27, 822, 822,],
[961, 955, 221, 955, 955, 686, 939, 939, 479, 176,],
[638, 365, 218, 944, 853, 363, 639, 22, 884, 456,],
[302, 912, 524, 38, 174, 209, 879, 23, 910, 227,],
[440, 673, 861, 666, 372, 558, 49, 172, 232, 342,],
[244, 358, 123, 356, 586, 520, 499, 877, 542, 637,],
[806, 685, 905, 848, 803, 810, 921, 208, 625, 203,],
]
# fmt: on
with torch.no_grad():
output_ids = self.model.semantic.generate(
**input_ids,
do_sample=False,
semantic_generation_config=self.semantic_generation_config,
)
output_ids = self.model.coarse_acoustics.generate(
output_ids,
history_prompt=history_prompt,
do_sample=False,
semantic_generation_config=self.semantic_generation_config,
coarse_generation_config=self.coarse_generation_config,
codebook_size=self.model.generation_config.codebook_size,
)
# greedy decoding
output_ids = self.model.fine_acoustics.generate(
output_ids,
history_prompt=history_prompt,
temperature=None,
semantic_generation_config=self.semantic_generation_config,
coarse_generation_config=self.coarse_generation_config,
fine_generation_config=self.fine_generation_config,
codebook_size=self.model.generation_config.codebook_size,
)
self.assertListEqual(output_ids[0, :, : len(expected_output_ids[0])].tolist(), expected_output_ids)
@slow
def test_generate_end_to_end(self):
input_ids = self.inputs
with torch.no_grad():
self.model.generate(**input_ids)
self.model.generate(**{key: val for (key, val) in input_ids.items() if key != "history_prompt"})
@slow
def test_generate_end_to_end_with_args(self):
input_ids = self.inputs
with torch.no_grad():
self.model.generate(**input_ids, do_sample=True, temperature=0.6, penalty_alpha=0.6)
self.model.generate(**input_ids, do_sample=True, temperature=0.6, num_beams=4)
@slow
def test_generate_end_to_end_with_sub_models_args(self):
input_ids = self.inputs
with torch.no_grad():
self.model.generate(**input_ids, do_sample=False, coarse_do_sample=True, coarse_temperature=0.7)
self.model.generate(
**input_ids, do_sample=False, coarse_do_sample=True, coarse_temperature=0.7, fine_temperature=0.3
)
self.model.generate(
**input_ids,
do_sample=True,
temperature=0.6,
penalty_alpha=0.6,
semantic_temperature=0.9,
coarse_temperature=0.2,
fine_temperature=0.1,
)
# Copyright 2023 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import tempfile
import unittest
import numpy as np
from transformers import AutoTokenizer, BarkProcessor
from transformers.testing_utils import require_torch, slow
@require_torch
class BarkProcessorTest(unittest.TestCase):
def setUp(self):
self.checkpoint = "ylacombe/bark-small"
self.tmpdirname = tempfile.mkdtemp()
self.voice_preset = "en_speaker_1"
self.input_string = "This is a test string"
self.speaker_embeddings_dict_path = "speaker_embeddings_path.json"
self.speaker_embeddings_directory = "speaker_embeddings"
def get_tokenizer(self, **kwargs):
return AutoTokenizer.from_pretrained(self.checkpoint, **kwargs)
def tearDown(self):
shutil.rmtree(self.tmpdirname)
def test_save_load_pretrained_default(self):
tokenizer = self.get_tokenizer()
processor = BarkProcessor(tokenizer=tokenizer)
processor.save_pretrained(self.tmpdirname)
processor = BarkProcessor.from_pretrained(self.tmpdirname)
self.assertEqual(processor.tokenizer.get_vocab(), tokenizer.get_vocab())
@slow
def test_save_load_pretrained_additional_features(self):
processor = BarkProcessor.from_pretrained(
pretrained_processor_name_or_path=self.checkpoint,
speaker_embeddings_dict_path=self.speaker_embeddings_dict_path,
)
processor.save_pretrained(
self.tmpdirname,
speaker_embeddings_dict_path=self.speaker_embeddings_dict_path,
speaker_embeddings_directory=self.speaker_embeddings_directory,
)
tokenizer_add_kwargs = self.get_tokenizer(bos_token="(BOS)", eos_token="(EOS)")
processor = BarkProcessor.from_pretrained(
self.tmpdirname,
self.speaker_embeddings_dict_path,
bos_token="(BOS)",
eos_token="(EOS)",
)
self.assertEqual(processor.tokenizer.get_vocab(), tokenizer_add_kwargs.get_vocab())
def test_speaker_embeddings(self):
processor = BarkProcessor.from_pretrained(
pretrained_processor_name_or_path=self.checkpoint,
speaker_embeddings_dict_path=self.speaker_embeddings_dict_path,
)
seq_len = 35
nb_codebooks_coarse = 2
nb_codebooks_total = 8
voice_preset = {
"semantic_prompt": np.ones(seq_len),
"coarse_prompt": np.ones((nb_codebooks_coarse, seq_len)),
"fine_prompt": np.ones((nb_codebooks_total, seq_len)),
}
# test providing already loaded voice_preset
inputs = processor(text=self.input_string, voice_preset=voice_preset)
processed_voice_preset = inputs["history_prompt"]
for key in voice_preset:
self.assertListEqual(voice_preset[key].tolist(), processed_voice_preset.get(key, np.array([])).tolist())
# test loading voice preset from npz file
tmpfilename = os.path.join(self.tmpdirname, "file.npz")
np.savez(tmpfilename, **voice_preset)
inputs = processor(text=self.input_string, voice_preset=tmpfilename)
processed_voice_preset = inputs["history_prompt"]
for key in voice_preset:
self.assertListEqual(voice_preset[key].tolist(), processed_voice_preset.get(key, np.array([])).tolist())
# test loading voice preset from the hub
inputs = processor(text=self.input_string, voice_preset=self.voice_preset)
def test_tokenizer(self):
tokenizer = self.get_tokenizer()
processor = BarkProcessor(tokenizer=tokenizer)
encoded_processor = processor(text=self.input_string)
encoded_tok = tokenizer(
self.input_string,
padding="max_length",
max_length=256,
add_special_tokens=False,
return_attention_mask=True,
return_token_type_ids=False,
)
for key in encoded_tok.keys():
self.assertListEqual(encoded_tok[key], encoded_processor[key].squeeze().tolist())
......@@ -167,6 +167,8 @@ IGNORE_NON_TESTED = PRIVATE_MODELS.copy() + [
"SpeechT5SpeechEncoder", # Building part of bigger (tested) model.
"SpeechT5TextDecoder", # Building part of bigger (tested) model.
"SpeechT5TextEncoder", # Building part of bigger (tested) model.
"BarkCausalModel", # Building part of bigger (tested) model.
"BarkModel", # Does not have a forward signature - generation tested with integration tests
]
# Update this list with test files that don't have a tester with a `all_model_classes` variable and which don't
......@@ -188,6 +190,7 @@ TEST_FILES_WITH_NO_COMMON_TESTS = [
"models/vision_text_dual_encoder/test_modeling_tf_vision_text_dual_encoder.py",
"models/vision_text_dual_encoder/test_modeling_flax_vision_text_dual_encoder.py",
"models/decision_transformer/test_modeling_decision_transformer.py",
"models/bark/test_modeling_bark.py",
]
# Update this list for models that are not in any of the auto MODEL_XXX_MAPPING. Being in this list is an exception and
......@@ -332,11 +335,15 @@ IGNORE_NON_AUTO_CONFIGURED = PRIVATE_MODELS.copy() + [
"AltCLIPVisionModel",
"AltRobertaModel",
"TvltForAudioVisualClassification",
"BarkCausalModel",
"BarkCoarseModel",
"BarkFineModel",
"BarkSemanticModel",
"MusicgenModel",
"MusicgenForConditionalGeneration",
"SpeechT5ForSpeechToSpeech",
"SpeechT5ForTextToSpeech",
"SpeechT5HifiGan",
"MusicgenModel",
"MusicgenForConditionalGeneration",
]
# DO NOT edit this list!
......
......@@ -28,6 +28,9 @@ src/transformers/models/auto/feature_extraction_auto.py
src/transformers/models/auto/image_processing_auto.py
src/transformers/models/auto/processing_auto.py
src/transformers/models/auto/tokenization_auto.py
src/transformers/models/bark/configuration_bark.py
src/transformers/models/bark/modeling_bark.py
src/transformers/models/bark/processing_bark.py
src/transformers/models/bart/configuration_bart.py
src/transformers/models/bart/modeling_bart.py
src/transformers/models/bart/tokenization_bart.py
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment